Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import datadog.trace.api.InstrumenterConfig;
import datadog.trace.api.ProductActivation;
import datadog.trace.api.appsec.HttpClientRequest;
import datadog.trace.api.datastreams.AgentDataStreamsMonitoring;
import datadog.trace.api.datastreams.DataStreamsTransactionExtractor;
import datadog.trace.api.gateway.BlockResponseFunction;
import datadog.trace.api.gateway.Flow;
import datadog.trace.api.gateway.RequestContext;
Expand All @@ -27,6 +29,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.BitSet;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import org.slf4j.Logger;
Expand Down Expand Up @@ -70,6 +73,22 @@ protected boolean shouldSetResourceName() {

public AgentSpan onRequest(final AgentSpan span, final REQUEST request) {
if (request != null) {
// apply extractors if any (disabled if DSM is off)
AgentDataStreamsMonitoring dataStreamsMonitoring =
AgentTracer.get().getDataStreamsMonitoring();
List<DataStreamsTransactionExtractor> extractorList =
dataStreamsMonitoring.getTransactionExtractorsByType(
DataStreamsTransactionExtractor.Type.HTTP_OUT_HEADERS);
if (extractorList != null) {
for (DataStreamsTransactionExtractor extractor : extractorList) {
String transactionId = getRequestHeader(request, extractor.getValue());
if (transactionId != null && !transactionId.isEmpty()) {
dataStreamsMonitoring.trackTransaction(transactionId, extractor.getName());
span.setTag(Tags.DSM_TRANSACTION_ID, transactionId);
span.setTag(Tags.DSM_TRANSACTION_CHECKPOINT, extractor.getName());
}
}
}

String method = method(request);
span.setTag(Tags.HTTP_METHOD, method);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import datadog.context.propagation.Propagators;
import datadog.trace.api.Config;
import datadog.trace.api.DDTags;
import datadog.trace.api.datastreams.AgentDataStreamsMonitoring;
import datadog.trace.api.datastreams.DataStreamsTransactionExtractor;
import datadog.trace.api.function.TriConsumer;
import datadog.trace.api.function.TriFunction;
import datadog.trace.api.gateway.BlockResponseFunction;
Expand All @@ -39,6 +41,7 @@
import datadog.trace.bootstrap.instrumentation.decorator.http.ClientIpAddressResolver;
import java.net.InetAddress;
import java.util.BitSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -95,6 +98,10 @@ public abstract class HttpServerDecorator<REQUEST, CONNECTION, RESPONSE, REQUEST

protected abstract int status(RESPONSE response);

protected String getRequestHeader(REQUEST request, String key) {
return null;
}

protected String requestedSessionId(REQUEST request) {
return null;
}
Expand Down Expand Up @@ -326,6 +333,25 @@ public AgentSpan onRequest(
span.setRequestBlockingAction((RequestBlockingAction) flow.getAction());
}

if (request != null) {
// apply extractors if any (disabled if DSM is off)
AgentDataStreamsMonitoring dataStreamsMonitoring =
AgentTracer.get().getDataStreamsMonitoring();
List<DataStreamsTransactionExtractor> extractorList =
dataStreamsMonitoring.getTransactionExtractorsByType(
DataStreamsTransactionExtractor.Type.HTTP_IN_HEADERS);
if (extractorList != null) {
for (DataStreamsTransactionExtractor extractor : extractorList) {
String transactionId = getRequestHeader(request, extractor.getValue());
if (transactionId != null) {
dataStreamsMonitoring.trackTransaction(transactionId, extractor.getName());
span.setTag(Tags.DSM_TRANSACTION_ID, transactionId);
span.setTag(Tags.DSM_TRANSACTION_CHECKPOINT, extractor.getName());
}
}
}
}

return span;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,11 @@ class HttpServerDecoratorTest extends ServerDecoratorTest {
protected int status(Map m) {
return m.status == null ? 0 : m.status
}

@Override
protected String getRequestHeader(Map map, String key) {
return map.getOrDefault(key, null)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
package datadog.trace.agent.test

import static datadog.communication.http.OkHttpUtils.buildHttpClient
import static datadog.trace.api.ConfigDefaults.DEFAULT_AGENT_HOST
import static datadog.trace.api.ConfigDefaults.DEFAULT_AGENT_TIMEOUT
import static datadog.trace.api.ConfigDefaults.DEFAULT_TRACE_AGENT_PORT
import static datadog.trace.api.config.DebuggerConfig.DYNAMIC_INSTRUMENTATION_SNAPSHOT_URL
import static datadog.trace.api.config.DebuggerConfig.DYNAMIC_INSTRUMENTATION_VERIFY_BYTECODE
import static datadog.trace.api.config.TraceInstrumentationConfig.CODE_ORIGIN_FOR_SPANS_ENABLED
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious
import static datadog.trace.util.AgentThreadFactory.AgentThread.TASK_SCHEDULER

import ch.qos.logback.classic.Level
import ch.qos.logback.classic.util.ContextInitializer
import com.datadog.debugger.agent.ClassesToRetransformFinder
Expand Down Expand Up @@ -33,6 +43,7 @@ import datadog.trace.api.TraceConfig
import datadog.trace.api.config.GeneralConfig
import datadog.trace.api.config.TracerConfig
import datadog.trace.api.datastreams.AgentDataStreamsMonitoring
import datadog.trace.api.datastreams.DataStreamsTransactionExtractor
import datadog.trace.api.sampling.SamplingRule
import datadog.trace.api.time.SystemTimeSource
import datadog.trace.bootstrap.ActiveSubsystems
Expand All @@ -56,6 +67,13 @@ import de.thetaphi.forbiddenapis.SuppressForbidden
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import groovy.transform.stc.ClosureParams
import groovy.transform.stc.SimpleType
import java.lang.instrument.ClassFileTransformer
import java.lang.instrument.Instrumentation
import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
import net.bytebuddy.agent.ByteBuddyAgent
import net.bytebuddy.agent.builder.AgentBuilder
import net.bytebuddy.description.type.TypeDescription
Expand All @@ -69,24 +87,6 @@ import org.slf4j.LoggerFactory
import org.spockframework.mock.MockUtil
import spock.lang.Shared

import java.lang.instrument.ClassFileTransformer
import java.lang.instrument.Instrumentation
import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger

import static datadog.communication.http.OkHttpUtils.buildHttpClient
import static datadog.trace.api.ConfigDefaults.DEFAULT_AGENT_HOST
import static datadog.trace.api.ConfigDefaults.DEFAULT_AGENT_TIMEOUT
import static datadog.trace.api.ConfigDefaults.DEFAULT_TRACE_AGENT_PORT
import static datadog.trace.api.config.DebuggerConfig.DYNAMIC_INSTRUMENTATION_SNAPSHOT_URL
import static datadog.trace.api.config.DebuggerConfig.DYNAMIC_INSTRUMENTATION_VERIFY_BYTECODE
import static datadog.trace.api.config.TraceInstrumentationConfig.CODE_ORIGIN_FOR_SPANS_ENABLED
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious
import static datadog.trace.util.AgentThreadFactory.AgentThread.TASK_SCHEDULER

/**
* A specification that automatically applies instrumentation and exposes a global trace
* writer.
Expand Down Expand Up @@ -250,6 +250,10 @@ abstract class InstrumentationSpecification extends DDSpecification implements A
List<? extends SamplingRule.TraceSamplingRule> getTraceSamplingRules() {
return null
}

List<DataStreamsTransactionExtractor> getDataStreamsTransactionExtractors() {
return null
}
}

boolean originalAppSecRuntimeValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ protected int status(final HttpResponse httpResponse) {
return httpResponse.status().intValue();
}

@Override
protected String getRequestHeader(HttpRequest httpRequest, String key) {
Optional<akka.http.javadsl.model.HttpHeader> header = httpRequest.getHeader(key);
return header.map(HttpHeader::value).orElse(null);
}

@Override
protected boolean isAppSecOnResponseSeparate() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ protected boolean isAppSecOnResponseSeparate() {
return true;
}

@Override
protected String getRequestHeader(final Request request, String key) {
return request.getHeader(key);
}

@Override
protected BlockResponseFunction createBlockResponseFunction(Request request, Request connection) {
return new JettyBlockResponseFunction(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ protected boolean isAppSecOnResponseSeparate() {
return true;
}

@Override
protected String getRequestHeader(final Request request, String key) {
return request.getHeader(key);
}

public AgentSpan onResponse(AgentSpan span, HttpChannel channel) {
Request request = channel.getRequest();
Response response = channel.getResponse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ protected int status(final Response response) {
return response.getStatus();
}

@Override
protected String getRequestHeader(final Request request, String key) {
return request.getHeaders().get(key);
}

public AgentSpan onResponse(AgentSpan span, HttpChannelState channel) {
Request request = channel.getRequest();
Response response = channel.getResponse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ protected boolean isAppSecOnResponseSeparate() {
return true;
}

@Override
protected String getRequestHeader(final Request request, String key) {
return request.getHeader(key);
}

public AgentSpan onResponse(AgentSpan span, HttpConnection channel) {
Request request = channel.getRequest();
Response response = channel.getResponse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ protected boolean isAppSecOnResponseSeparate() {
return true;
}

@Override
protected String getRequestHeader(final Request request, String key) {
return request.getHeader(key);
}

public AgentSpan onResponse(AgentSpan span, AbstractHttpConnection connection) {
Request request = connection.getRequest();
Response response = connection.getResponse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ protected boolean isAppSecOnResponseSeparate() {
return true;
}

@Override
protected String getRequestHeader(final Request request, String key) {
return request.getHeader(key);
}

public AgentSpan onResponse(AgentSpan span, HttpChannel<?> channel) {
Request request = channel.getRequest();
Response response = channel.getResponse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,19 @@
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.Config;
import datadog.trace.api.datastreams.AgentDataStreamsMonitoring;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.api.datastreams.DataStreamsTags;
import datadog.trace.api.datastreams.DataStreamsTransactionExtractor;
import datadog.trace.api.datastreams.StatsPoint;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
import datadog.trace.bootstrap.instrumentation.api.Tags;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.matcher.ElementMatcher;
Expand All @@ -44,6 +49,7 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.record.RecordBatch;

@AutoService(InstrumenterModule.class)
Expand Down Expand Up @@ -178,6 +184,25 @@ record =
if (TIME_IN_QUEUE_ENABLED) {
setter.injectTimeInQueue(record.headers());
}

// track transactions on produce
AgentDataStreamsMonitoring dataStreamsMonitoring =
AgentTracer.get().getDataStreamsMonitoring();
List<DataStreamsTransactionExtractor> extractors =
dataStreamsMonitoring.getTransactionExtractorsByType(
DataStreamsTransactionExtractor.Type.KAFKA_PRODUCE_HEADERS);
if (extractors != null) {
for (DataStreamsTransactionExtractor extractor : extractors) {
Header header = record.headers().lastHeader(extractor.getValue());
if (header != null && header.value() != null) {
String transactionId = new String(header.value(), StandardCharsets.UTF_8);
dataStreamsMonitoring.trackTransaction(transactionId, extractor.getName());
span.setTag(Tags.DSM_TRANSACTION_ID, transactionId);
span.setTag(Tags.DSM_TRANSACTION_CHECKPOINT, extractor.getName());
}
}
}

return activateSpan(span);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,20 @@
import datadog.context.propagation.Propagator;
import datadog.context.propagation.Propagators;
import datadog.trace.api.Config;
import datadog.trace.api.datastreams.AgentDataStreamsMonitoring;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.api.datastreams.DataStreamsTags;
import datadog.trace.api.datastreams.DataStreamsTransactionExtractor;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
import datadog.trace.bootstrap.instrumentation.api.Tags;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -128,6 +134,23 @@ protected void startNewRecordSpan(ConsumerRecord<?, ?> val) {
if (null != queueSpan) {
queueSpan.finish();
}

AgentDataStreamsMonitoring dataStreamsMonitoring =
AgentTracer.get().getDataStreamsMonitoring();
List<DataStreamsTransactionExtractor> extractors =
dataStreamsMonitoring.getTransactionExtractorsByType(
DataStreamsTransactionExtractor.Type.KAFKA_CONSUME_HEADERS);
if (extractors != null) {
for (DataStreamsTransactionExtractor extractor : extractors) {
Header header = val.headers().lastHeader(extractor.getValue());
if (header != null && header.value() != null) {
String transactionId = new String(header.value(), StandardCharsets.UTF_8);
dataStreamsMonitoring.trackTransaction(transactionId, extractor.getName());
span.setTag(Tags.DSM_TRANSACTION_ID, transactionId);
span.setTag(Tags.DSM_TRANSACTION_CHECKPOINT, extractor.getName());
}
}
}
}
} catch (final Exception e) {
log.debug("Error starting new record span", e);
Expand Down
Loading
Loading