@@ -13,51 +13,50 @@ def set_dsm_context(event, event_source):
1313
1414
1515def _dsm_set_context_helper (
16- event , service_type , arn_extractor , payload_size_calculator
16+ record , service_type , arn , payload_size
1717):
1818 """
1919 Common helper function for setting DSM context.
2020
2121 Args:
2222 event: The Lambda event containing records
2323 service_type: The service type string (example: sqs', 'sns')
24- arn_extractor: Function to extract the ARN from the record
25- payload_size_calculator: Function to calculate payload size
24+ arn: ARN from the record
25+ payload_size: payload size of the record
2626 """
2727 from datadog_lambda .wrapper import format_err_with_traceback
2828 from ddtrace .internal .datastreams import data_streams_processor
2929 from ddtrace .internal .datastreams .processor import DsmPathwayCodec
3030
31- records = event .get ("Records" )
32- if records is None :
33- return
3431 processor = data_streams_processor ()
3532
36- for record in records :
37- try :
38- arn = arn_extractor (record )
39- context_json = _get_dsm_context_from_lambda (record )
40- payload_size = payload_size_calculator (record , context_json )
41-
42- ctx = DsmPathwayCodec .decode (context_json , processor )
43- ctx .set_checkpoint (
44- ["direction:in" , f"topic:{ arn } " , f"type:{ service_type } " ],
45- payload_size = payload_size ,
46- )
47- except Exception as e :
48- logger .error (format_err_with_traceback (e ))
33+ try :
34+ context_json = _get_dsm_context_from_lambda (record )
35+
36+ ctx = DsmPathwayCodec .decode (context_json , processor )
37+ ctx .set_checkpoint (
38+ ["direction:in" , f"topic:{ arn } " , f"type:{ service_type } " ],
39+ payload_size = payload_size ,
40+ )
41+ except Exception as e :
42+ logger .error (format_err_with_traceback (e ))
4943
5044
5145def _dsm_set_sqs_context (event ):
5246 from ddtrace .internal .datastreams .botocore import calculate_sqs_payload_size
47+ from datadog_lambda .wrapper import format_err_with_traceback
5348
54- def sqs_payload_calculator (record , context_json ):
55- return calculate_sqs_payload_size (record )
56-
57- def sqs_arn_extractor (record ):
58- return record .get ("eventSourceARN" , "" )
49+ records = event .get ("Records" )
50+ if records is None :
51+ return
5952
60- _dsm_set_context_helper (event , "sqs" , sqs_arn_extractor , sqs_payload_calculator )
53+ for record in records :
54+ try :
55+ arn = record .get ("eventSourceARN" , "" )
56+ payload_size = calculate_sqs_payload_size (record )
57+ _dsm_set_context_helper (record , "sqs" , arn , payload_size )
58+ except Exception as e :
59+ logger .error (format_err_with_traceback (e ))
6160
6261
6362def _get_dsm_context_from_lambda (message ):
0 commit comments