Skip to content

Commit a7e617d

Browse files
committed
Implement dd-trace logic
1 parent e030fa0 commit a7e617d

File tree

4 files changed

+178
-54
lines changed

4 files changed

+178
-54
lines changed

datadog_lambda/constants.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,20 @@ class SamplingPriority(object):
1414

1515
# Datadog trace headers
1616
class TraceHeader(object):
17-
TRACE_ID = 'x-datadog-trace-id'
18-
PARENT_ID = 'x-datadog-parent-id'
19-
SAMPLING_PRIORITY = 'x-datadog-sampling-priority'
17+
TRACE_ID = "x-datadog-trace-id"
18+
PARENT_ID = "x-datadog-parent-id"
19+
SAMPLING_PRIORITY = "x-datadog-sampling-priority"
2020

2121

2222
# X-Ray subsegment to save Datadog trace metadata
2323
class XraySubsegment(object):
24-
NAME = 'datadog-metadata'
25-
KEY = 'trace'
26-
NAMESPACE = 'datadog'
24+
NAME = "datadog-metadata"
25+
KEY = "trace"
26+
NAMESPACE = "datadog"
27+
28+
29+
# Source of datadog context
30+
class Source(object):
31+
XRAY = "xray"
32+
EVENT = "event"
33+
DDTRACE = "ddtrace"

datadog_lambda/trace_wrapper.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import sys
2+
from datadog_lambda.constants import SamplingPriority, TraceHeader, Source
3+
4+
5+
class TraceWrapper:
6+
"""
7+
TraceWrapper wraps dd-trace, to make this library usable when
8+
dd-trace hasn't been installed/initialised
9+
"""
10+
11+
def __init__(self):
12+
self._tracer = None
13+
14+
def extract(self, event):
15+
tracer = self._load_tracer()
16+
if not tracer:
17+
return None
18+
return _propagator.extract(event)
19+
20+
def start_span(self, name, **kwargs):
21+
tracer = self._load_tracer()
22+
if not tracer:
23+
return None
24+
25+
return self._tracer.start_span(name, **kwargs)
26+
27+
def _load_tracer(self):
28+
if not TraceWrapper.tracer_enabled():
29+
return None
30+
try:
31+
if not self._tracer:
32+
from ddtrace import tracer
33+
from ddtrace.propagation.http import HTTPPropagator
34+
35+
self._tracer = tracer
36+
self._propagator = HTTPPropagator()
37+
except:
38+
pass
39+
return self._tracer
40+
41+
@property
42+
def trace_context(self):
43+
tracer = self._load_tracer()
44+
if not tracer:
45+
return None
46+
span = tracer.current_span()
47+
if not span:
48+
return None
49+
50+
parent_id = span.context.span_id
51+
trace_id = span.context.trace_id
52+
return {
53+
"parent_id": parent_id,
54+
"trace_id": trace_id,
55+
"sampling_priority": SamplingPriority.AUTO_KEEP,
56+
"source": Source.DDTRACE,
57+
}
58+
59+
@staticmethod
60+
def tracer_enabled():
61+
mods = sys.modules.keys()
62+
# Check whether user has imported ddtrace
63+
return "ddtrace" in mods
64+
65+
66+
trace_wrapper = TraceWrapper()

datadog_lambda/tracing.py

Lines changed: 67 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
SamplingPriority,
1414
TraceHeader,
1515
XraySubsegment,
16+
Source,
1617
)
18+
from datadog_lambda.trace_wrapper import trace_wrapper
1719

1820
logger = logging.getLogger(__name__)
1921

@@ -38,8 +40,32 @@ def _convert_xray_sampling(xray_sampled):
3840
"""
3941
Convert X-Ray sampled (True/False) to its Datadog counterpart.
4042
"""
41-
return str(SamplingPriority.USER_KEEP) if xray_sampled \
43+
return (
44+
str(SamplingPriority.USER_KEEP)
45+
if xray_sampled
4246
else str(SamplingPriority.USER_REJECT)
47+
)
48+
49+
50+
def _get_xray_trace_context():
51+
if not is_lambda_context():
52+
return None
53+
54+
xray_trace_entity = xray_recorder.get_trace_entity() # xray (sub)segment
55+
return {
56+
"trace-id": _convert_xray_trace_id(xray_trace_entity.trace_id),
57+
"parent-id": _convert_xray_entity_id(xray_trace_entity.id),
58+
"sampling-priority": _convert_xray_sampling(xray_trace_entity.sampled),
59+
"source": Source.XRAY,
60+
}
61+
62+
63+
def _context_obj_to_headers(obj):
64+
return {
65+
TraceHeader.TRACE_ID: obj.get("trace_id"),
66+
TraceHeader.PARENT_ID: obj.get("parent_id"),
67+
TraceHeader.SAMPLING_PRIORITY: obj.get("sampling_priority"),
68+
}
4369

