11import time
22from threading import Lock , Thread
33from typing import Dict , List , Union
4+ import concurrent .futures
45
56import attr
7+ import httpx
68
79from featureflags .models .metrics_data_metrics_type import \
810 MetricsDataMetricsType
2022from .models .unset import Unset
2123from .sdk_logging_codes import info_metrics_thread_started , \
2224 info_metrics_success , warn_post_metrics_failed , \
23- info_metrics_thread_existed , info_metrics_target_exceeded
25+ info_metrics_thread_existed , info_metrics_target_exceeded , \
26+ warn_post_metrics_target_batch_failed , info_metrics_target_batch_success
2427from .util import log
2528
2629FF_METRIC_TYPE = 'FFMETRICS'
@@ -62,7 +65,10 @@ def __init__(self, config: Config, client: AuthenticatedClient,
6265 self ._client = client
6366 self ._environment = environment
6467 self ._data : Dict [str , AnalyticsEvent ] = {}
65- self ._target_data : Dict [str , MetricTargetData ] = {}
68+ self ._target_data_batches : List [Dict [str , MetricTargetData ]] = [{}]
69+ self ._max_number_of_batches = 200
70+ self ._max_batch_size = 1000
71+ self ._current_batch_index = 0
6672 self .max_target_data_exceeded = False
6773
6874 self ._running = False
@@ -89,37 +95,45 @@ def enqueue(self, target: Target, identifier: str,
8995 event .count = 1
9096 self ._data [unique_evaluation_key ] = event
9197
92- # Store unique targets. If the target already exists
93- # just ignore it.
98+ # Check if we're on our final batch - if we are, and we've
99+ # exceeded the max batch size just return early.
100+ if len (self ._target_data_batches ) >= self ._max_number_of_batches :
101+ if len (self ._target_data_batches [
102+ self ._current_batch_index ]) >= \
103+ self ._max_batch_size :
104+ if not self .max_target_data_exceeded :
105+ self .max_target_data_exceeded = True
106+ info_metrics_target_exceeded ()
107+ return
108+
94109 if event .target is not None and not event .target .anonymous :
95110 unique_target_key = self .get_target_key (event )
96- if unique_target_key not in self ._target_data :
97- # Temporary workaround for FFM-8231 - limit max size of
98- # target
99- # metrics to 50k, which ff-server can process in around
100- # 18 seconds. This possibly prevent some targets from
101- # getting
102- # registered and showing in the UI, but in theory, they
103- # should get registered eventually on subsequent
104- # evaluations.
105- # We want to eventually use a batching solution
106- # to avoid this.
107- max_target_size = 50000
108- if len (self ._target_data ) >= max_target_size :
109- # Only log the info code once per interval
110- if not self .max_target_data_exceeded :
111- info_metrics_target_exceeded ()
112- self .max_target_data_exceeded = True
111+
112+ # Store unique targets. If the target already exists
113+ # in any of the batches, don't continue processing it
114+ for batch in self ._target_data_batches :
115+ if unique_target_key in batch :
113116 return
114- target_name = event .target .name
115- # If the target has no name use the identifier
116- if not target_name :
117- target_name = event .target .identifier
118- self ._target_data [unique_target_key ] = MetricTargetData (
117+
118+ # If we've exceeded the max batch size for the current
119+ # batch, then create a new batch and start using it.
120+ if len (self ._target_data_batches [
121+ self ._current_batch_index ]) >= self ._max_batch_size :
122+ self ._target_data_batches .append ({})
123+ self ._current_batch_index += 1
124+
125+ target_name = event .target .name
126+ # If the target has no name use the identifier
127+ if not target_name :
128+ target_name = event .target .identifier
129+ self ._target_data_batches [
130+ self ._current_batch_index ][unique_target_key ] = \
131+ MetricTargetData (
119132 identifier = event .target .identifier ,
120133 name = target_name ,
121134 attributes = event .target .attributes
122135 )
136+
123137 finally :
124138 self ._lock .release ()
125139
@@ -177,35 +191,106 @@ def _send_data(self) -> None:
177191 attributes = metric_attributes
178192 )
179193 metrics_data .append (md )
180- for _ , unique_target in self ._target_data .items ():
181- target_attributes : List [KeyValue ] = []
182- if not isinstance (unique_target .attributes , Unset ):
183- for key , value in unique_target .attributes .items ():
184- # Attribute values need to be sent as string to
185- # ff-server so convert all values to strings.
186- target_attributes .append (KeyValue (key , str (value )))
187- td = TargetData (
188- identifier = unique_target .identifier ,
189- name = unique_target .name ,
190- attributes = target_attributes
191- )
192- target_data .append (td )
194+ for _ , unique_target in self ._target_data_batches [0 ].items ():
195+ self .process_target (target_data , unique_target )
196+
197+ target_data_batches : List [List [TargetData ]] = []
198+ target_data_batch_index = 0
199+ # We've already accounted for the first batch, so start processing
200+ # from the second batch onwards
201+ for batch in self ._target_data_batches [1 :]:
202+ target_data_batches .append ([])
203+ for _ , unique_target in batch .items ():
204+ self .process_target (
205+ target_data_batches [target_data_batch_index ],
206+ unique_target )
207+ target_data_batch_index += 1
208+
209+
193210 finally :
194211 self ._data = {}
195- self ._target_data = {}
212+ self ._target_data_batches = [{}]
213+ self ._current_batch_index = 0
196214 self .max_target_data_exceeded = False
197215 self ._lock .release ()
198216
199217 body : Metrics = Metrics (target_data = target_data ,
200218 metrics_data = metrics_data )
201- response = post_metrics (client = self ._client ,
202- environment = self ._environment , json_body = body )
203- log .debug ('Metrics server returns: %d' , response .status_code )
204- if response .status_code >= 400 :
205- warn_post_metrics_failed (response .status_code )
206- return
207- info_metrics_success ()
208- return
219+ try :
220+ response = post_metrics (client = self ._client ,
221+ environment = self ._environment ,
222+ json_body = body )
223+
224+ log .debug ('Metrics server returns: %d' , response .status_code )
225+ if response .status_code >= 400 :
226+ warn_post_metrics_failed (response .status_code )
227+ return
228+ if len (target_data_batches ) > 0 :
229+ log .info ('Sending %s target batches to metrics' ,
230+ len (target_data_batches ))
231+ unique_responses_codes = {}
232+
233+ # Process batches concurrently
234+ with concurrent .futures .ThreadPoolExecutor () as executor :
235+ futures = []
236+ for batch in target_data_batches :
237+ # Staggering requests over 0.02 seconds mean that we
238+ # will send 200 requests every four seconds, so that
239+ # the backend isn't hit too hard.
240+ time .sleep (0.02 )
241+ future = executor .submit (
242+ self .process_target_data_batch ,
243+ batch )
244+ futures .append (future )
245+
246+ # Wait for all batches to complete
247+ concurrent .futures .wait (futures )
248+
249+ # Get unique status codes
250+ for future in futures :
251+ status_code = future .result ()
252+ if status_code in unique_responses_codes :
253+ unique_responses_codes [status_code ] += 1
254+ else :
255+ unique_responses_codes [status_code ] = 1
256+
257+ # Log any error codes
258+ for unique_code , count in unique_responses_codes .items ():
259+ if response .status_code >= 400 :
260+ warn_post_metrics_target_batch_failed (
261+ f'{ count } batches received code { unique_code } ' )
262+ info_metrics_target_batch_success (
263+ f'{ count } batches successful' )
264+
265+
266+ info_metrics_success ()
267+ except httpx .RequestError as ex :
268+ warn_post_metrics_failed (ex )
269+
270+
271+ def process_target_data_batch (self , target_data_batch ):
272+ batch_request_body : Metrics = Metrics (
273+ target_data = target_data_batch , metrics_data = []
274+ )
275+ response = post_metrics (
276+ client = self ._client , environment = self ._environment ,
277+ json_body = batch_request_body
278+ )
279+ return response .status_code
280+
281+ def process_target (self , target_data , unique_target ):
282+ target_attributes : List [KeyValue ] = []
283+ if not isinstance (unique_target .attributes , Unset ):
284+ for key , value in unique_target .attributes .items ():
285+ # Attribute values need to be sent as string to
286+ # ff-server so convert all values to strings.
287+ target_attributes .append (KeyValue (key , str (value )))
288+ td = TargetData (
289+ identifier = unique_target .identifier ,
290+ name = unique_target .name ,
291+ attributes = target_attributes
292+ )
293+ target_data .append (td )
209294
210295 def close (self ) -> None :
211296 self ._running = False
0 commit comments