Skip to content

Commit 7d21380

Browse files
feat: add a more explicit MetricsHandler process for routing metrics
1 parent ee1249a commit 7d21380

File tree

2 files changed

+74
-29
lines changed

2 files changed

+74
-29
lines changed

datadog_lambda/metric.py

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
# This product includes software developed at Datadog (https://www.datadoghq.com/).
44
# Copyright 2019 Datadog, Inc.
55

6+
import enum
67
import logging
78
import os
89
import time
@@ -15,25 +16,44 @@
1516

1617
logger = logging.getLogger(__name__)
1718

18-
lambda_stats = None
1919

20-
flush_in_thread = os.environ.get("DD_FLUSH_IN_THREAD", "").lower() == "true"
20+
class MetricsHandler(enum.Enum):
21+
EXTENSION = "extension"
22+
FORWARDER = "forwarder"
23+
DATADOG_API = "datadog_api"
24+
25+
26+
def _select_metrics_handler():
27+
if should_use_extension:
28+
return MetricsHandler.EXTENSION
29+
if os.environ.get("DD_FLUSH_TO_LOG", "").lower() == "true":
30+
return MetricsHandler.FORWARDER
31+
return MetricsHandler.DATADOG_API
32+
2133

22-
if should_use_extension:
34+
metrics_handler = _select_metrics_handler()
35+
logger.debug("identified primary metrics handler as %s", metrics_handler)
36+
37+
38+
lambda_stats = None
39+
if metrics_handler == MetricsHandler.EXTENSION:
2340
from datadog_lambda.statsd_writer import StatsDWriter
2441

2542
lambda_stats = StatsDWriter()
26-
else:
43+
44+
elif metrics_handler == MetricsHandler.DATADOG_API:
2745
# Periodical flushing in a background thread is NOT guaranteed to succeed
2846
# and leads to data loss. When disabled, metrics are only flushed at the
2947
# end of invocation. To make metrics submitted from a long-running Lambda
3048
# function available sooner, consider using the Datadog Lambda extension.
3149
from datadog_lambda.api import init_api
3250
from datadog_lambda.thread_stats_writer import ThreadStatsWriter
3351

52+
flush_in_thread = os.environ.get("DD_FLUSH_IN_THREAD", "").lower() == "true"
3453
init_api()
3554
lambda_stats = ThreadStatsWriter(flush_in_thread)
3655