4470

4571
def extract_dd_trace_context(event):
@@ -54,33 +80,32 @@ def extract_dd_trace_context(event):
5480
the correct context.
5581
"""
5682
global dd_trace_context
57-
headers = event.get('headers', {})
83+
headers = event.get("headers", {})
5884
lowercase_headers = {k.lower(): v for k, v in headers.items()}
5985

6086
trace_id = lowercase_headers.get(TraceHeader.TRACE_ID)
6187
parent_id = lowercase_headers.get(TraceHeader.PARENT_ID)
6288
sampling_priority = lowercase_headers.get(TraceHeader.SAMPLING_PRIORITY)
6389
if trace_id and parent_id and sampling_priority:
64-
logger.debug('Extracted Datadog trace context from headers')
90+
logger.debug("Extracted Datadog trace context from headers")
6591
dd_trace_context = {
66-
'trace-id': trace_id,
67-
'parent-id': parent_id,
68-
'sampling-priority': sampling_priority,
92+
"trace-id": trace_id,
93+
"parent-id": parent_id,
94+
"sampling-priority": sampling_priority,
95+
"source": Source.EVENT,
6996
}
7097
xray_recorder.begin_subsegment(XraySubsegment.NAME)
7198
subsegment = xray_recorder.current_subsegment()
7299
subsegment.put_metadata(
73-
XraySubsegment.KEY,
74-
dd_trace_context,
75-
XraySubsegment.NAMESPACE
100+
XraySubsegment.KEY, dd_trace_context, XraySubsegment.NAMESPACE
76101
)
77102
xray_recorder.end_subsegment()
78103
else:
79104
# AWS Lambda runtime caches global variables between invocations,
80105
# reset to avoid using the context from the last invocation.
81-
dd_trace_context = {}
82-
83-
logger.debug('extracted dd trace context %s', dd_trace_context)
106+
dd_trace_context = _get_xray_trace_context()
107+
logger.debug("extracted dd trace context %s", dd_trace_context)
108+
return dd_trace_context
84109

85110

86111
def get_dd_trace_context():
@@ -95,30 +120,25 @@ def get_dd_trace_context():
95120
automatically, but this function can be used to manually inject the trace
96121
context to an outgoing request.
97122
"""
98-
if not is_lambda_context():
99-
logger.debug('get_dd_trace_context is only supported in LambdaContext')
100-
return {}
101-
102123
global dd_trace_context
103-
xray_trace_entity = xray_recorder.get_trace_entity() # xray (sub)segment
104-
if dd_trace_context:
105-
return {
106-
TraceHeader.TRACE_ID:
107-
dd_trace_context['trace-id'],
108-
TraceHeader.PARENT_ID: _convert_xray_entity_id(
109-
xray_trace_entity.id),
110-
TraceHeader.SAMPLING_PRIORITY:
111-
dd_trace_context['sampling-priority'],
112-
}
113-
else:
114-
return {
115-
TraceHeader.TRACE_ID: _convert_xray_trace_id(
116-
xray_trace_entity.trace_id),
117-
TraceHeader.PARENT_ID: _convert_xray_entity_id(
118-
xray_trace_entity.id),
119-
TraceHeader.SAMPLING_PRIORITY: _convert_xray_sampling(
120-
xray_trace_entity.sampled),
121-
}
124+
125+
if not dd_trace_context:
126+
return None
127+
trace_context = _context_obj_to_headers(dd_trace_context)
128+
datadog_context = trace_wrapper.trace_context
129+
if datadog_context:
130+
logger.debug("get_dd_trace_context using dd-trace context")
131+
return datadog_context
132+
try:
133+
xray_context = _get_xray_trace_context() # xray (sub)segment
134+
if xray_context:
135+
trace_context[TraceHeader.PARENT_ID] = xray_context["parent_id"]
136+
except Exception as e:
137+
logger.debug(
138+
"get_dd_trace_context couldn't read from segment from x-ray, with error %s"
139+
% e
140+
)
141+
return trace_context
122142

123143

