2121logger = logging .getLogger (__name__ )
2222
2323
24- class StatsDWrapper :
24+ class StatsWriter :
25+ def distribution (self , metric_name , value , tags = [], timestamp = None ):
26+ raise NotImplementedError ()
27+
28+ def flush (self ):
29+ raise NotImplementedError ()
30+
31+ def stop (self ):
32+ raise NotImplementedError ()
33+
34+
35+ class StatsDWriter (StatsWriter ):
2536 """
26- Wraps StatsD calls, to give an identical interface to ThreadStats
37+ Writes distribution metrics using StatsD protocol
2738 """
2839
2940 def __init__ (self ):
@@ -33,21 +44,82 @@ def __init__(self):
3344 def distribution (self , metric_name , value , tags = [], timestamp = None ):
3445 statsd .distribution (metric_name , value , tags = tags )
3546
36- def flush (self , value ):
47+ def flush (self ):
48+ pass
49+
50+ def stop (self ):
3751 pass
3852
3953
54+ class ThreadStatsWriter (StatsWriter ):
55+ """
56+ Writes distribution metrics using the ThreadStats class
57+ """
58+
59+ def __init__ (self , flush_in_thread ):
60+ self .thread_stats = ThreadStats (compress_payload = True )
61+ self .thread_stats .start (flush_in_thread = flush_in_thread )
62+
63+ def distribution (self , metric_name , value , tags = [], timestamp = None ):
64+ self .thread_stats .distribution (
65+ metric_name , value , tags = tags , timestamp = timestamp
66+ )
67+
68+ def flush (self ):
69+ """ "Flush distributions from ThreadStats to Datadog.
70+ Modified based on `datadog.threadstats.base.ThreadStats.flush()`,
71+ to gain better control over exception handling.
72+ """
73+ _ , dists = self .thread_stats ._get_aggregate_metrics_and_dists (float ("inf" ))
74+ count_dists = len (dists )
75+ if not count_dists :
76+ logger .debug ("No distributions to flush. Continuing." )
77+
78+ self .thread_stats .flush_count += 1
79+ logger .debug (
80+ "Flush #%s sending %s distributions" ,
81+ self .thread_stats .flush_count ,
82+ count_dists ,
83+ )
84+ try :
85+ self .thread_stats .reporter .flush_distributions (dists )
86+ except Exception as e :
87+ # The nature of the root issue https://bugs.python.org/issue41345 is complex,
88+ # but comprehensive tests suggest that it is safe to retry on this specific error.
89+ if isinstance (
90+ e , api .exceptions .ClientError
91+ ) and "RemoteDisconnected" in str (e ):
92+ logger .debug (
93+ "Retry flush #%s due to RemoteDisconnected" ,
94+ self .thread_stats .flush_count ,
95+ )
96+ try :
97+ self .thread_stats .reporter .flush_distributions (dists )
98+ except Exception :
99+ logger .debug (
100+ "Flush #%s failed after retry" ,
101+ self .thread_stats .flush_count ,
102+ exc_info = True ,
103+ )
104+ else :
105+ logger .debug (
106+ "Flush #%s failed" , self .thread_stats .flush_count , exc_info = True
107+ )
108+
109+ def stop (self ):
110+ self .thread_stats .stop ()
111+
112+
40113lambda_stats = None
41114if should_use_extension :
42- lambda_stats = StatsDWrapper ()
115+ lambda_stats = StatsDWriter ()
43116else :
44117 # Periodical flushing in a background thread is NOT guaranteed to succeed
45118 # and leads to data loss. When disabled, metrics are only flushed at the
46119 # end of invocation. To make metrics submitted from a long-running Lambda
47120 # function available sooner, consider using the Datadog Lambda extension.
48121 flush_in_thread = os .environ .get ("DD_FLUSH_IN_THREAD" , "" ).lower () == "true"
49- lambda_stats = ThreadStats (compress_payload = True )
50- lambda_stats .start (flush_in_thread = flush_in_thread )
122+ lambda_stats = ThreadStatsWriter (flush_in_thread )
51123
52124
53125def lambda_metric (metric_name , value , timestamp = None , tags = None , force_async = False ):
@@ -74,8 +146,7 @@ def lambda_metric(metric_name, value, timestamp=None, tags=None, force_async=Fal
74146
75147
76148def write_metric_point_to_stdout (metric_name , value , timestamp = None , tags = []):
77- """Writes the specified metric point to standard output
78- """
149+ """Writes the specified metric point to standard output"""
79150 logger .debug (
80151 "Sending metric %s value %s to Datadog via log forwarder" , metric_name , value
81152 )
@@ -91,40 +162,8 @@ def write_metric_point_to_stdout(metric_name, value, timestamp=None, tags=[]):
91162 )
92163
93164
94- def flush_thread_stats ():
95- """"Flush distributions from ThreadStats to Datadog.
96-
97- Modified based on `datadog.threadstats.base.ThreadStats.flush()`,
98- to gain better control over exception handling.
99- """
100- _ , dists = lambda_stats ._get_aggregate_metrics_and_dists (float ("inf" ))
101- count_dists = len (dists )
102- if not count_dists :
103- logger .debug ("No distributions to flush. Continuing." )
104-
105- lambda_stats .flush_count += 1
106- logger .debug (
107- "Flush #%s sending %s distributions" , lambda_stats .flush_count , count_dists
108- )
109- try :
110- lambda_stats .reporter .flush_distributions (dists )
111- except Exception as e :
112- # The nature of the root issue https://bugs.python.org/issue41345 is complex,
113- # but comprehensive tests suggest that it is safe to retry on this specific error.
114- if isinstance (e , api .exceptions .ClientError ) and "RemoteDisconnected" in str (e ):
115- logger .debug (
116- "Retry flush #%s due to RemoteDisconnected" , lambda_stats .flush_count
117- )
118- try :
119- lambda_stats .reporter .flush_distributions (dists )
120- except Exception :
121- logger .debug (
122- "Flush #%s failed after retry" ,
123- lambda_stats .flush_count ,
124- exc_info = True ,
125- )
126- else :
127- logger .debug ("Flush #%s failed" , lambda_stats .flush_count , exc_info = True )
165+ def flush_stats ():
166+ lambda_stats .flush ()
128167
129168
130169def are_enhanced_metrics_enabled ():
0 commit comments