Skip to content

Commit 7507357

Browse files
test fixes
1 parent e6a8b4e commit 7507357

File tree

2 files changed

+176
-194
lines changed

2 files changed

+176
-194
lines changed

tests/test_dsm.py

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
import unittest
2+
from unittest.mock import patch, MagicMock
3+
4+
from datadog_lambda.dsm import set_dsm_context, _dsm_set_sqs_context
5+
from datadog_lambda.trigger import EventTypes
6+
7+
8+
class TestDsmContext(unittest.TestCase):
9+
def test_non_sqs_event_source_does_nothing(self):
10+
"""Test that non-SQS event sources don't trigger DSM context setting"""
11+
event = {"Records": [{"body": "test"}]}
12+
13+
mock_event_source = MagicMock()
14+
mock_event_source.equals.return_value = False # Not SQS
15+
16+
with patch("datadog_lambda.dsm._dsm_set_sqs_context") as mock_sqs_context:
17+
set_dsm_context(event, mock_event_source)
18+
19+
mock_event_source.equals.assert_called_once_with(EventTypes.SQS)
20+
mock_sqs_context.assert_not_called()
21+
22+
def test_event_with_no_records_does_nothing(self):
23+
"""Test that events where Records is None don't trigger DSM processing"""
24+
events_with_no_records = [
25+
{},
26+
{"Records": None},
27+
{"someOtherField": "value"},
28+
]
29+
30+
for event in events_with_no_records:
31+
with patch(
32+
"ddtrace.internal.datastreams.data_streams_processor"
33+
) as mock_processor:
34+
_dsm_set_sqs_context(event)
35+
36+
mock_processor.assert_not_called()
37+
38+
def test_sqs_event_triggers_dsm_sqs_context(self):
39+
"""Test that SQS event sources trigger the SQS-specific DSM context function"""
40+
sqs_event = {
41+
"Records": [
42+
{
43+
"eventSource": "aws:sqs",
44+
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:my-queue",
45+
"body": "Hello from SQS!",
46+
}
47+
]
48+
}
49+
50+
mock_event_source = MagicMock()
51+
mock_event_source.equals.return_value = True
52+
53+
with patch("datadog_lambda.dsm._dsm_set_sqs_context") as mock_sqs_context:
54+
set_dsm_context(sqs_event, mock_event_source)
55+
56+
mock_sqs_context.assert_called_once_with(sqs_event)
57+
58+
def test_multiple_records_process_each_record(self):
59+
"""Test that each record in an SQS event gets processed individually"""
60+
multi_record_event = {
61+
"Records": [
62+
{
63+
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:queue1",
64+
"body": "Message 1",
65+
},
66+
{
67+
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:queue2",
68+
"body": "Message 2",
69+
},
70+
{
71+
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:queue3",
72+
"body": "Message 3",
73+
},
74+
]
75+
}
76+
77+
mock_processor = MagicMock()
78+
mock_context = MagicMock()
79+
80+
with patch(
81+
"ddtrace.internal.datastreams.data_streams_processor",
82+
return_value=mock_processor,
83+
):
84+
with patch(
85+
"ddtrace.internal.datastreams.botocore.get_datastreams_context",
86+
return_value={},
87+
):
88+
with patch(
89+
"ddtrace.internal.datastreams.botocore.calculate_sqs_payload_size",
90+
return_value=100,
91+
):
92+
with patch(
93+
"ddtrace.internal.datastreams.processor.DsmPathwayCodec.decode",
94+
return_value=mock_context,
95+
):
96+
_dsm_set_sqs_context(multi_record_event)
97+
98+
assert mock_context.set_checkpoint.call_count == 3
99+
100+
calls = mock_context.set_checkpoint.call_args_list
101+
expected_arns = [
102+
"arn:aws:sqs:us-east-1:123456789012:queue1",
103+
"arn:aws:sqs:us-east-1:123456789012:queue2",
104+
"arn:aws:sqs:us-east-1:123456789012:queue3",
105+
]
106+
107+
for i, call in enumerate(calls):
108+
args, kwargs = call
109+
tags = args[0]
110+
self.assertIn("direction:in", tags)
111+
self.assertIn(f"topic:{expected_arns[i]}", tags)
112+
self.assertIn("type:sqs", tags)
113+
self.assertEqual(kwargs["payload_size"], 100)

tests/test_wrapper.py

Lines changed: 63 additions & 194 deletions
Original file line numberDiff line numberDiff line change
@@ -563,200 +563,69 @@ def return_type_test(event, context):
563563
self.assertEqual(result, test_result)
564564
self.assertFalse(MockPrintExc.called)
565565

