Skip to content

Commit a05c507

Browse files
committed
Add LambdaFunctionTask
Currently SWF only; needs to be moved upward. And with some rough edges... Signed-off-by: Yves Bastide <yves@botify.com>
1 parent 8759d17 commit a05c507

File tree

4 files changed

+146
-5
lines changed

4 files changed

+146
-5
lines changed

examples/lambda.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from simpleflow import Workflow, futures
2+
from simpleflow.lambda_function import LambdaFunction
3+
from simpleflow.swf.task import LambdaFunctionTask
4+
5+
6+
class LambdaWorkflow(Workflow):
7+
name = 'basic'
8+
version = 'example'
9+
task_list = 'example'
10+
# lambda_role = ''
11+
12+
def run(self):
13+
future = self.submit(
14+
LambdaFunctionTask(
15+
LambdaFunction(
16+
'hello-world-python',
17+
idempotent=True,
18+
),
19+
8,
20+
foo='bar',
21+
)
22+
)
23+
print(futures.wait(future))

simpleflow/lambda_function.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from simpleflow.base import Submittable
2+
3+
4+
class LambdaFunction(Submittable):
5+
def __init__(self,
6+
name,
7+
start_to_close_timeout=None,
8+
idempotent=None):
9+
self.name = name
10+
self.start_to_close_timeout = start_to_close_timeout
11+
self.idempotent = idempotent

simpleflow/swf/executor.py

Lines changed: 75 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from simpleflow.signal import WaitForSignal
2626
from simpleflow.swf import constants
2727
from simpleflow.swf.helpers import swf_identity
28-
from simpleflow.swf.task import ActivityTask, WorkflowTask, SignalTask, MarkerTask, SwfTask
28+
from simpleflow.swf.task import ActivityTask, WorkflowTask, SignalTask, MarkerTask, SwfTask, LambdaFunctionTask
2929
from simpleflow.utils import (
3030
hex_hash,
3131
issubclass_,
@@ -321,7 +321,7 @@ def _get_future_from_child_workflow_event(self, event):
321321
future.set_finished(json_loads_or_raw(event['result']))
322322
elif state == 'failed':
323323
future.set_exception(exceptions.TaskFailed(
324-
name=event['id'],
324+
name=event['name'],
325325
reason=event['reason'],
326326
details=event.get('details'),
327327
))
@@ -339,6 +339,43 @@ def _get_future_from_child_workflow_event(self, event):
339339

340340
return future
341341

342+
def _get_future_from_lambda_function_event(self, event):
343+
"""
344+
345+
:param event: child workflow event
346+
:type event: dict[str, Any]
347+
:return:
348+
:rtype: futures.Future
349+
"""
350+
future = futures.Future()
351+
state = event['state']
352+
353+
if state == 'scheduled':
354+
pass
355+
elif state == 'schedule_failed':
356+
logger.info('failed to schedule {}: {}'.format(
357+
event['name'],
358+
event['cause'],
359+
))
360+
return None
361+
elif state == 'started':
362+
future.set_running()
363+
elif state == 'completed':
364+
future.set_finished(json_loads_or_raw(event['result']))
365+
elif state == 'failed':
366+
future.set_exception(exceptions.TaskFailed(
367+
name=event['name'],
368+
reason=event['reason'],
369+
details=event.get('details'),
370+
))
371+
elif state == 'timed_out':
372+
future.set_exception(exceptions.TimeoutError(
373+
event['timeout_type'],
374+
None,
375+
))
376+
377+
return future
378+
342379
def _get_future_from_marker_event(self, a_task, event):
343380
"""Maps a marker event to a Future with the corresponding
344381
state.
@@ -447,6 +484,19 @@ def find_child_workflow_event(self, a_task, history):
447484
"""
448485
return history.child_workflows.get(a_task.id)
449486

487+
def find_lambda_function_event(self, a_task, history):
488+
"""
489+
Get the event corresponding to a lambda function, if any.
490+
491+
:param a_task:
492+
:type a_task: LambdaFunctionTask
493+
:param history:
494+
:type history: simpleflow.history.History
495+
:return:
496+
:rtype: Optional[dict]
497+
"""
498+
return history.lambda_functions.get(a_task.id)
499+
450500
def find_signal_event(self, a_task, history):
451501
"""
452502
Get the event corresponding to a signal, if any.
@@ -496,6 +546,7 @@ def find_marker_event(self, a_task, history):
496546
WorkflowTask: find_child_workflow_event,
497547
SignalTask: find_signal_event,
498548
MarkerTask: find_marker_event,
549+
LambdaFunctionTask: find_lambda_function_event,
499550
}
500551

501552
def find_event(self, a_task, history):
@@ -569,12 +620,30 @@ def resume_child_workflow(self, a_task, event):
569620

570621
return future
571622

623+
def resume_lambda_function(self, a_task, event):
624+
"""
625+
Resume a child workflow.
626+
627+
:param a_task:
628+
:type a_task: LambdaTask
629+
:param event:
630+
:type event: dict
631+
:return:
632+
:rtype: simpleflow.futures.Future
633+
"""
634+
future = self._get_future_from_lambda_function_event(event)
635+
636+
if future.finished and future.exception:
637+
raise future.exception
638+
639+
return future
640+
572641
def schedule_task(self, a_task, task_list=None):
573642
"""
574643
Let a task schedule itself.
575644
If too many decisions are in flight, add a timer decision and raise ExecutionBlocked.
576645
:param a_task:
577-
:type a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask
646+
:type a_task: SwfTask
578647
:param task_list:
579648
:type task_list: Optional[str]
580649
:raise: exceptions.ExecutionBlocked if too many decisions waiting
@@ -640,6 +709,7 @@ def _add_start_timer_decision(self, id):
640709
'signal': get_future_from_signal_event,
641710
'external_workflow': get_future_from_external_workflow_event,
642711
'marker': _get_future_from_marker_event,
712+
'lambda_function': resume_lambda_function,
643713
}
644714

