Skip to content

Commit 235659b

Browse files
fixes
1 parent c066b8f commit 235659b

File tree

2 files changed

+132
-26
lines changed

2 files changed

+132
-26
lines changed

datadog_lambda/dsm.py

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,27 @@ def _dsm_set_sqs_context(event):
1717

1818
for record in records:
1919
arn = record.get("eventSourceARN", "")
20-
_set_dsm_context_for_record(record, "sqs", arn)
20+
try:
21+
context_json = _get_dsm_context_from_sqs_lambda(record)
22+
if not context_json:
23+
return
24+
_set_dsm_context_for_record(context_json, "sqs", arn)
2125

26+
except Exception as e:
27+
logger.error(f"Unable to set dsm context: {e}")
2228

23-
def _set_dsm_context_for_record(record, type, arn):
24-
from ddtrace.data_streams import set_consume_checkpoint
2529

26-
try:
27-
context_json = _get_dsm_context_from_lambda(record)
28-
if not context_json:
29-
logger.debug("DataStreams skipped lambda message: %r", record)
30-
return
30+
def _set_dsm_context_for_record(context_json, type, arn):
31+
from ddtrace.data_streams import set_consume_checkpoint
3132

32-
carrier_get = _create_carrier_get(context_json)
33-
set_consume_checkpoint(type, arn, carrier_get, manual_checkpoint=False)
34-
except Exception as e:
35-
logger.error(f"Unable to set dsm context: {e}")
33+
carrier_get = _create_carrier_get(context_json)
34+
set_consume_checkpoint(type, arn, carrier_get, manual_checkpoint=False)
3635

3736

38-
def _get_dsm_context_from_lambda(message):
37+
def _get_dsm_context_from_sqs_lambda(message):
3938
"""
40-
Lambda-specific message formats:
41-
- message.messageAttributes._datadog.stringValue (SQS -> lambda)
39+
Lambda-specific message shape for SQS -> Lambda:
40+
- message.messageAttributes._datadog.stringValue
4241
"""
4342
context_json = None
4443
message_attributes = message.get("messageAttributes")
@@ -53,8 +52,10 @@ def _get_dsm_context_from_lambda(message):
5352
datadog_attr = message_attributes["_datadog"]
5453

5554
if "stringValue" in datadog_attr:
56-
# SQS -> lambda
5755
context_json = json.loads(datadog_attr["stringValue"])
56+
if not isinstance(context_json, dict):
57+
logger.debug("DataStreams did not handle lambda message: %r", message)
58+
return None
5859
else:
5960
logger.debug("DataStreams did not handle lambda message: %r", message)
6061

tests/test_dsm.py

Lines changed: 115 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
from datadog_lambda.dsm import (
66
set_dsm_context,
77
_dsm_set_sqs_context,
8-
_get_dsm_context_from_lambda,
8+
_get_dsm_context_from_sqs_lambda,
9+
_create_carrier_get,
910
)
1011
from datadog_lambda.trigger import EventTypes, _EventSource
1112

@@ -20,8 +21,12 @@ def setUp(self):
2021
self.mock_set_consume_checkpoint = patcher.start()
2122
self.addCleanup(patcher.stop)
2223

23-
patcher = patch("datadog_lambda.dsm._get_dsm_context_from_lambda")
24-
self.mock_get_dsm_context_from_lambda = patcher.start()
24+
patcher = patch("datadog_lambda.dsm._get_dsm_context_from_sqs_lambda")
25+
self.mock_get_dsm_context_from_sqs_lambda = patcher.start()
26+
self.addCleanup(patcher.stop)
27+
28+
patcher = patch("datadog_lambda.dsm.logger")
29+
self.mock_logger = patcher.start()
2530
self.addCleanup(patcher.stop)
2631

2732
def test_non_sqs_event_source_does_nothing(self):
@@ -104,7 +109,7 @@ def test_sqs_multiple_records_process_each_record(self):
104109
]
105110
}
106111

