Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/137718.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 137718
summary: "OTLP: return correct response type for partial successes"
area: TSDB
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;

import com.google.protobuf.MessageLite;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.BytesRef;
Expand Down Expand Up @@ -49,7 +47,9 @@
import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Transport action for handling OpenTelemetry Protocol (OTLP) Metrics requests.
Expand All @@ -68,6 +68,7 @@ public class OTLPMetricsTransportAction extends HandledTransportAction<
public static final ActionType<MetricsResponse> TYPE = new ActionType<>(NAME);

private static final Logger logger = LogManager.getLogger(OTLPMetricsTransportAction.class);
public static final int IGNORED_DATA_POINTS_MESSAGE_LIMIT = 10;
private final Client client;

@Inject
Expand Down Expand Up @@ -136,6 +137,7 @@ private void addIndexRequest(
.setRequireDataStream(true)
.source(xContentBuilder)
.tsid(tsid)
.setIncludeSourceOnError(false)
.setDynamicTemplates(dynamicTemplates)
);
}
Expand All @@ -158,7 +160,10 @@ private static void handlePartialSuccess(ActionListener<MetricsResponse> listene
// (i.e. when the server accepts only parts of the data and rejects the rest),
// the server MUST respond with HTTP 200 OK.
// https://opentelemetry.io/docs/specs/otlp/#partial-success-1
MessageLite response = responseWithRejectedDataPoints(context.getIgnoredDataPoints(), context.getIgnoredDataPointsMessage());
ExportMetricsServiceResponse response = responseWithRejectedDataPoints(
context.getIgnoredDataPoints(),
context.getIgnoredDataPointsMessage(IGNORED_DATA_POINTS_MESSAGE_LIMIT)
);
listener.onResponse(new MetricsResponse(RestStatus.BAD_REQUEST, response));
}

