@@ -563,200 +563,69 @@ def return_type_test(event, context):
563563 self .assertEqual (result , test_result )
564564 self .assertFalse (MockPrintExc .called )
565565
566- @patch .dict (os .environ , {"DD_DATA_STREAMS_ENABLED" : "true" })
567- def test_datadog_lambda_wrapper_dsm_sqs_context_pathway_verification (self ):
568- from datadog_lambda .trigger import _EventSource , EventTypes
569-
570- sqs_event_source = _EventSource (EventTypes .SQS )
571- self .mock_extract_dd_trace_context .return_value = ({}, None , sqs_event_source )
572-
573- with patch (
574- "ddtrace.internal.datastreams.processor.get_connection"
575- ) as mock_get_connection :
576-
577- mock_conn = unittest .mock .MagicMock ()
578- mock_response = unittest .mock .MagicMock ()
579- mock_response .status = 200
580- mock_conn .getresponse .return_value = mock_response
581- mock_get_connection .return_value = mock_conn
582-
583- def updated_get_datastreams_context (message ):
584- """
585- Updated version that handles the correct message formats
586- """
587- import base64
588- import json
589-
590- context_json = None
591- message_body = message
592- try :
593- body = message .get ("Body" )
594- if body :
595- message_body = json .loads (body )
596- except (ValueError , TypeError ):
597- pass
598-
599- message_attributes = message_body .get (
600- "MessageAttributes"
601- ) or message_body .get ("messageAttributes" )
602- if not message_attributes :
603- return None
604-
605- if "_datadog" not in message_attributes :
606- return None
607-
608- datadog_attr = message_attributes ["_datadog" ]
609-
610- if message_body .get ("Type" ) == "Notification" :
611- if datadog_attr .get ("Type" ) == "Binary" :
612- context_json = json .loads (
613- base64 .b64decode (datadog_attr ["Value" ]).decode ()
614- )
615- elif "StringValue" in datadog_attr :
616- context_json = json .loads (datadog_attr ["StringValue" ])
617- elif "stringValue" in datadog_attr :
618- context_json = json .loads (datadog_attr ["stringValue" ])
619- elif "BinaryValue" in datadog_attr :
620- context_json = json .loads (datadog_attr ["BinaryValue" ].decode ())
621- else :
622- print (f"DEBUG: Unhandled datadog_attr format: { datadog_attr } " )
623-
624- return context_json
625-
626- # Step 1: Create a message with some context in the message attributes
627- from ddtrace .internal .datastreams .processor import DataStreamsProcessor
628-
629- processor_instance = DataStreamsProcessor ()
630-
631- with patch (
632- "ddtrace.internal.datastreams.botocore.get_datastreams_context" ,
633- updated_get_datastreams_context ,
634- ), patch (
635- "ddtrace.internal.datastreams.data_streams_processor" ,
636- return_value = processor_instance ,
637- ):
638-
639- parent_ctx = processor_instance .new_pathway ()
640-
641- parent_ctx .set_checkpoint (
642- ["direction:out" , "topic:upstream-topic" , "type:sqs" ],
643- now_sec = 1640995200.0 ,
644- payload_size = 512 ,
645- )
646- parent_hash = parent_ctx .hash
647- encoded_parent_context = parent_ctx .encode_b64 ()
648-
649- sqs_event = {
650- "Records" : [
651- {
652- "eventSource" : "aws:sqs" ,
653- "eventSourceARN" : "arn:aws:sqs:us-east-1:123456789012:test" ,
654- "Body" : "test message body" ,
655- "messageAttributes" : {
656- "_datadog" : {
657- "stringValue" : json .dumps (
658- {
659- "dd-pathway-ctx-base64" : encoded_parent_context
660- }
661- )
662- }
663- },
664- }
665- ]
666- }
667-
668- # Step 2: Call the handler
669- @wrapper .datadog_lambda_wrapper
670- def lambda_handler (event , context ):
671- return {"statusCode" : 200 , "body" : "processed" }
672-
673- result = lambda_handler (sqs_event , get_mock_context ())
674- self .assertEqual (result ["statusCode" ], 200 )
675-
676- # New context set after handler call
677- current_ctx = processor_instance ._current_context .value
678- self .assertIsNotNone (
679- current_ctx ,
680- "Data streams context should be set after processing SQS message" ,
681- )
682-
683- # Step 3: Check that hash in this context is the child of the hash you passed
684- # Step 4: Check that the right checkpoint was produced during call to handler
685- # The buckets hold the aggregated stats for all checkpoints
686- found_sqs_checkpoint = False
687- for bucket_time , bucket in processor_instance ._buckets .items ():
688- for aggr_key , stats in bucket .pathway_stats .items ():
689- edge_tags_str , hash_value , parent_hash_recorded = aggr_key
690- edge_tags = edge_tags_str .split ("," )
691-
692- if (
693- "direction:in" in edge_tags
694- and "topic:arn:aws:sqs:us-east-1:123456789012:test"
695- in edge_tags
696- and "type:sqs" in edge_tags
697- ):
698- found_sqs_checkpoint = True
699-
700- # EXPLICIT PARENT-CHILD HASH RELATIONSHIP TEST
701- self .assertEqual (
702- parent_hash_recorded ,
703- parent_hash ,
704- f"Parent hash must be preserved: "
705- f"expected { parent_hash } , got { parent_hash_recorded } " ,
706- )
707- self .assertEqual (
708- hash_value ,
709- current_ctx .hash ,
710- f"Child hash must match current context: "
711- f"expected { current_ctx .hash } , got { hash_value } " ,
712- )
713- self .assertNotEqual (
714- hash_value ,
715- parent_hash_recorded ,
716- f"Child hash ({ hash_value } ) must be different from "
717- f"parent hash ({ parent_hash_recorded } ) - proves parent-child" ,
718- )
719- self .assertGreaterEqual (
720- stats .payload_size .count ,
721- 1 ,
722- "Should have one payload size measurement" ,
723- )
724-
725- break
726-
727- self .assertTrue (
728- found_sqs_checkpoint ,
729- "Should have found SQS consumption checkpoint in processor stats" ,
730- )
731-
732- processor_instance .shutdown (timeout = 0.1 )
733-
734- @patch .dict (os .environ , {"DD_DATA_STREAMS_ENABLED" : "true" })
735- @patch ("datadog_lambda.wrapper.set_dsm_context" )
736- def test_set_dsm_context_called_when_enabled (self , mock_set_dsm_context ):
737- @wrapper .datadog_lambda_wrapper
738- def lambda_handler (event , context ):
739- return {"statusCode" : 200 , "body" : "processed" }
740-
741- lambda_event = {}
742- lambda_handler (lambda_event , get_mock_context ())
743-
744- mock_set_dsm_context .assert_called_once ()
745-
746- @patch ("datadog_lambda.wrapper.set_dsm_context" )
747- def test_set_dsm_context_not_called_when_disabled (self , mock_set_dsm_context ):
748- # Ensure DD_DATA_STREAMS_ENABLED is not in environment
749- if "DD_DATA_STREAMS_ENABLED" in os .environ :
750- del os .environ ["DD_DATA_STREAMS_ENABLED" ]
751-
752- @wrapper .datadog_lambda_wrapper
753- def lambda_handler (event , context ):
754- return {"statusCode" : 200 , "body" : "processed" }
755-
756- lambda_event = {}
757- lambda_handler (lambda_event , get_mock_context ())
758-
759- mock_set_dsm_context .assert_not_called ()
566+ def test_set_dsm_context_called_when_DSM_and_tracing_enabled (self ):
567+ env_vars = {"DD_DATA_STREAMS_ENABLED" : "true" }
568+ with patch .dict (os .environ , env_vars ):
569+ with patch ("datadog_lambda.wrapper.dd_tracing_enabled" , True ):
570+ with patch (
571+ "datadog_lambda.wrapper.set_dsm_context"
572+ ) as set_dsm_context_patch :
573+
574+ @wrapper .datadog_lambda_wrapper
575+ def lambda_handler (event , context ):
576+ return "ok"
577+
578+ result = lambda_handler ({}, get_mock_context ())
579+ assert result == "ok"
580+ assert set_dsm_context_patch .called_once ()
581+
582+ def test_set_dsm_context_not_called_when_only_DSM_enabled (self ):
583+ env_vars = {"DD_DATA_STREAMS_ENABLED" : "true" }
584+ with patch .dict (os .environ , env_vars ):
585+ with patch ("datadog_lambda.wrapper.dd_tracing_enabled" , False ):
586+ with patch (
587+ "datadog_lambda.wrapper.set_dsm_context"
588+ ) as set_dsm_context_patch :
589+
590+ @wrapper .datadog_lambda_wrapper
591+ def lambda_handler (event , context ):
592+ return "ok"
593+
594+ result = lambda_handler ({}, get_mock_context ())
595+ assert result == "ok"
596+ assert set_dsm_context_patch .call_count == 0
597+
598+ def test_set_dsm_context_not_called_when_only_tracing_enabled (self ):
599+ env_vars = {"DD_DATA_STREAMS_ENABLED" : "false" }
600+ with patch .dict (os .environ , env_vars ):
601+ with patch ("datadog_lambda.wrapper.dd_tracing_enabled" , True ):
602+ with patch (
603+ "datadog_lambda.wrapper.set_dsm_context"
604+ ) as set_dsm_context_patch :
605+
606+ @wrapper .datadog_lambda_wrapper
607+ def lambda_handler (event , context ):
608+ return "ok"
609+
610+ result = lambda_handler ({}, get_mock_context ())
611+ assert result == "ok"
612+ assert set_dsm_context_patch .call_count == 0
613+
614+ def test_set_dsm_context_not_called_when_tracing_and_DSM_disabled (self ):
615+ env_vars = {"DD_DATA_STREAMS_ENABLED" : "false" }
616+ with patch .dict (os .environ , env_vars ):
617+ with patch ("datadog_lambda.wrapper.dd_tracing_enabled" , True ):
618+ with patch (
619+ "datadog_lambda.wrapper.set_dsm_context"
620+ ) as set_dsm_context_patch :
621+
622+ @wrapper .datadog_lambda_wrapper
623+ def lambda_handler (event , context ):
624+ return "ok"
625+
626+ result = lambda_handler ({}, get_mock_context ())
627+ assert result == "ok"
628+ assert set_dsm_context_patch .call_count == 0
760629
761630
762631class TestLambdaDecoratorSettings (unittest .TestCase ):
0 commit comments