566-
@patch.dict(os.environ, {"DD_DATA_STREAMS_ENABLED": "true"})
567-
def test_datadog_lambda_wrapper_dsm_sqs_context_pathway_verification(self):
568-
from datadog_lambda.trigger import _EventSource, EventTypes
569-
570-
sqs_event_source = _EventSource(EventTypes.SQS)
571-
self.mock_extract_dd_trace_context.return_value = ({}, None, sqs_event_source)
572-
573-
with patch(
574-
"ddtrace.internal.datastreams.processor.get_connection"
575-
) as mock_get_connection:
576-
577-
mock_conn = unittest.mock.MagicMock()
578-
mock_response = unittest.mock.MagicMock()
579-
mock_response.status = 200
580-
mock_conn.getresponse.return_value = mock_response
581-
mock_get_connection.return_value = mock_conn
582-
583-
def updated_get_datastreams_context(message):
584-
"""
585-
Updated version that handles the correct message formats
586-
"""
587-
import base64
588-
import json
589-
590-
context_json = None
591-
message_body = message
592-
try:
593-
body = message.get("Body")
594-
if body:
595-
message_body = json.loads(body)
596-
except (ValueError, TypeError):
597-
pass
598-
599-
message_attributes = message_body.get(
600-
"MessageAttributes"
601-
) or message_body.get("messageAttributes")
602-
if not message_attributes:
603-
return None
604-
605-
if "_datadog" not in message_attributes:
606-
return None
607-
608-
datadog_attr = message_attributes["_datadog"]
609-
610-
if message_body.get("Type") == "Notification":
611-
if datadog_attr.get("Type") == "Binary":
612-
context_json = json.loads(
613-
base64.b64decode(datadog_attr["Value"]).decode()
614-
)
615-
elif "StringValue" in datadog_attr:
616-
context_json = json.loads(datadog_attr["StringValue"])
617-
elif "stringValue" in datadog_attr:
618-
context_json = json.loads(datadog_attr["stringValue"])
619-
elif "BinaryValue" in datadog_attr:
620-
context_json = json.loads(datadog_attr["BinaryValue"].decode())
621-
else:
622-
print(f"DEBUG: Unhandled datadog_attr format: {datadog_attr}")
623-
624-
return context_json
625-
626-
# Step 1: Create a message with some context in the message attributes
627-
from ddtrace.internal.datastreams.processor import DataStreamsProcessor
628-
629-
processor_instance = DataStreamsProcessor()
630-
631-
with patch(
632-
"ddtrace.internal.datastreams.botocore.get_datastreams_context",
633-
updated_get_datastreams_context,
634-
), patch(
635-
"ddtrace.internal.datastreams.data_streams_processor",
636-
return_value=processor_instance,
637-
):
638-
639-
parent_ctx = processor_instance.new_pathway()
640-
641-
parent_ctx.set_checkpoint(
642-
["direction:out", "topic:upstream-topic", "type:sqs"],
643-
now_sec=1640995200.0,
644-
payload_size=512,
645-
)
646-
parent_hash = parent_ctx.hash
647-
encoded_parent_context = parent_ctx.encode_b64()
648-
649-
sqs_event = {
650-
"Records": [
651-
{
652-
"eventSource": "aws:sqs",
653-
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test",
654-
"Body": "test message body",
655-
"messageAttributes": {
656-
"_datadog": {
657-
"stringValue": json.dumps(
658-
{
659-
"dd-pathway-ctx-base64": encoded_parent_context
660-
}
661-
)
662-
}
663-
},
664-
}
665-
]
666-
}
667-
668-
# Step 2: Call the handler
669-
@wrapper.datadog_lambda_wrapper
670-
def lambda_handler(event, context):
671-
return {"statusCode": 200, "body": "processed"}
672-
673-
result = lambda_handler(sqs_event, get_mock_context())
674-
self.assertEqual(result["statusCode"], 200)
675-
676-
# New context set after handler call
677-
current_ctx = processor_instance._current_context.value
678-
self.assertIsNotNone(
679-
current_ctx,
680-
"Data streams context should be set after processing SQS message",
681-
)
682-
683-
# Step 3: Check that hash in this context is the child of the hash you passed
684-
# Step 4: Check that the right checkpoint was produced during call to handler
685-
# The buckets hold the aggregated stats for all checkpoints
686-
found_sqs_checkpoint = False
687-
for bucket_time, bucket in processor_instance._buckets.items():
688-
for aggr_key, stats in bucket.pathway_stats.items():
689-
edge_tags_str, hash_value, parent_hash_recorded = aggr_key
690-
edge_tags = edge_tags_str.split(",")
691-
692-
if (
693-
"direction:in" in edge_tags
694-
and "topic:arn:aws:sqs:us-east-1:123456789012:test"
695-
in edge_tags
696-
and "type:sqs" in edge_tags
697-
):
698-
found_sqs_checkpoint = True
699-
700-
# EXPLICIT PARENT-CHILD HASH RELATIONSHIP TEST
701-
self.assertEqual(
702-
parent_hash_recorded,
703-
parent_hash,
704-
f"Parent hash must be preserved: "
705-
f"expected {parent_hash}, got {parent_hash_recorded}",
706-
)
707-
self.assertEqual(
708-
hash_value,
709-
current_ctx.hash,
710-
f"Child hash must match current context: "
711-
f"expected {current_ctx.hash}, got {hash_value}",
712-
)
713-
self.assertNotEqual(
714-
hash_value,
715-
parent_hash_recorded,
716-
f"Child hash ({hash_value}) must be different from "
717-
f"parent hash ({parent_hash_recorded}) - proves parent-child",
718-
)
719-
self.assertGreaterEqual(
720-
stats.payload_size.count,
721-
1,
722-
"Should have one payload size measurement",
723-
)
724-
725-
break
726-
727-
self.assertTrue(
728-
found_sqs_checkpoint,
729-
"Should have found SQS consumption checkpoint in processor stats",
730-
)
731-
732-
processor_instance.shutdown(timeout=0.1)
733-
734-
@patch.dict(os.environ, {"DD_DATA_STREAMS_ENABLED": "true"})
735-
@patch("datadog_lambda.wrapper.set_dsm_context")
736-
def test_set_dsm_context_called_when_enabled(self, mock_set_dsm_context):
737-
@wrapper.datadog_lambda_wrapper
738-
def lambda_handler(event, context):
739-
return {"statusCode": 200, "body": "processed"}
740-
741-
lambda_event = {}
742-
lambda_handler(lambda_event, get_mock_context())
743-
744-
mock_set_dsm_context.assert_called_once()
745-
746-
@patch("datadog_lambda.wrapper.set_dsm_context")
747-
def test_set_dsm_context_not_called_when_disabled(self, mock_set_dsm_context):
748-
# Ensure DD_DATA_STREAMS_ENABLED is not in environment
749-
if "DD_DATA_STREAMS_ENABLED" in os.environ:
750-
del os.environ["DD_DATA_STREAMS_ENABLED"]
751-
752-
@wrapper.datadog_lambda_wrapper
753-
def lambda_handler(event, context):
754-
return {"statusCode": 200, "body": "processed"}
755-
756-
lambda_event = {}
757-
lambda_handler(lambda_event, get_mock_context())
758-
759-
mock_set_dsm_context.assert_not_called()
566+
def test_set_dsm_context_called_when_DSM_and_tracing_enabled(self):
567+
env_vars = {"DD_DATA_STREAMS_ENABLED": "true"}
568+
with patch.dict(os.environ, env_vars):
569+
with patch("datadog_lambda.wrapper.dd_tracing_enabled", True):
570+
with patch(
571+
"datadog_lambda.wrapper.set_dsm_context"
572+
) as set_dsm_context_patch:
573+
574+
@wrapper.datadog_lambda_wrapper
575+
def lambda_handler(event, context):
576+
return "ok"
577+
578+
result = lambda_handler({}, get_mock_context())
579+
assert result == "ok"
580+
assert set_dsm_context_patch.called_once()
581+
582+
def test_set_dsm_context_not_called_when_only_DSM_enabled(self):
583+
env_vars = {"DD_DATA_STREAMS_ENABLED": "true"}
584+
with patch.dict(os.environ, env_vars):
585+
with patch("datadog_lambda.wrapper.dd_tracing_enabled", False):
586+
with patch(
587+
"datadog_lambda.wrapper.set_dsm_context"
588+
) as set_dsm_context_patch:
589+
590+
@wrapper.datadog_lambda_wrapper
591+
def lambda_handler(event, context):
592+
return "ok"
593+
594+
result = lambda_handler({}, get_mock_context())
595+
assert result == "ok"
596+
assert set_dsm_context_patch.call_count == 0
597+
598+
def test_set_dsm_context_not_called_when_only_tracing_enabled(self):
599+
env_vars = {"DD_DATA_STREAMS_ENABLED": "false"}
600+
with patch.dict(os.environ, env_vars):
601+
with patch("datadog_lambda.wrapper.dd_tracing_enabled", True):
602+
with patch(
603+
"datadog_lambda.wrapper.set_dsm_context"
604+
) as set_dsm_context_patch:
605+
606+
@wrapper.datadog_lambda_wrapper
607+
def lambda_handler(event, context):
608+
return "ok"
609+
610+
result = lambda_handler({}, get_mock_context())
611+
assert result == "ok"
612+
assert set_dsm_context_patch.call_count == 0
613+
614+
def test_set_dsm_context_not_called_when_tracing_and_DSM_disabled(self):
615+
env_vars = {"DD_DATA_STREAMS_ENABLED": "false"}
616+
with patch.dict(os.environ, env_vars):
617+
with patch("datadog_lambda.wrapper.dd_tracing_enabled", True):
618+
with patch(
619+
"datadog_lambda.wrapper.set_dsm_context"
620+
) as set_dsm_context_patch:
621+
622+
@wrapper.datadog_lambda_wrapper
623+
def lambda_handler(event, context):
624+
return "ok"
625+
626+
result = lambda_handler({}, get_mock_context())
627+
assert result == "ok"
628+
assert set_dsm_context_patch.call_count == 0
760629

761630

762631
class TestLambdaDecoratorSettings(unittest.TestCase):

0 commit comments

Comments
 (0)