Skip to content

Commit 1d49933

Browse files
committed
Move karafka's DataStreams handling inside the msg span
1 parent ba29a96 commit 1d49933

File tree

1 file changed

+16
-16
lines changed

1 file changed

+16
-16
lines changed

lib/datadog/tracing/contrib/karafka/patcher.rb

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,25 +34,25 @@ def each(&block)
3434
Karafka.extract(headers)
3535
end
3636

37-
if Datadog::DataStreams.enabled?
38-
begin
39-
headers = if message.metadata.respond_to?(:raw_headers)
40-
message.metadata.raw_headers
41-
else
42-
message.metadata.headers
37+
Tracing.trace(Ext::SPAN_MESSAGE_CONSUME, continue_from: trace_digest) do |span, trace|
38+
if Datadog::DataStreams.enabled?
39+
begin
40+
headers = if message.metadata.respond_to?(:raw_headers)
41+
message.metadata.raw_headers
42+
else
43+
message.metadata.headers
44+
end
45+
46+
Datadog::DataStreams.set_consume_checkpoint(
47+
type: 'kafka',
48+
source: message.topic,
49+
auto_instrumentation: true
50+
) { |key| headers[key] }
51+
rescue => e
52+
Datadog.logger.debug("Error setting DSM checkpoint: #{e.class}: #{e}")
4353
end
44-
45-
Datadog::DataStreams.set_consume_checkpoint(
46-
type: 'kafka',
47-
source: message.topic,
48-
auto_instrumentation: true
49-
) { |key| headers[key] }
50-
rescue => e
51-
Datadog.logger.debug("Error setting DSM checkpoint: #{e.class}: #{e}")
5254
end
53-
end
5455

55-
Tracing.trace(Ext::SPAN_MESSAGE_CONSUME, continue_from: trace_digest) do |span, trace|
5656
span.set_tag(Ext::TAG_OFFSET, message.metadata.offset)
5757
span.set_tag(Contrib::Ext::Messaging::TAG_DESTINATION, message.topic)
5858
span.set_tag(Contrib::Ext::Messaging::TAG_SYSTEM, Ext::TAG_SYSTEM)

0 commit comments

Comments
 (0)