107-
self.mock_get_dsm_context_from_lambda.side_effect = [
112+
self.mock_get_dsm_context_from_sqs_lambda.side_effect = [
108113
{"dd-pathway-ctx-base64": "context1"},
109114
{"dd-pathway-ctx-base64": "context2"},
110115
{"dd-pathway-ctx-base64": "context3"},
@@ -129,14 +134,44 @@ def test_sqs_multiple_records_process_each_record(self):
129134
carrier_get_func = args[2]
130135

131136
self.assertEqual(service_type, "sqs")
132-
133137
self.assertEqual(arn, expected_arns[i])
134138

135139
pathway_ctx = carrier_get_func("dd-pathway-ctx-base64")
140+
136141
self.assertEqual(pathway_ctx, expected_contexts[i])
137142

143+
def test_set_context_exception_handled(self):
144+
"""Test that exceptions in _get_dsm_context_from_sqs_lambda are properly handled"""
145+
# Make _get_dsm_context_from_sqs_lambda raise an exception
146+
self.mock_get_dsm_context_from_sqs_lambda.side_effect = Exception(
147+
"JSON parsing error"
148+
)
138149

139-
class TestGetDSMContext(unittest.TestCase):
150+
event = {
151+
"Records": [
152+
{
153+
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:my-queue",
154+
"body": "Test message",
155+
"messageAttributes": {
156+
"_datadog": {
157+
"stringValue": "invalid json",
158+
"dataType": "String",
159+
}
160+
},
161+
}
162+
]
163+
}
164+
165+
_dsm_set_sqs_context(event)
166+
167+
self.mock_logger.error.assert_called_once_with(
168+
"Unable to set dsm context: JSON parsing error"
169+
)
170+
171+
self.mock_set_consume_checkpoint.assert_not_called()
172+
173+
174+
class TestGetDSMContextFromSQS(unittest.TestCase):
140175
def test_sqs_to_lambda_string_value_format(self):
141176
"""Test format: message.messageAttributes._datadog.stringValue (SQS -> lambda)"""
142177
trace_context = {
@@ -175,22 +210,53 @@ def test_sqs_to_lambda_string_value_format(self):
175210
"awsRegion": "us-east-2",
176211
}
177212

178-
result = _get_dsm_context_from_lambda(lambda_record)
213+
result = _get_dsm_context_from_sqs_lambda(lambda_record)
179214

180215
assert result is not None
181216
assert result == trace_context
182217
assert result["x-datadog-trace-id"] == "789123456"
183218
assert result["x-datadog-parent-id"] == "321987654"
184219
assert result["dd-pathway-ctx"] == "test-pathway-ctx"
185220

221+
def test_sqs_record_context_not_dict(self):
222+
"""Test if that context is not a dict, get_dsm_context_from_sqs_lambda returns None"""
223+
224+
message_string = {
225+
"messageId": "test-message-id",
226+
"messageAttributes": {
227+
"_datadog": {
228+
"stringValue": '"just a string"',
229+
"dataType": "String",
230+
}
231+
},
232+
}
233+
234+
result = _get_dsm_context_from_sqs_lambda(message_string)
235+
236+
assert result is None
237+
238+
message_array = {
239+
"messageId": "test-message-id",
240+
"messageAttributes": {
241+
"_datadog": {
242+
"stringValue": '["array", "values"]',
243+
"dataType": "String",
244+
}
245+
},
246+
}
247+
248+
result = _get_dsm_context_from_sqs_lambda(message_array)
249+
250+
assert result is None
251+
186252
def test_no_message_attributes(self):
187253
"""Test message without MessageAttributes returns None."""
188254
message = {
189255
"messageId": "test-message-id",
190256
"body": "Test message without attributes",
191257
}
192258

193-
result = _get_dsm_context_from_lambda(message)
259+
result = _get_dsm_context_from_sqs_lambda(message)
194260

195261
assert result is None
196262

@@ -204,7 +270,7 @@ def test_no_datadog_attribute(self):
204270
},
205271
}
206272

207-
result = _get_dsm_context_from_lambda(message)
273+
result = _get_dsm_context_from_sqs_lambda(message)
208274

209275
assert result is None
210276

@@ -215,6 +281,45 @@ def test_empty_datadog_attribute(self):
215281
"messageAttributes": {"_datadog": {}},
216282
}
217283

218-
result = _get_dsm_context_from_lambda(message)
284+
result = _get_dsm_context_from_sqs_lambda(message)
219285

220286
assert result is None
287+
288+
289+
class TestCarrierGet(unittest.TestCase):
290+
def test_carrier_get_returns_correct_values(self):
291+
"""Test that carrier_get function returns correct values from context_json"""
292+
context_json = {
293+
"x-datadog-trace-id": "789123456",
294+
"x-datadog-parent-id": "321987654",
295+
"dd-pathway-ctx": "test-pathway-ctx",
296+
"custom-header": "custom-value",
297+
}
298+
299+
carrier_get = _create_carrier_get(context_json)
300+
301+
assert carrier_get("x-datadog-trace-id") == "789123456"
302+
assert carrier_get("x-datadog-parent-id") == "321987654"
303+
assert carrier_get("dd-pathway-ctx") == "test-pathway-ctx"
304+
assert carrier_get("custom-header") == "custom-value"
305+
assert carrier_get("non-existent-key") is None
306+
307+
def test_carrier_get_with_empty_context(self):
308+
"""Test carrier_get with empty context_json"""
309+
context_json = {}
310+
311+
carrier_get = _create_carrier_get(context_json)
312+
313+
assert carrier_get("any-key") is None
314+
assert carrier_get("x-datadog-trace-id") is None
315+
316+
def test_carrier_get_function_closure(self):
317+
"""Test that each carrier_get function has its own closure"""
318+
context_json_1 = {"key": "value1"}
319+
context_json_2 = {"key": "value2"}
320+
321+
carrier_get_1 = _create_carrier_get(context_json_1)
322+
carrier_get_2 = _create_carrier_get(context_json_2)
323+
324+
assert carrier_get_1("key") == "value1"
325+
assert carrier_get_2("key") == "value2"

0 commit comments

Comments
 (0)