Skip to content

Commit f0ed294

Browse files
committed
Add LambdaFunctionTask
Currently SWF only; needs to be moved upward. And with some rough edges... Signed-off-by: Yves Bastide <yves@botify.com>
1 parent 73186e1 commit f0ed294

File tree

4 files changed

+143
-2
lines changed

4 files changed

+143
-2
lines changed

examples/lambda.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from simpleflow import Workflow, futures
2+
from simpleflow.lambda_function import LambdaFunction
3+
from simpleflow.swf.task import LambdaFunctionTask
4+
5+
6+
class LambdaWorkflow(Workflow):
7+
name = 'basic'
8+
version = 'example'
9+
task_list = 'example'
10+
# lambda_role = ''
11+
12+
def run(self):
13+
future = self.submit(
14+
LambdaFunctionTask(
15+
LambdaFunction(
16+
'hello-world-python',
17+
idempotent=True,
18+
),
19+
8,
20+
foo='bar',
21+
)
22+
)
23+
print(futures.wait(future))

simpleflow/lambda_function.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from simpleflow.base import Submittable
2+
3+
4+
class LambdaFunction(Submittable):
5+
def __init__(self,
6+
name,
7+
start_to_close_timeout=None,
8+
idempotent=None):
9+
self.name = name
10+
self.start_to_close_timeout = start_to_close_timeout
11+
self.idempotent = idempotent

simpleflow/swf/executor.py

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
MarkerTask,
3737
TimerTask,
3838
CancelTimerTask,
39+
LambdaFunctionTask,
3940
)
4041
from simpleflow.utils import (
4142
hex_hash,
@@ -333,7 +334,7 @@ def _get_future_from_child_workflow_event(self, event):
333334
future.set_finished(json_loads_or_raw(event['result']))
334335
elif state == 'failed':
335336
future.set_exception(exceptions.TaskFailed(
336-
name=event['id'],
337+
name=event['name'],
337338
reason=event['reason'],
338339
details=event.get('details'),
339340
))
@@ -351,6 +352,43 @@ def _get_future_from_child_workflow_event(self, event):
351352

352353
return future
353354

355+
def _get_future_from_lambda_function_event(self, event):
356+
"""
357+
358+
:param event: child workflow event
359+
:type event: dict[str, Any]
360+
:return:
361+
:rtype: futures.Future
362+
"""
363+
future = futures.Future()
364+
state = event['state']
365+
366+
if state == 'scheduled':
367+
pass
368+
elif state == 'schedule_failed':
369+
logger.info('failed to schedule {}: {}'.format(
370+
event['name'],
371+
event['cause'],
372+
))
373+
return None
374+
elif state == 'started':
375+
future.set_running()
376+
elif state == 'completed':
377+
future.set_finished(json_loads_or_raw(event['result']))
378+
elif state == 'failed':
379+
future.set_exception(exceptions.TaskFailed(
380+
name=event['name'],
381+
reason=event['reason'],
382+
details=event.get('details'),
383+
))
384+
elif state == 'timed_out':
385+
future.set_exception(exceptions.TimeoutError(
386+
event['timeout_type'],
387+
None,
388+
))
389+
390+
return future
391+
354392
def _get_future_from_marker_event(self, a_task, event):
355393
"""Maps a marker event to a Future with the corresponding
356394
state.
@@ -488,6 +526,19 @@ def find_child_workflow_event(self, a_task, history):
488526
"""
489527
return history.child_workflows.get(a_task.id)
490528

529+
def find_lambda_function_event(self, a_task, history):
530+
"""
531+
Get the event corresponding to a lambda function, if any.
532+
533+
:param a_task:
534+
:type a_task: LambdaFunctionTask
535+
:param history:
536+
:type history: simpleflow.history.History
537+
:return:
538+
:rtype: Optional[dict]
539+
"""
540+
return history.lambda_functions.get(a_task.id)
541+
491542
def find_signal_event(self, a_task, history):
492543
"""
493544
Get the event corresponding to a signal, if any.
@@ -561,6 +612,7 @@ def find_timer_event(self, a_task, history):
561612
MarkerTask: find_marker_event,
562613
TimerTask: find_timer_event,
563614
CancelTimerTask: find_timer_event,
615+
LambdaFunctionTask: find_lambda_function_event,
564616
}
565617

566618
def find_event(self, a_task, history):
@@ -635,12 +687,30 @@ def resume_child_workflow(self, a_task, event):
635687

636688
return future
637689

690+
def resume_lambda_function(self, a_task, event):
691+
"""
692+
Resume a child workflow.
693+
694+
:param a_task:
695+
:type a_task: LambdaTask
696+
:param event:
697+
:type event: dict
698+
:return:
699+
:rtype: simpleflow.futures.Future
700+
"""
701+
future = self._get_future_from_lambda_function_event(event)
702+
703+
if future.finished and future.exception:
704+
raise future.exception
705+
706+
return future
707+
638708
def schedule_task(self, a_task, task_list=None):
639709
"""
640710
Let a task schedule itself.
641711
If too many decisions are in flight, add a timer decision and raise ExecutionBlocked.
642712
:param a_task:
643-
:type a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask
713+
:type a_task: SwfTask
644714
:param task_list:
645715
:type task_list: Optional[str]
646716
:raise: exceptions.ExecutionBlocked if too many decisions waiting
@@ -701,6 +771,7 @@ def _add_start_timer_decision(self, id):
701771
'external_workflow': get_future_from_external_workflow_event,
702772
'marker': _get_future_from_marker_event,
703773
'timer': _get_future_from_timer_event,
774+
'lambda_function': resume_lambda_function,
704775
}
705776

706777
def resume(self, a_task, *args, **kwargs):

simpleflow/swf/task.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@
66
from simpleflow import task
77
from simpleflow.utils import json_dumps
88

9+
10+
if False:
11+
from simpleflow.lambda_function import LambdaFunction
12+
13+
914
logger = logging.getLogger(__name__)
1015

1116

@@ -308,3 +313,34 @@ def schedule(self, *args, **kwargs):
308313
id=self.timer_id,
309314
)
310315
return [decision]
316+
317+
318+
class LambdaFunctionTask(task.Task, SwfTask):
319+
320+
def __init__(self, lambda_function, *args, **kwargs):
321+
# type: (LambdaFunction) -> None
322+
self.lambda_function = lambda_function
323+
self.args = self.resolve_args(*args)
324+
self.kwargs = self.resolve_kwargs(**kwargs)
325+
self.idempotent = lambda_function.idempotent
326+
self.id = None
327+
328+
@property
329+
def name(self):
330+
return self.lambda_function.name
331+
332+
def schedule(self, *args, **kwargs):
333+
input = {
334+
'args': self.args,
335+
'kwargs': self.kwargs,
336+
}
337+
338+
decision = swf.models.decision.LambdaFunctionDecision(
339+
'schedule',
340+
id=self.id,
341+
name=self.name,
342+
input=input,
343+
start_to_close_timeout=str(self.lambda_function.start_to_close_timeout)
344+
if self.lambda_function.start_to_close_timeout else None,
345+
)
346+
return [decision]

0 commit comments

Comments
 (0)