diff --git a/docs/changelog/137718.yaml b/docs/changelog/137718.yaml new file mode 100644 index 0000000000000..a40596209ac6d --- /dev/null +++ b/docs/changelog/137718.yaml @@ -0,0 +1,5 @@ +pr: 137718 +summary: "OTLP: return correct response type for partial successes" +area: TSDB +type: bug +issues: [] diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java index 3dcc22e3229bb..17c5f76ee4714 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java @@ -11,8 +11,6 @@ import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; -import com.google.protobuf.MessageLite; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.BytesRef; @@ -49,7 +47,9 @@ import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; import java.io.IOException; +import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; /** * Transport action for handling OpenTelemetry Protocol (OTLP) Metrics requests. @@ -68,6 +68,7 @@ public class OTLPMetricsTransportAction extends HandledTransportAction< public static final ActionType TYPE = new ActionType<>(NAME); private static final Logger logger = LogManager.getLogger(OTLPMetricsTransportAction.class); + public static final int IGNORED_DATA_POINTS_MESSAGE_LIMIT = 10; private final Client client; @Inject @@ -136,6 +137,7 @@ private void addIndexRequest( .setRequireDataStream(true) .source(xContentBuilder) .tsid(tsid) + .setIncludeSourceOnError(false) .setDynamicTemplates(dynamicTemplates) ); } @@ -158,7 +160,10 @@ private static void handlePartialSuccess(ActionListener listene // (i.e. when the server accepts only parts of the data and rejects the rest), // the server MUST respond with HTTP 200 OK. // https://opentelemetry.io/docs/specs/otlp/#partial-success-1 - MessageLite response = responseWithRejectedDataPoints(context.getIgnoredDataPoints(), context.getIgnoredDataPointsMessage()); + ExportMetricsServiceResponse response = responseWithRejectedDataPoints( + context.getIgnoredDataPoints(), + context.getIgnoredDataPointsMessage(IGNORED_DATA_POINTS_MESSAGE_LIMIT) + ); listener.onResponse(new MetricsResponse(RestStatus.BAD_REQUEST, response)); } @@ -167,6 +172,7 @@ private static void handlePartialSuccess( DataPointGroupingContext context, ActionListener listener ) { + Map> failureGroups = new HashMap<>(); // If the request is only partially accepted // (i.e. when the server accepts only parts of the data and rejects the rest), // the server MUST respond with HTTP 200 OK. @@ -174,22 +180,54 @@ private static void handlePartialSuccess( RestStatus status = RestStatus.OK; int failures = 0; for (BulkItemResponse bulkItemResponse : bulkItemResponses.getItems()) { - failures += bulkItemResponse.isFailed() ? 1 : 0; - if (bulkItemResponse.isFailed() && bulkItemResponse.getFailure().getStatus() == RestStatus.TOO_MANY_REQUESTS) { - // If the server receives more requests than the client is allowed or the server is overloaded, - // the server SHOULD respond with HTTP 429 Too Many Requests or HTTP 503 Service Unavailable - // and MAY include “Retry-After” header with a recommended time interval in seconds to wait before retrying. - // https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling - status = RestStatus.TOO_MANY_REQUESTS; + BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); + if (failure != null) { + // we're counting each document as one data point here + // which is an approximation since one document can represent multiple data points + failures++; + if (failure.getStatus() == RestStatus.TOO_MANY_REQUESTS) { + // If the server receives more requests than the client is allowed or the server is overloaded, + // the server SHOULD respond with HTTP 429 Too Many Requests or HTTP 503 Service Unavailable + // and MAY include “Retry-After” header with a recommended time interval in seconds to wait before retrying. + // https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling + status = RestStatus.TOO_MANY_REQUESTS; + } + FailureGroup failureGroup = failureGroups.computeIfAbsent(failure.getIndex(), k -> new HashMap<>()) + .computeIfAbsent(failure.getStatus(), k -> new FailureGroup(new AtomicInteger(0), failure.getMessage())); + failureGroup.failureCount().incrementAndGet(); } } - MessageLite response = responseWithRejectedDataPoints( + if (bulkItemResponses.getItems().length == failures) { + // all data points failed, so we report total data points as failures + failures = context.totalDataPoints(); + } + StringBuilder failureMessageBuilder = new StringBuilder(); + for (Map.Entry> indexEntry : failureGroups.entrySet()) { + String indexName = indexEntry.getKey(); + for (Map.Entry statusEntry : indexEntry.getValue().entrySet()) { + RestStatus restStatus = statusEntry.getKey(); + FailureGroup failureGroup = statusEntry.getValue(); + failureMessageBuilder.append("Index [") + .append(indexName) + .append("] returned status [") + .append(restStatus) + .append("] for ") + .append(failureGroup.failureCount()) + .append(" documents. Sample error message: "); + failureMessageBuilder.append(failureGroup.failureMessageSample()); + failureMessageBuilder.append("\n"); + } + } + failureMessageBuilder.append(context.getIgnoredDataPointsMessage(10)); + ExportMetricsServiceResponse response = responseWithRejectedDataPoints( failures + context.getIgnoredDataPoints(), - bulkItemResponses.buildFailureMessage() + context.getIgnoredDataPointsMessage() + failureMessageBuilder.toString() ); listener.onResponse(new MetricsResponse(status, response)); } + record FailureGroup(AtomicInteger failureCount, String failureMessageSample) {} + private static void handleFailure(ActionListener listener, Exception e, DataPointGroupingContext context) { // https://opentelemetry.io/docs/specs/otlp/#failures-1 // If the processing of the request fails, @@ -199,12 +237,12 @@ private static void handleFailure(ActionListener listener, Exce ); } - private static ExportMetricsPartialSuccess responseWithRejectedDataPoints(int rejectedDataPoints, String message) { - return ExportMetricsServiceResponse.newBuilder() - .getPartialSuccessBuilder() + private static ExportMetricsServiceResponse responseWithRejectedDataPoints(int rejectedDataPoints, String message) { + ExportMetricsPartialSuccess partialSuccess = ExportMetricsPartialSuccess.newBuilder() .setRejectedDataPoints(rejectedDataPoints) .setErrorMessage(message) .build(); + return ExportMetricsServiceResponse.newBuilder().setPartialSuccess(partialSuccess).build(); } public static class MetricsRequest extends ActionRequest implements CompositeIndicesRequest { @@ -229,7 +267,7 @@ public static class MetricsResponse extends ActionResponse { private final BytesReference response; private final RestStatus status; - public MetricsResponse(RestStatus status, MessageLite response) { + public MetricsResponse(RestStatus status, ExportMetricsServiceResponse response) { this(status, new BytesArray(response.toByteArray())); } diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java index 3dad2dfcfd945..f4fc34a050bd4 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java @@ -119,8 +119,23 @@ public int getIgnoredDataPoints() { return ignoredDataPoints; } - public String getIgnoredDataPointsMessage() { - return ignoredDataPointMessages.isEmpty() ? "" : String.join("\n", ignoredDataPointMessages); + public String getIgnoredDataPointsMessage(int limit) { + if (ignoredDataPointMessages.isEmpty()) { + return ""; + } + StringBuilder sb = new StringBuilder(); + sb.append("Ignored ").append(ignoredDataPoints).append(" data points due to the following reasons:\n"); + int count = 0; + for (String message : ignoredDataPointMessages) { + sb.append(" - ").append(message).append("\n"); + count++; + if (count >= limit) { + sb.append(" - ... and more\n"); + break; + } + } + return sb.toString(); + } private ResourceGroup getOrCreateResourceGroup(ResourceMetrics resourceMetrics) { diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportActionTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportActionTests.java index 648df30555d27..d19b2b1f5d569 100644 --- a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportActionTests.java +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportActionTests.java @@ -37,6 +37,7 @@ import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.keyValue; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; @@ -76,12 +77,13 @@ public void testSuccessEmptyRequest() throws Exception { public void test429() throws Exception { BulkItemResponse[] bulkItemResponses = new BulkItemResponse[] { - failureResponse(RestStatus.TOO_MANY_REQUESTS, "too many requests"), + failureResponse("metrics-generic.otel-default", RestStatus.TOO_MANY_REQUESTS, "too many requests"), successResponse() }; MetricsResponse response = executeRequest(createMetricsRequest(createMetric()), new BulkResponse(bulkItemResponses, 0)); assertThat(response.getStatus(), equalTo(RestStatus.TOO_MANY_REQUESTS)); - ExportMetricsPartialSuccess metricsServiceResponse = ExportMetricsPartialSuccess.parseFrom(response.getResponse().array()); + ExportMetricsPartialSuccess metricsServiceResponse = ExportMetricsServiceResponse.parseFrom(response.getResponse().array()) + .getPartialSuccess(); assertThat( metricsServiceResponse.getRejectedDataPoints(), equalTo(Arrays.stream(bulkItemResponses).filter(BulkItemResponse::isFailed).count()) @@ -92,13 +94,25 @@ public void test429() throws Exception { public void testPartialSuccess() throws Exception { MetricsResponse response = executeRequest( createMetricsRequest(createMetric()), - new BulkResponse(new BulkItemResponse[] { failureResponse(RestStatus.BAD_REQUEST, "bad request") }, 0) + new BulkResponse( + new BulkItemResponse[] { + failureResponse("metrics-generic.otel-default", RestStatus.BAD_REQUEST, "bad request 1"), + failureResponse("metrics-generic.otel-default", RestStatus.BAD_REQUEST, "bad request 2"), + failureResponse("metrics-hostmetricsreceiver.otel-default", RestStatus.BAD_REQUEST, "bad request 3"), + failureResponse("metrics-generic.otel-default", RestStatus.INTERNAL_SERVER_ERROR, "internal server error") }, + 0 + ) ); assertThat(response.getStatus(), equalTo(RestStatus.OK)); - ExportMetricsPartialSuccess metricsServiceResponse = ExportMetricsPartialSuccess.parseFrom(response.getResponse().array()); + ExportMetricsPartialSuccess metricsServiceResponse = ExportMetricsServiceResponse.parseFrom(response.getResponse().array()) + .getPartialSuccess(); assertThat(metricsServiceResponse.getRejectedDataPoints(), equalTo(1L)); - assertThat(metricsServiceResponse.getErrorMessage(), containsString("bad request")); + // the error message contains only one message per unique index and error status + assertThat(metricsServiceResponse.getErrorMessage(), containsString("bad request 1")); + assertThat(metricsServiceResponse.getErrorMessage(), not(containsString("bad request 2"))); + assertThat(metricsServiceResponse.getErrorMessage(), containsString("bad request 3")); + assertThat(metricsServiceResponse.getErrorMessage(), containsString("internal server error")); } public void testBulkError() throws Exception { @@ -113,7 +127,8 @@ private void assertExceptionStatus(Exception exception, RestStatus restStatus) t MetricsResponse response = executeRequest(createMetricsRequest(createMetric()), exception); assertThat(response.getStatus(), equalTo(restStatus)); - ExportMetricsPartialSuccess metricsServiceResponse = ExportMetricsPartialSuccess.parseFrom(response.getResponse().array()); + ExportMetricsPartialSuccess metricsServiceResponse = ExportMetricsServiceResponse.parseFrom(response.getResponse().array()) + .getPartialSuccess(); assertThat(metricsServiceResponse.getRejectedDataPoints(), equalTo(1L)); assertThat(metricsServiceResponse.getErrorMessage(), equalTo(exception.getMessage())); } @@ -172,11 +187,11 @@ private static BulkItemResponse successResponse() { return BulkItemResponse.success(-1, DocWriteRequest.OpType.CREATE, mock(DocWriteResponse.class)); } - private static BulkItemResponse failureResponse(RestStatus restStatus, String failureMessage) { + private static BulkItemResponse failureResponse(String index, RestStatus restStatus, String failureMessage) { return BulkItemResponse.failure( -1, DocWriteRequest.OpType.CREATE, - new BulkItemResponse.Failure("index", "id", new RuntimeException(failureMessage), restStatus) + new BulkItemResponse.Failure(index, "id", new RuntimeException(failureMessage), restStatus) ); } diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java index 2b2071acc171a..85b91457e0ec1 100644 --- a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java @@ -74,7 +74,7 @@ public void testGroupingSameGroup() throws Exception { context.groupDataPoints(metricsRequest); assertEquals(6, context.totalDataPoints()); assertEquals(0, context.getIgnoredDataPoints()); - assertEquals("", context.getIgnoredDataPointsMessage()); + assertEquals("", context.getIgnoredDataPointsMessage(10)); AtomicInteger groupCount = new AtomicInteger(0); context.consume(dataPointGroup -> groupCount.incrementAndGet()); @@ -96,7 +96,7 @@ public void testGroupingDifferentTargetIndex() throws Exception { context.groupDataPoints(metricsRequest); assertEquals(2, context.totalDataPoints()); assertEquals(0, context.getIgnoredDataPoints()); - assertEquals("", context.getIgnoredDataPointsMessage()); + assertEquals("", context.getIgnoredDataPointsMessage(10)); AtomicInteger groupCount = new AtomicInteger(0); List targetIndexes = new ArrayList<>(); @@ -119,7 +119,7 @@ public void testGroupingDifferentGroupUnit() throws Exception { context.groupDataPoints(metricsRequest); assertEquals(2, context.totalDataPoints()); assertEquals(0, context.getIgnoredDataPoints()); - assertEquals("", context.getIgnoredDataPointsMessage()); + assertEquals("", context.getIgnoredDataPointsMessage(10)); AtomicInteger groupCount = new AtomicInteger(0); context.consume(dataPointGroup -> groupCount.incrementAndGet()); @@ -136,7 +136,7 @@ public void testGroupingDuplicateNameSameTimeSeries() throws Exception { context.groupDataPoints(metricsRequest); assertEquals(2, context.totalDataPoints()); assertEquals(1, context.getIgnoredDataPoints()); - assertThat(context.getIgnoredDataPointsMessage(), containsString("Duplicate metric name 'system.cpu.usage' for timestamp")); + assertThat(context.getIgnoredDataPointsMessage(10), containsString("Duplicate metric name 'system.cpu.usage' for timestamp")); AtomicInteger groupCount = new AtomicInteger(0); context.consume(dataPointGroup -> groupCount.incrementAndGet()); @@ -184,7 +184,7 @@ public void testGroupingDifferentResource() throws Exception { context.groupDataPoints(ExportMetricsServiceRequest.newBuilder().addAllResourceMetrics(List.of(resource1, resource2)).build()); assertEquals(2, context.totalDataPoints()); assertEquals(0, context.getIgnoredDataPoints()); - assertEquals("", context.getIgnoredDataPointsMessage()); + assertEquals("", context.getIgnoredDataPointsMessage(10)); AtomicInteger groupCount = new AtomicInteger(0); context.consume(dataPointGroup -> groupCount.incrementAndGet()); @@ -216,7 +216,7 @@ public void testGroupingDifferentScope() throws Exception { context.groupDataPoints(ExportMetricsServiceRequest.newBuilder().addAllResourceMetrics(List.of(resource1, resource2)).build()); assertEquals(2, context.totalDataPoints()); assertEquals(0, context.getIgnoredDataPoints()); - assertEquals("", context.getIgnoredDataPointsMessage()); + assertEquals("", context.getIgnoredDataPointsMessage(10)); AtomicInteger groupCount = new AtomicInteger(0); context.consume(dataPointGroup -> groupCount.incrementAndGet()); @@ -234,7 +234,7 @@ public void testGroupingDifferentGroupTimestamp() throws Exception { context.groupDataPoints(metricsRequest); assertEquals(2, context.totalDataPoints()); assertEquals(0, context.getIgnoredDataPoints()); - assertEquals("", context.getIgnoredDataPointsMessage()); + assertEquals("", context.getIgnoredDataPointsMessage(10)); AtomicInteger groupCount = new AtomicInteger(0); context.consume(dataPointGroup -> groupCount.incrementAndGet()); @@ -256,7 +256,7 @@ public void testGroupingDifferentGroupAttributes() throws Exception { context.groupDataPoints(metricsRequest); assertEquals(2, context.totalDataPoints()); assertEquals(0, context.getIgnoredDataPoints()); - assertEquals("", context.getIgnoredDataPointsMessage()); + assertEquals("", context.getIgnoredDataPointsMessage(10)); AtomicInteger groupCount = new AtomicInteger(0); context.consume(dataPointGroup -> groupCount.incrementAndGet()); @@ -280,7 +280,7 @@ public void testReceiverBasedRouting() throws Exception { context.groupDataPoints(ExportMetricsServiceRequest.newBuilder().addAllResourceMetrics(List.of(resource)).build()); assertEquals(1, context.totalDataPoints()); assertEquals(0, context.getIgnoredDataPoints()); - assertEquals("", context.getIgnoredDataPointsMessage()); + assertEquals("", context.getIgnoredDataPointsMessage(10)); List targetIndexes = new ArrayList<>(); context.consume(dataPointGroup -> targetIndexes.add(dataPointGroup.targetIndex().index())); @@ -303,7 +303,7 @@ public void testReceiverBasedRoutingWithoutTrailingSlash() throws Exception { context.groupDataPoints(ExportMetricsServiceRequest.newBuilder().addAllResourceMetrics(List.of(resource)).build()); assertEquals(1, context.totalDataPoints()); assertEquals(0, context.getIgnoredDataPoints()); - assertEquals("", context.getIgnoredDataPointsMessage()); + assertEquals("", context.getIgnoredDataPointsMessage(10)); List targetIndexes = new ArrayList<>(); context.consume(dataPointGroup -> targetIndexes.add(dataPointGroup.targetIndex().index()));