77 set_dsm_context ,
88 _dsm_set_sqs_context ,
99 _dsm_set_sns_context ,
10+ _dsm_set_kinesis_context ,
1011 _get_dsm_context_from_lambda ,
1112)
1213from datadog_lambda .trigger import EventTypes , _EventSource
@@ -22,6 +23,10 @@ def setUp(self):
2223 self .mock_dsm_set_sns_context = patcher .start ()
2324 self .addCleanup (patcher .stop )
2425
26+ patcher = patch ("datadog_lambda.dsm._dsm_set_kinesis_context" )
27+ self .mock_dsm_set_kinesis_context = patcher .start ()
28+ self .addCleanup (patcher .stop )
29+
2530 patcher = patch ("ddtrace.internal.datastreams.data_streams_processor" )
2631 self .mock_data_streams_processor = patcher .start ()
2732 self .addCleanup (patcher .stop )
@@ -45,6 +50,13 @@ def setUp(self):
4550 self .mock_calculate_sns_payload_size .return_value = 150
4651 self .addCleanup (patcher .stop )
4752
53+ patcher = patch (
54+ "ddtrace.internal.datastreams.botocore.calculate_kinesis_payload_size"
55+ )
56+ self .mock_calculate_kinesis_payload_size = patcher .start ()
57+ self .mock_calculate_kinesis_payload_size .return_value = 200
58+ self .addCleanup (patcher .stop )
59+
4860 patcher = patch ("ddtrace.internal.datastreams.processor.DsmPathwayCodec.decode" )
4961 self .mock_dsm_pathway_codec_decode = patcher .start ()
5062 self .addCleanup (patcher .stop )
@@ -207,6 +219,88 @@ def test_sns_multiple_records_process_each_record(self):
207219 self .assertIn ("type:sns" , tags )
208220 self .assertEqual (kwargs ["payload_size" ], 150 )
209221
222+ def test_kinesis_event_with_no_records_does_nothing (self ):
223+ """Test that events where Records is None don't trigger DSM processing"""
224+ events_with_no_records = [
225+ {},
226+ {"Records" : None },
227+ {"someOtherField" : "value" },
228+ ]
229+
230+ for event in events_with_no_records :
231+ _dsm_set_kinesis_context (event )
232+ self .mock_data_streams_processor .assert_not_called ()
233+
234+ def test_kinesis_event_triggers_dsm_kinesis_context (self ):
235+ """Test that Kinesis event sources trigger the Kinesis-specific DSM context function"""
236+ kinesis_event = {
237+ "Records" : [
238+ {
239+ "eventSource" : "aws:kinesis" ,
240+ "eventSourceARN" : "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream" ,
241+ "kinesis" : {
242+ "data" : "SGVsbG8gZnJvbSBLaW5lc2lzIQ==" ,
243+ "partitionKey" : "partition-key" ,
244+ },
245+ }
246+ ]
247+ }
248+
249+ event_source = _EventSource (EventTypes .KINESIS )
250+ set_dsm_context (kinesis_event , event_source )
251+
252+ self .mock_dsm_set_kinesis_context .assert_called_once_with (kinesis_event )
253+
254+ def test_kinesis_multiple_records_process_each_record (self ):
255+ """Test that each record in a Kinesis event gets processed individually"""
256+ multi_record_event = {
257+ "Records" : [
258+ {
259+ "eventSourceARN" : "arn:aws:kinesis:us-east-1:123456789012:stream/stream1" ,
260+ "kinesis" : {
261+ "data" : "TWVzc2FnZSAx" ,
262+ "partitionKey" : "partition-1" ,
263+ },
264+ },
265+ {
266+ "eventSourceARN" : "arn:aws:kinesis:us-east-1:123456789012:stream/stream2" ,
267+ "kinesis" : {
268+ "data" : "TWVzc2FnZSAy" ,
269+ "partitionKey" : "partition-2" ,
270+ },
271+ },
272+ {
273+ "eventSourceARN" : "arn:aws:kinesis:us-east-1:123456789012:stream/stream3" ,
274+ "kinesis" : {
275+ "data" : "TWVzc2FnZSAz" ,
276+ "partitionKey" : "partition-3" ,
277+ },
278+ },
279+ ]
280+ }
281+
282+ mock_context = MagicMock ()
283+ self .mock_dsm_pathway_codec_decode .return_value = mock_context
284+
285+ _dsm_set_kinesis_context (multi_record_event )
286+
287+ self .assertEqual (mock_context .set_checkpoint .call_count , 3 )
288+
289+ calls = mock_context .set_checkpoint .call_args_list
290+ expected_arns = [
291+ "arn:aws:kinesis:us-east-1:123456789012:stream/stream1" ,
292+ "arn:aws:kinesis:us-east-1:123456789012:stream/stream2" ,
293+ "arn:aws:kinesis:us-east-1:123456789012:stream/stream3" ,
294+ ]
295+
296+ for i , call in enumerate (calls ):
297+ args , kwargs = call
298+ tags = args [0 ]
299+ self .assertIn ("direction:in" , tags )
300+ self .assertIn (f"topic:{ expected_arns [i ]} " , tags )
301+ self .assertIn ("type:kinesis" , tags )
302+ self .assertEqual (kwargs ["payload_size" ], 200 )
303+
210304
211305class TestGetDSMContext (unittest .TestCase ):
212306 def test_sqs_to_lambda_string_value_format (self ):
0 commit comments