33# This product includes software developed at Datadog (https://www.datadoghq.com/).
44# Copyright 2019 Datadog, Inc.
55
6+ import enum
7+ import logging
68import os
79import time
8- import logging
9- import ujson as json
1010from datetime import datetime , timedelta
1111
12+ import ujson as json
13+
1214from datadog_lambda .extension import should_use_extension
13- from datadog_lambda .tags import get_enhanced_metrics_tags , dd_lambda_layer_tag
15+ from datadog_lambda .fips import fips_mode_enabled
16+ from datadog_lambda .tags import dd_lambda_layer_tag , get_enhanced_metrics_tags
1417
1518logger = logging .getLogger (__name__ )
1619
17- lambda_stats = None
18- extension_thread_stats = None
1920
20- flush_in_thread = os .environ .get ("DD_FLUSH_IN_THREAD" , "" ).lower () == "true"
21+ class MetricsHandler (enum .Enum ):
22+ EXTENSION = "extension"
23+ FORWARDER = "forwarder"
24+ DATADOG_API = "datadog_api"
25+ NO_METRICS = "no_metrics"
26+
2127
22- if should_use_extension :
28+ def _select_metrics_handler ():
29+ if should_use_extension :
30+ return MetricsHandler .EXTENSION
31+ if os .environ .get ("DD_FLUSH_TO_LOG" , "" ).lower () == "true" :
32+ return MetricsHandler .FORWARDER
33+
34+ if fips_mode_enabled :
35+ logger .debug (
36+ "With FIPS mode enabled, the Datadog API metrics handler is unavailable."
37+ )
38+ return MetricsHandler .NO_METRICS
39+
40+ return MetricsHandler .DATADOG_API
41+
42+
43+ metrics_handler = _select_metrics_handler ()
44+ logger .debug ("identified primary metrics handler as %s" , metrics_handler )
45+
46+
47+ lambda_stats = None
48+ if metrics_handler == MetricsHandler .EXTENSION :
2349 from datadog_lambda .statsd_writer import StatsDWriter
2450
2551 lambda_stats = StatsDWriter ()
26- else :
52+
53+ elif metrics_handler == MetricsHandler .DATADOG_API :
2754 # Periodical flushing in a background thread is NOT guaranteed to succeed
2855 # and leads to data loss. When disabled, metrics are only flushed at the
2956 # end of invocation. To make metrics submitted from a long-running Lambda
3057 # function available sooner, consider using the Datadog Lambda extension.
31- from datadog_lambda .thread_stats_writer import ThreadStatsWriter
3258 from datadog_lambda .api import init_api
59+ from datadog_lambda .thread_stats_writer import ThreadStatsWriter
3360
61+ flush_in_thread = os .environ .get ("DD_FLUSH_IN_THREAD" , "" ).lower () == "true"
3462 init_api ()
3563 lambda_stats = ThreadStatsWriter (flush_in_thread )
3664
65+
3766enhanced_metrics_enabled = (
3867 os .environ .get ("DD_ENHANCED_METRICS" , "true" ).lower () == "true"
3968)
@@ -44,16 +73,19 @@ def lambda_metric(metric_name, value, timestamp=None, tags=None, force_async=Fal
4473 Submit a data point to Datadog distribution metrics.
4574 https://docs.datadoghq.com/graphing/metrics/distributions/
4675
47- When DD_FLUSH_TO_LOG is True, write metric to log, and
48- wait for the Datadog Log Forwarder Lambda function to submit
49- the metrics asynchronously.
76+ If the Datadog Lambda Extension is present, metrics are submitted to its
77+ dogstatsd endpoint.
78+
79+ When DD_FLUSH_TO_LOG is True or force_async is True, write metric to log,
80+ and wait for the Datadog Log Forwarder Lambda function to submit the
81+ metrics asynchronously.
5082
5183 Otherwise, the metrics will be submitted to the Datadog API
5284 periodically and at the end of the function execution in a
5385 background thread.
5486
55- Note that if the extension is present, it will override the DD_FLUSH_TO_LOG value
56- and always use the layer to send metrics to the extension
87+ Note that if the extension is present, it will override the DD_FLUSH_TO_LOG
88+ value and always use the layer to send metrics to the extension
5789 """
5890 if not metric_name or not isinstance (metric_name , str ):
5991 logger .warning (
@@ -71,56 +103,54 @@ def lambda_metric(metric_name, value, timestamp=None, tags=None, force_async=Fal
71103 )
72104 return
73105
74- flush_to_logs = os .environ .get ("DD_FLUSH_TO_LOG" , "" ).lower () == "true"
75106 tags = [] if tags is None else list (tags )
76107 tags .append (dd_lambda_layer_tag )
77108
78- if should_use_extension and timestamp is not None :
79- # The extension does not support timestamps for distributions so we create a
80- # a thread stats writer to submit metrics with timestamps to the API
81- timestamp_ceiling = int (
82- (datetime .now () - timedelta (hours = 4 )).timestamp ()
83- ) # 4 hours ago
84- if isinstance (timestamp , datetime ):
85- timestamp = int (timestamp .timestamp ())
86- if timestamp_ceiling > timestamp :
87- logger .warning (
88- "Timestamp %s is older than 4 hours, not submitting metric %s" ,
89- timestamp ,
90- metric_name ,
91- )
92- return
93- global extension_thread_stats
94- if extension_thread_stats is None :
95- from datadog_lambda .thread_stats_writer import ThreadStatsWriter
96- from datadog_lambda .api import init_api
97-
98- init_api ()
99- extension_thread_stats = ThreadStatsWriter (flush_in_thread )
100-
101- extension_thread_stats .distribution (
102- metric_name , value , tags = tags , timestamp = timestamp
103- )
104- return
109+ if metrics_handler == MetricsHandler .EXTENSION :
110+ if timestamp is not None :
111+ if isinstance (timestamp , datetime ):
112+ timestamp = int (timestamp .timestamp ())
113+
114+ timestamp_floor = int ((datetime .now () - timedelta (hours = 4 )).timestamp ())
115+ if timestamp < timestamp_floor :
116+ logger .warning (
117+ "Timestamp %s is older than 4 hours, not submitting metric %s" ,
118+ timestamp ,
119+ metric_name ,
120+ )
121+ return
105122
106- if should_use_extension :
107123 logger .debug (
108124 "Sending metric %s value %s to Datadog via extension" , metric_name , value
109125 )
110126 lambda_stats .distribution (metric_name , value , tags = tags , timestamp = timestamp )
127+
128+ elif force_async or (metrics_handler == MetricsHandler .FORWARDER ):
129+ write_metric_point_to_stdout (metric_name , value , timestamp = timestamp , tags = tags )
130+
131+ elif metrics_handler == MetricsHandler .DATADOG_API :
132+ lambda_stats .distribution (metric_name , value , tags = tags , timestamp = timestamp )
133+
134+ elif metrics_handler == MetricsHandler .NO_METRICS :
135+ logger .debug (
136+ "Metric %s cannot be submitted because the metrics handler is disabled" ,
137+ metric_name ,
138+ ),
139+
111140 else :
112- if flush_to_logs or force_async :
113- write_metric_point_to_stdout (
114- metric_name , value , timestamp = timestamp , tags = tags
115- )
116- else :
117- lambda_stats .distribution (
118- metric_name , value , tags = tags , timestamp = timestamp
119- )
141+ # This should be qutie impossible, but let's at least log a message if
142+ # it somehow happens.
143+ logger .debug (
144+ "Metric %s cannot be submitted because the metrics handler is not configured: %s" ,
145+ metric_name ,
146+ metrics_handler ,
147+ )
120148
121149
122- def write_metric_point_to_stdout (metric_name , value , timestamp = None , tags = [] ):
150+ def write_metric_point_to_stdout (metric_name , value , timestamp = None , tags = None ):
123151 """Writes the specified metric point to standard output"""
152+ tags = tags or []
153+
124154 logger .debug (
125155 "Sending metric %s value %s to Datadog via log forwarder" , metric_name , value
126156 )
@@ -138,19 +168,8 @@ def write_metric_point_to_stdout(metric_name, value, timestamp=None, tags=[]):
138168
139169
140170def flush_stats (lambda_context = None ):
141- lambda_stats .flush ()
142-
143- if extension_thread_stats is not None :
144- tags = None
145- if lambda_context is not None :
146- tags = get_enhanced_metrics_tags (lambda_context )
147- split_arn = lambda_context .invoked_function_arn .split (":" )
148- if len (split_arn ) > 7 :
149- # Get rid of the alias
150- split_arn .pop ()
151- arn = ":" .join (split_arn )
152- tags .append ("function_arn:" + arn )
153- extension_thread_stats .flush (tags )
171+ if lambda_stats is not None :
172+ lambda_stats .flush ()
154173
155174
156175def submit_enhanced_metric (metric_name , lambda_context ):
0 commit comments