Skip to content

Commit 44fe94e

Browse files
traskCopilototelbot[bot]
authored
Add configurable OpenTelemetry kafka-clients interceptors (#14929)
Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: trask <218610+trask@users.noreply.github.com> Co-authored-by: otelbot <197425009+otelbot@users.noreply.github.com>
1 parent b78d354 commit 44fe94e

21 files changed

+1101
-227
lines changed

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/README.md

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,20 +31,31 @@ There are two options for capturing traces, either using interceptors or wrappin
3131

3232
#### Using interceptors
3333

34-
The Kafka clients API provides a way to "intercept" messages before they are sent to the brokers as well as messages received from the broker before being passed to the application.
35-
The OpenTelemetry instrumented Kafka library provides two interceptors to be configured to add tracing information automatically.
36-
The interceptor class has to be set in the properties bag used to create the Kafka client.
34+
The Kafka clients API provides a way to intercept messages before they are sent to the brokers as well as messages received from the broker before being passed to the application.
3735

38-
Use the `TracingProducerInterceptor` for the producer in order to create a "send" span automatically, each time a message is sent.
36+
To intercept messages and emit telemetry for a `KafkaProducer`:
3937

4038
```java
41-
props.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
39+
KafkaTelemetry telemetry = KafkaTelemetry.create(openTelemetry);
40+
41+
Map<String, Object> props = new HashMap<>();
42+
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
43+
props.putAll(telemetry.producerInterceptorConfigProperties());
44+
45+
Producer<String, String> producer = new KafkaProducer<>(props);
4246
```
4347

44-
Use the `TracingConsumerInterceptor` for the consumer in order to create a "receive" span automatically, each time a message is received.
48+
To intercept messages and emit telemetry for a `KafkaConsumer`:
4549

4650
```java
47-
props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
51+
KafkaTelemetry telemetry = KafkaTelemetry.create(openTelemetry);
52+
53+
Map<String, Object> props = new HashMap<>();
54+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
55+
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
56+
props.putAll(telemetry.consumerInterceptorConfigProperties());
57+
58+
Consumer<String, String> consumer = new KafkaConsumer<>(props);
4859
```
4960

5061
#### Wrapping clients

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,27 +22,32 @@ tasks {
2222
systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
2323
}
2424

25-
val testReceiveSpansDisabled by registering(Test::class) {
25+
test {
26+
filter {
27+
excludeTestsMatching("*Deprecated*")
28+
}
29+
}
30+
31+
val testDeprecated by registering(Test::class) {
2632
testClassesDirs = sourceSets.test.get().output.classesDirs
2733
classpath = sourceSets.test.get().runtimeClasspath
2834
filter {
29-
includeTestsMatching("InterceptorsSuppressReceiveSpansTest")
30-
includeTestsMatching("WrapperSuppressReceiveSpansTest")
35+
includeTestsMatching("*DeprecatedInterceptorsTest")
3136
}
32-
include("**/InterceptorsSuppressReceiveSpansTest.*", "**/WrapperSuppressReceiveSpansTest.*")
37+
systemProperty("otel.instrumentation.messaging.experimental.receive-telemetry.enabled", "true")
38+
systemProperty("otel.instrumentation.messaging.experimental.capture-headers", "Test-Message-Header")
3339
}
3440

35-
test {
41+
val testDeprecatedSuppressReceiveSpans by registering(Test::class) {
42+
testClassesDirs = sourceSets.test.get().output.classesDirs
43+
classpath = sourceSets.test.get().runtimeClasspath
3644
filter {
37-
excludeTestsMatching("InterceptorsSuppressReceiveSpansTest")
38-
excludeTestsMatching("WrapperSuppressReceiveSpansTest")
45+
includeTestsMatching("*DeprecatedInterceptorsSuppressReceiveSpansTest")
3946
}
40-
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
41-
systemProperty("otel.instrumentation.messaging.experimental.capture-headers", "Test-Message-Header")
4247
}
4348

4449
check {
45-
dependsOn(testReceiveSpansDisabled)
50+
dependsOn(testDeprecated, testDeprecatedSuppressReceiveSpans)
4651
}
4752
}
4853

instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java

Lines changed: 76 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -5,61 +5,47 @@
55

66
package io.opentelemetry.instrumentation.kafkaclients.v2_6;
77

8-
import static java.util.logging.Level.WARNING;
9-
108
import io.opentelemetry.api.OpenTelemetry;
119
import io.opentelemetry.context.Context;
12-
import io.opentelemetry.context.Scope;
13-
import io.opentelemetry.context.propagation.TextMapPropagator;
14-
import io.opentelemetry.context.propagation.TextMapSetter;
1510
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
16-
import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
1711
import io.opentelemetry.instrumentation.api.internal.Timer;
1812
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContext;
1913
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil;
20-
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaHeadersSetter;
2114
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProcessRequest;
2215
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProducerRequest;
2316
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaReceiveRequest;
24-
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaUtil;
2517
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.MetricsReporterList;
2618
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.OpenTelemetryMetricsReporter;
2719
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.OpenTelemetrySupplier;
28-
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.TracingList;
20+
import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.KafkaConsumerTelemetry;
21+
import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.KafkaConsumerTelemetrySupplier;
22+
import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.KafkaProducerTelemetry;
23+
import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.KafkaProducerTelemetrySupplier;
24+
import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.OpenTelemetryConsumerInterceptor;
25+
import io.opentelemetry.instrumentation.kafkaclients.v2_6.internal.OpenTelemetryProducerInterceptor;
2926
import java.lang.reflect.InvocationTargetException;
3027
import java.lang.reflect.Proxy;
3128
import java.util.Collections;
3229
import java.util.HashMap;
33-
import java.util.LinkedHashMap;
34-
import java.util.List;
3530
import java.util.Map;
36-
import java.util.concurrent.Future;
37-
import java.util.function.BiFunction;
38-
import java.util.logging.Logger;
3931
import org.apache.kafka.clients.CommonClientConfigs;
4032
import org.apache.kafka.clients.consumer.Consumer;
41-
import org.apache.kafka.clients.consumer.ConsumerRecord;
33+
import org.apache.kafka.clients.consumer.ConsumerConfig;
4234
import org.apache.kafka.clients.consumer.ConsumerRecords;
4335
import org.apache.kafka.clients.consumer.KafkaConsumer;
4436
import org.apache.kafka.clients.producer.Callback;
4537
import org.apache.kafka.clients.producer.KafkaProducer;
4638
import org.apache.kafka.clients.producer.Producer;
39+
import org.apache.kafka.clients.producer.ProducerConfig;
4740
import org.apache.kafka.clients.producer.ProducerRecord;
4841
import org.apache.kafka.clients.producer.RecordMetadata;
49-
import org.apache.kafka.common.TopicPartition;
50-
import org.apache.kafka.common.header.Headers;
5142
import org.apache.kafka.common.metrics.MetricsReporter;
5243

5344
public final class KafkaTelemetry {
54-
private static final Logger logger = Logger.getLogger(KafkaTelemetry.class.getName());
55-
56-
private static final TextMapSetter<Headers> SETTER = KafkaHeadersSetter.INSTANCE;
5745

5846
private final OpenTelemetry openTelemetry;
59-
private final Instrumenter<KafkaProducerRequest, RecordMetadata> producerInstrumenter;
60-
private final Instrumenter<KafkaReceiveRequest, Void> consumerReceiveInstrumenter;
61-
private final Instrumenter<KafkaProcessRequest, Void> consumerProcessInstrumenter;
62-
private final boolean producerPropagationEnabled;
47+
private final KafkaProducerTelemetry producerTelemetry;
48+
private final KafkaConsumerTelemetry consumerTelemetry;
6349

6450
KafkaTelemetry(
6551
OpenTelemetry openTelemetry,
@@ -68,10 +54,13 @@ public final class KafkaTelemetry {
6854
Instrumenter<KafkaProcessRequest, Void> consumerProcessInstrumenter,
6955
boolean producerPropagationEnabled) {
7056
this.openTelemetry = openTelemetry;
71-
this.producerInstrumenter = producerInstrumenter;
72-
this.consumerReceiveInstrumenter = consumerReceiveInstrumenter;
73-
this.consumerProcessInstrumenter = consumerProcessInstrumenter;
74-
this.producerPropagationEnabled = producerPropagationEnabled;
57+
this.producerTelemetry =
58+
new KafkaProducerTelemetry(
59+
openTelemetry.getPropagators().getTextMapPropagator(),
60+
producerInstrumenter,
61+
producerPropagationEnabled);
62+
this.consumerTelemetry =
63+
new KafkaConsumerTelemetry(consumerReceiveInstrumenter, consumerProcessInstrumenter);
7564
}
7665

7766
/** Returns a new {@link KafkaTelemetry} configured with the given {@link OpenTelemetry}. */
@@ -86,8 +75,14 @@ public static KafkaTelemetryBuilder builder(OpenTelemetry openTelemetry) {
8675
return new KafkaTelemetryBuilder(openTelemetry);
8776
}
8877

89-
private TextMapPropagator propagator() {
90-
return openTelemetry.getPropagators().getTextMapPropagator();
78+
// this method can be removed when the deprecated TracingProducerInterceptor is removed
79+
KafkaProducerTelemetry getProducerTelemetry() {
80+
return producerTelemetry;
81+
}
82+
83+
// this method can be removed when the deprecated TracingProducerInterceptor is removed
84+
KafkaConsumerTelemetry getConsumerTelemetry() {
85+
return consumerTelemetry;
9186
}
9287

9388
/** Returns a decorated {@link Producer} that emits spans for each sent message. */
@@ -109,7 +104,8 @@ public <K, V> Producer<K, V> wrap(Producer<K, V> producer) {
109104
&& method.getParameterTypes()[1] == Callback.class
110105
? (Callback) args[1]
111106
: null;
112-
return buildAndInjectSpan(record, producer, callback, producer::send);
107+
return producerTelemetry.buildAndInjectSpan(
108+
record, producer, callback, producer::send);
113109
}
114110
try {
115111
return method.invoke(producer, args);
@@ -138,35 +134,19 @@ public <K, V> Consumer<K, V> wrap(Consumer<K, V> consumer) {
138134
// ConsumerRecords<K, V> poll(Duration duration)
139135
if ("poll".equals(method.getName()) && result instanceof ConsumerRecords) {
140136
ConsumerRecords<K, V> consumerRecords = (ConsumerRecords<K, V>) result;
141-
Context receiveContext = buildAndFinishSpan(consumerRecords, consumer, timer);
137+
Context receiveContext =
138+
consumerTelemetry.buildAndFinishSpan(consumerRecords, consumer, timer);
142139
if (receiveContext == null) {
143140
receiveContext = Context.current();
144141
}
145142
KafkaConsumerContext consumerContext =
146143
KafkaConsumerContextUtil.create(receiveContext, consumer);
147-
result = addTracing(consumerRecords, consumerContext);
144+
result = consumerTelemetry.addTracing(consumerRecords, consumerContext);
148145
}
149146
return result;
150147
});
151148
}
152149

153-
<K, V> ConsumerRecords<K, V> addTracing(
154-
ConsumerRecords<K, V> consumerRecords, KafkaConsumerContext consumerContext) {
155-
if (consumerRecords.isEmpty()) {
156-
return consumerRecords;
157-
}
158-
159-
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new LinkedHashMap<>();
160-
for (TopicPartition partition : consumerRecords.partitions()) {
161-
List<ConsumerRecord<K, V>> list = consumerRecords.records(partition);
162-
if (list != null && !list.isEmpty()) {
163-
list = TracingList.wrap(list, consumerProcessInstrumenter, () -> true, consumerContext);
164-
}
165-
records.put(partition, list);
166-
}
167-
return new ConsumerRecords<>(records);
168-
}
169-
170150
/**
171151
* Produces a set of kafka client config properties (consumer or producer) to register a {@link
172152
* MetricsReporter} that records metrics to an {@code openTelemetry} instance. Add these resulting
@@ -208,113 +188,58 @@ <K, V> ConsumerRecords<K, V> addTracing(
208188
}
209189

210190
/**
211-
* Build and inject span into record.
191+
* Returns configuration properties that can be used to enable OpenTelemetry instrumentation via
192+
* {@code OpenTelemetryProducerInterceptor}. Add these resulting properties to the configuration
193+
* map used to initialize a {@link org.apache.kafka.clients.producer.KafkaProducer}.
194+
*
195+
* <p>Example usage:
196+
*
197+
* <pre>{@code
198+
* // KafkaTelemetry telemetry = KafkaTelemetry.create(openTelemetry);
199+
* // Map<String, Object> config = new HashMap<>();
200+
* // config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
201+
* // config.putAll(telemetry.producerInterceptorConfigProperties());
202+
* // try (KafkaProducer<?, ?> producer = new KafkaProducer<>(config)) { ... }
203+
* }</pre>
212204
*
213-
* @param record the producer record to inject span info.
205+
* @return the kafka producer interceptor config properties
214206
*/
215-
<K, V> void buildAndInjectSpan(ProducerRecord<K, V> record, String clientId) {
216-
Context parentContext = Context.current();
217-
218-
KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId);
219-
if (!producerInstrumenter.shouldStart(parentContext, request)) {
220-
return;
221-
}
222-
223-
Context context = producerInstrumenter.start(parentContext, request);
224-
if (producerPropagationEnabled) {
225-
try {
226-
propagator().inject(context, record.headers(), SETTER);
227-
} catch (Throwable t) {
228-
// it can happen if headers are read only (when record is sent second time)
229-
logger.log(WARNING, "failed to inject span context. sending record second time?", t);
230-
}
231-
}
232-
producerInstrumenter.end(context, request, null, null);
207+
public Map<String, ?> producerInterceptorConfigProperties() {
208+
Map<String, Object> config = new HashMap<>();
209+
config.put(
210+
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
211+
OpenTelemetryProducerInterceptor.class.getName());
212+
config.put(
213+
OpenTelemetryProducerInterceptor.CONFIG_KEY_KAFKA_PRODUCER_TELEMETRY_SUPPLIER,
214+
new KafkaProducerTelemetrySupplier(producerTelemetry));
215+
return Collections.unmodifiableMap(config);
233216
}
234217

235218
/**
236-
* Build and inject span into record.
219+
* Returns configuration properties that can be used to enable OpenTelemetry instrumentation via
220+
* {@code OpenTelemetryConsumerInterceptor}. Add these resulting properties to the configuration
221+
* map used to initialize a {@link org.apache.kafka.clients.consumer.KafkaConsumer}.
222+
*
223+
* <p>Example usage:
224+
*
225+
* <pre>{@code
226+
* // KafkaTelemetry telemetry = KafkaTelemetry.create(openTelemetry);
227+
* // Map<String, Object> config = new HashMap<>();
228+
* // config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
229+
* // config.putAll(telemetry.consumerInterceptorConfigProperties());
230+
* // try (KafkaConsumer<?, ?> consumer = new KafkaConsumer<>(config)) { ... }
231+
* }</pre>
237232
*
238-
* @param record the producer record to inject span info.
239-
* @param callback the producer send callback
240-
* @return send function's result
233+
* @return the kafka consumer interceptor config properties
241234
*/
242-
<K, V> Future<RecordMetadata> buildAndInjectSpan(
243-
ProducerRecord<K, V> record,
244-
Producer<K, V> producer,
245-
Callback callback,
246-
BiFunction<ProducerRecord<K, V>, Callback, Future<RecordMetadata>> sendFn) {
247-
Context parentContext = Context.current();
248-
249-
KafkaProducerRequest request = KafkaProducerRequest.create(record, producer);
250-
if (!producerInstrumenter.shouldStart(parentContext, request)) {
251-
return sendFn.apply(record, callback);
252-
}
253-
254-
Context context = producerInstrumenter.start(parentContext, request);
255-
propagator().inject(context, record.headers(), SETTER);
256-
257-
try (Scope ignored = context.makeCurrent()) {
258-
return sendFn.apply(record, new ProducerCallback(callback, parentContext, context, request));
259-
}
260-
}
261-
262-
private <K, V> Context buildAndFinishSpan(
263-
ConsumerRecords<K, V> records, Consumer<K, V> consumer, Timer timer) {
264-
return buildAndFinishSpan(
265-
records, KafkaUtil.getConsumerGroup(consumer), KafkaUtil.getClientId(consumer), timer);
266-
}
267-
268-
<K, V> Context buildAndFinishSpan(
269-
ConsumerRecords<K, V> records, String consumerGroup, String clientId, Timer timer) {
270-
if (records.isEmpty()) {
271-
return null;
272-
}
273-
Context parentContext = Context.current();
274-
KafkaReceiveRequest request = KafkaReceiveRequest.create(records, consumerGroup, clientId);
275-
Context context = null;
276-
if (consumerReceiveInstrumenter.shouldStart(parentContext, request)) {
277-
context =
278-
InstrumenterUtil.startAndEnd(
279-
consumerReceiveInstrumenter,
280-
parentContext,
281-
request,
282-
null,
283-
null,
284-
timer.startTime(),
285-
timer.now());
286-
}
287-
288-
// we're returning the context of the receive span so that process spans can use it as
289-
// parent context even though the span has ended
290-
// this is the suggested behavior according to the spec batch receive scenario:
291-
// https://github.com/open-telemetry/semantic-conventions/blob/main/docs/messaging/messaging-spans.md#batch-receiving
292-
return context;
293-
}
294-
295-
private class ProducerCallback implements Callback {
296-
private final Callback callback;
297-
private final Context parentContext;
298-
private final Context context;
299-
private final KafkaProducerRequest request;
300-
301-
ProducerCallback(
302-
Callback callback, Context parentContext, Context context, KafkaProducerRequest request) {
303-
this.callback = callback;
304-
this.parentContext = parentContext;
305-
this.context = context;
306-
this.request = request;
307-
}
308-
309-
@Override
310-
public void onCompletion(RecordMetadata metadata, Exception exception) {
311-
producerInstrumenter.end(context, request, metadata, exception);
312-
313-
if (callback != null) {
314-
try (Scope ignored = parentContext.makeCurrent()) {
315-
callback.onCompletion(metadata, exception);
316-
}
317-
}
318-
}
235+
public Map<String, ?> consumerInterceptorConfigProperties() {
236+
Map<String, Object> config = new HashMap<>();
237+
config.put(
238+
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
239+
OpenTelemetryConsumerInterceptor.class.getName());
240+
config.put(
241+
OpenTelemetryConsumerInterceptor.CONFIG_KEY_KAFKA_CONSUMER_TELEMETRY_SUPPLIER,
242+
new KafkaConsumerTelemetrySupplier(consumerTelemetry));
243+
return Collections.unmodifiableMap(config);
319244
}
320245
}

0 commit comments

Comments
 (0)