Skip to content

Commit 8759d17

Browse files
committed
History: parse lambda function events
Signed-off-by: Yves Bastide <yves@botify.com>
1 parent fe9ee7d commit 8759d17

File tree

1 file changed

+79
-0
lines changed

1 file changed

+79
-0
lines changed

simpleflow/history.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ class History(object):
2323
:type _signals: collections.OrderedDict[str, dict[str, Any]]
2424
:ivar _markers: marker events
2525
:type _markers: collections.OrderedDict[str, list[dict[str, Any]]]
26+
:ivar _lambda_functions: activity events
27+
:type _lambda_functions: collections.OrderedDict[str, dict[str, Any]]
2628
:ivar _tasks: ordered list of tasks/etc
2729
:type _tasks: list[dict[str, Any]]
2830
"""
@@ -36,6 +38,7 @@ def __init__(self, history):
3638
self._signals = collections.OrderedDict()
3739
self._signaled_workflows = collections.defaultdict(list)
3840
self._markers = collections.OrderedDict()
41+
self._lambda_functions = collections.OrderedDict()
3942
self._tasks = []
4043

4144
@property
@@ -80,6 +83,13 @@ def signals(self):
8083
return self._signals
8184

8285
@property
86+
def lambda_functions(self):
87+
"""
88+
:return: lambda_functions
89+
:rtype: collections.OrderedDict[str, dict[str, Any]]
90+
"""
91+
return self._lambda_functions
92+
8393
def signaled_workflows(self):
8494
"""
8595
:return: signaled workflows
@@ -469,12 +479,81 @@ def parse_marker_event(self, events, event):
469479
}
470480
self._markers.setdefault(event.marker_name, []).append(marker)
471481

482+
def parse_lambda_function_event(self, events, event):
483+
"""
484+
Parse a lambda function event.
485+
:param events:
486+
:param event:
487+
"""
488+
def get_lambda():
489+
scheduled_event_id = events[event.scheduled_event_id - 1]
490+
return self._lambda_functions[scheduled_event_id.lambda_id]
491+
492+
if event.state == 'scheduled':
493+
lambda_function = {
494+
'type': 'lambda_function',
495+
'id': event.lambda_id,
496+
'name': event.lambda_name,
497+
'input': event.input,
498+
'state': event.state,
499+
'start_to_close_timeout': getattr(event, 'start_to_close_timeout', None),
500+
'scheduled_id': event.id,
501+
'scheduled_timestamp': event.timestamp,
502+
}
503+
self._lambda_functions[event.lambda_id] = lambda_function
504+
elif event.state == 'schedule_failed':
505+
lambda_function = {
506+
'type': 'lambda_function',
507+
'id': event.lambda_id,
508+
'name': event.lambda_name,
509+
'state': event.state,
510+
'schedule_failed_id': event.id,
511+
'schedule_failed_timestamp': event.timestamp,
512+
}
513+
self._lambda_functions[event.lambda_id] = lambda_function
514+
elif event.state == 'started':
515+
lambda_function = get_lambda()
516+
lambda_function.update({
517+
'state': event.state,
518+
'started_id': event.id,
519+
'started_timestamp': event.timestamp,
520+
})
521+
elif event.state == 'completed':
522+
lambda_function = get_lambda()
523+
lambda_function.update({
524+
'state': event.state,
525+
'result': getattr(event, 'result', None),
526+
'completed_id': event.id,
527+
'completed_timestamp': event.timestamp,
528+
})
529+
elif event.state == 'failed':
530+
lambda_function = get_lambda()
531+
lambda_function.update({
532+
'state': event.state,
533+
'reason': getattr(event, 'reason', ''),
534+
'details': getattr(event, 'details', ''),
535+
'failed_id': event.id,
536+
'failed_timestamp': event.timestamp,
537+
'retry': lambda_function.get('retry', -1) + 1,
538+
})
539+
elif event.state == 'timed_out':
540+
lambda_function = get_lambda()
541+
lambda_function.update({
542+
'state': event.state,
543+
'timeout_type': getattr(event, 'timeout_type', 'START_TO_CLOSE'),
544+
'timeout_value': lambda_function['start_to_close_timeout'],
545+
'timed_out_id': event.id,
546+
'timed_out_timestamp': event.timestamp,
547+
'retry': lambda_function.get('retry', -1) + 1,
548+
})
549+
472550
TYPE_TO_PARSER = {
473551
'ActivityTask': parse_activity_event,
474552
'ChildWorkflowExecution': parse_child_workflow_event,
475553
'WorkflowExecution': parse_workflow_event,
476554
'ExternalWorkflowExecution': parse_external_workflow_event,
477555
'Marker': parse_marker_event,
556+
'LambdaFunction': parse_lambda_function_event,
478557
}
479558

480559
def parse(self):

0 commit comments

Comments
 (0)