56+
3757
enhanced_metrics_enabled = (
3858
os.environ.get("DD_ENHANCED_METRICS", "true").lower() == "true"
3959
)
@@ -44,16 +64,19 @@ def lambda_metric(metric_name, value, timestamp=None, tags=None, force_async=Fal
4464
Submit a data point to Datadog distribution metrics.
4565
https://docs.datadoghq.com/graphing/metrics/distributions/
4666
47-
When DD_FLUSH_TO_LOG is True, write metric to log, and
48-
wait for the Datadog Log Forwarder Lambda function to submit
49-
the metrics asynchronously.
67+
If the Datadog Lambda Extension is present, metrics are submitted to its
68+
dogstatsd endpoint.
69+
70+
When DD_FLUSH_TO_LOG is True or force_async is True, write metric to log,
71+
and wait for the Datadog Log Forwarder Lambda function to submit the
72+
metrics asynchronously.
5073
5174
Otherwise, the metrics will be submitted to the Datadog API
5275
periodically and at the end of the function execution in a
5376
background thread.
5477
55-
Note that if the extension is present, it will override the DD_FLUSH_TO_LOG value
56-
and always use the layer to send metrics to the extension
78+
Note that if the extension is present, it will override the DD_FLUSH_TO_LOG
79+
value and always use the layer to send metrics to the extension
5780
"""
5881
if not metric_name or not isinstance(metric_name, str):
5982
logger.warning(
@@ -71,11 +94,10 @@ def lambda_metric(metric_name, value, timestamp=None, tags=None, force_async=Fal
7194
)
7295
return
7396

74-
flush_to_logs = os.environ.get("DD_FLUSH_TO_LOG", "").lower() == "true"
7597
tags = [] if tags is None else list(tags)
7698
tags.append(dd_lambda_layer_tag)
7799

78-
if should_use_extension:
100+
if metrics_handler == MetricsHandler.EXTENSION:
79101
if timestamp is not None:
80102
if isinstance(timestamp, datetime):
81103
timestamp = int(timestamp.timestamp())
@@ -94,15 +116,20 @@ def lambda_metric(metric_name, value, timestamp=None, tags=None, force_async=Fal
94116
)
95117
lambda_stats.distribution(metric_name, value, tags=tags, timestamp=timestamp)
96118

119+
elif force_async or (metrics_handler == MetricsHandler.FORWARDER):
120+
write_metric_point_to_stdout(metric_name, value, timestamp=timestamp, tags=tags)
121+
122+
elif metrics_handler == MetricsHandler.DATADOG_API:
123+
lambda_stats.distribution(metric_name, value, tags=tags, timestamp=timestamp)
124+
97125
else:
98-
if flush_to_logs or force_async:
99-
write_metric_point_to_stdout(
100-
metric_name, value, timestamp=timestamp, tags=tags
101-
)
102-
else:
103-
lambda_stats.distribution(
104-
metric_name, value, tags=tags, timestamp=timestamp
105-
)
126+
# This should be qutie impossible, but let's at least log a message if
127+
# it somehow happens.
128+
logger.debug(
129+
"Metric %s cannot be submitted because the metrics handler is not configured: %s",
130+
metric_name,
131+
metrics_handler,
132+
)
106133

107134

108135
def write_metric_point_to_stdout(metric_name, value, timestamp=None, tags=None):

tests/test_metric.py

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,12 @@
77
from datadog.api.exceptions import ClientError
88

99
from datadog_lambda.api import KMS_ENCRYPTION_CONTEXT_KEY, decrypt_kms_api_key
10-
from datadog_lambda.metric import flush_stats, lambda_metric
10+
from datadog_lambda.metric import (
11+
MetricsHandler,
12+
_select_metrics_handler,
13+
flush_stats,
14+
lambda_metric,
15+
)
1116
from datadog_lambda.tags import dd_lambda_layer_tag
1217
from datadog_lambda.thread_stats_writer import ThreadStatsWriter
1318

@@ -34,15 +39,31 @@ def test_lambda_metric_tagged_with_dd_lambda_layer(self):
3439

3540
# let's fake that the extension is present, this should override DD_FLUSH_TO_LOG
3641
@patch("datadog_lambda.metric.should_use_extension", True)
37-
def test_lambda_metric_flush_to_log_with_extension(self):
42+
def test_select_metrics_handler_extension_despite_flush_to_logs(self):
3843
os.environ["DD_FLUSH_TO_LOG"] = "True"
44+
self.assertEqual(MetricsHandler.EXTENSION, _select_metrics_handler())
45+
del os.environ["DD_FLUSH_TO_LOG"]
46+
47+
@patch("datadog_lambda.metric.should_use_extension", False)
48+
def test_select_metrics_handler_forwarder_when_flush_to_logs(self):
49+
os.environ["DD_FLUSH_TO_LOG"] = "True"
50+
self.assertEqual(MetricsHandler.FORWARDER, _select_metrics_handler())
51+
del os.environ["DD_FLUSH_TO_LOG"]
52+
53+
@patch("datadog_lambda.metric.should_use_extension", False)
54+
def test_select_metrics_handler_dd_api_fallback(self):
55+
os.environ["DD_FLUSH_TO_LOG"] = "False"
56+
self.assertEqual(MetricsHandler.DATADOG_API, _select_metrics_handler())
57+
del os.environ["DD_FLUSH_TO_LOG"]
58+
59+
@patch("datadog_lambda.metric.metrics_handler", MetricsHandler.EXTENSION)
60+
def test_lambda_metric_flush_to_log_with_extension(self):
3961
lambda_metric("test", 1)
4062
self.mock_metric_lambda_stats.distribution.assert_has_calls(
4163
[call("test", 1, timestamp=None, tags=[dd_lambda_layer_tag])]
4264
)
43-
del os.environ["DD_FLUSH_TO_LOG"]
4465

45-
@patch("datadog_lambda.metric.should_use_extension", True)
66+
@patch("datadog_lambda.metric.metrics_handler", MetricsHandler.EXTENSION)
4667
def test_lambda_metric_timestamp_with_extension(self):
4768
delta = timedelta(minutes=1)
4869
timestamp = int((datetime.now() - delta).timestamp())
@@ -52,7 +73,7 @@ def test_lambda_metric_timestamp_with_extension(self):
5273
[call("test_timestamp", 1, timestamp=timestamp, tags=[dd_lambda_layer_tag])]
5374
)
5475

55-
@patch("datadog_lambda.metric.should_use_extension", True)
76+
@patch("datadog_lambda.metric.metrics_handler", MetricsHandler.EXTENSION)
5677
def test_lambda_metric_datetime_with_extension(self):
5778
delta = timedelta(minutes=1)
5879
timestamp = datetime.now() - delta
@@ -69,22 +90,19 @@ def test_lambda_metric_datetime_with_extension(self):
6990
]
7091
)
7192

72-
@patch("datadog_lambda.metric.should_use_extension", True)
93+
@patch("datadog_lambda.metric.metrics_handler", MetricsHandler.EXTENSION)
7394
def test_lambda_metric_invalid_timestamp_with_extension(self):
7495
delta = timedelta(hours=5)
7596
timestamp = int((datetime.now() - delta).timestamp())
7697

7798
lambda_metric("test_timestamp", 1, timestamp)
7899
self.mock_metric_lambda_stats.distribution.assert_not_called()
79100

101+
@patch("datadog_lambda.metric.metrics_handler", MetricsHandler.FORWARDER)
80102
def test_lambda_metric_flush_to_log(self):
81-
os.environ["DD_FLUSH_TO_LOG"] = "True"
82-
83103
lambda_metric("test", 1)
84104
self.mock_metric_lambda_stats.distribution.assert_not_called()
85105

86-
del os.environ["DD_FLUSH_TO_LOG"]
87-
88106
@patch("datadog_lambda.metric.logger.warning")
89107
def test_lambda_metric_invalid_metric_name_none(self, mock_logger_warning):
90108
lambda_metric(None, 1)

0 commit comments

Comments
 (0)