645715
def resume(self, a_task, *args, **kwargs):
@@ -651,7 +721,7 @@ def resume(self, a_task, *args, **kwargs):
651721
If in repair mode, we may fake the task to repair from the previous history.
652722
653723
:param a_task:
654-
:type a_task: ActivityTask | WorkflowTask | SignalTask
724+
:type a_task: ActivityTask | WorkflowTask | SignalTask | LambdaTask
655725
:param args:
656726
:param args: list
657727
:type kwargs:
@@ -765,7 +835,7 @@ def submit(self, func, *args, **kwargs):
765835
try:
766836
# do not use directly "Submittable" here because we want to catch if
767837
# we don't have an instance from a class known to work under simpleflow.swf
768-
if isinstance(func, (ActivityTask, WorkflowTask, SignalTask, MarkerTask)):
838+
if isinstance(func, SwfTask):
769839
# no need to wrap it, already wrapped in the correct format
770840
a_task = func
771841
elif isinstance(func, Activity):

simpleflow/swf/task.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@
66
from simpleflow import task
77
from simpleflow.utils import json_dumps
88

9+
10+
if False:
11+
from simpleflow.lambda_function import LambdaFunction
12+
13+
914
logger = logging.getLogger(__name__)
1015

1116

@@ -248,3 +253,35 @@ def schedule(self, *args, **kwargs):
248253

249254
def get_json_details(self):
250255
return json_dumps(self.details) if self.details is not None else None
256+
257+
258+
class LambdaFunctionTask(task.Task, SwfTask):
259+
260+
def __init__(self, lambda_function, *args, **kwargs):
261+
# type: (LambdaFunction) -> None
262+
self.lambda_function = lambda_function
263+
self.args = self.resolve_args(*args)
264+
self.kwargs = self.resolve_kwargs(**kwargs)
265+
self.idempotent = lambda_function.idempotent
266+
self.id = None
267+
268+
@property
269+
def name(self):
270+
return self.lambda_function.name
271+
272+
def schedule(self, *args, **kwargs):
273+
input = {
274+
'args': self.args,
275+
'kwargs': self.kwargs,
276+
}
277+
278+
decision = swf.models.decision.LambdaFunctionDecision(
279+
'schedule',
280+
id=self.id,
281+
name=self.name,
282+
input=input,
283+
start_to_close_timeout=str(self.lambda_function.start_to_close_timeout) if self.lambda_function.start_to_close_timeout
284+
else None,
285+
)
286+
287+
return [decision]

0 commit comments

Comments
 (0)