Skip to content

Commit 83db961

Browse files
refactorings
1 parent 81a0e8e commit 83db961

File tree

3 files changed

+72
-33
lines changed

3 files changed

+72
-33
lines changed

datadog_lambda/dsm.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from datadog_lambda import logger
2+
from datadog_lambda.trigger import EventTypes
3+
4+
5+
def set_dsm_context(event, event_source):
6+
7+
if event_source.equals(EventTypes.SQS):
8+
_dsm_set_sqs_context(event)
9+
10+
11+
def _dsm_set_sqs_context(event):
12+
from datadog_lambda.wrapper import format_err_with_traceback
13+
14+
from ddtrace.internal.datastreams.processor import (
15+
DataStreamsProcessor as processor,
16+
DsmPathwayCodec,
17+
)
18+
from ddtrace.internal.datastreams.botocore import (
19+
get_datastreams_context,
20+
calculate_sqs_payload_size,
21+
)
22+
23+
records = event.get("Records", [])
24+
for record in records:
25+
try:
26+
queue_arn = record.get("eventSourceARN", "")
27+
28+
contextjson = get_datastreams_context(record)
29+
payload_size = calculate_sqs_payload_size(record)
30+
31+
ctx = DsmPathwayCodec.decode(contextjson, processor())
32+
ctx.set_checkpoint(
33+
["direction:in", "topic:" + queue_arn, "type:sqs"],
34+
payload_size=payload_size,
35+
)
36+
except Exception as e:
37+
logger.error(format_err_with_traceback(e))

datadog_lambda/wrapper.py

Lines changed: 3 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from importlib import import_module
1010
from time import time_ns
1111

12+
from datadog_lambda.dsm import set_dsm_context
1213
from datadog_lambda.extension import should_use_extension, flush_extension
1314
from datadog_lambda.cold_start import (
1415
set_cold_start,
@@ -291,39 +292,6 @@ def _inject_authorizer_span_headers(self, request_id):
291292
self.response["context"]["_datadog"] = datadog_data
292293

293294
def _before(self, event, context):
294-
def _dsm_set_sqs_context(record):
295-
try:
296-
queue_arn = record.get("eventSourceARN", "")
297-
298-
contextjson = get_datastreams_context(record)
299-
payload_size = calculate_sqs_payload_size(record)
300-
301-
ctx = DsmPathwayCodec.decode(contextjson, processor())
302-
ctx.set_checkpoint(
303-
["direction:in", "topic:" + queue_arn, "type:sqs"],
304-
payload_size=payload_size,
305-
)
306-
307-
except Exception as e:
308-
logger.error(format_err_with_traceback(e))
309-
310-
if self.data_streams_enabled:
311-
from ddtrace.internal.datastreams.processor import (
312-
DataStreamsProcessor as processor,
313-
DsmPathwayCodec,
314-
)
315-
from ddtrace.internal.datastreams.botocore import (
316-
get_datastreams_context,
317-
calculate_sqs_payload_size,
318-
)
319-
320-
if isinstance(event, dict) and "Records" in event and event["Records"]:
321-
sqs_records = [
322-
r for r in event["Records"] if r.get("eventSource") == "aws:sqs"
323-
]
324-
if sqs_records:
325-
for record in sqs_records:
326-
_dsm_set_sqs_context(record)
327295

328296
try:
329297
self.response = None
@@ -360,6 +328,8 @@ def _dsm_set_sqs_context(record):
360328
self.inferred_span = create_inferred_span(
361329
event, context, event_source, self.decode_authorizer_context
362330
)
331+
if self.data_streams_enabled:
332+
set_dsm_context(event, event_source)
363333
self.span = create_function_execution_span(
364334
context=context,
365335
function_name=self.function_name,

tests/test_wrapper.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,11 @@ def return_type_test(event, context):
565565

566566
@patch.dict(os.environ, {"DD_DATA_STREAMS_ENABLED": "true"})
567567
def test_datadog_lambda_wrapper_dsm_sqs_context_pathway_verification(self):
568+
from datadog_lambda.trigger import _EventSource, EventTypes
569+
570+
sqs_event_source = _EventSource(EventTypes.SQS)
571+
self.mock_extract_dd_trace_context.return_value = ({}, None, sqs_event_source)
572+
568573
with patch(
569574
"ddtrace.internal.datastreams.processor.get_connection"
570575
) as mock_get_connection:
@@ -729,6 +734,33 @@ def lambda_handler(event, context):
729734

730735
processor_instance.shutdown(timeout=0.1)
731736

737+
@patch.dict(os.environ, {"DD_DATA_STREAMS_ENABLED": "true"})
738+
@patch("datadog_lambda.wrapper.set_dsm_context")
739+
def test_set_dsm_context_called_when_enabled(self, mock_set_dsm_context):
740+
@wrapper.datadog_lambda_wrapper
741+
def lambda_handler(event, context):
742+
return {"statusCode": 200, "body": "processed"}
743+
744+
lambda_event = {}
745+
lambda_handler(lambda_event, get_mock_context())
746+
747+
mock_set_dsm_context.assert_called_once()
748+
749+
@patch("datadog_lambda.wrapper.set_dsm_context")
750+
def test_set_dsm_context_not_called_when_disabled(self, mock_set_dsm_context):
751+
# Ensure DD_DATA_STREAMS_ENABLED is not in environment
752+
if "DD_DATA_STREAMS_ENABLED" in os.environ:
753+
del os.environ["DD_DATA_STREAMS_ENABLED"]
754+
755+
@wrapper.datadog_lambda_wrapper
756+
def lambda_handler(event, context):
757+
return {"statusCode": 200, "body": "processed"}
758+
759+
lambda_event = {}
760+
lambda_handler(lambda_event, get_mock_context())
761+
762+
mock_set_dsm_context.assert_not_called()
763+
732764

733765
class TestLambdaDecoratorSettings(unittest.TestCase):
734766
def test_some_envs_should_depend_on_dd_tracing_enabled(self):

0 commit comments

Comments
 (0)