Skip to content

Commit 2b76c31

Browse files
committed
Add LambdaFunctionTask
Currently SWF only; needs to be moved upward. And with some rough edges... Signed-off-by: Yves Bastide <yves@botify.com>
1 parent 1def54b commit 2b76c31

File tree

4 files changed

+143
-2
lines changed

4 files changed

+143
-2
lines changed

examples/lambda.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from simpleflow import Workflow, futures
2+
from simpleflow.lambda_function import LambdaFunction
3+
from simpleflow.swf.task import LambdaFunctionTask
4+
5+
6+
class LambdaWorkflow(Workflow):
7+
name = 'basic'
8+
version = 'example'
9+
task_list = 'example'
10+
# lambda_role = ''
11+
12+
def run(self):
13+
future = self.submit(
14+
LambdaFunctionTask(
15+
LambdaFunction(
16+
'hello-world-python',
17+
idempotent=True,
18+
),
19+
8,
20+
foo='bar',
21+
)
22+
)
23+
print(futures.wait(future))

simpleflow/lambda_function.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from simpleflow.base import Submittable
2+
3+
4+
class LambdaFunction(Submittable):
5+
def __init__(self,
6+
name,
7+
start_to_close_timeout=None,
8+
idempotent=None):
9+
self.name = name
10+
self.start_to_close_timeout = start_to_close_timeout
11+
self.idempotent = idempotent

simpleflow/swf/executor.py

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
MarkerTask,
3737
TimerTask,
3838
CancelTimerTask,
39+
LambdaFunctionTask,
3940
)
4041
from simpleflow.utils import (
4142
hex_hash,
@@ -342,7 +343,7 @@ def _get_future_from_child_workflow_event(self, event):
342343
future.set_finished(json_loads_or_raw(event['result']))
343344
elif state == 'failed':
344345
future.set_exception(exceptions.TaskFailed(
345-
name=event['id'],
346+
name=event['name'],
346347
reason=event['reason'],
347348
details=event.get('details'),
348349
))
@@ -360,6 +361,43 @@ def _get_future_from_child_workflow_event(self, event):
360361

361362
return future
362363

364+
def _get_future_from_lambda_function_event(self, event):
365+
"""
366+
367+
:param event: child workflow event
368+
:type event: dict[str, Any]
369+
:return:
370+
:rtype: futures.Future
371+
"""
372+
future = futures.Future()
373+
state = event['state']
374+
375+
if state == 'scheduled':
376+
pass
377+
elif state == 'schedule_failed':
378+
logger.info('failed to schedule {}: {}'.format(
379+
event['name'],
380+
event['cause'],
381+
))
382+
return None
383+
elif state == 'started':
384+
future.set_running()
385+
elif state == 'completed':
386+
future.set_finished(json_loads_or_raw(event['result']))
387+
elif state == 'failed':
388+
future.set_exception(exceptions.TaskFailed(
389+
name=event['name'],
390+
reason=event['reason'],
391+
details=event.get('details'),
392+
))
393+
elif state == 'timed_out':
394+
future.set_exception(exceptions.TimeoutError(
395+
event['timeout_type'],
396+
None,
397+
))
398+
399+
return future
400+
363401
def _get_future_from_marker_event(self, a_task, event):
364402
"""Maps a marker event to a Future with the corresponding
365403
state.
@@ -497,6 +535,19 @@ def find_child_workflow_event(self, a_task, history):
497535
"""
498536
return history.child_workflows.get(a_task.id)
499537

538+
def find_lambda_function_event(self, a_task, history):
539+
"""
540+
Get the event corresponding to a lambda function, if any.
541+
542+
:param a_task:
543+
:type a_task: LambdaFunctionTask
544+
:param history:
545+
:type history: simpleflow.history.History
546+
:return:
547+
:rtype: Optional[dict]
548+
"""
549+
return history.lambda_functions.get(a_task.id)
550+
500551
def find_signal_event(self, a_task, history):
501552
"""
502553
Get the event corresponding to a signal, if any.
@@ -570,6 +621,7 @@ def find_timer_event(self, a_task, history):
570621
MarkerTask: find_marker_event,
571622
TimerTask: find_timer_event,
572623
CancelTimerTask: find_timer_event,
624+
LambdaFunctionTask: find_lambda_function_event,
573625
}
574626

575627
def find_event(self, a_task, history):
@@ -644,12 +696,30 @@ def resume_child_workflow(self, a_task, event):
644696

645697
return future
646698

699+
def resume_lambda_function(self, a_task, event):
700+
"""
701+
Resume a child workflow.
702+
703+
:param a_task:
704+
:type a_task: LambdaTask
705+
:param event:
706+
:type event: dict
707+
:return:
708+
:rtype: simpleflow.futures.Future
709+
"""
710+
future = self._get_future_from_lambda_function_event(event)
711+
712+
if future.finished and future.exception:
713+
raise future.exception
714+
715+
return future
716+
647717
def schedule_task(self, a_task, task_list=None):
648718
"""
649719
Let a task schedule itself.
650720
If too many decisions are in flight, add a timer decision and raise ExecutionBlocked.
651721
:param a_task:
652-
:type a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask
722+
:type a_task: SwfTask
653723
:param task_list:
654724
:type task_list: Optional[str]
655725
:raise: exceptions.ExecutionBlocked if too many decisions waiting
@@ -710,6 +780,7 @@ def _add_start_timer_decision(self, id):
710780
'external_workflow': get_future_from_external_workflow_event,
711781
'marker': _get_future_from_marker_event,
712782
'timer': _get_future_from_timer_event,
783+
'lambda_function': resume_lambda_function,
713784
}
714785

715786
def resume(self, a_task, *args, **kwargs):

simpleflow/swf/task.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@
55
from simpleflow import task
66
from simpleflow.utils import json_dumps
77

8+
9+
if False:
10+
from simpleflow.lambda_function import LambdaFunction
11+
12+
813
logger = logging.getLogger(__name__)
914

1015

@@ -351,3 +356,34 @@ def schedule(self, *args, **kwargs):
351356
id=self.timer_id,
352357
)
353358
return [decision]
359+
360+
361+
class LambdaFunctionTask(task.Task, SwfTask):
362+
363+
def __init__(self, lambda_function, *args, **kwargs):
364+
# type: (LambdaFunction) -> None
365+
self.lambda_function = lambda_function
366+
self.args = self.resolve_args(*args)
367+
self.kwargs = self.resolve_kwargs(**kwargs)
368+
self.idempotent = lambda_function.idempotent
369+
self.id = None
370+
371+
@property
372+
def name(self):
373+
return self.lambda_function.name
374+
375+
def schedule(self, *args, **kwargs):
376+
input = {
377+
'args': self.args,
378+
'kwargs': self.kwargs,
379+
}
380+
381+
decision = swf.models.decision.LambdaFunctionDecision(
382+
'schedule',
383+
id=self.id,
384+
name=self.name,
385+
input=input,
386+
start_to_close_timeout=str(self.lambda_function.start_to_close_timeout)
387+
if self.lambda_function.start_to_close_timeout else None,
388+
)
389+
return [decision]

0 commit comments

Comments
 (0)