@@ -249,15 +249,10 @@ def extract_context_from_sqs_or_sns_event_or_context(
249249 logger .debug ("Failed extracting context as EventBridge to SQS." )
250250
251251 context = None
252- records = (
253- event .get ("Records" , [])
254- if config .data_streams_enabled
255- else [event .get ("Records" )[0 ]]
256- )
257- for idx , record in enumerate (records ):
252+ for idx , record in enumerate (event .get ("Records" , [])):
258253 try :
259254 source_arn = record .get ("eventSourceARN" , "" )
260- dsm_data = None
255+ dd_ctx = None
261256
262257 # logic to deal with SNS => SQS event
263258 if "body" in record :
@@ -311,8 +306,10 @@ def extract_context_from_sqs_or_sns_event_or_context(
311306 )
312307 if idx == 0 :
313308 context = propagator .extract (dd_data )
314- dsm_data = dd_data
315- else :
309+ if not config .data_streams_enabled :
310+ break
311+ dd_ctx = dd_data
312+ elif idx == 0 :
316313 # Handle case where trace context is injected into attributes.AWSTraceHeader
317314 # example:Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1
318315 attrs = event .get ("Records" )[0 ].get ("attributes" )
@@ -329,17 +326,18 @@ def extract_context_from_sqs_or_sns_event_or_context(
329326 logger .debug (
330327 "Found dd-trace injected trace context from AWSTraceHeader"
331328 )
332- if idx == 0 :
333- context = Context (
334- trace_id = int (trace_id_parts [2 ][8 :], 16 ),
335- span_id = int (x_ray_context ["parent_id" ], 16 ),
336- sampling_priority = float (x_ray_context ["sampled" ]),
337- )
329+ context = Context (
330+ trace_id = int (trace_id_parts [2 ][8 :], 16 ),
331+ span_id = int (x_ray_context ["parent_id" ], 16 ),
332+ sampling_priority = float (x_ray_context ["sampled" ]),
333+ )
334+ if not config .data_streams_enabled :
335+ break
338336 except Exception as e :
339337 logger .debug ("The trace extractor returned with error %s" , e )
340338
341339 # Set DSM checkpoint once per record
342- _dsm_set_checkpoint (dsm_data , event_type , source_arn )
340+ _dsm_set_checkpoint (dd_ctx , event_type , source_arn )
343341
344342 return context if context else extract_context_from_lambda_context (lambda_context )
345343
@@ -402,25 +400,15 @@ def extract_context_from_kinesis_event(event, lambda_context):
402400 Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported.
403401 """
404402 source_arn = ""
405- records = (
406- [get_first_record (event )]
407- if not config .data_streams_enabled
408- else event .get ("Records" )
409- )
403+
410404 context = None
411- for idx , record in enumerate (records ):
412- dsm_data = None
405+ for idx , record in enumerate (event . get ( "Records" , []) ):
406+ dd_ctx = None
413407 try :
414408 source_arn = record .get ("eventSourceARN" , "" )
415409 kinesis = record .get ("kinesis" )
416410 if not kinesis :
417- context = (
418- extract_context_from_lambda_context (lambda_context )
419- if idx == 0
420- else context
421- )
422- _dsm_set_checkpoint (None , "kinesis" , source_arn )
423- continue
411+ return extract_context_from_lambda_context (lambda_context )
424412 data = kinesis .get ("data" )
425413 if data :
426414 import base64
@@ -433,10 +421,11 @@ def extract_context_from_kinesis_event(event, lambda_context):
433421 if dd_ctx :
434422 if idx == 0 :
435423 context = propagator .extract (dd_ctx )
436- dsm_data = dd_ctx
424+ if not config .data_streams_enabled :
425+ break
437426 except Exception as e :
438427 logger .debug ("The trace extractor returned with error %s" , e )
439- _dsm_set_checkpoint (dsm_data , "kinesis" , source_arn )
428+ _dsm_set_checkpoint (dd_ctx , "kinesis" , source_arn )
440429 return context if context else extract_context_from_lambda_context (lambda_context )
441430
442431
0 commit comments