@@ -25,6 +25,8 @@ class History(object):
2525 :type _markers: collections.OrderedDict[str, list[dict[str, Any]]]
2626 :ivar _timers: timer events
2727 :type _timers: dict[str, dict[str, Any]]]
28+ :ivar _lambda_functions: activity events
29+ :type _lambda_functions: collections.OrderedDict[str, dict[str, Any]]
2830 :ivar _tasks: ordered list of tasks/etc
2931 :type _tasks: list[dict[str, Any]]
3032 """
@@ -39,6 +41,7 @@ def __init__(self, history):
3941 self ._signaled_workflows = collections .defaultdict (list )
4042 self ._markers = collections .OrderedDict ()
4143 self ._timers = {}
44+ self ._lambda_functions = collections .OrderedDict ()
4245 self ._tasks = []
4346 self ._cancel_requested = None
4447 self ._cancel_failed = None
@@ -119,6 +122,13 @@ def cancel_failed_decision_task_completed_event_id(self):
119122 return self ._cancel_failed ['decision_task_completed_event_id' ] if self ._cancel_failed else None
120123
121124 @property
125+ def lambda_functions (self ):
126+ """
127+ :return: lambda_functions
128+ :rtype: collections.OrderedDict[str, dict[str, Any]]
129+ """
130+ return self ._lambda_functions
131+
122132 def signaled_workflows (self ):
123133 """
124134 :return: signaled workflows
@@ -598,6 +608,74 @@ def parse_decision_event(self, events, event):
598608 if event .state == 'completed' :
599609 self .completed_decision_id = event .id
600610
611+ def parse_lambda_function_event (self , events , event ):
612+ """
613+ Parse a lambda function event.
614+ :param events:
615+ :param event:
616+ """
617+ def get_lambda ():
618+ scheduled_event_id = events [event .scheduled_event_id - 1 ]
619+ return self ._lambda_functions [scheduled_event_id .lambda_id ]
620+
621+ if event .state == 'scheduled' :
622+ lambda_function = {
623+ 'type' : 'lambda_function' ,
624+ 'id' : event .lambda_id ,
625+ 'name' : event .lambda_name ,
626+ 'input' : event .input ,
627+ 'state' : event .state ,
628+ 'start_to_close_timeout' : getattr (event , 'start_to_close_timeout' , None ),
629+ 'scheduled_id' : event .id ,
630+ 'scheduled_timestamp' : event .timestamp ,
631+ }
632+ self ._lambda_functions [event .lambda_id ] = lambda_function
633+ elif event .state == 'schedule_failed' :
634+ lambda_function = {
635+ 'type' : 'lambda_function' ,
636+ 'id' : event .lambda_id ,
637+ 'name' : event .lambda_name ,
638+ 'state' : event .state ,
639+ 'schedule_failed_id' : event .id ,
640+ 'schedule_failed_timestamp' : event .timestamp ,
641+ }
642+ self ._lambda_functions [event .lambda_id ] = lambda_function
643+ elif event .state == 'started' :
644+ lambda_function = get_lambda ()
645+ lambda_function .update ({
646+ 'state' : event .state ,
647+ 'started_id' : event .id ,
648+ 'started_timestamp' : event .timestamp ,
649+ })
650+ elif event .state == 'completed' :
651+ lambda_function = get_lambda ()
652+ lambda_function .update ({
653+ 'state' : event .state ,
654+ 'result' : getattr (event , 'result' , None ),
655+ 'completed_id' : event .id ,
656+ 'completed_timestamp' : event .timestamp ,
657+ })
658+ elif event .state == 'failed' :
659+ lambda_function = get_lambda ()
660+ lambda_function .update ({
661+ 'state' : event .state ,
662+ 'reason' : getattr (event , 'reason' , '' ),
663+ 'details' : getattr (event , 'details' , '' ),
664+ 'failed_id' : event .id ,
665+ 'failed_timestamp' : event .timestamp ,
666+ 'retry' : lambda_function .get ('retry' , - 1 ) + 1 ,
667+ })
668+ elif event .state == 'timed_out' :
669+ lambda_function = get_lambda ()
670+ lambda_function .update ({
671+ 'state' : event .state ,
672+ 'timeout_type' : getattr (event , 'timeout_type' , 'START_TO_CLOSE' ),
673+ 'timeout_value' : lambda_function ['start_to_close_timeout' ],
674+ 'timed_out_id' : event .id ,
675+ 'timed_out_timestamp' : event .timestamp ,
676+ 'retry' : lambda_function .get ('retry' , - 1 ) + 1 ,
677+ })
678+
601679 TYPE_TO_PARSER = {
602680 'ActivityTask' : parse_activity_event ,
603681 'DecisionTask' : parse_decision_event ,
@@ -606,6 +684,7 @@ def parse_decision_event(self, events, event):
606684 'ExternalWorkflowExecution' : parse_external_workflow_event ,
607685 'Marker' : parse_marker_event ,
608686 'Timer' : parse_timer_event ,
687+ 'LambdaFunction' : parse_lambda_function_event ,
609688 }
610689
611690 def parse (self ):
0 commit comments