Expand All @@ -167,29 +172,62 @@ private static void handlePartialSuccess(
DataPointGroupingContext context,
ActionListener<MetricsResponse> listener
) {
Map<String, Map<RestStatus, FailureGroup>> failureGroups = new HashMap<>();
// If the request is only partially accepted
// (i.e. when the server accepts only parts of the data and rejects the rest),
// the server MUST respond with HTTP 200 OK.
// https://opentelemetry.io/docs/specs/otlp/#partial-success-1
RestStatus status = RestStatus.OK;
int failures = 0;
for (BulkItemResponse bulkItemResponse : bulkItemResponses.getItems()) {
failures += bulkItemResponse.isFailed() ? 1 : 0;
if (bulkItemResponse.isFailed() && bulkItemResponse.getFailure().getStatus() == RestStatus.TOO_MANY_REQUESTS) {
// If the server receives more requests than the client is allowed or the server is overloaded,
// the server SHOULD respond with HTTP 429 Too Many Requests or HTTP 503 Service Unavailable
// and MAY include “Retry-After” header with a recommended time interval in seconds to wait before retrying.
// https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling
status = RestStatus.TOO_MANY_REQUESTS;
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
if (failure != null) {
// we're counting each document as one data point here
// which is an approximation since one document can represent multiple data points
failures++;
if (failure.getStatus() == RestStatus.TOO_MANY_REQUESTS) {
// If the server receives more requests than the client is allowed or the server is overloaded,
// the server SHOULD respond with HTTP 429 Too Many Requests or HTTP 503 Service Unavailable
// and MAY include “Retry-After” header with a recommended time interval in seconds to wait before retrying.
// https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling
status = RestStatus.TOO_MANY_REQUESTS;
}
FailureGroup failureGroup = failureGroups.computeIfAbsent(failure.getIndex(), k -> new HashMap<>())
.computeIfAbsent(failure.getStatus(), k -> new FailureGroup(new AtomicInteger(0), failure.getMessage()));
failureGroup.failureCount().incrementAndGet();
}
}
MessageLite response = responseWithRejectedDataPoints(
if (bulkItemResponses.getItems().length == failures) {
// all data points failed, so we report total data points as failures
failures = context.totalDataPoints();
}
StringBuilder failureMessageBuilder = new StringBuilder();
for (Map.Entry<String, Map<RestStatus, FailureGroup>> indexEntry : failureGroups.entrySet()) {
String indexName = indexEntry.getKey();
for (Map.Entry<RestStatus, FailureGroup> statusEntry : indexEntry.getValue().entrySet()) {
RestStatus restStatus = statusEntry.getKey();
FailureGroup failureGroup = statusEntry.getValue();
failureMessageBuilder.append("Index [")
.append(indexName)
.append("] returned status [")
.append(restStatus)
.append("] for ")
.append(failureGroup.failureCount())
.append(" documents. Sample error message: ");
failureMessageBuilder.append(failureGroup.failureMessageSample());
failureMessageBuilder.append("\n");
}
}
failureMessageBuilder.append(context.getIgnoredDataPointsMessage(10));
ExportMetricsServiceResponse response = responseWithRejectedDataPoints(
failures + context.getIgnoredDataPoints(),
bulkItemResponses.buildFailureMessage() + context.getIgnoredDataPointsMessage()
failureMessageBuilder.toString()
);
listener.onResponse(new MetricsResponse(status, response));
}

record FailureGroup(AtomicInteger failureCount, String failureMessageSample) {}

private static void handleFailure(ActionListener<MetricsResponse> listener, Exception e, DataPointGroupingContext context) {
// https://opentelemetry.io/docs/specs/otlp/#failures-1
// If the processing of the request fails,
Expand All @@ -199,12 +237,12 @@ private static void handleFailure(ActionListener<MetricsResponse> listener, Exce
);
}

private static ExportMetricsPartialSuccess responseWithRejectedDataPoints(int rejectedDataPoints, String message) {
return ExportMetricsServiceResponse.newBuilder()
.getPartialSuccessBuilder()
private static ExportMetricsServiceResponse responseWithRejectedDataPoints(int rejectedDataPoints, String message) {
ExportMetricsPartialSuccess partialSuccess = ExportMetricsPartialSuccess.newBuilder()
.setRejectedDataPoints(rejectedDataPoints)
.setErrorMessage(message)
.build();
return ExportMetricsServiceResponse.newBuilder().setPartialSuccess(partialSuccess).build();
}

public static class MetricsRequest extends ActionRequest implements CompositeIndicesRequest {
Expand All @@ -229,7 +267,7 @@ public static class MetricsResponse extends ActionResponse {
private final BytesReference response;
private final RestStatus status;

public MetricsResponse(RestStatus status, MessageLite response) {
public MetricsResponse(RestStatus status, ExportMetricsServiceResponse response) {
this(status, new BytesArray(response.toByteArray()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,23 @@ public int getIgnoredDataPoints() {
return ignoredDataPoints;
}

public String getIgnoredDataPointsMessage() {
return ignoredDataPointMessages.isEmpty() ? "" : String.join("\n", ignoredDataPointMessages);
public String getIgnoredDataPointsMessage(int limit) {
if (ignoredDataPointMessages.isEmpty()) {
return "";
}
StringBuilder sb = new StringBuilder();
sb.append("Ignored ").append(ignoredDataPoints).append(" data points due to the following reasons:\n");
int count = 0;
for (String message : ignoredDataPointMessages) {
sb.append(" - ").append(message).append("\n");
count++;
if (count >= limit) {
sb.append(" - ... and more\n");
break;
}
}
return sb.toString();

}

private ResourceGroup getOrCreateResourceGroup(ResourceMetrics resourceMetrics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.keyValue;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
Expand Down Expand Up @@ -76,12 +77,13 @@ public void testSuccessEmptyRequest() throws Exception {

public void test429() throws Exception {
BulkItemResponse[] bulkItemResponses = new BulkItemResponse[] {
failureResponse(RestStatus.TOO_MANY_REQUESTS, "too many requests"),
failureResponse("metrics-generic.otel-default", RestStatus.TOO_MANY_REQUESTS, "too many requests"),
successResponse() };
MetricsResponse response = executeRequest(createMetricsRequest(createMetric()), new BulkResponse(bulkItemResponses, 0));

assertThat(response.getStatus(), equalTo(RestStatus.TOO_MANY_REQUESTS));
ExportMetricsPartialSuccess metricsServiceResponse = ExportMetricsPartialSuccess.parseFrom(response.getResponse().array());
ExportMetricsPartialSuccess metricsServiceResponse = ExportMetricsServiceResponse.parseFrom(response.getResponse().array())
.getPartialSuccess();
assertThat(
metricsServiceResponse.getRejectedDataPoints(),
equalTo(Arrays.stream(bulkItemResponses).filter(BulkItemResponse::isFailed).count())
Expand All @@ -92,13 +94,25 @@ public void test429() throws Exception {
public void testPartialSuccess() throws Exception {
MetricsResponse response = executeRequest(
createMetricsRequest(createMetric()),
new BulkResponse(new BulkItemResponse[] { failureResponse(RestStatus.BAD_REQUEST, "bad request") }, 0)
new BulkResponse(
new BulkItemResponse[] {
failureResponse("metrics-generic.otel-default", RestStatus.BAD_REQUEST, "bad request 1"),
failureResponse("metrics-generic.otel-default", RestStatus.BAD_REQUEST, "bad request 2"),
failureResponse("metrics-hostmetricsreceiver.otel-default", RestStatus.BAD_REQUEST, "bad request 3"),
failureResponse("metrics-generic.otel-default", RestStatus.INTERNAL_SERVER_ERROR, "internal server error") },
0
)
);

assertThat(response.getStatus(), equalTo(RestStatus.OK));
ExportMetricsPartialSuccess metricsServiceResponse = ExportMetricsPartialSuccess.parseFrom(response.getResponse().array());
ExportMetricsPartialSuccess metricsServiceResponse = ExportMetricsServiceResponse.parseFrom(response.getResponse().array())
.getPartialSuccess();
assertThat(metricsServiceResponse.getRejectedDataPoints(), equalTo(1L));
assertThat(metricsServiceResponse.getErrorMessage(), containsString("bad request"));
// the error message contains only one message per unique index and error status
assertThat(metricsServiceResponse.getErrorMessage(), containsString("bad request 1"));
assertThat(metricsServiceResponse.getErrorMessage(), not(containsString("bad request 2")));
assertThat(metricsServiceResponse.getErrorMessage(), containsString("bad request 3"));
assertThat(metricsServiceResponse.getErrorMessage(), containsString("internal server error"));
}

public void testBulkError() throws Exception {
Expand All @@ -113,7 +127,8 @@ private void assertExceptionStatus(Exception exception, RestStatus restStatus) t
MetricsResponse response = executeRequest(createMetricsRequest(createMetric()), exception);

assertThat(response.getStatus(), equalTo(restStatus));
ExportMetricsPartialSuccess metricsServiceResponse = ExportMetricsPartialSuccess.parseFrom(response.getResponse().array());
ExportMetricsPartialSuccess metricsServiceResponse = ExportMetricsServiceResponse.parseFrom(response.getResponse().array())
.getPartialSuccess();
assertThat(metricsServiceResponse.getRejectedDataPoints(), equalTo(1L));
assertThat(metricsServiceResponse.getErrorMessage(), equalTo(exception.getMessage()));
}
Expand Down Expand Up @@ -172,11 +187,11 @@ private static BulkItemResponse successResponse() {
return BulkItemResponse.success(-1, DocWriteRequest.OpType.CREATE, mock(DocWriteResponse.class));
}

private static BulkItemResponse failureResponse(RestStatus restStatus, String failureMessage) {
private static BulkItemResponse failureResponse(String index, RestStatus restStatus, String failureMessage) {
return BulkItemResponse.failure(
-1,
DocWriteRequest.OpType.CREATE,
new BulkItemResponse.Failure("index", "id", new RuntimeException(failureMessage), restStatus)
new BulkItemResponse.Failure(index, "id", new RuntimeException(failureMessage), restStatus)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void testGroupingSameGroup() throws Exception {
context.groupDataPoints(metricsRequest);
assertEquals(6, context.totalDataPoints());
assertEquals(0, context.getIgnoredDataPoints());
assertEquals("", context.getIgnoredDataPointsMessage());
assertEquals("", context.getIgnoredDataPointsMessage(10));

AtomicInteger groupCount = new AtomicInteger(0);
context.consume(dataPointGroup -> groupCount.incrementAndGet());
Expand All @@ -96,7 +96,7 @@ public void testGroupingDifferentTargetIndex() throws Exception {
context.groupDataPoints(metricsRequest);
assertEquals(2, context.totalDataPoints());
assertEquals(0, context.getIgnoredDataPoints());
assertEquals("", context.getIgnoredDataPointsMessage());
assertEquals("", context.getIgnoredDataPointsMessage(10));

AtomicInteger groupCount = new AtomicInteger(0);
List<String> targetIndexes = new ArrayList<>();
Expand All @@ -119,7 +119,7 @@ public void testGroupingDifferentGroupUnit() throws Exception {
context.groupDataPoints(metricsRequest);
assertEquals(2, context.totalDataPoints());
assertEquals(0, context.getIgnoredDataPoints());
assertEquals("", context.getIgnoredDataPointsMessage());
assertEquals("", context.getIgnoredDataPointsMessage(10));

AtomicInteger groupCount = new AtomicInteger(0);
context.consume(dataPointGroup -> groupCount.incrementAndGet());
Expand All @@ -136,7 +136,7 @@ public void testGroupingDuplicateNameSameTimeSeries() throws Exception {
context.groupDataPoints(metricsRequest);
assertEquals(2, context.totalDataPoints());
assertEquals(1, context.getIgnoredDataPoints());
assertThat(context.getIgnoredDataPointsMessage(), containsString("Duplicate metric name 'system.cpu.usage' for timestamp"));
assertThat(context.getIgnoredDataPointsMessage(10), containsString("Duplicate metric name 'system.cpu.usage' for timestamp"));

AtomicInteger groupCount = new AtomicInteger(0);
context.consume(dataPointGroup -> groupCount.incrementAndGet());
Expand Down Expand Up @@ -184,7 +184,7 @@ public void testGroupingDifferentResource() throws Exception {
context.groupDataPoints(ExportMetricsServiceRequest.newBuilder().addAllResourceMetrics(List.of(resource1, resource2)).build());
assertEquals(2, context.totalDataPoints());
assertEquals(0, context.getIgnoredDataPoints());
assertEquals("", context.getIgnoredDataPointsMessage());
assertEquals("", context.getIgnoredDataPointsMessage(10));

AtomicInteger groupCount = new AtomicInteger(0);
context.consume(dataPointGroup -> groupCount.incrementAndGet());
Expand Down Expand Up @@ -216,7 +216,7 @@ public void testGroupingDifferentScope() throws Exception {
context.groupDataPoints(ExportMetricsServiceRequest.newBuilder().addAllResourceMetrics(List.of(resource1, resource2)).build());
assertEquals(2, context.totalDataPoints());
assertEquals(0, context.getIgnoredDataPoints());
assertEquals("", context.getIgnoredDataPointsMessage());
assertEquals("", context.getIgnoredDataPointsMessage(10));

AtomicInteger groupCount = new AtomicInteger(0);
context.consume(dataPointGroup -> groupCount.incrementAndGet());
Expand All @@ -234,7 +234,7 @@ public void testGroupingDifferentGroupTimestamp() throws Exception {
context.groupDataPoints(metricsRequest);
assertEquals(2, context.totalDataPoints());
assertEquals(0, context.getIgnoredDataPoints());
assertEquals("", context.getIgnoredDataPointsMessage());
assertEquals("", context.getIgnoredDataPointsMessage(10));

AtomicInteger groupCount = new AtomicInteger(0);
context.consume(dataPointGroup -> groupCount.incrementAndGet());
Expand All @@ -256,7 +256,7 @@ public void testGroupingDifferentGroupAttributes() throws Exception {
context.groupDataPoints(metricsRequest);
assertEquals(2, context.totalDataPoints());
assertEquals(0, context.getIgnoredDataPoints());
assertEquals("", context.getIgnoredDataPointsMessage());
assertEquals("", context.getIgnoredDataPointsMessage(10));

AtomicInteger groupCount = new AtomicInteger(0);
context.consume(dataPointGroup -> groupCount.incrementAndGet());
Expand All @@ -280,7 +280,7 @@ public void testReceiverBasedRouting() throws Exception {
context.groupDataPoints(ExportMetricsServiceRequest.newBuilder().addAllResourceMetrics(List.of(resource)).build());
assertEquals(1, context.totalDataPoints());
assertEquals(0, context.getIgnoredDataPoints());
assertEquals("", context.getIgnoredDataPointsMessage());
assertEquals("", context.getIgnoredDataPointsMessage(10));

List<String> targetIndexes = new ArrayList<>();
context.consume(dataPointGroup -> targetIndexes.add(dataPointGroup.targetIndex().index()));
Expand All @@ -303,7 +303,7 @@ public void testReceiverBasedRoutingWithoutTrailingSlash() throws Exception {
context.groupDataPoints(ExportMetricsServiceRequest.newBuilder().addAllResourceMetrics(List.of(resource)).build());
assertEquals(1, context.totalDataPoints());
assertEquals(0, context.getIgnoredDataPoints());
assertEquals("", context.getIgnoredDataPointsMessage());
assertEquals("", context.getIgnoredDataPointsMessage(10));

List<String> targetIndexes = new ArrayList<>();
context.consume(dataPointGroup -> targetIndexes.add(dataPointGroup.targetIndex().index()));
Expand Down