Skip to content

Commit cf737b0

Browse files
Refactored appraoch, decouple APM and DSM
1 parent 268c1e5 commit cf737b0

File tree

1 file changed

+143
-116
lines changed

1 file changed

+143
-116
lines changed

datadog_lambda/tracing.py

Lines changed: 143 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,30 @@
6868
LOWER_64_BITS = "LOWER_64_BITS"
6969

7070

71+
def _dsm_set_context_sqs_or_sns_event(event, event_type):
72+
for record in event.get("Records", [])[1:]:
73+
arn = _dsm_get_source_arn(record, event_type)
74+
if not arn:
75+
continue
76+
context = extract_context_from_sqs_or_sns_record(record)
77+
_dsm_set_checkpoint(context, event_type, arn)
78+
79+
80+
def _dsm_set_context_kinesis_event(event):
81+
for record in event.get("Records", [])[1:]:
82+
if (arn := record.get("eventSourceARN")) and (kinesis := record.get("kinesis")):
83+
context = extract_context_from_kinesis_record(kinesis)
84+
_dsm_set_checkpoint(context, "kinesis", arn)
85+
86+
87+
def _dsm_get_source_arn(record, event_type):
88+
return (
89+
record.get("Sns", {}).get("TopicArn")
90+
if event_type == "sns"
91+
else record.get("eventSourceARN")
92+
)
93+
94+
7195
def _dsm_set_checkpoint(context_json, event_type, arn):
7296
if not config.data_streams_enabled:
7397
return
@@ -237,7 +261,6 @@ def extract_context_from_sqs_or_sns_event_or_context(
237261
Falls back to lambda context if no trace data is found in the SQS message attributes.
238262
Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported.
239263
"""
240-
source_arn = ""
241264
event_type = "sqs" if event_source.equals(EventTypes.SQS) else "sns"
242265

243266
# EventBridge => SQS
@@ -248,100 +271,99 @@ def extract_context_from_sqs_or_sns_event_or_context(
248271
except Exception:
249272
logger.debug("Failed extracting context as EventBridge to SQS.")
250273

251-
context = None
252-
for idx, record in enumerate(event.get("Records", [])):
253-
try:
254-
source_arn = record.get("eventSourceARN", "")
255-
dd_ctx = None
256-
257-
# logic to deal with SNS => SQS event
258-
if "body" in record:
259-
body_str = record.get("body")
274+
try:
275+
first_record = event.get("Records")[0]
276+
source_arn = _dsm_get_source_arn(first_record, event_type)
277+
dd_data = extract_context_from_sqs_or_sns_record(first_record)
278+
if dd_data:
279+
if is_step_function_event(dd_data):
260280
try:
261-
body = json.loads(body_str)
262-
if body.get("Type", "") == "Notification" and "TopicArn" in body:
263-
logger.debug("Found SNS message inside SQS event")
264-
record = get_first_record(create_sns_event(body))
281+
return extract_context_from_step_functions(dd_data, None)
265282
except Exception:
266-
pass
267-
268-
msg_attributes = record.get("messageAttributes")
269-
if msg_attributes is None:
270-
sns_record = record.get("Sns") or {}
271-
# SNS->SQS event would extract SNS arn without this check
272-
if event_source.equals(EventTypes.SNS):
273-
source_arn = sns_record.get("TopicArn", "")
274-
msg_attributes = sns_record.get("MessageAttributes") or {}
275-
dd_payload = msg_attributes.get("_datadog")
276-
if dd_payload:
277-
# SQS uses dataType and binaryValue/stringValue
278-
# SNS uses Type and Value
279-
# fmt: off
280-
dd_json_data = None
281-
dd_json_data_type = dd_payload.get("Type") or dd_payload.get("dataType")
282-
if dd_json_data_type == "Binary":
283-
import base64
284-
285-
dd_json_data = dd_payload.get("binaryValue") or dd_payload.get("Value")
286-
if dd_json_data:
287-
dd_json_data = base64.b64decode(dd_json_data)
288-
elif dd_json_data_type == "String":
289-
dd_json_data = dd_payload.get("stringValue") or dd_payload.get("Value")
290-
# fmt: on
291-
else:
292283
logger.debug(
293-
"Datadog Lambda Python only supports extracting trace"
294-
"context from String or Binary SQS/SNS message attributes"
284+
"Failed to extract Step Functions context from SQS/SNS event."
295285
)
286+
context = propagator.extract(dd_data)
287+
_dsm_set_checkpoint(dd_data, event_type, source_arn)
288+
if config.data_streams_enabled:
289+
_dsm_set_context_sqs_or_sns_event(event, event_type)
290+
return context
291+
else:
292+
# Handle case where trace context is injected into attributes.AWSTraceHeader
293+
# example: Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1
294+
attrs = event.get("Records")[0].get("attributes")
295+
if attrs:
296+
x_ray_header = attrs.get("AWSTraceHeader")
297+
if x_ray_header:
298+
x_ray_context = parse_xray_header(x_ray_header)
299+
trace_id_parts = x_ray_context.get("trace_id", "").split("-")
300+
if len(trace_id_parts) > 2 and trace_id_parts[2].startswith(
301+
DD_TRACE_JAVA_TRACE_ID_PADDING
302+
):
303+
# If it starts with eight 0's padding,
304+
# then this AWSTraceHeader contains Datadog injected trace context
305+
logger.debug(
306+
"Found dd-trace injected trace context from AWSTraceHeader"
307+
)
308+
return Context(
309+
trace_id=int(trace_id_parts[2][8:], 16),
310+
span_id=int(x_ray_context["parent_id"], 16),
311+
sampling_priority=float(x_ray_context["sampled"]),
312+
)
313+
# Still want to set a DSM checkpoint even if DSM context not propagated
314+
_dsm_set_checkpoint(None, event_type, source_arn)
315+
if config.data_streams_enabled:
316+
_dsm_set_context_sqs_or_sns_event(event, event_type)
317+
return extract_context_from_lambda_context(lambda_context)
318+
except Exception as e:
319+
logger.debug("The trace extractor returned with error %s", e)
320+
# Still want to set a DSM checkpoint even if DSM context not propagated
321+
_dsm_set_checkpoint(None, event_type, source_arn)
322+
if config.data_streams_enabled:
323+
_dsm_set_context_sqs_or_sns_event(event, event_type)
324+
return extract_context_from_lambda_context(lambda_context)
296325

297-
if dd_json_data:
298-
dd_data = json.loads(dd_json_data)
299-
300-
if idx == 0:
301-
if is_step_function_event(dd_data):
302-
try:
303-
return extract_context_from_step_functions(
304-
dd_data, None
305-
)
306-
except Exception:
307-
logger.debug(
308-
"Failed to extract Step Functions context from SQS/SNS event."
309-
)
310-
context = propagator.extract(dd_data)
311-
if not config.data_streams_enabled:
312-
break
313-
dd_ctx = dd_data
314-
elif idx == 0:
315-
# Handle case where trace context is injected into attributes.AWSTraceHeader
316-
# example:Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1
317-
attrs = event.get("Records")[0].get("attributes")
318-
if attrs:
319-
x_ray_header = attrs.get("AWSTraceHeader")
320-
if x_ray_header:
321-
x_ray_context = parse_xray_header(x_ray_header)
322-
trace_id_parts = x_ray_context.get("trace_id", "").split("-")
323-
if len(trace_id_parts) > 2 and trace_id_parts[2].startswith(
324-
DD_TRACE_JAVA_TRACE_ID_PADDING
325-
):
326-
# If it starts with eight 0's padding,
327-
# then this AWSTraceHeader contains Datadog injected trace context
328-
logger.debug(
329-
"Found dd-trace injected trace context from AWSTraceHeader"
330-
)
331-
context = Context(
332-
trace_id=int(trace_id_parts[2][8:], 16),
333-
span_id=int(x_ray_context["parent_id"], 16),
334-
sampling_priority=float(x_ray_context["sampled"]),
335-
)
336-
if not config.data_streams_enabled:
337-
break
338-
except Exception as e:
339-
logger.debug("The trace extractor returned with error %s", e)
340326

341-
# Set DSM checkpoint once per record
342-
_dsm_set_checkpoint(dd_ctx, event_type, source_arn)
327+
def extract_context_from_sqs_or_sns_record(record):
328+
# logic to deal with SNS => SQS event
329+
if "body" in record:
330+
body_str = record.get("body")
331+
try:
332+
body = json.loads(body_str)
333+
if body.get("Type", "") == "Notification" and "TopicArn" in body:
334+
logger.debug("Found SNS message inside SQS event")
335+
record = get_first_record(create_sns_event(body))
336+
except Exception:
337+
pass
338+
339+
msg_attributes = record.get("messageAttributes")
340+
if msg_attributes is None:
341+
sns_record = record.get("Sns") or {}
342+
msg_attributes = sns_record.get("MessageAttributes") or {}
343+
dd_payload = msg_attributes.get("_datadog")
344+
if dd_payload:
345+
# SQS uses dataType and binaryValue/stringValue
346+
# SNS uses Type and Value
347+
dd_json_data = None
348+
dd_json_data_type = dd_payload.get("Type") or dd_payload.get("dataType")
349+
if dd_json_data_type == "Binary":
350+
import base64
351+
352+
dd_json_data = dd_payload.get("binaryValue") or dd_payload.get("Value")
353+
if dd_json_data:
354+
dd_json_data = base64.b64decode(dd_json_data)
355+
elif dd_json_data_type == "String":
356+
dd_json_data = dd_payload.get("stringValue") or dd_payload.get("Value")
357+
else:
358+
logger.debug(
359+
"Datadog Lambda Python only supports extracting trace"
360+
"context from String or Binary SQS/SNS message attributes"
361+
)
343362

344-
return context if context else extract_context_from_lambda_context(lambda_context)
363+
if dd_json_data:
364+
dd_data = json.loads(dd_json_data)
365+
return dd_data
366+
return None
345367

346368

347369
def _extract_context_from_eventbridge_sqs_event(event):
@@ -402,35 +424,40 @@ def extract_context_from_kinesis_event(event, lambda_context):
402424
Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported.
403425
"""
404426
source_arn = ""
427+
try:
428+
record = get_first_record(event)
429+
source_arn = record.get("eventSourceARN", "")
430+
kinesis = record.get("kinesis")
431+
if not kinesis:
432+
return extract_context_from_lambda_context(lambda_context)
433+
dd_ctx = extract_context_from_kinesis_record(kinesis)
434+
if dd_ctx:
435+
context = propagator.extract(dd_ctx)
436+
_dsm_set_checkpoint(dd_ctx, "kinesis", source_arn)
437+
if config.data_streams_enabled:
438+
_dsm_set_context_kinesis_event(event)
439+
return context
440+
except Exception as e:
441+
logger.debug("The trace extractor returned with error %s", e)
442+
# Still want to set a DSM checkpoint even if DSM context not propagated
443+
_dsm_set_checkpoint(None, "kinesis", source_arn)
444+
if config.data_streams_enabled:
445+
_dsm_set_context_kinesis_event(event)
446+
return extract_context_from_lambda_context(lambda_context)
405447

406-
context = None
407-
for idx, record in enumerate(event.get("Records", [])):
408-
dd_ctx = None
409-
try:
410-
source_arn = record.get("eventSourceARN", "")
411-
kinesis = record.get("kinesis")
412-
if not kinesis:
413-
if idx == 0:
414-
return extract_context_from_lambda_context(lambda_context)
415-
continue
416-
data = kinesis.get("data")
417-
if data:
418-
import base64
419-
420-
b64_bytes = data.encode("ascii")
421-
str_bytes = base64.b64decode(b64_bytes)
422-
data_str = str_bytes.decode("ascii")
423-
data_obj = json.loads(data_str)
424-
dd_ctx = data_obj.get("_datadog")
425-
if dd_ctx:
426-
if idx == 0:
427-
context = propagator.extract(dd_ctx)
428-
if not config.data_streams_enabled:
429-
break
430-
except Exception as e:
431-
logger.debug("The trace extractor returned with error %s", e)
432-
_dsm_set_checkpoint(dd_ctx, "kinesis", source_arn)
433-
return context if context else extract_context_from_lambda_context(lambda_context)
448+
449+
def extract_context_from_kinesis_record(record_kinesis_data):
450+
data = record_kinesis_data.get("data")
451+
if data:
452+
import base64
453+
454+
b64_bytes = data.encode("ascii")
455+
str_bytes = base64.b64decode(b64_bytes)
456+
data_str = str_bytes.decode("ascii")
457+
data_obj = json.loads(data_str)
458+
dd_ctx = data_obj.get("_datadog")
459+
return dd_ctx
460+
return None
434461

435462

436463
def _deterministic_sha256_hash(s: str, part: str) -> int:

0 commit comments

Comments
 (0)