@@ -25,6 +25,8 @@ class History(object):
2525 :type _markers: collections.OrderedDict[str, list[dict[str, Any]]]
2626 :ivar _timers: timer events
2727 :type _timers: dict[str, dict[str, Any]]]
28+ :ivar _lambda_functions: activity events
29+ :type _lambda_functions: collections.OrderedDict[str, dict[str, Any]]
2830 :ivar _tasks: ordered list of tasks/etc
2931 :type _tasks: list[dict[str, Any]]
3032 """
@@ -39,6 +41,7 @@ def __init__(self, history):
3941 self ._signaled_workflows = collections .defaultdict (list )
4042 self ._markers = collections .OrderedDict ()
4143 self ._timers = {}
44+ self ._lambda_functions = collections .OrderedDict ()
4245 self ._tasks = []
4346
4447 @property
@@ -83,6 +86,13 @@ def signals(self):
8386 return self ._signals
8487
8588 @property
89+ def lambda_functions (self ):
90+ """
91+ :return: lambda_functions
92+ :rtype: collections.OrderedDict[str, dict[str, Any]]
93+ """
94+ return self ._lambda_functions
95+
8696 def signaled_workflows (self ):
8797 """
8898 :return: signaled workflows
@@ -510,13 +520,82 @@ def parse_timer_event(self, events, event):
510520 timer ['cancel_failed_event_id' ] = event .id
511521 timer ['cancel_failed_event_timestamp' ] = event .timestamp
512522
523+ def parse_lambda_function_event (self , events , event ):
524+ """
525+ Parse a lambda function event.
526+ :param events:
527+ :param event:
528+ """
529+ def get_lambda ():
530+ scheduled_event_id = events [event .scheduled_event_id - 1 ]
531+ return self ._lambda_functions [scheduled_event_id .lambda_id ]
532+
533+ if event .state == 'scheduled' :
534+ lambda_function = {
535+ 'type' : 'lambda_function' ,
536+ 'id' : event .lambda_id ,
537+ 'name' : event .lambda_name ,
538+ 'input' : event .input ,
539+ 'state' : event .state ,
540+ 'start_to_close_timeout' : getattr (event , 'start_to_close_timeout' , None ),
541+ 'scheduled_id' : event .id ,
542+ 'scheduled_timestamp' : event .timestamp ,
543+ }
544+ self ._lambda_functions [event .lambda_id ] = lambda_function
545+ elif event .state == 'schedule_failed' :
546+ lambda_function = {
547+ 'type' : 'lambda_function' ,
548+ 'id' : event .lambda_id ,
549+ 'name' : event .lambda_name ,
550+ 'state' : event .state ,
551+ 'schedule_failed_id' : event .id ,
552+ 'schedule_failed_timestamp' : event .timestamp ,
553+ }
554+ self ._lambda_functions [event .lambda_id ] = lambda_function
555+ elif event .state == 'started' :
556+ lambda_function = get_lambda ()
557+ lambda_function .update ({
558+ 'state' : event .state ,
559+ 'started_id' : event .id ,
560+ 'started_timestamp' : event .timestamp ,
561+ })
562+ elif event .state == 'completed' :
563+ lambda_function = get_lambda ()
564+ lambda_function .update ({
565+ 'state' : event .state ,
566+ 'result' : getattr (event , 'result' , None ),
567+ 'completed_id' : event .id ,
568+ 'completed_timestamp' : event .timestamp ,
569+ })
570+ elif event .state == 'failed' :
571+ lambda_function = get_lambda ()
572+ lambda_function .update ({
573+ 'state' : event .state ,
574+ 'reason' : getattr (event , 'reason' , '' ),
575+ 'details' : getattr (event , 'details' , '' ),
576+ 'failed_id' : event .id ,
577+ 'failed_timestamp' : event .timestamp ,
578+ 'retry' : lambda_function .get ('retry' , - 1 ) + 1 ,
579+ })
580+ elif event .state == 'timed_out' :
581+ lambda_function = get_lambda ()
582+ lambda_function .update ({
583+ 'state' : event .state ,
584+ 'timeout_type' : getattr (event , 'timeout_type' , 'START_TO_CLOSE' ),
585+ 'timeout_value' : lambda_function ['start_to_close_timeout' ],
586+ 'timed_out_id' : event .id ,
587+ 'timed_out_timestamp' : event .timestamp ,
588+ 'retry' : lambda_function .get ('retry' , - 1 ) + 1 ,
589+ })
590+
513591 TYPE_TO_PARSER = {
514592 'ActivityTask' : parse_activity_event ,
515593 'ChildWorkflowExecution' : parse_child_workflow_event ,
516594 'WorkflowExecution' : parse_workflow_event ,
517595 'ExternalWorkflowExecution' : parse_external_workflow_event ,
518596 'Marker' : parse_marker_event ,
519597 'Timer' : parse_timer_event ,
598+ 'LambdaFunction' : parse_lambda_function_event ,
520599 }
521600
522601 def parse (self ):
0 commit comments