124144
def set_correlation_ids():
@@ -130,16 +150,16 @@ def set_correlation_ids():
130150
TODO: Remove me when Datadog tracer is natively supported in Lambda.
131151
"""
132152
if not is_lambda_context():
133-
logger.debug('set_correlation_ids is only supported in LambdaContext')
153+
logger.debug("set_correlation_ids is only supported in LambdaContext")
134154
return
135155

136156
context = get_dd_trace_context()
137157

138-
span = tracer.trace('dummy.span')
158+
span = tracer.trace("dummy.span")
139159
span.trace_id = context[TraceHeader.TRACE_ID]
140160
span.span_id = context[TraceHeader.PARENT_ID]
141161

142-
logger.debug('correlation ids set')
162+
logger.debug("correlation ids set")
143163

144164

145165
def inject_correlation_ids():
@@ -153,17 +173,19 @@ def inject_correlation_ids():
153173
# Override the log format of the AWS provided LambdaLoggerHandler
154174
root_logger = logging.getLogger()
155175
for handler in root_logger.handlers:
156-
if handler.__class__.__name__ == 'LambdaLoggerHandler':
157-
handler.setFormatter(logging.Formatter(
158-
'[%(levelname)s]\t%(asctime)s.%(msecs)dZ\t%(aws_request_id)s\t'
159-
'[dd.trace_id=%(dd.trace_id)s dd.span_id=%(dd.span_id)s]\t%(message)s\n',
160-
'%Y-%m-%dT%H:%M:%S'
161-
))
176+
if handler.__class__.__name__ == "LambdaLoggerHandler":
177+
handler.setFormatter(
178+
logging.Formatter(
179+
"[%(levelname)s]\t%(asctime)s.%(msecs)dZ\t%(aws_request_id)s\t"
180+
"[dd.trace_id=%(dd.trace_id)s dd.span_id=%(dd.span_id)s]\t%(message)s\n",
181+
"%Y-%m-%dT%H:%M:%S",
182+
)
183+
)
162184

163185
# Patch `logging.Logger.makeRecord` to actually inject correlation ids
164186
patch(logging=True)
165187

166-
logger.debug('logs injection configured')
188+
logger.debug("logs injection configured")
167189

168190

169191
def is_lambda_context():

datadog_lambda/wrapper.py

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import logging
88
import traceback
99

10-
from datadog_lambda.cold_start import set_cold_start
10+
from datadog_lambda.cold_start import set_cold_start, is_cold_start
1111
from datadog_lambda.metric import (
1212
lambda_stats,
1313
submit_invocations_metric,
@@ -18,7 +18,10 @@
1818
extract_dd_trace_context,
1919
set_correlation_ids,
2020
inject_correlation_ids,
21+
get_dd_trace_context,
2122
)
23+
from datadog_lambda.trace_wrapper import trace_wrapper
24+
from datadog_lambda.constants import Source
2225

2326

2427
logger = logging.getLogger(__name__)
@@ -39,7 +42,6 @@ def my_lambda_handle(event, context):
3942

4043

4144
class _NoopDecorator(object):
42-
4345
def __init__(self, func):
4446
self.func = func
4547

@@ -82,6 +84,8 @@ def __init__(self, func):
8284
self.logs_injection = (
8385
os.environ.get("DD_LOGS_INJECTION", "true").lower() == "true"
8486
)
87+
self.handler_name = os.environ.get("_HANDLER", "handler")
88+
self.function_name = os.environ.get("AWS_LAMBDA_FUNCTION_NAME", "function")
8589

8690
# Inject trace correlation ids to logs
8791
if self.logs_injection:
@@ -106,10 +110,33 @@ def __call__(self, event, context, **kwargs):
106110

107111
def _before(self, event, context):
108112
try:
113+
109114
set_cold_start()
110115
submit_invocations_metric(context)
111116
# Extract Datadog trace context from incoming requests
112-
extract_dd_trace_context(event)
117+
dd_context = extract_dd_trace_context(event)
118+
span_context = None
119+
if dd_context["source"] == Source.EVENT:
120+
span_context = trace_wrapper.extract(dd_context)
121+
122+
tags = {}
123+
if context:
124+
tags = {
125+
"cold_start": is_cold_start(),
126+
"function_arn": context.invoked_function_arn,
127+
"request_id": context.aws_request_id,
128+
"resource_names": context.function_name,
129+
}
130+
args = {
131+
"service": self.function_name,
132+
"resource": self.handler_name,
133+
"span_type": "serverless",
134+
"child_of": span_context,
135+
}
136+
137+
self.span = trace_wrapper.start_span("aws.lambda", **args)
138+
if self.span:
139+
self.span.set_tags(tags)
113140

114141
# Set log correlation ids using extracted trace context
115142
set_correlation_ids()
@@ -119,6 +146,8 @@ def _before(self, event, context):
119146

120147
def _after(self, event, context):
121148
try:
149+
if self.span:
150+
self.span.finish()
122151
if not self.flush_to_log:
123152
lambda_stats.flush(float("inf"))
124153
logger.debug("datadog_lambda_wrapper _after() done")

0 commit comments

Comments
 (0)