diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpClientDecorator.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpClientDecorator.java index 77e96360435..ac9b20d10aa 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpClientDecorator.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpClientDecorator.java @@ -11,6 +11,8 @@ import datadog.trace.api.InstrumenterConfig; import datadog.trace.api.ProductActivation; import datadog.trace.api.appsec.HttpClientRequest; +import datadog.trace.api.datastreams.AgentDataStreamsMonitoring; +import datadog.trace.api.datastreams.DataStreamsTransactionExtractor; import datadog.trace.api.gateway.BlockResponseFunction; import datadog.trace.api.gateway.Flow; import datadog.trace.api.gateway.RequestContext; @@ -27,6 +29,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.BitSet; +import java.util.List; import java.util.Map; import java.util.function.BiFunction; import org.slf4j.Logger; @@ -70,6 +73,22 @@ protected boolean shouldSetResourceName() { public AgentSpan onRequest(final AgentSpan span, final REQUEST request) { if (request != null) { + // apply extractors if any (disabled if DSM is off) + AgentDataStreamsMonitoring dataStreamsMonitoring = + AgentTracer.get().getDataStreamsMonitoring(); + List extractorList = + dataStreamsMonitoring.getTransactionExtractorsByType( + DataStreamsTransactionExtractor.Type.HTTP_OUT_HEADERS); + if (extractorList != null) { + for (DataStreamsTransactionExtractor extractor : extractorList) { + String transactionId = getRequestHeader(request, extractor.getValue()); + if (transactionId != null && !transactionId.isEmpty()) { + dataStreamsMonitoring.trackTransaction(transactionId, extractor.getName()); + span.setTag(Tags.DSM_TRANSACTION_ID, transactionId); + span.setTag(Tags.DSM_TRANSACTION_CHECKPOINT, extractor.getName()); + } + } + } String method = method(request); span.setTag(Tags.HTTP_METHOD, method); diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java index 76163e9bb6f..8205f01210a 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java @@ -13,6 +13,8 @@ import datadog.context.propagation.Propagators; import datadog.trace.api.Config; import datadog.trace.api.DDTags; +import datadog.trace.api.datastreams.AgentDataStreamsMonitoring; +import datadog.trace.api.datastreams.DataStreamsTransactionExtractor; import datadog.trace.api.function.TriConsumer; import datadog.trace.api.function.TriFunction; import datadog.trace.api.gateway.BlockResponseFunction; @@ -39,6 +41,7 @@ import datadog.trace.bootstrap.instrumentation.decorator.http.ClientIpAddressResolver; import java.net.InetAddress; import java.util.BitSet; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -95,6 +98,10 @@ public abstract class HttpServerDecorator extractorList = + dataStreamsMonitoring.getTransactionExtractorsByType( + DataStreamsTransactionExtractor.Type.HTTP_IN_HEADERS); + if (extractorList != null) { + for (DataStreamsTransactionExtractor extractor : extractorList) { + String transactionId = getRequestHeader(request, extractor.getValue()); + if (transactionId != null) { + dataStreamsMonitoring.trackTransaction(transactionId, extractor.getName()); + span.setTag(Tags.DSM_TRANSACTION_ID, transactionId); + span.setTag(Tags.DSM_TRANSACTION_CHECKPOINT, extractor.getName()); + } + } + } + } + return span; } diff --git a/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecoratorTest.groovy b/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecoratorTest.groovy index 7c3efb1d48c..c56efc5f014 100644 --- a/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecoratorTest.groovy +++ b/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecoratorTest.groovy @@ -468,6 +468,11 @@ class HttpServerDecoratorTest extends ServerDecoratorTest { protected int status(Map m) { return m.status == null ? 0 : m.status } + + @Override + protected String getRequestHeader(Map map, String key) { + return map.getOrDefault(key, null) + } } } diff --git a/dd-java-agent/instrumentation-testing/src/main/groovy/datadog/trace/agent/test/InstrumentationSpecification.groovy b/dd-java-agent/instrumentation-testing/src/main/groovy/datadog/trace/agent/test/InstrumentationSpecification.groovy index 82bb9c1512f..ba0d60d27c3 100644 --- a/dd-java-agent/instrumentation-testing/src/main/groovy/datadog/trace/agent/test/InstrumentationSpecification.groovy +++ b/dd-java-agent/instrumentation-testing/src/main/groovy/datadog/trace/agent/test/InstrumentationSpecification.groovy @@ -1,5 +1,15 @@ package datadog.trace.agent.test +import static datadog.communication.http.OkHttpUtils.buildHttpClient +import static datadog.trace.api.ConfigDefaults.DEFAULT_AGENT_HOST +import static datadog.trace.api.ConfigDefaults.DEFAULT_AGENT_TIMEOUT +import static datadog.trace.api.ConfigDefaults.DEFAULT_TRACE_AGENT_PORT +import static datadog.trace.api.config.DebuggerConfig.DYNAMIC_INSTRUMENTATION_SNAPSHOT_URL +import static datadog.trace.api.config.DebuggerConfig.DYNAMIC_INSTRUMENTATION_VERIFY_BYTECODE +import static datadog.trace.api.config.TraceInstrumentationConfig.CODE_ORIGIN_FOR_SPANS_ENABLED +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious +import static datadog.trace.util.AgentThreadFactory.AgentThread.TASK_SCHEDULER + import ch.qos.logback.classic.Level import ch.qos.logback.classic.util.ContextInitializer import com.datadog.debugger.agent.ClassesToRetransformFinder @@ -33,6 +43,7 @@ import datadog.trace.api.TraceConfig import datadog.trace.api.config.GeneralConfig import datadog.trace.api.config.TracerConfig import datadog.trace.api.datastreams.AgentDataStreamsMonitoring +import datadog.trace.api.datastreams.DataStreamsTransactionExtractor import datadog.trace.api.sampling.SamplingRule import datadog.trace.api.time.SystemTimeSource import datadog.trace.bootstrap.ActiveSubsystems @@ -56,6 +67,13 @@ import de.thetaphi.forbiddenapis.SuppressForbidden import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import groovy.transform.stc.ClosureParams import groovy.transform.stc.SimpleType +import java.lang.instrument.ClassFileTransformer +import java.lang.instrument.Instrumentation +import java.nio.ByteBuffer +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException +import java.util.concurrent.atomic.AtomicInteger import net.bytebuddy.agent.ByteBuddyAgent import net.bytebuddy.agent.builder.AgentBuilder import net.bytebuddy.description.type.TypeDescription @@ -69,24 +87,6 @@ import org.slf4j.LoggerFactory import org.spockframework.mock.MockUtil import spock.lang.Shared -import java.lang.instrument.ClassFileTransformer -import java.lang.instrument.Instrumentation -import java.nio.ByteBuffer -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.TimeUnit -import java.util.concurrent.TimeoutException -import java.util.concurrent.atomic.AtomicInteger - -import static datadog.communication.http.OkHttpUtils.buildHttpClient -import static datadog.trace.api.ConfigDefaults.DEFAULT_AGENT_HOST -import static datadog.trace.api.ConfigDefaults.DEFAULT_AGENT_TIMEOUT -import static datadog.trace.api.ConfigDefaults.DEFAULT_TRACE_AGENT_PORT -import static datadog.trace.api.config.DebuggerConfig.DYNAMIC_INSTRUMENTATION_SNAPSHOT_URL -import static datadog.trace.api.config.DebuggerConfig.DYNAMIC_INSTRUMENTATION_VERIFY_BYTECODE -import static datadog.trace.api.config.TraceInstrumentationConfig.CODE_ORIGIN_FOR_SPANS_ENABLED -import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious -import static datadog.trace.util.AgentThreadFactory.AgentThread.TASK_SCHEDULER - /** * A specification that automatically applies instrumentation and exposes a global trace * writer. @@ -250,6 +250,10 @@ abstract class InstrumentationSpecification extends DDSpecification implements A List getTraceSamplingRules() { return null } + + List getDataStreamsTransactionExtractors() { + return null + } } boolean originalAppSecRuntimeValue diff --git a/dd-java-agent/instrumentation/akka/akka-http/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpServerDecorator.java b/dd-java-agent/instrumentation/akka/akka-http/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpServerDecorator.java index efa73e608cd..c4ffd1d83b0 100644 --- a/dd-java-agent/instrumentation/akka/akka-http/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpServerDecorator.java +++ b/dd-java-agent/instrumentation/akka/akka-http/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpServerDecorator.java @@ -88,6 +88,12 @@ protected int status(final HttpResponse httpResponse) { return httpResponse.status().intValue(); } + @Override + protected String getRequestHeader(HttpRequest httpRequest, String key) { + Optional header = httpRequest.getHeader(key); + return header.map(HttpHeader::value).orElse(null); + } + @Override protected boolean isAppSecOnResponseSeparate() { return true; diff --git a/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-10.0/src/main/java11/datadog/trace/instrumentation/jetty10/JettyDecorator.java b/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-10.0/src/main/java11/datadog/trace/instrumentation/jetty10/JettyDecorator.java index ed42375dd12..8a009440cb5 100644 --- a/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-10.0/src/main/java11/datadog/trace/instrumentation/jetty10/JettyDecorator.java +++ b/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-10.0/src/main/java11/datadog/trace/instrumentation/jetty10/JettyDecorator.java @@ -78,6 +78,11 @@ protected boolean isAppSecOnResponseSeparate() { return true; } + @Override + protected String getRequestHeader(final Request request, String key) { + return request.getHeader(key); + } + @Override protected BlockResponseFunction createBlockResponseFunction(Request request, Request connection) { return new JettyBlockResponseFunction(request); diff --git a/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-11.0/src/main/java11/datadog/trace/instrumentation/jetty11/JettyDecorator.java b/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-11.0/src/main/java11/datadog/trace/instrumentation/jetty11/JettyDecorator.java index 9b74d1f418e..014d32bb26a 100644 --- a/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-11.0/src/main/java11/datadog/trace/instrumentation/jetty11/JettyDecorator.java +++ b/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-11.0/src/main/java11/datadog/trace/instrumentation/jetty11/JettyDecorator.java @@ -78,6 +78,11 @@ protected boolean isAppSecOnResponseSeparate() { return true; } + @Override + protected String getRequestHeader(final Request request, String key) { + return request.getHeader(key); + } + public AgentSpan onResponse(AgentSpan span, HttpChannel channel) { Request request = channel.getRequest(); Response response = channel.getResponse(); diff --git a/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-12.0/src/main/java17/datadog/trace/instrumentation/jetty12/JettyDecorator.java b/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-12.0/src/main/java17/datadog/trace/instrumentation/jetty12/JettyDecorator.java index 979cb5d62a5..0450fa2b44e 100644 --- a/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-12.0/src/main/java17/datadog/trace/instrumentation/jetty12/JettyDecorator.java +++ b/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-12.0/src/main/java17/datadog/trace/instrumentation/jetty12/JettyDecorator.java @@ -83,6 +83,11 @@ protected int status(final Response response) { return response.getStatus(); } + @Override + protected String getRequestHeader(final Request request, String key) { + return request.getHeaders().get(key); + } + public AgentSpan onResponse(AgentSpan span, HttpChannelState channel) { Request request = channel.getRequest(); Response response = channel.getResponse(); diff --git a/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-7.0/src/main/java/datadog/trace/instrumentation/jetty70/JettyDecorator.java b/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-7.0/src/main/java/datadog/trace/instrumentation/jetty70/JettyDecorator.java index 4822293650c..7aa3f5a4c13 100644 --- a/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-7.0/src/main/java/datadog/trace/instrumentation/jetty70/JettyDecorator.java +++ b/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-7.0/src/main/java/datadog/trace/instrumentation/jetty70/JettyDecorator.java @@ -77,6 +77,11 @@ protected boolean isAppSecOnResponseSeparate() { return true; } + @Override + protected String getRequestHeader(final Request request, String key) { + return request.getHeader(key); + } + public AgentSpan onResponse(AgentSpan span, HttpConnection channel) { Request request = channel.getRequest(); Response response = channel.getResponse(); diff --git a/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-7.6/src/main/java/datadog/trace/instrumentation/jetty76/JettyDecorator.java b/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-7.6/src/main/java/datadog/trace/instrumentation/jetty76/JettyDecorator.java index f158ee1c418..b86623b8061 100644 --- a/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-7.6/src/main/java/datadog/trace/instrumentation/jetty76/JettyDecorator.java +++ b/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-7.6/src/main/java/datadog/trace/instrumentation/jetty76/JettyDecorator.java @@ -77,6 +77,11 @@ protected boolean isAppSecOnResponseSeparate() { return true; } + @Override + protected String getRequestHeader(final Request request, String key) { + return request.getHeader(key); + } + public AgentSpan onResponse(AgentSpan span, AbstractHttpConnection connection) { Request request = connection.getRequest(); Response response = connection.getResponse(); diff --git a/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-9.0/src/main/java/datadog/trace/instrumentation/jetty9/JettyDecorator.java b/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-9.0/src/main/java/datadog/trace/instrumentation/jetty9/JettyDecorator.java index 87b50fea628..6234b00dd5d 100644 --- a/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-9.0/src/main/java/datadog/trace/instrumentation/jetty9/JettyDecorator.java +++ b/dd-java-agent/instrumentation/jetty/jetty-server/jetty-server-9.0/src/main/java/datadog/trace/instrumentation/jetty9/JettyDecorator.java @@ -77,6 +77,11 @@ protected boolean isAppSecOnResponseSeparate() { return true; } + @Override + protected String getRequestHeader(final Request request, String key) { + return request.getHeader(key); + } + public AgentSpan onResponse(AgentSpan span, HttpChannel channel) { Request request = channel.getRequest(); Response response = channel.getResponse(); diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index ba66a01b6da..cec22bee967 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -27,14 +27,19 @@ import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.InstrumenterModule; import datadog.trace.api.Config; +import datadog.trace.api.datastreams.AgentDataStreamsMonitoring; import datadog.trace.api.datastreams.DataStreamsContext; import datadog.trace.api.datastreams.DataStreamsTags; +import datadog.trace.api.datastreams.DataStreamsTransactionExtractor; import datadog.trace.api.datastreams.StatsPoint; import datadog.trace.bootstrap.InstrumentationContext; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags; +import datadog.trace.bootstrap.instrumentation.api.Tags; +import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.Map; import net.bytebuddy.asm.Advice; import net.bytebuddy.matcher.ElementMatcher; @@ -44,6 +49,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.internals.Sender; +import org.apache.kafka.common.header.Header; import org.apache.kafka.common.record.RecordBatch; @AutoService(InstrumenterModule.class) @@ -178,6 +184,25 @@ record = if (TIME_IN_QUEUE_ENABLED) { setter.injectTimeInQueue(record.headers()); } + + // track transactions on produce + AgentDataStreamsMonitoring dataStreamsMonitoring = + AgentTracer.get().getDataStreamsMonitoring(); + List extractors = + dataStreamsMonitoring.getTransactionExtractorsByType( + DataStreamsTransactionExtractor.Type.KAFKA_PRODUCE_HEADERS); + if (extractors != null) { + for (DataStreamsTransactionExtractor extractor : extractors) { + Header header = record.headers().lastHeader(extractor.getValue()); + if (header != null && header.value() != null) { + String transactionId = new String(header.value(), StandardCharsets.UTF_8); + dataStreamsMonitoring.trackTransaction(transactionId, extractor.getName()); + span.setTag(Tags.DSM_TRANSACTION_ID, transactionId); + span.setTag(Tags.DSM_TRANSACTION_CHECKPOINT, extractor.getName()); + } + } + } + return activateSpan(span); } diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java index f634706ca5e..49bf03153d0 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java @@ -21,14 +21,20 @@ import datadog.context.propagation.Propagator; import datadog.context.propagation.Propagators; import datadog.trace.api.Config; +import datadog.trace.api.datastreams.AgentDataStreamsMonitoring; import datadog.trace.api.datastreams.DataStreamsContext; import datadog.trace.api.datastreams.DataStreamsTags; +import datadog.trace.api.datastreams.DataStreamsTransactionExtractor; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags; +import datadog.trace.bootstrap.instrumentation.api.Tags; +import java.nio.charset.StandardCharsets; import java.util.Iterator; +import java.util.List; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -128,6 +134,23 @@ protected void startNewRecordSpan(ConsumerRecord val) { if (null != queueSpan) { queueSpan.finish(); } + + AgentDataStreamsMonitoring dataStreamsMonitoring = + AgentTracer.get().getDataStreamsMonitoring(); + List extractors = + dataStreamsMonitoring.getTransactionExtractorsByType( + DataStreamsTransactionExtractor.Type.KAFKA_CONSUME_HEADERS); + if (extractors != null) { + for (DataStreamsTransactionExtractor extractor : extractors) { + Header header = val.headers().lastHeader(extractor.getValue()); + if (header != null && header.value() != null) { + String transactionId = new String(header.value(), StandardCharsets.UTF_8); + dataStreamsMonitoring.trackTransaction(transactionId, extractor.getName()); + span.setTag(Tags.DSM_TRANSACTION_ID, transactionId); + span.setTag(Tags.DSM_TRANSACTION_CHECKPOINT, extractor.getName()); + } + } + } } } catch (final Exception e) { log.debug("Error starting new record span", e); diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterator.java b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterator.java index 1d5e93031e6..bbf4a6361e5 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterator.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterator.java @@ -16,16 +16,22 @@ import datadog.context.propagation.Propagator; import datadog.context.propagation.Propagators; import datadog.trace.api.Config; +import datadog.trace.api.datastreams.AgentDataStreamsMonitoring; import datadog.trace.api.datastreams.DataStreamsContext; import datadog.trace.api.datastreams.DataStreamsTags; +import datadog.trace.api.datastreams.DataStreamsTransactionExtractor; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags; +import datadog.trace.bootstrap.instrumentation.api.Tags; import datadog.trace.instrumentation.kafka_common.StreamingContext; import datadog.trace.instrumentation.kafka_common.Utils; +import java.nio.charset.StandardCharsets; import java.util.Iterator; +import java.util.List; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -128,6 +134,23 @@ protected void startNewRecordSpan(ConsumerRecord val) { if (null != queueSpan) { queueSpan.finish(); } + + AgentDataStreamsMonitoring dataStreamsMonitoring = + AgentTracer.get().getDataStreamsMonitoring(); + List extractors = + dataStreamsMonitoring.getTransactionExtractorsByType( + DataStreamsTransactionExtractor.Type.KAFKA_CONSUME_HEADERS); + if (extractors != null) { + for (DataStreamsTransactionExtractor extractor : extractors) { + Header header = val.headers().lastHeader(extractor.getValue()); + if (header != null && header.value() != null) { + String transactionId = new String(header.value(), StandardCharsets.UTF_8); + dataStreamsMonitoring.trackTransaction(transactionId, extractor.getName()); + span.setTag(Tags.DSM_TRANSACTION_ID, transactionId); + span.setTag(Tags.DSM_TRANSACTION_CHECKPOINT, extractor.getName()); + } + } + } } } catch (final Exception e) { log.debug("Error starting new record span", e); diff --git a/dd-java-agent/instrumentation/restlet-2.2/src/main/java/datadog/trace/instrumentation/restlet/RestletDecorator.java b/dd-java-agent/instrumentation/restlet-2.2/src/main/java/datadog/trace/instrumentation/restlet/RestletDecorator.java index 58c37ffe5c2..627def26a91 100644 --- a/dd-java-agent/instrumentation/restlet-2.2/src/main/java/datadog/trace/instrumentation/restlet/RestletDecorator.java +++ b/dd-java-agent/instrumentation/restlet-2.2/src/main/java/datadog/trace/instrumentation/restlet/RestletDecorator.java @@ -67,4 +67,9 @@ protected int peerPort(final HttpExchange exchange) { protected int status(final HttpExchange exchange) { return exchange.getResponseCode(); } + + @Override + protected String getRequestHeader(final HttpExchange exchange, String key) { + return exchange.getRequestHeaders().getFirst(key); + } } diff --git a/dd-java-agent/instrumentation/servlet/javax-servlet/javax-servlet-2.2/src/main/java/datadog/trace/instrumentation/servlet2/Servlet2Decorator.java b/dd-java-agent/instrumentation/servlet/javax-servlet/javax-servlet-2.2/src/main/java/datadog/trace/instrumentation/servlet2/Servlet2Decorator.java index aab83420d51..b93877f73ed 100644 --- a/dd-java-agent/instrumentation/servlet/javax-servlet/javax-servlet-2.2/src/main/java/datadog/trace/instrumentation/servlet2/Servlet2Decorator.java +++ b/dd-java-agent/instrumentation/servlet/javax-servlet/javax-servlet-2.2/src/main/java/datadog/trace/instrumentation/servlet2/Servlet2Decorator.java @@ -69,6 +69,11 @@ protected int peerPort(final HttpServletRequest httpServletRequest) { return 0; } + @Override + protected String getRequestHeader(final HttpServletRequest request, String key) { + return request.getHeader(key); + } + @Override protected int status(final Integer status) { return status; diff --git a/dd-java-agent/instrumentation/servlet/javax-servlet/javax-servlet-3.0/src/main/java/datadog/trace/instrumentation/servlet3/Servlet3Decorator.java b/dd-java-agent/instrumentation/servlet/javax-servlet/javax-servlet-3.0/src/main/java/datadog/trace/instrumentation/servlet3/Servlet3Decorator.java index c153e2c747e..61a8d22a2b8 100644 --- a/dd-java-agent/instrumentation/servlet/javax-servlet/javax-servlet-3.0/src/main/java/datadog/trace/instrumentation/servlet3/Servlet3Decorator.java +++ b/dd-java-agent/instrumentation/servlet/javax-servlet/javax-servlet-3.0/src/main/java/datadog/trace/instrumentation/servlet3/Servlet3Decorator.java @@ -77,6 +77,11 @@ protected String requestedSessionId(final HttpServletRequest request) { return request.getRequestedSessionId(); } + @Override + protected String getRequestHeader(final HttpServletRequest request, String key) { + return request.getHeader(key); + } + @Override public AgentSpan onRequest( final AgentSpan span, diff --git a/dd-java-agent/instrumentation/spring/spring-webmvc/spring-webmvc-3.1/src/main/java/datadog/trace/instrumentation/springweb/SpringWebHttpServerDecorator.java b/dd-java-agent/instrumentation/spring/spring-webmvc/spring-webmvc-3.1/src/main/java/datadog/trace/instrumentation/springweb/SpringWebHttpServerDecorator.java index e4e759e2cc2..52ce8855048 100644 --- a/dd-java-agent/instrumentation/spring/spring-webmvc/spring-webmvc-3.1/src/main/java/datadog/trace/instrumentation/springweb/SpringWebHttpServerDecorator.java +++ b/dd-java-agent/instrumentation/spring/spring-webmvc/spring-webmvc-3.1/src/main/java/datadog/trace/instrumentation/springweb/SpringWebHttpServerDecorator.java @@ -92,6 +92,11 @@ protected int status(final HttpServletResponse httpServletResponse) { return httpServletResponse.getStatus(); } + @Override + protected String getRequestHeader(final HttpServletRequest request, String key) { + return request.getHeader(key); + } + @Override public AgentSpan onRequest( final AgentSpan span, diff --git a/dd-java-agent/instrumentation/spring/spring-webmvc/spring-webmvc-6.0/src/main/java17/datadog/trace/instrumentation/springweb6/SpringWebHttpServerDecorator.java b/dd-java-agent/instrumentation/spring/spring-webmvc/spring-webmvc-6.0/src/main/java17/datadog/trace/instrumentation/springweb6/SpringWebHttpServerDecorator.java index 9811ab6c662..79b89c1aad5 100644 --- a/dd-java-agent/instrumentation/spring/spring-webmvc/spring-webmvc-6.0/src/main/java17/datadog/trace/instrumentation/springweb6/SpringWebHttpServerDecorator.java +++ b/dd-java-agent/instrumentation/spring/spring-webmvc/spring-webmvc-6.0/src/main/java17/datadog/trace/instrumentation/springweb6/SpringWebHttpServerDecorator.java @@ -86,6 +86,11 @@ protected String peerHostIP(final HttpServletRequest httpServletRequest) { return httpServletRequest.getRemoteAddr(); } + @Override + protected String getRequestHeader(final HttpServletRequest request, String key) { + return request.getHeader(key); + } + @Override protected int peerPort(final HttpServletRequest httpServletRequest) { return httpServletRequest.getRemotePort(); diff --git a/dd-java-agent/instrumentation/tomcat/tomcat-common/src/main/java/datadog/trace/instrumentation/tomcat/TomcatDecorator.java b/dd-java-agent/instrumentation/tomcat/tomcat-common/src/main/java/datadog/trace/instrumentation/tomcat/TomcatDecorator.java index 8beb6991b45..2ab1d6693f1 100644 --- a/dd-java-agent/instrumentation/tomcat/tomcat-common/src/main/java/datadog/trace/instrumentation/tomcat/TomcatDecorator.java +++ b/dd-java-agent/instrumentation/tomcat/tomcat-common/src/main/java/datadog/trace/instrumentation/tomcat/TomcatDecorator.java @@ -72,6 +72,11 @@ protected int peerPort(final Request request) { return request.getRemotePort(); } + @Override + protected String getRequestHeader(final Request request, String key) { + return request.getHeader(key); + } + @Override protected int status(final Response response) { int status = response.getStatus(); diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java index 2e441e2677b..2ecba77d301 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java @@ -88,6 +88,8 @@ public final class GeneralConfig { public static final String DATA_STREAMS_ENABLED = "data.streams.enabled"; public static final String DATA_STREAMS_BUCKET_DURATION_SECONDS = "data.streams.bucket_duration.seconds"; + public static final String DATA_STREAMS_TRANSACTION_EXTRACTORS = + "data.streams.transaction_extractors"; public static final String TELEMETRY_ENABLED = "instrumentation.telemetry.enabled"; public static final String TELEMETRY_HEARTBEAT_INTERVAL = "telemetry.heartbeat.interval"; diff --git a/dd-trace-api/src/main/java/datadog/trace/api/experimental/DataStreamsCheckpointer.java b/dd-trace-api/src/main/java/datadog/trace/api/experimental/DataStreamsCheckpointer.java index 574b9fdb2a0..6ee9365f336 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/experimental/DataStreamsCheckpointer.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/experimental/DataStreamsCheckpointer.java @@ -24,6 +24,12 @@ static DataStreamsCheckpointer get() { */ void setConsumeCheckpoint(String type, String source, DataStreamsContextCarrier carrier); + /** + * @param transactionId Transaction ID to track. + * @param checkpointName Unique checkpoint name. + */ + void trackTransaction(String transactionId, String checkpointName); + /** * @param type The type of the checkpoint, usually the streaming technology being used. Examples: * kafka, kinesis, sns etc. @@ -45,5 +51,8 @@ public void setConsumeCheckpoint( @Override public void setProduceCheckpoint( String type, String target, DataStreamsContextCarrier carrier) {} + + @Override + public void trackTransaction(String transactionId, String checkpointName) {} } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java index 05c213a9c18..33a624fe3b6 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java @@ -86,6 +86,7 @@ import datadog.trace.context.TraceScope; import datadog.trace.core.baggage.BaggagePropagator; import datadog.trace.core.datastreams.DataStreamsMonitoring; +import datadog.trace.core.datastreams.DataStreamsTransactionExtractors; import datadog.trace.core.datastreams.DefaultDataStreamsMonitoring; import datadog.trace.core.histogram.Histograms; import datadog.trace.core.monitor.HealthMetrics; @@ -679,6 +680,16 @@ private CoreTracer( } else { traceSamplingRules = TraceSamplingRules.deserialize(traceSamplingRulesJson); } + + DataStreamsTransactionExtractors dataStreamsTransactionExtractors; + String dataStreamsTransactionExtractorsJson = config.getDataStreamsTransactionExtractors(); + if (dataStreamsTransactionExtractorsJson == null) { + dataStreamsTransactionExtractors = DataStreamsTransactionExtractors.EMPTY; + } else { + dataStreamsTransactionExtractors = + DataStreamsTransactionExtractors.deserialize(dataStreamsTransactionExtractorsJson); + } + // Get initial Span Sampling Rules from config String spanSamplingRulesJson = config.getSpanSamplingRules(); String spanSamplingRulesFile = config.getSpanSamplingRulesFile(); @@ -708,6 +719,7 @@ private CoreTracer( .setSpanSamplingRules(spanSamplingRules.getRules()) .setTraceSamplingRules(traceSamplingRules.getRules(), traceSamplingRulesJson) .setTracingTags(config.getMergedSpanTags()) + .setDataStreamsTransactionExtractors(dataStreamsTransactionExtractors.getExtractors()) .apply(); this.logs128bTraceIdEnabled = Config.get().isLogs128bitTraceIdEnabled(); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/StatusLogger.java b/dd-trace-core/src/main/java/datadog/trace/core/StatusLogger.java index d7507503b88..a8a3e9e4de8 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/StatusLogger.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/StatusLogger.java @@ -156,6 +156,8 @@ public void toJson(JsonWriter writer, Config config) throws IOException { } writer.name("data_streams_enabled"); writer.value(config.isDataStreamsEnabled()); + writer.name("data_streams_transaction_extractors"); + writer.value(config.getDataStreamsTransactionExtractors()); writer.endObject(); } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/TracingConfigPoller.java b/dd-trace-core/src/main/java/datadog/trace/core/TracingConfigPoller.java index ac88784ac79..1466f1f4fb7 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/TracingConfigPoller.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/TracingConfigPoller.java @@ -31,6 +31,7 @@ import datadog.trace.api.debugger.DebuggerConfigBridge; import datadog.trace.api.debugger.DebuggerConfigUpdate; import datadog.trace.api.sampling.SamplingRule; +import datadog.trace.core.datastreams.DataStreamsTransactionExtractors; import datadog.trace.logging.GlobalLogLevelSwitcher; import datadog.trace.logging.LogLevel; import java.io.ByteArrayInputStream; @@ -95,7 +96,12 @@ final class Updater implements ProductListener { private final JsonAdapter TRACE_SAMPLING_RULE; { - Moshi MOSHI = new Moshi.Builder().add(new TracingSamplingRulesAdapter()).build(); + Moshi MOSHI = + new Moshi.Builder() + .add(new TracingSamplingRulesAdapter()) + .add(new DataStreamsTransactionExtractors.DataStreamsTransactionExtractorsAdapter()) + .add(new DataStreamsTransactionExtractors.DataStreamsTransactionExtractorAdapter()) + .build(); CONFIG_OVERRIDES_ADAPTER = MOSHI.adapter(ConfigOverrides.class); LIB_CONFIG_ADAPTER = MOSHI.adapter(LibConfig.class); TRACE_SAMPLING_RULE = MOSHI.adapter(TracingSamplingRule.class); @@ -238,6 +244,10 @@ void applyConfigOverrides(LibConfig libConfig) { maybeOverride(builder::setServiceMapping, libConfig.serviceMapping); maybeOverride(builder::setHeaderTags, libConfig.headerTags); + if (null != libConfig.dataStreamsTransactionExtractors) { + builder.setDataStreamsTransactionExtractors( + libConfig.dataStreamsTransactionExtractors.getExtractors()); + } if (null != libConfig.tracingSamplingRules) { builder.setTraceSamplingRules( @@ -406,6 +416,8 @@ static final class LibConfig { @Json(name = "live_debugging_enabled") public Boolean liveDebuggingEnabled; + @Json(name = "data_streams_transaction_extractors") + public DataStreamsTransactionExtractors dataStreamsTransactionExtractors; /** * Merges a list of LibConfig objects by taking the first non-null value for each field. * @@ -454,6 +466,9 @@ public static LibConfig mergeLibConfigs(List configs) { if (merged.tracingSamplingRules == null) { merged.tracingSamplingRules = config.tracingSamplingRules; } + if (merged.dataStreamsTransactionExtractors == null) { + merged.dataStreamsTransactionExtractors = config.dataStreamsTransactionExtractors; + } if (merged.dynamicInstrumentationEnabled == null) { merged.dynamicInstrumentationEnabled = config.dynamicInstrumentationEnabled; } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamsTransactionExtractors.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamsTransactionExtractors.java new file mode 100644 index 00000000000..1e028b2a743 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamsTransactionExtractors.java @@ -0,0 +1,149 @@ +package datadog.trace.core.datastreams; + +import com.squareup.moshi.FromJson; +import com.squareup.moshi.JsonAdapter; +import com.squareup.moshi.JsonReader; +import com.squareup.moshi.Moshi; +import com.squareup.moshi.ToJson; +import com.squareup.moshi.Types; +import datadog.trace.api.datastreams.DataStreamsTransactionExtractor; +import java.io.IOException; +import java.lang.reflect.ParameterizedType; +import java.util.Collections; +import java.util.List; +import okio.BufferedSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DataStreamsTransactionExtractors { + public static final DataStreamsTransactionExtractors EMPTY = + new DataStreamsTransactionExtractors("[]", Collections.emptyList()); + private static final Logger log = LoggerFactory.getLogger(DataStreamsTransactionExtractors.class); + private static final Moshi MOSHI = + new Moshi.Builder() + .add(new DataStreamsTransactionExtractors.DataStreamsTransactionExtractorAdapter()) + .build(); + private static final ParameterizedType LIST_OF_RULES = + Types.newParameterizedType(List.class, DataStreamsTransactionExtractorImpl.class); + public static final JsonAdapter> LIST_OF_RULES_ADAPTER = + MOSHI.adapter(LIST_OF_RULES); + + private final List extractors; + private final String json; + + public DataStreamsTransactionExtractors( + String json, List extractors) { + this.extractors = Collections.unmodifiableList(extractors); + this.json = json; + } + + public static DataStreamsTransactionExtractors deserialize(String json) { + try { + return new DataStreamsTransactionExtractors(json, LIST_OF_RULES_ADAPTER.fromJson(json)); + } catch (Throwable ex) { + log.error("Couldn't parse Data Streams Extractors from JSON: {}", json, ex); + } + + return EMPTY; + } + + public List getExtractors() { + return extractors; + } + + private static final class JsonDataStreamsTransactionExtractor { + private static final JsonAdapter jsonAdapter = + MOSHI.adapter(JsonDataStreamsTransactionExtractor.class); + String name; + String type; + String value; + + @Override + public String toString() { + return jsonAdapter.toJson(this); + } + } + + public static final class DataStreamsTransactionExtractorsAdapter { + @FromJson + DataStreamsTransactionExtractors fromJson( + JsonReader reader, JsonAdapter> parser) + throws IOException { + if (reader.peek() == JsonReader.Token.NULL) { + return reader.nextNull(); + } + try (BufferedSource source = reader.nextSource()) { + String json = source.readUtf8(); + return new DataStreamsTransactionExtractors(json, parser.fromJson(json)); + } + } + + @ToJson + String toJson(DataStreamsTransactionExtractors extractors) { + return extractors.json; + } + } + + public static final class DataStreamsTransactionExtractorAdapter { + private static DataStreamsTransactionExtractor create( + JsonDataStreamsTransactionExtractor jsonExtractor) { + + DataStreamsTransactionExtractor.Type type; + try { + type = DataStreamsTransactionExtractor.Type.valueOf(jsonExtractor.type); + } catch (Throwable ex) { + type = DataStreamsTransactionExtractor.Type.UNKNOWN; + } + + return new DataStreamsTransactionExtractorImpl(jsonExtractor.name, type, jsonExtractor.value); + } + + @FromJson + DataStreamsTransactionExtractor fromJson(JsonDataStreamsTransactionExtractor jsonExtractor) { + return create(jsonExtractor); + } + + @ToJson + JsonDataStreamsTransactionExtractor toJson(DataStreamsTransactionExtractor extractor) { + throw new UnsupportedOperationException(); + } + } + + public static final class DataStreamsTransactionExtractorImpl + implements DataStreamsTransactionExtractor { + private final String name; + private final DataStreamsTransactionExtractor.Type type; + private final String value; + + public DataStreamsTransactionExtractorImpl( + final String name, final DataStreamsTransactionExtractor.Type type, final String value) { + this.name = name; + this.type = type; + this.value = value; + } + + public String getName() { + return name; + } + + public DataStreamsTransactionExtractor.Type getType() { + return type; + } + + public String getValue() { + return value; + } + + @Override + public String toString() { + return "DataStreamsTransactionExtractorImpl{" + + "name='" + + name + + "', type='" + + type.name() + + "', value='" + + value + + "'}"; + } + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java index c86b9402081..34d0bb95d42 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java @@ -16,7 +16,15 @@ import datadog.context.propagation.Propagator; import datadog.trace.api.Config; import datadog.trace.api.TraceConfig; -import datadog.trace.api.datastreams.*; +import datadog.trace.api.datastreams.Backlog; +import datadog.trace.api.datastreams.DataStreamsContext; +import datadog.trace.api.datastreams.DataStreamsTags; +import datadog.trace.api.datastreams.DataStreamsTransactionExtractor; +import datadog.trace.api.datastreams.InboxItem; +import datadog.trace.api.datastreams.NoopPathwayContext; +import datadog.trace.api.datastreams.PathwayContext; +import datadog.trace.api.datastreams.StatsPoint; +import datadog.trace.api.datastreams.TransactionInfo; import datadog.trace.api.experimental.DataStreamsContextCarrier; import datadog.trace.api.time.TimeSource; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; @@ -36,6 +44,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Supplier; import org.jctools.queues.MpscArrayQueue; import org.slf4j.Logger; @@ -45,6 +54,7 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even private static final Logger log = LoggerFactory.getLogger(DefaultDataStreamsMonitoring.class); static final long FEATURE_CHECK_INTERVAL_NANOS = TimeUnit.MINUTES.toNanos(5); + static final List NO_EXTRACTORS = Collections.emptyList(); private static final StatsPoint REPORT = new StatsPoint(DataStreamsTags.EMPTY, 0, 0, 0, 0, 0, 0, 0, null); @@ -67,6 +77,12 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even private volatile boolean configSupportsDataStreams = false; private final ConcurrentHashMap schemaSamplers; private static final ThreadLocal serviceNameOverride = new ThreadLocal<>(); + private final ReentrantReadWriteLock extractorsLock = new ReentrantReadWriteLock(); + + // contains a list of active extractors by type + private static final Map< + DataStreamsTransactionExtractor.Type, List> + extractorsByType = new HashMap<>(); public DefaultDataStreamsMonitoring( Config config, @@ -182,6 +198,27 @@ public void clearThreadServiceName() { serviceNameOverride.remove(); } + @Override + public void trackTransaction(String transactionId, String checkpointName) { + inbox.offer( + new TransactionInfo(transactionId, timeSource.getCurrentTimeNanos(), checkpointName)); + } + + @Override + public List getTransactionExtractorsByType( + DataStreamsTransactionExtractor.Type extractorType) { + if (!supportsDataStreams) { + return NO_EXTRACTORS; + } + + extractorsLock.readLock().lock(); + try { + return extractorsByType.getOrDefault(extractorType, NO_EXTRACTORS); + } finally { + extractorsLock.readLock().unlock(); + } + } + private static String getThreadServiceName() { return serviceNameOverride.get(); } @@ -213,6 +250,7 @@ public void mergePathwayContextIntoSpan(AgentSpan span, DataStreamsContextCarrie } } + @Override public void trackBacklog(DataStreamsTags tags, long value) { inbox.offer(new Backlog(tags, value, timeSource.getCurrentTimeNanos(), getThreadServiceName())); } @@ -358,6 +396,10 @@ public void run() { StatsBucket statsBucket = getStatsBucket(backlog.getTimestampNanos(), backlog.getServiceNameOverride()); statsBucket.addBacklog(backlog); + } else if (payload instanceof TransactionInfo) { + TransactionInfo transactionInfo = (TransactionInfo) payload; + StatsBucket statsBucket = getStatsBucket(transactionInfo.getTimestamp(), ""); + statsBucket.addTransaction(transactionInfo); } } } catch (Exception e) { @@ -430,9 +472,33 @@ public void onEvent(EventType eventType, String message) { } } + private void updateExtractorsFromConfig() { + extractorsLock.writeLock().lock(); + try { + extractorsByType.clear(); + List extractors = + traceConfigSupplier.get().getDataStreamsTransactionExtractors(); + if (extractors == null) { + return; + } + for (DataStreamsTransactionExtractor extractor : extractors) { + List list = + extractorsByType.computeIfAbsent(extractor.getType(), k -> new LinkedList<>()); + list.add(extractor); + log.debug("Added data streams transaction extractor: {}", extractor); + } + } finally { + extractorsLock.writeLock().unlock(); + } + } + private void checkDynamicConfig() { configSupportsDataStreams = traceConfigSupplier.get().isDataStreamsEnabled(); supportsDataStreams = agentSupportsDataStreams && configSupportsDataStreams; + + if (supportsDataStreams) { + updateExtractorsFromConfig(); + } } private void checkFeatures() { diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java index 0df8e7291d7..03128f559ca 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java @@ -10,6 +10,7 @@ import datadog.trace.api.ProcessTags; import datadog.trace.api.WellKnownTags; import datadog.trace.api.datastreams.DataStreamsTags; +import datadog.trace.api.datastreams.TransactionInfo; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.common.metrics.Sink; import java.util.Collection; @@ -37,6 +38,9 @@ public class MsgPackDatastreamsPayloadWriter implements DatastreamsPayloadWriter private static final byte[] BACKLOG_TAGS = "Tags".getBytes(ISO_8859_1); private static final byte[] PRODUCTS_MASK = "ProductMask".getBytes(ISO_8859_1); private static final byte[] PROCESS_TAGS = "ProcessTags".getBytes(ISO_8859_1); + private static final byte[] TRANSACTIONS = "Transactions".getBytes(ISO_8859_1); + private static final byte[] TRANSACTION_CHECKPOINT_IDS = + "TransactionCheckpointIds".getBytes(ISO_8859_1); private static final int INITIAL_CAPACITY = 512 * 1024; @@ -93,7 +97,7 @@ public void writePayload(Collection data, String serviceNameOverrid /* 2 */ writer.writeUTF8(SERVICE); - if (serviceNameOverride != null) { + if (serviceNameOverride != null && !serviceNameOverride.isEmpty()) { writer.writeUTF8(serviceNameOverride.getBytes(ISO_8859_1)); } else { writer.writeUTF8(wellKnownTags.getService()); @@ -121,7 +125,8 @@ public void writePayload(Collection data, String serviceNameOverrid for (StatsBucket bucket : data) { boolean hasBacklogs = !bucket.getBacklogs().isEmpty(); - writer.startMap(3 + (hasBacklogs ? 1 : 0)); + boolean hasTransactions = !bucket.getTransactions().isEmpty(); + writer.startMap(3 + (hasBacklogs ? 1 : 0) + (hasTransactions ? 2 : 0)); /* 1 */ writer.writeUTF8(START); @@ -139,6 +144,14 @@ public void writePayload(Collection data, String serviceNameOverrid /* 4 */ writeBacklogs(bucket.getBacklogs(), writer); } + + if (hasTransactions) { + /* 5 */ + writer.writeUTF8(TRANSACTIONS); + writer.writeBinary(bucket.getTransactions().getData()); + writer.writeUTF8(TRANSACTION_CHECKPOINT_IDS); + writer.writeBinary(TransactionInfo.getCheckpointIdCacheBytes()); + } } /* 8 */ diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsBucket.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsBucket.java index c61550d0e3e..e9031219934 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsBucket.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsBucket.java @@ -3,6 +3,7 @@ import datadog.trace.api.datastreams.Backlog; import datadog.trace.api.datastreams.DataStreamsTags; import datadog.trace.api.datastreams.StatsPoint; +import datadog.trace.api.datastreams.TransactionInfo; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -12,6 +13,7 @@ public class StatsBucket { private final long bucketDurationNanos; private final Map hashToGroup = new HashMap<>(); private final Map backlogs = new HashMap<>(); + private final TransactionContainer transactions = new TransactionContainer(1024); public StatsBucket(long startTimeNanos, long bucketDurationNanos) { this.startTimeNanos = startTimeNanos; @@ -40,6 +42,10 @@ public void addBacklog(Backlog backlog) { (k, v) -> (v == null) ? backlog.getValue() : Math.max(v, backlog.getValue())); } + public void addTransaction(TransactionInfo transaction) { + transactions.add(transaction); + } + public long getStartTimeNanos() { return startTimeNanos; } @@ -55,4 +61,8 @@ public Collection getGroups() { public Collection> getBacklogs() { return backlogs.entrySet(); } + + public TransactionContainer getTransactions() { + return transactions; + } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/TransactionContainer.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/TransactionContainer.java new file mode 100644 index 00000000000..5ba0ff88f36 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/TransactionContainer.java @@ -0,0 +1,43 @@ +package datadog.trace.core.datastreams; + +import datadog.trace.api.datastreams.TransactionInfo; +import java.util.Arrays; + +public class TransactionContainer { + // we store data as an array of bytes, since the number of object can be significant + private byte[] data; + private int size; + + public TransactionContainer(Integer initialSizeBytes) { + this.data = new byte[initialSizeBytes]; + } + + public void add(TransactionInfo transactionInfo) { + // check if we need to resize + byte[] transactionBytes = transactionInfo.getBytes(); + + // resize buffer if needed + if (data.length - size < transactionBytes.length) { + byte[] resized = new byte[data.length * 2]; + System.arraycopy(data, 0, resized, 0, size); + data = resized; + } + + // add data + System.arraycopy(transactionBytes, 0, data, size, transactionBytes.length); + size += transactionBytes.length; + } + + public boolean isEmpty() { + return size == 0; + } + + public boolean clear() { + size = 0; + return true; + } + + public byte[] getData() { + return Arrays.copyOf(data, size); + } +} diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/TracingConfigPollerTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/TracingConfigPollerTest.groovy index ff5b0ea4e25..6038861d689 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/TracingConfigPollerTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/TracingConfigPollerTest.groovy @@ -8,12 +8,11 @@ import datadog.remoteconfig.Product import datadog.remoteconfig.state.ParsedConfigKey import datadog.remoteconfig.state.ProductListener import datadog.trace.core.test.DDCoreSpecification +import java.nio.charset.StandardCharsets import okhttp3.HttpUrl import okhttp3.OkHttpClient import spock.lang.Timeout -import java.nio.charset.StandardCharsets - @Timeout(10) class TracingConfigPollerTest extends DDCoreSpecification { @@ -169,7 +168,14 @@ class TracingConfigPollerTest extends DDCoreSpecification { "tag_name": "custom.header" } ], - "tracing_sampling_rate": 1.3 + "tracing_sampling_rate": 1.3, + "data_streams_transaction_extractors": [ + { + "name": "test", + "type": "type", + "value": "value" + } + ] } } """.getBytes(StandardCharsets.UTF_8), null) @@ -183,6 +189,10 @@ class TracingConfigPollerTest extends DDCoreSpecification { tracer.captureTraceConfig().traceSampleRate == 1.0 // should be clamped to 1.0 tracer.captureTraceConfig().requestHeaderTags == ["x-custom-header": "custom.header"] tracer.captureTraceConfig().responseHeaderTags == ["x-custom-header": "custom.header"] + tracer.captureTraceConfig().getDataStreamsTransactionExtractors().size() == 1 + tracer.captureTraceConfig().getDataStreamsTransactionExtractors()[0].name == "test" + tracer.captureTraceConfig().getDataStreamsTransactionExtractors()[0].type == "type" + tracer.captureTraceConfig().getDataStreamsTransactionExtractors()[0].value == "value" when: // Remove service level config diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsTransactionExtractorsTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsTransactionExtractorsTest.groovy new file mode 100644 index 00000000000..db547814c00 --- /dev/null +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsTransactionExtractorsTest.groovy @@ -0,0 +1,22 @@ +package datadog.trace.core.datastreams + +import datadog.trace.core.test.DDCoreSpecification + +class DataStreamsTransactionExtractorsTest extends DDCoreSpecification { + def "Deserialize from json"() { + when: + def list = DataStreamsTransactionExtractors.deserialize("""[ + {"name": "extractor", "type": "http_request_header", "value": "transaction_id"}, + {"name": "second_extractor", "type": "http_response_header", "value": "transaction_id"} + ]""") + def extractors = list.getExtractors() + then: + extractors.size() == 2 + extractors[0].getName() == "extractor" + extractors[0].getType() == "http_request_header" + extractors[0].getValue() == "transaction_id" + extractors[1].getName() == "second_extractor" + extractors[1].getType() == "http_response_header" + extractors[1].getValue() == "transaction_id" + } +} diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/TransactionContainerTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/TransactionContainerTest.groovy new file mode 100644 index 00000000000..a7593acb19e --- /dev/null +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/TransactionContainerTest.groovy @@ -0,0 +1,49 @@ +package datadog.trace.core.datastreams + +import datadog.trace.api.datastreams.TransactionInfo +import datadog.trace.core.test.DDCoreSpecification + +class TransactionContainerTest extends DDCoreSpecification { + def "test with no resize"() { + given: + TransactionInfo.resetCache() + def container = new TransactionContainer(1024) + container.add(new TransactionInfo("1", 1, "1")) + container.add(new TransactionInfo("2", 2, "2")) + def data = container.getData() + + expect: + data.size() == 22 + data == new byte[] { + 1, 0, 0, 0, 0, 0, 0, 0, 1, 1, 49, 2, 0, 0, 0, 0, 0, 0, 0, 2, 1, 50 + } + } + + def "test with with resize"() { + given: + TransactionInfo.resetCache() + def container = new TransactionContainer(10) + container.add(new TransactionInfo("1", 1, "1")) + container.add(new TransactionInfo("2", 2, "2")) + def data = container.getData() + + expect: + data.size() == 22 + data == new byte[] { + 1, 0, 0, 0, 0, 0, 0, 0, 1, 1, 49, 2, 0, 0, 0, 0, 0, 0, 0, 2, 1, 50 + } + } + + def "test checkpoint map"() { + given: + TransactionInfo.resetCache() + new TransactionInfo("1", 1, "1") + new TransactionInfo("2", 2, "2") + def data = TransactionInfo.getCheckpointIdCacheBytes() + expect: + data.size() == 6 + data == new byte[] { + 1, 1, 49, 2, 1, 50 + } + } +} diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index 70a40347e1f..152beee5627 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -357,6 +357,7 @@ import static datadog.trace.api.config.GeneralConfig.DATA_JOBS_PARSE_SPARK_PLAN_ENABLED; import static datadog.trace.api.config.GeneralConfig.DATA_STREAMS_BUCKET_DURATION_SECONDS; import static datadog.trace.api.config.GeneralConfig.DATA_STREAMS_ENABLED; +import static datadog.trace.api.config.GeneralConfig.DATA_STREAMS_TRANSACTION_EXTRACTORS; import static datadog.trace.api.config.GeneralConfig.DOGSTATSD_ARGS; import static datadog.trace.api.config.GeneralConfig.DOGSTATSD_HOST; import static datadog.trace.api.config.GeneralConfig.DOGSTATSD_NAMED_PIPE; @@ -1225,6 +1226,7 @@ public static String getHostName() { private final boolean dataStreamsEnabled; private final float dataStreamsBucketDurationSeconds; + private final String dataStreamsTransactionExtractors; private final boolean serviceDiscoveryEnabled; @@ -2726,6 +2728,8 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment()) dataStreamsBucketDurationSeconds = configProvider.getFloat( DATA_STREAMS_BUCKET_DURATION_SECONDS, DEFAULT_DATA_STREAMS_BUCKET_DURATION); + dataStreamsTransactionExtractors = + configProvider.getString(DATA_STREAMS_TRANSACTION_EXTRACTORS); azureAppServices = configProvider.getBoolean(AZURE_APP_SERVICES, false); traceAgentPath = configProvider.getString(TRACE_AGENT_PATH); @@ -4557,6 +4561,10 @@ public float getDataStreamsBucketDurationSeconds() { return dataStreamsBucketDurationSeconds; } + public String getDataStreamsTransactionExtractors() { + return dataStreamsTransactionExtractors; + } + public long getDataStreamsBucketDurationNanoseconds() { // Rounds to the nearest millisecond before converting to nanos int milliseconds = Math.round(dataStreamsBucketDurationSeconds * 1000); diff --git a/internal-api/src/main/java/datadog/trace/api/DynamicConfig.java b/internal-api/src/main/java/datadog/trace/api/DynamicConfig.java index fb14e8370aa..4a33193b2ae 100644 --- a/internal-api/src/main/java/datadog/trace/api/DynamicConfig.java +++ b/internal-api/src/main/java/datadog/trace/api/DynamicConfig.java @@ -14,6 +14,7 @@ import static datadog.trace.util.ConfigStrings.normalizedHeaderTag; import static datadog.trace.util.ConfigStrings.trim; +import datadog.trace.api.datastreams.DataStreamsTransactionExtractor; import datadog.trace.api.sampling.SamplingRule.SpanSamplingRule; import datadog.trace.api.sampling.SamplingRule.TraceSamplingRule; import java.util.Collection; @@ -111,6 +112,7 @@ public final class Builder { Double traceSampleRate; String preferredServiceName; + List dataStreamsTransactionExtractors; Builder() {} @@ -134,6 +136,7 @@ public final class Builder { this.tracingTags = snapshot.tracingTags; this.preferredServiceName = snapshot.preferredServiceName; + this.dataStreamsTransactionExtractors = snapshot.dataStreamsTransactionExtractors; } public Builder setRuntimeMetricsEnabled(boolean runtimeMetricsEnabled) { @@ -151,6 +154,12 @@ public Builder setDataStreamsEnabled(boolean dataStreamsEnabled) { return this; } + public Builder setDataStreamsTransactionExtractors( + List dataStreamsTransactionExtractors) { + this.dataStreamsTransactionExtractors = dataStreamsTransactionExtractors; + return this; + } + public Builder setServiceMapping(Map serviceMapping) { return setServiceMapping(serviceMapping.entrySet()); } @@ -324,6 +333,7 @@ public static class Snapshot implements TraceConfig { final Map tracingTags; final String preferredServiceName; + final List dataStreamsTransactionExtractors; protected Snapshot(DynamicConfig.Builder builder, Snapshot oldSnapshot) { @@ -345,6 +355,7 @@ protected Snapshot(DynamicConfig.Builder builder, Snapshot oldSnapshot) { this.tracingTags = nullToEmpty(builder.tracingTags); this.preferredServiceName = builder.preferredServiceName; + this.dataStreamsTransactionExtractors = builder.dataStreamsTransactionExtractors; } private static Map nullToEmpty(Map mapping) { @@ -415,6 +426,11 @@ public List getTraceSamplingRules() { return traceSamplingRules; } + @Override + public List getDataStreamsTransactionExtractors() { + return dataStreamsTransactionExtractors; + } + @Override public Map getTracingTags() { return tracingTags; diff --git a/internal-api/src/main/java/datadog/trace/api/TraceConfig.java b/internal-api/src/main/java/datadog/trace/api/TraceConfig.java index f07d3f56195..3a022a08e7b 100644 --- a/internal-api/src/main/java/datadog/trace/api/TraceConfig.java +++ b/internal-api/src/main/java/datadog/trace/api/TraceConfig.java @@ -1,5 +1,6 @@ package datadog.trace.api; +import datadog.trace.api.datastreams.DataStreamsTransactionExtractor; import datadog.trace.api.sampling.SamplingRule.SpanSamplingRule; import datadog.trace.api.sampling.SamplingRule.TraceSamplingRule; import java.util.List; @@ -46,4 +47,11 @@ public interface TraceConfig { * @return The tracer sampler Trace Sampling Rules, or an empty collection if no rule is defined. */ List getTraceSamplingRules(); + + /** + * Get DSM transaction extractors. + * + * @return List of Data Streams Transactions extractors. + */ + List getDataStreamsTransactionExtractors(); } diff --git a/internal-api/src/main/java/datadog/trace/api/WellKnownTags.java b/internal-api/src/main/java/datadog/trace/api/WellKnownTags.java index f01e8d154ef..fa00fea11cb 100644 --- a/internal-api/src/main/java/datadog/trace/api/WellKnownTags.java +++ b/internal-api/src/main/java/datadog/trace/api/WellKnownTags.java @@ -49,4 +49,21 @@ public UTF8BytesString getVersion() { public UTF8BytesString getLanguage() { return language; } + + public String toString() { + return "WellKnownTags{" + + "runtimeId=" + + runtimeId + + ", hostname=" + + hostname + + ", env=" + + env + + ", service=" + + service + + ", version=" + + version + + ", language=" + + language + + "}"; + } } diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java b/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java index b7c51bd36ec..3165a0f30ba 100644 --- a/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java @@ -5,7 +5,8 @@ import datadog.trace.bootstrap.instrumentation.api.Schema; import datadog.trace.bootstrap.instrumentation.api.SchemaIterator; -public interface AgentDataStreamsMonitoring extends DataStreamsCheckpointer { +public interface AgentDataStreamsMonitoring + extends DataStreamsCheckpointer, DataStreamsTransactionTracker { void trackBacklog(DataStreamsTags tags, long value); /** diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsTransactionExtractor.java b/internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsTransactionExtractor.java new file mode 100644 index 00000000000..bf0b8fde6b3 --- /dev/null +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsTransactionExtractor.java @@ -0,0 +1,22 @@ +package datadog.trace.api.datastreams; + +public interface DataStreamsTransactionExtractor { + enum Type { + UNKNOWN, + /** HTTP_OUT_HEADERS targets outgoing HTTP requests */ + HTTP_OUT_HEADERS, + /** HTTP_IN_HEADERS targets incoming HTTP requests */ + HTTP_IN_HEADERS, + /** KAFKA_CONSUME_HEADERS targets headers from consumed messages (after consume) */ + KAFKA_CONSUME_HEADERS, + /** KAFKA_CONSUME_HEADERS targets headers from produced messages (before produce) */ + KAFKA_PRODUCE_HEADERS + } + + /** getName returns transaction extractor name */ + String getName(); + /** getType returns transaction extractor type */ + Type getType(); + /** getType returns transaction extractor value */ + String getValue(); +} diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsTransactionTracker.java b/internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsTransactionTracker.java new file mode 100644 index 00000000000..7107d62bc84 --- /dev/null +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/DataStreamsTransactionTracker.java @@ -0,0 +1,11 @@ +package datadog.trace.api.datastreams; + +import java.util.List; + +public interface DataStreamsTransactionTracker { + /** trackTransaction used to emit "seen" even for transactions */ + void trackTransaction(String transactionId, String checkpointName); + /** extractorsByType returns the list of extractors */ + List getTransactionExtractorsByType( + DataStreamsTransactionExtractor.Type extractorType); +} diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java b/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java index f5cdcb0c82f..ad2519550cd 100644 --- a/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java @@ -4,6 +4,7 @@ import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.Schema; import datadog.trace.bootstrap.instrumentation.api.SchemaIterator; +import java.util.List; public class NoopDataStreamsMonitoring implements AgentDataStreamsMonitoring { public static final NoopDataStreamsMonitoring INSTANCE = new NoopDataStreamsMonitoring(); @@ -46,6 +47,15 @@ public void setThreadServiceName(String serviceName) {} @Override public void clearThreadServiceName() {} + @Override + public void trackTransaction(String transactionId, String checkpointName) {} + + @Override + public List getTransactionExtractorsByType( + DataStreamsTransactionExtractor.Type extractorType) { + return null; + } + @Override public void setConsumeCheckpoint(String type, String source, DataStreamsContextCarrier carrier) {} diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/TransactionInfo.java b/internal-api/src/main/java/datadog/trace/api/datastreams/TransactionInfo.java new file mode 100644 index 00000000000..4615f4d9b55 --- /dev/null +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/TransactionInfo.java @@ -0,0 +1,94 @@ +package datadog.trace.api.datastreams; + +import datadog.trace.api.Pair; +import datadog.trace.api.cache.DDCache; +import datadog.trace.api.cache.DDCaches; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +public final class TransactionInfo implements InboxItem { + private static final DDCache CACHE = DDCaches.newFixedSizeCache(64); + private static final AtomicInteger COUNTER = new AtomicInteger(); + + private final String id; + private final long timestamp; + private final int checkpointId; + + public TransactionInfo(String id, Long timestamp, String checkpoint) { + this.id = id; + this.timestamp = timestamp; + this.checkpointId = CACHE.computeIfAbsent(checkpoint, k -> COUNTER.incrementAndGet()); + } + + public String getId() { + return id; + } + + public Long getTimestamp() { + return timestamp; + } + + public Integer getCheckpointId() { + return checkpointId; + } + + public byte[] getBytes() { + byte[] idBytes = id.getBytes(); + byte[] result = new byte[idBytes.length + 10]; + // up to 1 byte for checkpoint id + result[0] = (byte) (checkpointId); + // 8 bytes for timestamp + result[1] = (byte) (timestamp >> 56); + result[2] = (byte) (timestamp >> 48); + result[3] = (byte) (timestamp >> 40); + result[4] = (byte) (timestamp >> 32); + result[5] = (byte) (timestamp >> 24); + result[6] = (byte) (timestamp >> 16); + result[7] = (byte) (timestamp >> 8); + result[8] = (byte) (timestamp); + // id size, up to 256 bytes + result[9] = (byte) (idBytes.length); + // add id bytes + System.arraycopy(idBytes, 0, result, 10, idBytes.length); + return result; + } + + static void resetCache() { + CACHE.clear(); + COUNTER.set(0); + } + + public static byte[] getCheckpointIdCacheBytes() { + // get all values + List> pairs = new LinkedList<>(); + CACHE.visit( + (key, value) -> { + pairs.add(Pair.of(key, value)); + }); + + // serialize + byte[] result = new byte[1024]; + int index = 0; + for (Pair pair : pairs) { + byte[] keyBytes = pair.getLeft().getBytes(); + // resize the buffer if needed + if (result.length - index <= keyBytes.length + 2) { + byte[] resized = new byte[result.length * 2]; + System.arraycopy(result, 0, resized, 0, result.length); + result = resized; + } + + result[index] = pair.getRight().byteValue(); + index++; + result[index] = (byte) (keyBytes.length); + index++; + + System.arraycopy(keyBytes, 0, result, index, keyBytes.length); + index += keyBytes.length; + } + + return Arrays.copyOf(result, index); + } +} diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java index 59405a0aef1..bcfbf0dbff7 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java @@ -6,6 +6,7 @@ import datadog.trace.api.EndpointTracker; import datadog.trace.api.TraceConfig; import datadog.trace.api.datastreams.AgentDataStreamsMonitoring; +import datadog.trace.api.datastreams.DataStreamsTransactionExtractor; import datadog.trace.api.datastreams.NoopDataStreamsMonitoring; import datadog.trace.api.experimental.DataStreamsCheckpointer; import datadog.trace.api.gateway.CallbackProvider; @@ -802,5 +803,10 @@ public List getSpanSamplingRules() { public List getTraceSamplingRules() { return Collections.emptyList(); } + + @Override + public List getDataStreamsTransactionExtractors() { + return null; + } } } diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/Tags.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/Tags.java index 3eaa1e292cc..20629daf051 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/Tags.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/Tags.java @@ -174,4 +174,6 @@ public class Tags { public static final String LLMOBS_TOOL_SPAN_KIND = "tool"; public static final String LLMOBS_EMBEDDING_SPAN_KIND = "embedding"; public static final String LLMOBS_RETRIEVAL_SPAN_KIND = "retrieval"; + public static final String DSM_TRANSACTION_ID = "dsm.transaction.id"; + public static final String DSM_TRANSACTION_CHECKPOINT = "dsm.transaction.checkpoint"; } diff --git a/internal-api/src/test/groovy/datadog/trace/api/datastreams/TransactionInfoTest.groovy b/internal-api/src/test/groovy/datadog/trace/api/datastreams/TransactionInfoTest.groovy new file mode 100644 index 00000000000..c82b6a839b2 --- /dev/null +++ b/internal-api/src/test/groovy/datadog/trace/api/datastreams/TransactionInfoTest.groovy @@ -0,0 +1,61 @@ +package datadog.trace.api.datastreams + +import datadog.trace.api.Pair +import spock.lang.Specification + +class TransactionInfoTest extends Specification { + + def "test checkpoint id cache serialization multiple"() { + given: + TransactionInfo.resetCache() + def controlSize = 10 + // generate multiple transaction ids to trigger cache updates + for (int i = 0; i < controlSize; i++) { + new TransactionInfo("id " + i, i, "checkpoint " + i) + } + + def items = new LinkedList>() + // get cache data + def data = TransactionInfo.getCheckpointIdCacheBytes() + def i = 0 + while (i < data.size()) { + def id = data[i] + i++ + + def size = data[i] + i++ + + def str = new String(data, i, size) + i += size + + items.add(Pair.of(str, id) as Pair) + } + + expect: + items.size() == controlSize + + for (def item in items) { + item.left == "Checkpoint " + item.right + } + } + + def "test checkpoint id cache serialization"() { + given: + TransactionInfo.resetCache() + new TransactionInfo("id", 1, "checkpoint") + def bytes = TransactionInfo.getCheckpointIdCacheBytes() + expect: + bytes.size() == 12 + bytes == new byte[] {1, 10, 99, 104, 101, 99, 107, 112, 111, 105, 110, 116} + } + + def "test transaction id serialization"() { + given: + TransactionInfo.resetCache() + def test = new TransactionInfo("id", 1, "checkpoint") + def bytes = test.getBytes() + expect: + bytes.size() == 12 + bytes == new byte[] {1, 0, 0, 0, 0, 0, 0, 0, 1, 2, 105, 100} + } +} diff --git a/metadata/supported-configurations.json b/metadata/supported-configurations.json index 8a90076eeac..31c0ba6ba5b 100644 --- a/metadata/supported-configurations.json +++ b/metadata/supported-configurations.json @@ -141,6 +141,7 @@ "DD_DATA_JOBS_OPENLINEAGE_TIMEOUT_ENABLED": ["A"], "DD_DATA_STREAMS_BUCKET_DURATION_SECONDS": ["A"], "DD_DATA_STREAMS_ENABLED": ["A"], + "DD_DATA_STREAMS_TRANSACTION_EXTRACTORS": ["A"], "DD_DBM_INJECT_SQL_BASEHASH": ["A"], "DD_DBM_ALWAYS_APPEND_SQL_COMMENT": ["A"], "DD_DBM_PROPAGATION_MODE": ["A"],