66import os
77import json
88import time
9- import base64
109import logging
1110
12- from botocore .exceptions import ClientError
13- import boto3
14- from datadog import api , initialize , statsd
15- from datadog .threadstats import ThreadStats
1611from datadog_lambda .extension import should_use_extension
1712from datadog_lambda .tags import get_enhanced_metrics_tags , tag_dd_lambda_layer
18-
19- KMS_ENCRYPTION_CONTEXT_KEY = "LambdaFunctionName"
20- ENHANCED_METRICS_NAMESPACE_PREFIX = "aws.lambda.enhanced"
13+ from datadog_lambda .api import init_api
2114
2215logger = logging .getLogger (__name__ )
2316
2417lambda_stats = None
2518
19+ init_api ()
2620
27- class StatsWriter :
28- def distribution (self , metric_name , value , tags = [], timestamp = None ):
29- raise NotImplementedError ()
30-
31- def flush (self ):
32- raise NotImplementedError ()
33-
34- def stop (self ):
35- raise NotImplementedError ()
36-
37-
38- class StatsDWriter (StatsWriter ):
39- """
40- Writes distribution metrics using StatsD protocol
41- """
42-
43- def __init__ (self ):
44- options = {"statsd_host" : "127.0.0.1" , "statsd_port" : 8125 }
45- initialize (** options )
46-
47- def distribution (self , metric_name , value , tags = [], timestamp = None ):
48- statsd .distribution (metric_name , value , tags = tags )
49-
50- def flush (self ):
51- pass
52-
53- def stop (self ):
54- pass
55-
21+ if should_use_extension :
22+ from datadog_lambda .statsd_writer import StatsDWriter
5623
57- class ThreadStatsWriter (StatsWriter ):
58- """
59- Writes distribution metrics using the ThreadStats class
60- """
61-
62- def __init__ (self , flush_in_thread ):
63- self .thread_stats = ThreadStats (compress_payload = True )
64- self .thread_stats .start (flush_in_thread = flush_in_thread )
65-
66- def distribution (self , metric_name , value , tags = [], timestamp = None ):
67- self .thread_stats .distribution (
68- metric_name , value , tags = tags , timestamp = timestamp
69- )
70-
71- def flush (self ):
72- """ "Flush distributions from ThreadStats to Datadog.
73- Modified based on `datadog.threadstats.base.ThreadStats.flush()`,
74- to gain better control over exception handling.
75- """
76- _ , dists = self .thread_stats ._get_aggregate_metrics_and_dists (float ("inf" ))
77- count_dists = len (dists )
78- if not count_dists :
79- logger .debug ("No distributions to flush. Continuing." )
24+ lambda_stats = StatsDWriter ()
25+ else :
26+ # Periodical flushing in a background thread is NOT guaranteed to succeed
27+ # and leads to data loss. When disabled, metrics are only flushed at the
28+ # end of invocation. To make metrics submitted from a long-running Lambda
29+ # function available sooner, consider using the Datadog Lambda extension.
30+ from datadog_lambda .thread_stats_writer import ThreadStatsWriter
8031
81- self .thread_stats .flush_count += 1
82- logger .debug (
83- "Flush #%s sending %s distributions" ,
84- self .thread_stats .flush_count ,
85- count_dists ,
86- )
87- try :
88- self .thread_stats .reporter .flush_distributions (dists )
89- except Exception as e :
90- # The nature of the root issue https://bugs.python.org/issue41345 is complex,
91- # but comprehensive tests suggest that it is safe to retry on this specific error.
92- if isinstance (
93- e , api .exceptions .ClientError
94- ) and "RemoteDisconnected" in str (e ):
95- logger .debug (
96- "Retry flush #%s due to RemoteDisconnected" ,
97- self .thread_stats .flush_count ,
98- )
99- try :
100- self .thread_stats .reporter .flush_distributions (dists )
101- except Exception :
102- logger .debug (
103- "Flush #%s failed after retry" ,
104- self .thread_stats .flush_count ,
105- exc_info = True ,
106- )
107- else :
108- logger .debug (
109- "Flush #%s failed" , self .thread_stats .flush_count , exc_info = True
110- )
111-
112- def stop (self ):
113- self .thread_stats .stop ()
114-
115-
116- def init_lambda_stats ():
117- global lambda_stats
118- if should_use_extension :
119- lambda_stats = StatsDWriter ()
120- else :
121- # Periodical flushing in a background thread is NOT guaranteed to succeed
122- # and leads to data loss. When disabled, metrics are only flushed at the
123- # end of invocation. To make metrics submitted from a long-running Lambda
124- # function available sooner, consider using the Datadog Lambda extension.
125- flush_in_thread = os .environ .get ("DD_FLUSH_IN_THREAD" , "" ).lower () == "true"
126- lambda_stats = ThreadStatsWriter (flush_in_thread )
32+ flush_in_thread = os .environ .get ("DD_FLUSH_IN_THREAD" , "" ).lower () == "true"
33+ lambda_stats = ThreadStatsWriter (flush_in_thread )
12734
12835
12936def lambda_metric (metric_name , value , timestamp = None , tags = None , force_async = False ):
@@ -138,16 +45,24 @@ def lambda_metric(metric_name, value, timestamp=None, tags=None, force_async=Fal
13845 Otherwise, the metrics will be submitted to the Datadog API
13946 periodically and at the end of the function execution in a
14047 background thread.
48+
49+ Note that if the extension is present, it will override the DD_FLUSH_TO_LOG value
50+ and always use the layer to send metrics to the extension
14151 """
142- global lambda_stats
14352 flush_to_logs = os .environ .get ("DD_FLUSH_TO_LOG" , "" ).lower () == "true"
14453 tags = tag_dd_lambda_layer (tags )
14554
146- if flush_to_logs or (force_async and not should_use_extension ):
147- write_metric_point_to_stdout (metric_name , value , timestamp = timestamp , tags = tags )
148- else :
149- logger .debug ("Sending metric %s to Datadog via lambda layer" , metric_name )
55+ if should_use_extension :
15056 lambda_stats .distribution (metric_name , value , tags = tags , timestamp = timestamp )
57+ else :
58+ if flush_to_logs or force_async :
59+ write_metric_point_to_stdout (
60+ metric_name , value , timestamp = timestamp , tags = tags
61+ )
62+ else :
63+ lambda_stats .distribution (
64+ metric_name , value , tags = tags , timestamp = timestamp
65+ )
15166
15267
15368def write_metric_point_to_stdout (metric_name , value , timestamp = None , tags = []):
@@ -168,7 +83,6 @@ def write_metric_point_to_stdout(metric_name, value, timestamp=None, tags=[]):
16883
16984
17085def flush_stats ():
171- global lambda_stats
17286 lambda_stats .flush ()
17387
17488
@@ -217,74 +131,3 @@ def submit_errors_metric(lambda_context):
217131 lambda_context (dict): Lambda context dict passed to the function by AWS
218132 """
219133 submit_enhanced_metric ("errors" , lambda_context )
220-
221-
222- def decrypt_kms_api_key (kms_client , ciphertext ):
223- """
224- Decodes and deciphers the base64-encoded ciphertext given as a parameter using KMS.
225- For this to work properly, the Lambda function must have the appropriate IAM permissions.
226-
227- Args:
228- kms_client: The KMS client to use for decryption
229- ciphertext (string): The base64-encoded ciphertext to decrypt
230- """
231- decoded_bytes = base64 .b64decode (ciphertext )
232-
233- """
234- When the API key is encrypted using the AWS console, the function name is added as an
235- encryption context. When the API key is encrypted using the AWS CLI, no encryption context
236- is added. We need to try decrypting the API key both with and without the encryption context.
237- """
238- # Try without encryption context, in case API key was encrypted using the AWS CLI
239- function_name = os .environ .get ("AWS_LAMBDA_FUNCTION_NAME" )
240- try :
241- plaintext = kms_client .decrypt (CiphertextBlob = decoded_bytes )[
242- "Plaintext"
243- ].decode ("utf-8" )
244- except ClientError :
245- logger .debug (
246- "Failed to decrypt ciphertext without encryption context, \
247- retrying with encryption context"
248- )
249- # Try with encryption context, in case API key was encrypted using the AWS Console
250- plaintext = kms_client .decrypt (
251- CiphertextBlob = decoded_bytes ,
252- EncryptionContext = {
253- KMS_ENCRYPTION_CONTEXT_KEY : function_name ,
254- },
255- )["Plaintext" ].decode ("utf-8" )
256-
257- return plaintext
258-
259-
260- # Set API Key
261- if not api ._api_key :
262- DD_API_KEY_SECRET_ARN = os .environ .get ("DD_API_KEY_SECRET_ARN" , "" )
263- DD_API_KEY_SSM_NAME = os .environ .get ("DD_API_KEY_SSM_NAME" , "" )
264- DD_KMS_API_KEY = os .environ .get ("DD_KMS_API_KEY" , "" )
265- DD_API_KEY = os .environ .get ("DD_API_KEY" , os .environ .get ("DATADOG_API_KEY" , "" ))
266-
267- if DD_API_KEY_SECRET_ARN :
268- api ._api_key = boto3 .client ("secretsmanager" ).get_secret_value (
269- SecretId = DD_API_KEY_SECRET_ARN
270- )["SecretString" ]
271- elif DD_API_KEY_SSM_NAME :
272- api ._api_key = boto3 .client ("ssm" ).get_parameter (
273- Name = DD_API_KEY_SSM_NAME , WithDecryption = True
274- )["Parameter" ]["Value" ]
275- elif DD_KMS_API_KEY :
276- kms_client = boto3 .client ("kms" )
277- api ._api_key = decrypt_kms_api_key (kms_client , DD_KMS_API_KEY )
278- else :
279- api ._api_key = DD_API_KEY
280-
281- logger .debug ("Setting DATADOG_API_KEY of length %d" , len (api ._api_key ))
282-
283- # Set DATADOG_HOST, to send data to a non-default Datadog datacenter
284- api ._api_host = os .environ .get (
285- "DATADOG_HOST" , "https://api." + os .environ .get ("DD_SITE" , "datadoghq.com" )
286- )
287- logger .debug ("Setting DATADOG_HOST to %s" , api ._api_host )
288-
289- # Unmute exceptions from datadog api client, so we can catch and handle them
290- api ._mute = False
0 commit comments