@@ -34,7 +34,7 @@ class BaseEventProcessor(ABC):
3434 """ Class encapsulating event processing. Override with your own implementation. """
3535
3636 @abc .abstractmethod
37- def process (user_event ):
37+ def process (self , user_event ):
3838 """ Method to provide intermediary processing stage within event production.
3939 Args:
4040 user_event: UserEvent instance that needs to be processed and dispatched.
@@ -45,33 +45,34 @@ def process(user_event):
4545class BatchEventProcessor (BaseEventProcessor ):
4646 """
4747 BatchEventProcessor is an implementation of the BaseEventProcessor that batches events.
48+
4849 The BatchEventProcessor maintains a single consumer thread that pulls events off of
4950 the blocking queue and buffers them for either a configured batch size or for a
5051 maximum duration before the resulting LogEvent is sent to the EventDispatcher.
5152 """
5253
5354 _DEFAULT_QUEUE_CAPACITY = 1000
5455 _DEFAULT_BATCH_SIZE = 10
55- _DEFAULT_FLUSH_INTERVAL = timedelta ( seconds = 30 )
56- _DEFAULT_TIMEOUT_INTERVAL = timedelta ( seconds = 5 )
56+ _DEFAULT_FLUSH_INTERVAL = 30
57+ _DEFAULT_TIMEOUT_INTERVAL = 5
5758 _SHUTDOWN_SIGNAL = object ()
5859 _FLUSH_SIGNAL = object ()
5960 LOCK = threading .Lock ()
6061
6162 def __init__ (self ,
6263 event_dispatcher ,
63- logger ,
64+ logger = None ,
6465 start_on_init = False ,
6566 event_queue = None ,
6667 batch_size = None ,
6768 flush_interval = None ,
6869 timeout_interval = None ,
6970 notification_center = None ):
70- """ EventProcessor init method to configure event batching.
71+ """ BatchEventProcessor init method to configure event batching.
7172
7273 Args:
7374 event_dispatcher: Provides a dispatch_event method which if given a URL and params sends a request to it.
74- logger: Provides a log method to log messages. By default nothing would be logged.
75+ logger: Optional component which provides a log method to log messages. By default nothing would be logged.
7576 start_on_init: Optional boolean param which starts the consumer thread if set to True.
7677 Default value is False.
7778 event_queue: Optional component which accumulates the events until dispacthed.
@@ -86,20 +87,28 @@ def __init__(self,
8687 self .event_dispatcher = event_dispatcher or default_event_dispatcher
8788 self .logger = _logging .adapt_logger (logger or _logging .NoOpLogger ())
8889 self .event_queue = event_queue or queue .Queue (maxsize = self ._DEFAULT_QUEUE_CAPACITY )
89- self .batch_size = batch_size if self ._validate_intantiation_props (batch_size , 'batch_size' ) \
90+ self .batch_size = batch_size if self ._validate_instantiation_props (batch_size ,
91+ 'batch_size' ,
92+ self ._DEFAULT_BATCH_SIZE ) \
9093 else self ._DEFAULT_BATCH_SIZE
9194 self .flush_interval = timedelta (seconds = flush_interval ) \
92- if self ._validate_intantiation_props (flush_interval , 'flush_interval' ) \
93- else self ._DEFAULT_FLUSH_INTERVAL
95+ if self ._validate_instantiation_props (flush_interval ,
96+ 'flush_interval' ,
97+ self ._DEFAULT_FLUSH_INTERVAL ) \
98+ else timedelta (self ._DEFAULT_FLUSH_INTERVAL )
9499 self .timeout_interval = timedelta (seconds = timeout_interval ) \
95- if self ._validate_intantiation_props (timeout_interval , 'timeout_interval' ) \
96- else self ._DEFAULT_TIMEOUT_INTERVAL
97- self .notification_center = notification_center
100+ if self ._validate_instantiation_props (timeout_interval ,
101+ 'timeout_interval' ,
102+ self ._DEFAULT_TIMEOUT_INTERVAL ) \
103+ else timedelta (self ._DEFAULT_TIMEOUT_INTERVAL )
104+
105+ self .notification_center = notification_center or _notification_center .NotificationCenter (self .logger )
98106 self ._current_batch = list ()
99107
100108 if not validator .is_notification_center_valid (self .notification_center ):
101109 self .logger .error (enums .Errors .INVALID_INPUT .format ('notification_center' ))
102- self .notification_center = _notification_center .NotificationCenter ()
110+ self .logger .debug ('Creating notification center for use.' )
111+ self .notification_center = _notification_center .NotificationCenter (self .logger )
103112
104113 self .executor = None
105114 if start_on_init is True :
@@ -110,13 +119,14 @@ def is_running(self):
110119 """ Property to check if consumer thread is alive or not. """
111120 return self .executor .isAlive () if self .executor else False
112121
113- def _validate_intantiation_props (self , prop , prop_name ):
122+ def _validate_instantiation_props (self , prop , prop_name , default_value ):
114123 """ Method to determine if instantiation properties like batch_size, flush_interval
115124 and timeout_interval are valid.
116125
117126 Args:
118127 prop: Property value that needs to be validated.
119128 prop_name: Property name.
129+ default_value: Default value for property.
120130
121131 Returns:
122132 False if property value is None or less than or equal to 0 or not a finite number.
@@ -132,7 +142,7 @@ def _validate_intantiation_props(self, prop, prop_name):
132142 is_valid = False
133143
134144 if is_valid is False :
135- self .logger .info ('Using default value for {}.' .format (prop_name ))
145+ self .logger .info ('Using default value {} for {}.' .format (default_value , prop_name ))
136146
137147 return is_valid
138148
@@ -213,11 +223,10 @@ def _flush_queue(self):
213223
214224 log_event = EventFactory .create_log_event (to_process_batch , self .logger )
215225
216- if self .notification_center is not None :
217- self .notification_center .send_notifications (
218- enums .NotificationTypes .LOG_EVENT ,
219- log_event
220- )
226+ self .notification_center .send_notifications (
227+ enums .NotificationTypes .LOG_EVENT ,
228+ log_event
229+ )
221230
222231 try :
223232 self .event_dispatcher .dispatch_event (log_event )
@@ -226,14 +235,17 @@ def _flush_queue(self):
226235
227236 def process (self , user_event ):
228237 """ Method to process the user_event by putting it in event_queue.
238+
229239 Args:
230240 user_event: UserEvent Instance.
231241 """
232242 if not isinstance (user_event , UserEvent ):
233243 self .logger .error ('Provided event is in an invalid format.' )
234244 return
235245
236- self .logger .debug ('Received user_event: ' + str (user_event ))
246+ self .logger .debug ('Received event of type {} for user {}.' .format (
247+ type (user_event ).__name__ , user_event .user_id )
248+ )
237249
238250 try :
239251 self .event_queue .put_nowait (user_event )
@@ -242,6 +254,7 @@ def process(self, user_event):
242254
243255 def _add_to_batch (self , user_event ):
244256 """ Method to append received user event to current batch.
257+
245258 Args:
246259 user_event: UserEvent Instance.
247260 """
@@ -261,9 +274,11 @@ def _add_to_batch(self, user_event):
261274
262275 def _should_split (self , user_event ):
263276 """ Method to check if current event batch should split into two.
277+
264278 Args:
265279 user_event: UserEvent Instance.
266- Return Value:
280+
281+ Returns:
267282 - True, if revision number and project_id of last event in current batch do not match received event's
268283 revision number and project id respectively.
269284 - False, otherwise.
@@ -311,30 +326,32 @@ def __init__(self, event_dispatcher, logger=None, notification_center=None):
311326 """
312327 self .event_dispatcher = event_dispatcher
313328 self .logger = _logging .adapt_logger (logger or _logging .NoOpLogger ())
314- self .notification_center = notification_center
329+ self .notification_center = notification_center or _notification_center . NotificationCenter ( self . logger )
315330
316331 if not validator .is_notification_center_valid (self .notification_center ):
317332 self .logger .error (enums .Errors .INVALID_INPUT .format ('notification_center' ))
318333 self .notification_center = _notification_center .NotificationCenter ()
319334
320335 def process (self , user_event ):
321336 """ Method to process the user_event by dispatching it.
337+
322338 Args:
323339 user_event: UserEvent Instance.
324340 """
325341 if not isinstance (user_event , UserEvent ):
326342 self .logger .error ('Provided event is in an invalid format.' )
327343 return
328344
329- self .logger .debug ('Received user_event: ' + str (user_event ))
345+ self .logger .debug ('Received event of type {} for user {}.' .format (
346+ type (user_event ).__name__ , user_event .user_id )
347+ )
330348
331349 log_event = EventFactory .create_log_event (user_event , self .logger )
332350
333- if self .notification_center is not None :
334- self .notification_center .send_notifications (
335- enums .NotificationTypes .LOG_EVENT ,
336- log_event
337- )
351+ self .notification_center .send_notifications (
352+ enums .NotificationTypes .LOG_EVENT ,
353+ log_event
354+ )
338355
339356 try :
340357 self .event_dispatcher .dispatch_event (log_event )
0 commit comments