@@ -11,24 +11,28 @@ def set_dsm_context(event, event_source):
1111
1212
1313def _dsm_set_sqs_context (event ):
14- from ddtrace .data_streams import set_consume_checkpoint
15-
1614 records = event .get ("Records" )
1715 if records is None :
1816 return
1917
2018 for record in records :
21- try :
22- arn = record .get ("eventSourceARN" , "" )
23- context_json = _get_dsm_context_from_lambda (record )
24- if not context_json :
25- logger .debug ("DataStreams skipped lambda message: %r" , record )
26- return
27-
28- carrier_get = _create_carrier_get (context_json )
29- set_consume_checkpoint ("sqs" , arn , carrier_get )
30- except Exception as e :
31- logger .error (f"Unable to set dsm context: { e } " )
19+ arn = record .get ("eventSourceARN" , "" )
20+ _set_dsm_context_for_record (record , "sqs" , arn )
21+
22+
23+ def _set_dsm_context_for_record (record , type , arn ):
24+ from ddtrace .data_streams import set_consume_checkpoint
25+
26+ try :
27+ context_json = _get_dsm_context_from_lambda (record )
28+ if not context_json :
29+ logger .debug ("DataStreams skipped lambda message: %r" , record )
30+ return
31+
32+ carrier_get = _create_carrier_get (context_json )
33+ set_consume_checkpoint (type , arn , carrier_get , manual_checkpoint = False )
34+ except Exception as e :
35+ logger .error (f"Unable to set dsm context: { e } " )
3236
3337
3438def _get_dsm_context_from_lambda (message ):
0 commit comments