@@ -2743,6 +2743,71 @@ def test_sqs_batch_processing(self):
27432743 carrier_get_2 = args_2 [2 ]
27442744 self .assertEqual (carrier_get_2 ("dd-pathway-ctx-base64" ), "record2" )
27452745
2746+ def test_sqs_batch_processing_with_invalid_record (self ):
2747+ dd_data_1 = {"dd-pathway-ctx-base64" : "record1" }
2748+ dd_data_3 = {"dd-pathway-ctx-base64" : "record3" }
2749+ dd_json_data_1 = json .dumps (dd_data_1 )
2750+ dd_json_data_3 = json .dumps (dd_data_3 )
2751+
2752+ event = {
2753+ "Records" : [
2754+ {
2755+ "eventSourceARN" : "arn:aws:sqs:us-east-1:123456789012:test-queue" ,
2756+ "messageAttributes" : {
2757+ "_datadog" : {
2758+ "dataType" : "String" ,
2759+ "stringValue" : dd_json_data_1 ,
2760+ }
2761+ },
2762+ "eventSource" : "aws:sqs" ,
2763+ },
2764+ {
2765+ "eventSourceARN" : "arn:aws:sqs:us-east-1:123456789012:test-queue" ,
2766+ "messageAttributes" : {
2767+ "_datadog" : {
2768+ "dataType" : "String" ,
2769+ "stringValue" : "invalid json" , # This will cause extraction to fail
2770+ }
2771+ },
2772+ "eventSource" : "aws:sqs" ,
2773+ },
2774+ {
2775+ "eventSourceARN" : "arn:aws:sqs:us-east-1:123456789012:test-queue" ,
2776+ "messageAttributes" : {
2777+ "_datadog" : {
2778+ "dataType" : "String" ,
2779+ "stringValue" : dd_json_data_3 ,
2780+ }
2781+ },
2782+ "eventSource" : "aws:sqs" ,
2783+ },
2784+ ]
2785+ }
2786+
2787+ extract_context_from_sqs_or_sns_event_or_context (
2788+ event , self .lambda_context , parse_event_source (event )
2789+ )
2790+
2791+ self .assertEqual (self .mock_checkpoint .call_count , 3 )
2792+
2793+ args_1 , _ = self .mock_checkpoint .call_args_list [0 ]
2794+ self .assertEqual (args_1 [0 ], "sqs" )
2795+ self .assertEqual (args_1 [1 ], "arn:aws:sqs:us-east-1:123456789012:test-queue" )
2796+ carrier_get_1 = args_1 [2 ]
2797+ self .assertEqual (carrier_get_1 ("dd-pathway-ctx-base64" ), "record1" )
2798+
2799+ args_2 , _ = self .mock_checkpoint .call_args_list [1 ]
2800+ self .assertEqual (args_2 [0 ], "sqs" )
2801+ self .assertEqual (args_2 [1 ], "arn:aws:sqs:us-east-1:123456789012:test-queue" )
2802+ carrier_get_2 = args_2 [2 ]
2803+ self .assertEqual (carrier_get_2 ("dd-pathway-ctx-base64" ), None )
2804+
2805+ args_3 , _ = self .mock_checkpoint .call_args_list [2 ]
2806+ self .assertEqual (args_3 [0 ], "sqs" )
2807+ self .assertEqual (args_3 [1 ], "arn:aws:sqs:us-east-1:123456789012:test-queue" )
2808+ carrier_get_3 = args_3 [2 ]
2809+ self .assertEqual (carrier_get_3 ("dd-pathway-ctx-base64" ), "record3" )
2810+
27462811 def test_sqs_source_arn_not_found (self ):
27472812 event = {
27482813 "Records" : [
@@ -3019,53 +3084,6 @@ def test_sns_data_streams_disabled(self):
30193084
30203085 self .mock_checkpoint .assert_not_called ()
30213086
3022- def test_sns_batch_processing (self ):
3023- dd_data_1 = {"dd-pathway-ctx-base64" : "record1" }
3024- dd_data_2 = {"dd-pathway-ctx-base64" : "record2" }
3025- dd_json_data_1 = json .dumps (dd_data_1 )
3026- dd_json_data_2 = json .dumps (dd_data_2 )
3027-
3028- event = {
3029- "Records" : [
3030- {
3031- "Sns" : {
3032- "TopicArn" : "arn:aws:sns:us-east-1:123456789012:test-topic" ,
3033- "MessageAttributes" : {
3034- "_datadog" : {"Type" : "String" , "Value" : dd_json_data_1 }
3035- },
3036- },
3037- "eventSource" : "aws:sns" ,
3038- },
3039- {
3040- "Sns" : {
3041- "TopicArn" : "arn:aws:sns:us-east-1:123456789012:test-topic" ,
3042- "MessageAttributes" : {
3043- "_datadog" : {"Type" : "String" , "Value" : dd_json_data_2 }
3044- },
3045- },
3046- "eventSource" : "aws:sns" ,
3047- },
3048- ]
3049- }
3050-
3051- extract_context_from_sqs_or_sns_event_or_context (
3052- event , self .lambda_context , parse_event_source (event )
3053- )
3054-
3055- self .assertEqual (self .mock_checkpoint .call_count , 2 )
3056-
3057- args_1 , _ = self .mock_checkpoint .call_args_list [0 ]
3058- self .assertEqual (args_1 [0 ], "sns" )
3059- self .assertEqual (args_1 [1 ], "arn:aws:sns:us-east-1:123456789012:test-topic" )
3060- carrier_get_1 = args_1 [2 ]
3061- self .assertEqual (carrier_get_1 ("dd-pathway-ctx-base64" ), "record1" )
3062-
3063- args_2 , _ = self .mock_checkpoint .call_args_list [1 ]
3064- self .assertEqual (args_2 [0 ], "sns" )
3065- self .assertEqual (args_2 [1 ], "arn:aws:sns:us-east-1:123456789012:test-topic" )
3066- carrier_get_2 = args_2 [2 ]
3067- self .assertEqual (carrier_get_2 ("dd-pathway-ctx-base64" ), "record2" )
3068-
30693087 # SNS -> SQS TESTS
30703088
30713089 def test_sns_to_sqs_context_propagated_string_value (self ):
@@ -3362,6 +3380,81 @@ def test_sns_to_sqs_batch_processing(self):
33623380 carrier_get_2 = args_2 [2 ]
33633381 self .assertEqual (carrier_get_2 ("dd-pathway-ctx-base64" ), "record2" )
33643382
3383+ def test_sns_to_sqs_batch_processing_with_invalid_record (self ):
3384+ dd_data_1 = {"dd-pathway-ctx-base64" : "record1" }
3385+ dd_data_3 = {"dd-pathway-ctx-base64" : "record3" }
3386+ dd_json_data_1 = json .dumps (dd_data_1 )
3387+ dd_json_data_3 = json .dumps (dd_data_3 )
3388+
3389+ sns_message_1 = {
3390+ "Type" : "Notification" ,
3391+ "TopicArn" : "arn:aws:sns:us-east-1:123456789012:test-topic" ,
3392+ "MessageAttributes" : {
3393+ "_datadog" : {"Type" : "String" , "Value" : dd_json_data_1 }
3394+ },
3395+ }
3396+ sns_message_2 = {
3397+ "Type" : "Notification" ,
3398+ "TopicArn" : "arn:aws:sns:us-east-1:123456789012:test-topic" ,
3399+ "MessageAttributes" : {
3400+ "_datadog" : {
3401+ "Type" : "String" ,
3402+ "Value" : "invalid json" ,
3403+ } # This will cause extraction to fail
3404+ },
3405+ }
3406+ sns_message_3 = {
3407+ "Type" : "Notification" ,
3408+ "TopicArn" : "arn:aws:sns:us-east-1:123456789012:test-topic" ,
3409+ "MessageAttributes" : {
3410+ "_datadog" : {"Type" : "String" , "Value" : dd_json_data_3 }
3411+ },
3412+ }
3413+
3414+ event = {
3415+ "Records" : [
3416+ {
3417+ "eventSourceARN" : "arn:aws:sqs:us-east-1:123456789012:test-queue" ,
3418+ "body" : json .dumps (sns_message_1 ),
3419+ "eventSource" : "aws:sqs" ,
3420+ },
3421+ {
3422+ "eventSourceARN" : "arn:aws:sqs:us-east-1:123456789012:test-queue" ,
3423+ "body" : json .dumps (sns_message_2 ),
3424+ "eventSource" : "aws:sqs" ,
3425+ },
3426+ {
3427+ "eventSourceARN" : "arn:aws:sqs:us-east-1:123456789012:test-queue" ,
3428+ "body" : json .dumps (sns_message_3 ),
3429+ "eventSource" : "aws:sqs" ,
3430+ },
3431+ ]
3432+ }
3433+
3434+ extract_context_from_sqs_or_sns_event_or_context (
3435+ event , self .lambda_context , parse_event_source (event )
3436+ )
3437+
3438+ self .assertEqual (self .mock_checkpoint .call_count , 3 )
3439+
3440+ args_1 , _ = self .mock_checkpoint .call_args_list [0 ]
3441+ self .assertEqual (args_1 [0 ], "sqs" )
3442+ self .assertEqual (args_1 [1 ], "arn:aws:sqs:us-east-1:123456789012:test-queue" )
3443+ carrier_get_1 = args_1 [2 ]
3444+ self .assertEqual (carrier_get_1 ("dd-pathway-ctx-base64" ), "record1" )
3445+
3446+ args_2 , _ = self .mock_checkpoint .call_args_list [1 ]
3447+ self .assertEqual (args_2 [0 ], "sqs" )
3448+ self .assertEqual (args_2 [1 ], "arn:aws:sqs:us-east-1:123456789012:test-queue" )
3449+ carrier_get_2 = args_2 [2 ]
3450+ self .assertEqual (carrier_get_2 ("dd-pathway-ctx-base64" ), None )
3451+
3452+ args_3 , _ = self .mock_checkpoint .call_args_list [2 ]
3453+ self .assertEqual (args_3 [0 ], "sqs" )
3454+ self .assertEqual (args_3 [1 ], "arn:aws:sqs:us-east-1:123456789012:test-queue" )
3455+ carrier_get_3 = args_3 [2 ]
3456+ self .assertEqual (carrier_get_3 ("dd-pathway-ctx-base64" ), "record3" )
3457+
33653458 def test_sns_to_sqs_source_arn_not_found (self ):
33663459 sns_notification = {
33673460 "Type" : "Notification" ,
@@ -3574,6 +3667,63 @@ def test_kinesis_batch_processing(self):
35743667 carrier_get_2 = args_2 [2 ]
35753668 self .assertEqual (carrier_get_2 ("dd-pathway-ctx-base64" ), "record2" )
35763669
3670+ def test_kinesis_batch_processing_with_invalid_record (self ):
3671+ dd_data_1 = {"dd-pathway-ctx-base64" : "record1" }
3672+ dd_data_3 = {"dd-pathway-ctx-base64" : "record3" }
3673+
3674+ kinesis_data_1 = {"_datadog" : dd_data_1 , "message" : "test1" }
3675+ kinesis_data_2 = {"invalid" : "data" }
3676+ kinesis_data_3 = {"_datadog" : dd_data_3 , "message" : "test3" }
3677+
3678+ encoded_data_1 = base64 .b64encode (json .dumps (kinesis_data_1 ).encode ()).decode ()
3679+ encoded_data_2 = base64 .b64encode (json .dumps (kinesis_data_2 ).encode ()).decode ()
3680+ encoded_data_3 = base64 .b64encode (json .dumps (kinesis_data_3 ).encode ()).decode ()
3681+
3682+ event = {
3683+ "Records" : [
3684+ {
3685+ "eventSourceARN" : "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream" ,
3686+ "kinesis" : {"data" : encoded_data_1 },
3687+ },
3688+ {
3689+ "eventSourceARN" : "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream" ,
3690+ "kinesis" : {"data" : encoded_data_2 },
3691+ },
3692+ {
3693+ "eventSourceARN" : "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream" ,
3694+ "kinesis" : {"data" : encoded_data_3 },
3695+ },
3696+ ]
3697+ }
3698+
3699+ extract_context_from_kinesis_event (event , self .lambda_context )
3700+
3701+ self .assertEqual (self .mock_checkpoint .call_count , 3 )
3702+
3703+ args_1 , _ = self .mock_checkpoint .call_args_list [0 ]
3704+ self .assertEqual (args_1 [0 ], "kinesis" )
3705+ self .assertEqual (
3706+ args_1 [1 ], "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream"
3707+ )
3708+ carrier_get_1 = args_1 [2 ]
3709+ self .assertEqual (carrier_get_1 ("dd-pathway-ctx-base64" ), "record1" )
3710+
3711+ args_2 , _ = self .mock_checkpoint .call_args_list [1 ]
3712+ self .assertEqual (args_2 [0 ], "kinesis" )
3713+ self .assertEqual (
3714+ args_2 [1 ], "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream"
3715+ )
3716+ carrier_get_2 = args_2 [2 ]
3717+ self .assertEqual (carrier_get_2 ("dd-pathway-ctx-base64" ), None )
3718+
3719+ args_3 , _ = self .mock_checkpoint .call_args_list [2 ]
3720+ self .assertEqual (args_3 [0 ], "kinesis" )
3721+ self .assertEqual (
3722+ args_3 [1 ], "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream"
3723+ )
3724+ carrier_get_3 = args_3 [2 ]
3725+ self .assertEqual (carrier_get_3 ("dd-pathway-ctx-base64" ), "record3" )
3726+
35773727 def test_kinesis_source_arn_not_found (self ):
35783728 kinesis_data = {"message" : "test" }
35793729 kinesis_data_str = json .dumps (kinesis_data )
0 commit comments