@@ -623,116 +623,113 @@ def updated_get_datastreams_context(message):
623623
624624 return context_json
625625
626+ # Step 1: Create a message with some context in the message attributes
627+ from ddtrace .internal .datastreams .processor import DataStreamsProcessor
628+
629+ processor_instance = DataStreamsProcessor ()
630+
626631 with patch (
627632 "ddtrace.internal.datastreams.botocore.get_datastreams_context" ,
628633 updated_get_datastreams_context ,
634+ ), patch (
635+ "ddtrace.internal.datastreams.data_streams_processor" ,
636+ return_value = processor_instance ,
629637 ):
630638
631- # Step 1: Create a message with some context in the message attributes
632-
633- from ddtrace .internal .datastreams .processor import DataStreamsProcessor
634-
635- processor_instance = DataStreamsProcessor ()
636-
637- with patch (
638- "ddtrace.internal.datastreams.processor.DataStreamsProcessor" ,
639- return_value = processor_instance ,
640- ):
641-
642- parent_ctx = processor_instance .new_pathway ()
643-
644- parent_ctx .set_checkpoint (
645- ["direction:out" , "topic:upstream-topic" , "type:sqs" ],
646- now_sec = 1640995200.0 ,
647- payload_size = 512 ,
648- )
649- parent_hash = parent_ctx .hash
650- encoded_parent_context = parent_ctx .encode_b64 ()
651-
652- sqs_event = {
653- "Records" : [
654- {
655- "eventSource" : "aws:sqs" ,
656- "eventSourceARN" : "arn:aws:sqs:us-east-1:123456789012:test" ,
657- "Body" : "test message body" ,
658- "messageAttributes" : {
659- "_datadog" : {
660- "stringValue" : json .dumps (
661- {
662- "dd-pathway-ctx-base64" : encoded_parent_context
663- }
664- )
665- }
666- },
667- }
668- ]
669- }
670-
671- # Step 2: Call the handler
672- @wrapper .datadog_lambda_wrapper
673- def lambda_handler (event , context ):
674- return {"statusCode" : 200 , "body" : "processed" }
675-
676- result = lambda_handler (sqs_event , get_mock_context ())
677- self .assertEqual (result ["statusCode" ], 200 )
678-
679- # New context set after handler call
680- current_ctx = processor_instance ._current_context .value
681- self .assertIsNotNone (
682- current_ctx ,
683- "Data streams context should be set after processing SQS message" ,
684- )
685-
686- # Step 3: Check that hash in this context is the child of the hash you passed
687- # Step 4: Check that the right checkpoint was produced during call to handler
688- # The buckets hold the aggregated stats for all checkpoints
689- found_sqs_checkpoint = False
690- for bucket_time , bucket in processor_instance ._buckets .items ():
691- for aggr_key , stats in bucket .pathway_stats .items ():
692- edge_tags_str , hash_value , parent_hash_recorded = aggr_key
693- edge_tags = edge_tags_str .split ("," )
694-
695- if (
696- "direction:in" in edge_tags
697- and "topic:arn:aws:sqs:us-east-1:123456789012:test"
698- in edge_tags
699- and "type:sqs" in edge_tags
700- ):
701- found_sqs_checkpoint = True
702-
703- # EXPLICIT PARENT-CHILD HASH RELATIONSHIP TEST
704- self .assertEqual (
705- parent_hash_recorded ,
706- parent_hash ,
707- f"Parent hash must be preserved: "
708- f"expected { parent_hash } , got { parent_hash_recorded } " ,
709- )
710- self .assertEqual (
711- hash_value ,
712- current_ctx .hash ,
713- f"Child hash must match current context: "
714- f"expected { current_ctx .hash } , got { hash_value } " ,
715- )
716- self .assertNotEqual (
717- hash_value ,
718- parent_hash_recorded ,
719- f"Child hash ({ hash_value } ) must be different from "
720- f"parent hash ({ parent_hash_recorded } ) - proves parent-child" ,
721- )
722- self .assertGreaterEqual (
723- stats .payload_size .count ,
724- 1 ,
725- "Should have one payload size measurement" ,
726- )
727-
728- break
729-
730- self .assertTrue (
731- found_sqs_checkpoint ,
732- "Should have found SQS consumption checkpoint in processor stats" ,
733- )
734-
735- processor_instance .shutdown (timeout = 0.1 )
639+ parent_ctx = processor_instance .new_pathway ()
640+
641+ parent_ctx .set_checkpoint (
642+ ["direction:out" , "topic:upstream-topic" , "type:sqs" ],
643+ now_sec = 1640995200.0 ,
644+ payload_size = 512 ,
645+ )
646+ parent_hash = parent_ctx .hash
647+ encoded_parent_context = parent_ctx .encode_b64 ()
648+
649+ sqs_event = {
650+ "Records" : [
651+ {
652+ "eventSource" : "aws:sqs" ,
653+ "eventSourceARN" : "arn:aws:sqs:us-east-1:123456789012:test" ,
654+ "Body" : "test message body" ,
655+ "messageAttributes" : {
656+ "_datadog" : {
657+ "stringValue" : json .dumps (
658+ {
659+ "dd-pathway-ctx-base64" : encoded_parent_context
660+ }
661+ )
662+ }
663+ },
664+ }
665+ ]
666+ }
667+
668+ # Step 2: Call the handler
669+ @wrapper .datadog_lambda_wrapper
670+ def lambda_handler (event , context ):
671+ return {"statusCode" : 200 , "body" : "processed" }
672+
673+ result = lambda_handler (sqs_event , get_mock_context ())
674+ self .assertEqual (result ["statusCode" ], 200 )
675+
676+ # New context set after handler call
677+ current_ctx = processor_instance ._current_context .value
678+ self .assertIsNotNone (
679+ current_ctx ,
680+ "Data streams context should be set after processing SQS message" ,
681+ )
682+
683+ # Step 3: Check that hash in this context is the child of the hash you passed
684+ # Step 4: Check that the right checkpoint was produced during call to handler
685+ # The buckets hold the aggregated stats for all checkpoints
686+ found_sqs_checkpoint = False
687+ for bucket_time , bucket in processor_instance ._buckets .items ():
688+ for aggr_key , stats in bucket .pathway_stats .items ():
689+ edge_tags_str , hash_value , parent_hash_recorded = aggr_key
690+ edge_tags = edge_tags_str .split ("," )
691+
692+ if (
693+ "direction:in" in edge_tags
694+ and "topic:arn:aws:sqs:us-east-1:123456789012:test"
695+ in edge_tags
696+ and "type:sqs" in edge_tags
697+ ):
698+ found_sqs_checkpoint = True
699+
700+ # EXPLICIT PARENT-CHILD HASH RELATIONSHIP TEST
701+ self .assertEqual (
702+ parent_hash_recorded ,
703+ parent_hash ,
704+ f"Parent hash must be preserved: "
705+ f"expected { parent_hash } , got { parent_hash_recorded } " ,
706+ )
707+ self .assertEqual (
708+ hash_value ,
709+ current_ctx .hash ,
710+ f"Child hash must match current context: "
711+ f"expected { current_ctx .hash } , got { hash_value } " ,
712+ )
713+ self .assertNotEqual (
714+ hash_value ,
715+ parent_hash_recorded ,
716+ f"Child hash ({ hash_value } ) must be different from "
717+ f"parent hash ({ parent_hash_recorded } ) - proves parent-child" ,
718+ )
719+ self .assertGreaterEqual (
720+ stats .payload_size .count ,
721+ 1 ,
722+ "Should have one payload size measurement" ,
723+ )
724+
725+ break
726+
727+ self .assertTrue (
728+ found_sqs_checkpoint ,
729+ "Should have found SQS consumption checkpoint in processor stats" ,
730+ )
731+
732+ processor_instance .shutdown (timeout = 0.1 )
736733
737734 @patch .dict (os .environ , {"DD_DATA_STREAMS_ENABLED" : "true" })
738735 @patch ("datadog_lambda.wrapper.set_dsm_context" )
0 commit comments