11import unittest
2+ import base64
23import json
34from unittest .mock import patch
45
56from datadog_lambda .dsm import (
67 set_dsm_context ,
78 _dsm_set_sqs_context ,
9+ _dsm_set_sns_context ,
810 _get_dsm_context_from_lambda ,
911)
1012from datadog_lambda .trigger import EventTypes , _EventSource
@@ -22,6 +24,12 @@ def setUp(self):
2224
2325 patcher = patch ("datadog_lambda.dsm._get_dsm_context_from_lambda" )
2426 self .mock_get_dsm_context_from_lambda = patcher .start ()
27+ patcher = patch ("datadog_lambda.dsm._dsm_set_sns_context" )
28+ self .mock_dsm_set_sns_context = patcher .start ()
29+ self .addCleanup (patcher .stop )
30+
31+ patcher = patch ("ddtrace.internal.datastreams.data_streams_processor" )
32+ self .mock_data_streams_processor = patcher .start ()
2533 self .addCleanup (patcher .stop )
2634
2735 def test_non_sqs_event_source_does_nothing (self ):
@@ -135,6 +143,123 @@ def test_sqs_multiple_records_process_each_record(self):
135143 pathway_ctx = carrier_get_func ("dd-pathway-ctx-base64" )
136144 self .assertEqual (pathway_ctx , expected_contexts [i ])
137145
146+ def test_sns_event_with_no_records_does_nothing (self ):
147+ """Test that events where Records is None don't trigger DSM processing"""
148+ events_with_no_records = [
149+ {},
150+ {"Records" : None },
151+ {"someOtherField" : "value" },
152+ ]
153+
154+ for event in events_with_no_records :
155+ _dsm_set_sns_context (event )
156+ self .mock_set_consume_checkpoint .assert_not_called ()
157+
158+ def test_sns_event_triggers_dsm_sns_context (self ):
159+ """Test that SNS event sources trigger the SNS-specific DSM context function"""
160+ sns_event = {
161+ "Records" : [
162+ {
163+ "EventSource" : "aws:sns" ,
164+ "Sns" : {
165+ "TopicArn" : "arn:aws:sns:us-east-1:123456789012:my-topic" ,
166+ "Message" : "Hello from SNS!" ,
167+ },
168+ }
169+ ]
170+ }
171+
172+ event_source = _EventSource (EventTypes .SNS )
173+ set_dsm_context (sns_event , event_source )
174+
175+ self .mock_dsm_set_sns_context .assert_called_once_with (sns_event )
176+
177+ def test_sns_multiple_records_process_each_record (self ):
178+ """Test that each record in an SNS event gets processed individually"""
179+ multi_record_event = {
180+ "Records" : [
181+ {
182+ "EventSource" : "aws:sns" ,
183+ "Sns" : {
184+ "TopicArn" : "arn:aws:sns:us-east-1:123456789012:topic1" ,
185+ "Message" : "Message 1" ,
186+ "MessageAttributes" : {
187+ "_datadog" : {
188+ "Type" : "Binary" ,
189+ "Value" : base64 .b64encode (
190+ json .dumps ({"dd-pathway-ctx-base64" : "context1" })
191+ .encode ("utf-8" )
192+ ).decode ("utf-8" )
193+ }
194+ },
195+ }
196+ },
197+ {
198+ "EventSource" : "aws:sns" ,
199+ "Sns" : {
200+ "TopicArn" : "arn:aws:sns:us-east-1:123456789012:topic2" ,
201+ "Message" : "Message 2" ,
202+ "MessageAttributes" : {
203+ "_datadog" : {
204+ "Type" : "Binary" ,
205+ "Value" : base64 .b64encode (
206+ json .dumps ({"dd-pathway-ctx-base64" : "context2" })
207+ .encode ("utf-8" )
208+ ).decode ("utf-8" )
209+ }
210+ },
211+ }
212+ },
213+ {
214+ "EventSource" : "aws:sns" ,
215+ "Sns" : {
216+ "TopicArn" : "arn:aws:sns:us-east-1:123456789012:topic3" ,
217+ "Message" : "Message 3" ,
218+ "MessageAttributes" : {
219+ "_datadog" : {
220+ "Type" : "Binary" ,
221+ "Value" : base64 .b64encode (
222+ json .dumps ({"dd-pathway-ctx-base64" : "context3" })
223+ .encode ("utf-8" )
224+ ).decode ("utf-8" )
225+ }
226+ },
227+ }
228+ },
229+ ]
230+ }
231+
232+ self .mock_get_dsm_context_from_lambda .side_effect = [
233+ {"dd-pathway-ctx-base64" : "context1" },
234+ {"dd-pathway-ctx-base64" : "context2" },
235+ {"dd-pathway-ctx-base64" : "context3" },
236+ ]
237+
238+ _dsm_set_sns_context (multi_record_event )
239+
240+ self .assertEqual (self .mock_set_consume_checkpoint .call_count , 3 )
241+
242+ calls = self .mock_set_consume_checkpoint .call_args_list
243+ expected_arns = [
244+ "arn:aws:sns:us-east-1:123456789012:topic1" ,
245+ "arn:aws:sns:us-east-1:123456789012:topic2" ,
246+ "arn:aws:sns:us-east-1:123456789012:topic3" ,
247+ ]
248+ expected_contexts = ["context1" , "context2" , "context3" ]
249+
250+ for i , call in enumerate (calls ):
251+ args , kwargs = call
252+ service_type = args [0 ]
253+ arn = args [1 ]
254+ carrier_get_func = args [2 ]
255+
256+ self .assertEqual (service_type , "sns" )
257+
258+ self .assertEqual (arn , expected_arns [i ])
259+
260+ pathway_ctx = carrier_get_func ("dd-pathway-ctx-base64" )
261+ self .assertEqual (pathway_ctx , expected_contexts [i ])
262+
138263
139264class TestGetDSMContext (unittest .TestCase ):
140265 def test_sqs_to_lambda_string_value_format (self ):
@@ -183,6 +308,43 @@ def test_sqs_to_lambda_string_value_format(self):
183308 assert result ["x-datadog-parent-id" ] == "321987654"
184309 assert result ["dd-pathway-ctx" ] == "test-pathway-ctx"
185310
311+ def test_sns_to_lambda_format (self ):
312+ """Test format: message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda)"""
313+ trace_context = {
314+ "x-datadog-trace-id" : "111111111" ,
315+ "x-datadog-parent-id" : "222222222" ,
316+ "dd-pathway-ctx" : "test-pathway-ctx" ,
317+ }
318+ binary_data = base64 .b64encode (
319+ json .dumps (trace_context ).encode ("utf-8" )
320+ ).decode ("utf-8" )
321+
322+ sns_lambda_record = {
323+ "EventSource" : "aws:sns" ,
324+ "EventSubscriptionArn" : (
325+ "arn:aws:sns:us-east-1:123456789012:sns-topic:12345678-1234-1234-1234-123456789012"
326+ ),
327+ "Sns" : {
328+ "Type" : "Notification" ,
329+ "MessageId" : "95df01b4-ee98-5cb9-9903-4c221d41eb5e" ,
330+ "TopicArn" : "arn:aws:sns:us-east-1:123456789012:sns-topic" ,
331+ "Subject" : "Test Subject" ,
332+ "Message" : "Hello from SNS!" ,
333+ "Timestamp" : "2023-01-01T12:00:00.000Z" ,
334+ "MessageAttributes" : {
335+ "_datadog" : {"Type" : "Binary" , "Value" : binary_data }
336+ },
337+ },
338+ }
339+
340+ result = _get_dsm_context_from_lambda (sns_lambda_record )
341+
342+ assert result is not None
343+ assert result == trace_context
344+ assert result ["x-datadog-trace-id" ] == "111111111"
345+ assert result ["x-datadog-parent-id" ] == "222222222"
346+ assert result ["dd-pathway-ctx" ] == "test-pathway-ctx"
347+
186348 def test_no_message_attributes (self ):
187349 """Test message without MessageAttributes returns None."""
188350 message = {
0 commit comments