@@ -24,6 +24,19 @@ def _dsm_set_sqs_context(event):
2424 _set_dsm_context_for_record (record , "sqs" , arn )
2525
2626
27+ def _dsm_set_sns_context (event ):
28+ records = event .get ("Records" )
29+ if records is None :
30+ return
31+
32+ for record in records :
33+ sns_data = record .get ("Sns" )
34+ if not sns_data :
35+ return
36+ arn = sns_data .get ("TopicArn" , "" )
37+ _set_dsm_context_for_record (sns_data , "sns" , arn )
38+
39+
2740def _set_dsm_context_for_record (record , type , arn ):
2841 from ddtrace .data_streams import set_consume_checkpoint
2942
@@ -39,30 +52,6 @@ def _set_dsm_context_for_record(record, type, arn):
3952 logger .error (f"Unable to set dsm context: { e } " )
4053
4154
42- def _dsm_set_sns_context (event ):
43- from ddtrace .data_streams import set_consume_checkpoint
44-
45- records = event .get ("Records" )
46- if records is None :
47- return
48-
49- for record in records :
50- try :
51- sns_data = record .get ("Sns" )
52- if not sns_data :
53- return
54- arn = sns_data .get ("TopicArn" , "" )
55- context_json = _get_dsm_context_from_lambda (sns_data )
56- if not context_json :
57- logger .debug ("DataStreams skipped lambda message: %r" , record )
58- return
59-
60- carrier_get = _create_carrier_get (context_json )
61- set_consume_checkpoint ("sns" , arn , carrier_get )
62- except Exception as e :
63- logger .error (f"Unable to set dsm context: { e } " )
64-
65-
6655def _get_dsm_context_from_lambda (message ):
6756 """
6857 Lambda-specific message formats:
0 commit comments