diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 000000000..8bef74d47 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,8 @@ +[*] +end_of_line = lf +insert_final_newline = true + +[*.py] +indent_style = space +indent_size = 4 +trim_trailing_whitespace = true diff --git a/examples/lambda.py b/examples/lambda.py new file mode 100644 index 000000000..f2b8e588e --- /dev/null +++ b/examples/lambda.py @@ -0,0 +1,38 @@ +from simpleflow import Workflow +from simpleflow.lambda_function import LambdaFunction +from simpleflow.swf.task import LambdaFunctionTask + +""" +The lambda function is: + +from __future__ import print_function + +import json + +print('Loading function') + + +def lambda_handler(event, context): + print("Received event: " + json.dumps(event, indent=2)) + return 42 +""" + + +class LambdaWorkflow(Workflow): + name = 'basic' + version = 'example' + task_list = 'example' + lambda_role = 'arn:aws:iam::111111000000:role/swf-lambda' # optional, overridable (--lambda-role) + + def run(self): + future = self.submit( + LambdaFunctionTask( + LambdaFunction( + 'hello-world-python', + idempotent=True, + ), + 8, + foo='bar', + ) + ) + print(future.result) diff --git a/simpleflow/command.py b/simpleflow/command.py index df3316894..1d0b31729 100644 --- a/simpleflow/command.py +++ b/simpleflow/command.py @@ -159,6 +159,9 @@ def transform_input(wf_input): type=comma_separated_list, required=False, help='Tags for the workflow execution.') +@click.option('--lambda-role', + required=False, + help='Lambda role.') @click.option('--decision-tasks-timeout', required=False, help='Timeout for the decision tasks.') @@ -183,6 +186,7 @@ def start_workflow(workflow, task_list, execution_timeout, tags, + lambda_role, decision_tasks_timeout, input, input_file, @@ -210,6 +214,7 @@ def start_workflow(workflow, execution_timeout=execution_timeout, input=wf_input, tag_list=tags, + lambda_role=lambda_role or workflow_class.lambda_role, decision_tasks_timeout=decision_tasks_timeout, ) print('{workflow_id} {run_id}'.format( @@ -481,6 +486,9 @@ def create_unique_task_list(workflow_id=''): type=comma_separated_list, required=False, help='Tags identifying the workflow execution.') +@click.option('--lambda-role', + required=False, + help='Lambda role.') @click.option('--decision-tasks-timeout', required=False, help='Decision tasks timeout.') @@ -515,6 +523,7 @@ def standalone(context, workflow_id, execution_timeout, tags, + lambda_role, decision_tasks_timeout, input, input_file, @@ -617,6 +626,7 @@ def standalone(context, task_list, execution_timeout, tags, + lambda_role, decision_tasks_timeout, format.input(wf_input), None, diff --git a/simpleflow/history.py b/simpleflow/history.py index 69f4b2ae5..252405ceb 100644 --- a/simpleflow/history.py +++ b/simpleflow/history.py @@ -26,6 +26,8 @@ class History(object): :type _markers: collections.OrderedDict[str, list[dict[str, Any]]] :ivar _timers: timer events :type _timers: dict[str, dict[str, Any]]] + :ivar _lambda_functions: activity events + :type _lambda_functions: collections.OrderedDict[str, dict[str, Any]] :ivar _tasks: ordered list of tasks/etc :type _tasks: list[dict[str, Any]] """ @@ -40,6 +42,7 @@ def __init__(self, history): self._signaled_workflows = collections.defaultdict(list) self._markers = collections.OrderedDict() self._timers = {} + self._lambda_functions = collections.OrderedDict() self._tasks = [] self._cancel_requested = None self._cancel_failed = None @@ -119,6 +122,14 @@ def cancel_failed_decision_task_completed_event_id(self): """ return self._cancel_failed['decision_task_completed_event_id'] if self._cancel_failed else None + @property + def lambda_functions(self): + """ + :return: lambda_functions + :rtype: collections.OrderedDict[str, dict[str, Any]] + """ + return self._lambda_functions + @property def signaled_workflows(self): """ @@ -639,6 +650,84 @@ def parse_decision_event(self, events, event): if event.state == 'completed': self.completed_decision_id = event.id + def parse_lambda_function_event(self, events, event): + """ + Parse a lambda function event. + :param events: + :param event: + """ + def get_lambda(): + scheduled_event_id = events[event.scheduled_event_id - 1] + return self._lambda_functions[scheduled_event_id.lambda_id] + + if event.state == 'scheduled': + lambda_function = { + 'type': 'lambda_function', + 'id': event.lambda_id, + 'name': event.lambda_name, + 'input': event.input, + 'state': event.state, + 'start_to_close_timeout': getattr(event, 'start_to_close_timeout', None), + 'scheduled_id': event.id, + 'scheduled_timestamp': event.timestamp, + } + self._lambda_functions[event.lambda_id] = lambda_function + elif event.state == 'schedule_failed': + lambda_function = { + 'type': 'lambda_function', + 'id': event.lambda_id, + 'name': event.lambda_name, + 'state': event.state, + 'schedule_failed_id': event.id, + 'schedule_failed_timestamp': event.timestamp, + } + self._lambda_functions[event.lambda_id] = lambda_function + elif event.state == 'started': + lambda_function = get_lambda() + lambda_function.update({ + 'state': event.state, + 'started_id': event.id, + 'started_timestamp': event.timestamp, + }) + elif event.state == 'start_failed': + lambda_function = get_lambda() + lambda_function.update({ + 'state': event.state, + 'cause': event.cause, + 'message': getattr(event, 'message', ''), + 'start_failed_id': event.id, + 'start_failed_timestamp': event.timestamp, + 'retry': lambda_function.get('retry', -1) + 1, + }) + elif event.state == 'completed': + lambda_function = get_lambda() + lambda_function.update({ + 'state': event.state, + 'result': getattr(event, 'result', None), + 'completed_id': event.id, + 'completed_timestamp': event.timestamp, + }) + elif event.state == 'failed': + lambda_function = get_lambda() + lambda_function.update({ + 'state': event.state, + 'reason': getattr(event, 'reason', ''), + 'details': getattr(event, 'details', ''), + 'failed_id': event.id, + 'failed_timestamp': event.timestamp, + 'retry': lambda_function.get('retry', -1) + 1, + }) + elif event.state == 'timed_out': + lambda_function = get_lambda() + lambda_function.update({ + 'state': event.state, + 'timeout_type': getattr(event, 'timeout_type', 'START_TO_CLOSE'), + 'timeout_value': lambda_function['start_to_close_timeout'], + 'timed_out_id': event.id, + 'timed_out_timestamp': event.timestamp, + 'retry': lambda_function.get('retry', -1) + 1, + }) + TYPE_TO_PARSER = { 'ActivityTask': parse_activity_event, 'DecisionTask': parse_decision_event, @@ -647,6 +736,7 @@ def parse_decision_event(self, events, event): 'ExternalWorkflowExecution': parse_external_workflow_event, 'Marker': parse_marker_event, 'Timer': parse_timer_event, + 'LambdaFunction': parse_lambda_function_event, } def parse(self): diff --git a/simpleflow/lambda_function.py b/simpleflow/lambda_function.py new file mode 100644 index 000000000..fba9a896d --- /dev/null +++ b/simpleflow/lambda_function.py @@ -0,0 +1,14 @@ +from simpleflow.base import Submittable + + +class LambdaFunction(Submittable): + def __init__(self, + name, + start_to_close_timeout=None, + idempotent=None, + is_python_function=True, + ): + self.name = name + self.start_to_close_timeout = start_to_close_timeout + self.idempotent = idempotent + self.is_python_function = is_python_function diff --git a/simpleflow/swf/executor.py b/simpleflow/swf/executor.py index fa6a61019..56acdd666 100644 --- a/simpleflow/swf/executor.py +++ b/simpleflow/swf/executor.py @@ -37,6 +37,7 @@ MarkerTask, TimerTask, CancelTimerTask, + LambdaFunctionTask, ) from simpleflow.utils import ( hex_hash, @@ -383,6 +384,51 @@ def _get_future_from_child_workflow_event(self, event): return future + def _get_future_from_lambda_function_event(self, event): + """ + + :param event: child workflow event + :type event: dict[str, Any] + :return: + :rtype: futures.Future + """ + future = futures.Future() + state = event['state'] + + if state == 'scheduled': + pass + elif state == 'schedule_failed': + logger.info('failed to schedule {}: {}'.format( + event['name'], + event['cause'], + )) + return None + elif state == 'started': + future.set_running() + elif state == 'completed': + future.set_finished(format.decode(event['result'])) + elif state == 'failed': + future.set_exception(exceptions.TaskFailed( + name=event['id'], + reason=event['reason'], + details=event.get('details'), + )) + elif state == 'start_failed': + future.set_exception(exceptions.TaskFailed( + name=event['id'], + reason=event['cause'], + details=event.get('message'), + )) + elif state == 'timed_out': + future.set_exception(exceptions.TimeoutError( + event['timeout_type'], + None, + )) + else: + logger.warning('Unknown state: %s', state) + + return future + def _get_future_from_marker_event(self, a_task, event): """Maps a marker event to a Future with the corresponding state. @@ -520,6 +566,19 @@ def find_child_workflow_event(self, a_task, history): """ return history.child_workflows.get(a_task.id) + def find_lambda_function_event(self, a_task, history): + """ + Get the event corresponding to a lambda function, if any. + + :param a_task: + :type a_task: LambdaFunctionTask + :param history: + :type history: simpleflow.history.History + :return: + :rtype: Optional[dict] + """ + return history.lambda_functions.get(a_task.id) + def find_signal_event(self, a_task, history): """ Get the event corresponding to a signal, if any. @@ -593,6 +652,7 @@ def find_timer_event(self, a_task, history): MarkerTask: find_marker_event, TimerTask: find_timer_event, CancelTimerTask: find_timer_event, + LambdaFunctionTask: find_lambda_function_event, } def find_event(self, a_task, history): @@ -794,12 +854,30 @@ def find_timer_associated_with(self, event, swf_task): def get_retry_task_timer_id(swf_task): return '__simpleflow_task_{}'.format(str(swf_task.id)) + def resume_lambda_function(self, a_task, event): + """ + Resume a child workflow. + + :param a_task: + :type a_task: LambdaTask + :param event: + :type event: dict + :return: + :rtype: simpleflow.futures.Future + """ + future = self._get_future_from_lambda_function_event(event) + + if future.finished and future.exception: + raise future.exception + + return future + def schedule_task(self, a_task, task_list=None): """ Let a task schedule itself. If too many decisions are in flight, add a timer decision and raise ExecutionBlocked. :param a_task: - :type a_task: ActivityTask | WorkflowTask | SignalTask | MarkerTask + :type a_task: ActivityTask | WorkflowTask | SignalTask [ MarkerTask [ TimerTask | CancelTimerTask | LambdaFunctionTask # noqa :param task_list: :type task_list: Optional[str] :raise: exceptions.ExecutionBlocked if too many decisions waiting @@ -862,6 +940,7 @@ def _add_start_timer_decision(self, id, timeout=0): 'external_workflow': get_future_from_external_workflow_event, 'marker': _get_future_from_marker_event, 'timer': _get_future_from_timer_event, + 'lambda_function': resume_lambda_function, } def resume(self, a_task, *args, **kwargs): @@ -1391,8 +1470,9 @@ def get_event_details(self, event_type, event_name): def handle_cancel_requested(self): decision = swf.models.decision.WorkflowExecutionDecision() - is_current_decision = self._history.completed_decision_id < self._history.cancel_requested_id - should_cancel = self._workflow.should_cancel(self._history) + history = self._history + is_current_decision = history.completed_decision_id < history.cancel_requested_id + should_cancel = self._workflow.should_cancel(history) if not should_cancel: return None # ignore cancel if is_current_decision: diff --git a/simpleflow/swf/task.py b/simpleflow/swf/task.py index d2ddaa74b..291f87a29 100644 --- a/simpleflow/swf/task.py +++ b/simpleflow/swf/task.py @@ -5,6 +5,11 @@ from simpleflow import task, Workflow from simpleflow.utils import json_dumps + +if False: + from simpleflow.lambda_function import LambdaFunction + + logger = logging.getLogger(__name__) @@ -369,3 +374,37 @@ def schedule(self, *args, **kwargs): id=self.timer_id, ) return [decision] + + +class LambdaFunctionTask(task.Task, SwfTask): + + def __init__(self, lambda_function, *args, **kwargs): + # type: (LambdaFunction) -> None + self.lambda_function = lambda_function + self.args = self.resolve_args(*args) + self.kwargs = self.resolve_kwargs(**kwargs) + self.idempotent = lambda_function.idempotent + self.id = None + + @property + def name(self): + return self.lambda_function.name + + def schedule(self, *args, **kwargs): + if self.lambda_function.is_python_function: + input = { + 'args': self.args, + 'kwargs': self.kwargs, + } + else: + input = self.kwargs or self.args + + decision = swf.models.decision.LambdaFunctionDecision( + 'schedule', + id=self.id, + name=self.name, + input=input, + start_to_close_timeout=str(self.lambda_function.start_to_close_timeout) + if self.lambda_function.start_to_close_timeout else None, + ) + return [decision] diff --git a/simpleflow/workflow.py b/simpleflow/workflow.py index 011af6168..a17b51526 100644 --- a/simpleflow/workflow.py +++ b/simpleflow/workflow.py @@ -31,6 +31,7 @@ class Workflow(Submittable): version = None task_list = None task_priority = None + lambda_role = None retry = 0 raises_on_failure = True diff --git a/swf/models/__init__.py b/swf/models/__init__.py index bb5c904ff..6d7c65079 100644 --- a/swf/models/__init__.py +++ b/swf/models/__init__.py @@ -5,8 +5,8 @@ # # See the file LICENSE for copying permission. -from swf.models.base import BaseModel # NOQA -from swf.models.activity import ActivityType, ActivityTask # NOQA -from swf.models.domain import Domain # NOQA -from swf.models.workflow import WorkflowType, WorkflowExecution # NOQA -from swf.models.history import History # NOQA +from .base import BaseModel # NOQA +from .activity import ActivityType, ActivityTask # NOQA +from .domain import Domain # NOQA +from .workflow import WorkflowType, WorkflowExecution # NOQA +from .history import History # NOQA diff --git a/swf/models/decision/__init__.py b/swf/models/decision/__init__.py index 4ad1c3d14..fd8f5299e 100644 --- a/swf/models/decision/__init__.py +++ b/swf/models/decision/__init__.py @@ -5,10 +5,11 @@ # # See the file LICENSE for copying permission. -from swf.models.decision.marker import MarkerDecision # NOQA -from swf.models.decision.task import ActivityTaskDecision # NOQA -from swf.models.decision.timer import TimerDecision # NOQA -from swf.models.decision.workflow import ( # NOQA +from .lambda_function import LambdaFunctionDecision # NOQA +from .marker import MarkerDecision # NOQA +from .task import ActivityTaskDecision # NOQA +from .timer import TimerDecision # NOQA +from .workflow import ( # NOQA WorkflowExecutionDecision, ChildWorkflowExecutionDecision, ExternalWorkflowExecutionDecision, diff --git a/swf/models/decision/lambda_function.py b/swf/models/decision/lambda_function.py new file mode 100644 index 000000000..d77052d28 --- /dev/null +++ b/swf/models/decision/lambda_function.py @@ -0,0 +1,38 @@ +# -*- coding:utf-8 -*- + +# Copyright (c) 2016, Botify +# +# See the file LICENSE for copying permission. + +from simpleflow.utils import json_dumps +from swf.models.decision.base import Decision, decision_action + + +class LambdaFunctionDecision(Decision): + _base_type = 'LambdaFunction' + + @decision_action + def schedule(self, id, name, input=None, start_to_close_timeout=None): + """Schedule lambda function decision builder + + :param id: id of the Lambda function + :type id: str + + :param name: name of the Lambda function to schedule + :type name: str + + :param input: input provided to the activity task + :type input: Optional[dict] + + :param start_to_close_timeout: timeout, 1-300 seconds. Default: 300 + :type start_to_close_timeout: Optional[str] + """ + if input is not None: + input = json_dumps(input) + + self.update_attributes({ + 'id': id, + 'name': name, + 'input': input, + 'startToCloseTimeout': start_to_close_timeout, + }) diff --git a/swf/models/decision/workflow.py b/swf/models/decision/workflow.py index 8960741f6..986415c00 100644 --- a/swf/models/decision/workflow.py +++ b/swf/models/decision/workflow.py @@ -61,8 +61,9 @@ def terminate(self, reason=None, details=None): def continue_as_new(self, child_policy=None, execution_timeout=None, task_timeout=None, input=None, tag_list=None, task_list=None, + lambda_role=None, workflow_type_version=None): - """Coninue as new workflow execution decision builder + """Continue as new workflow execution decision builder :param child_policy: specifies the policy to use for the child workflow executions of the new execution :type child_policy: CHILD_POLICIES.{TERMINATE | REQUEST_CANCEL | ABANDON} @@ -79,6 +80,9 @@ def continue_as_new(self, child_policy=None, :param task_list: task list name :type task_list: str + :param lambda_role: Lambda role + :type lambda_role: str + :param task_timeout: maximum duration of decision tasks for the new workflow execution :type task_timeout: str @@ -95,6 +99,7 @@ def continue_as_new(self, child_policy=None, 'input': input, 'tagList': tag_list, 'taskList': task_list, + 'lambdaRole': lambda_role, 'workflowTypeVersion': workflow_type_version, }) @@ -105,7 +110,7 @@ class ChildWorkflowExecutionDecision(Decision): @decision_action def start(self, workflow_type, workflow_id, child_policy=CHILD_POLICIES.TERMINATE, execution_timeout='300', task_timeout='300', control=None, - input=None, tag_list=None, task_list=None): + input=None, tag_list=None, task_list=None, lambda_role=None): """Child workflow execution decision builder :param workflow_type: workflow type to start @@ -121,6 +126,12 @@ def start(self, workflow_type, workflow_id, child_policy=CHILD_POLICIES.TERMINAT :param execution_timeout: specifies the total duration for this workflow execution :type execution_timeout: str + :param task_timeout: maximum duration of decision tasks for the child workflow execution + :type task_timeout: str + + :param control: data attached to the event that can be used by the decider in subsequent workflow tasks + :type control: Optional[str] + :param input: The input provided to the child workflow execution :type input: Optional[dict] @@ -130,8 +141,8 @@ def start(self, workflow_type, workflow_id, child_policy=CHILD_POLICIES.TERMINAT :param task_list: task list name :type task_list: str - :param task_timeout: maximum duration of decision tasks for the child workflow execution - :type task_timeout: str + :param lambda_role: workflow lambda role + :type lambda_role: Optional[str] :param control: data attached to the event that can be used by the decider in subsequent workflow tasks @@ -153,6 +164,7 @@ def start(self, workflow_type, workflow_id, child_policy=CHILD_POLICIES.TERMINAT 'taskList': { 'name': task_list, }, + 'lambdaRole': lambda_role, 'workflowId': workflow_id, 'workflowType': { 'name': workflow_type.name, diff --git a/swf/models/event/__init__.py b/swf/models/event/__init__.py index b3bd3ffdd..90a0a2772 100644 --- a/swf/models/event/__init__.py +++ b/swf/models/event/__init__.py @@ -5,10 +5,11 @@ # # See the file LICENSE for copying permission. -from swf.models.event.base import Event # NOQA -from swf.models.event.compiler import CompiledEvent # NOQA -from swf.models.event.factory import EventFactory, CompiledEventFactory # NOQA -from swf.models.event.task import DecisionTaskEvent, ActivityTaskEvent # NOQA -from swf.models.event.workflow import WorkflowExecutionEvent # NOQA -from swf.models.event.marker import MarkerEvent # NOQA -from swf.models.event.timer import TimerEvent # NOQA +from .base import Event # NOQA +from .compiler import CompiledEvent # NOQA +from .factory import EventFactory, CompiledEventFactory # NOQA +from .task import DecisionTaskEvent, ActivityTaskEvent # NOQA +from .workflow import WorkflowExecutionEvent # NOQA +from .lambda_function import LambdaFunctionEvent # NOQA +from .marker import MarkerEvent # NOQA +from .timer import TimerEvent # NOQA diff --git a/swf/models/event/base.py b/swf/models/event/base.py index 5cc47bf91..8a20f0949 100644 --- a/swf/models/event/base.py +++ b/swf/models/event/base.py @@ -47,6 +47,7 @@ class Event(object): _name = None _attributes_key = None _attributes = None + _attribute_mapping = {} # dict for remapping Attributes items colliding with id, name... excluded_attributes = ( 'eventId', @@ -109,4 +110,6 @@ def process_attributes(self): """Processes the event raw_data attributes_key elements and sets current instance attributes accordingly""" for key, value in iteritems(self.raw[self._attributes_key]): - setattr(self, camel_to_underscore(key), value) + name = camel_to_underscore(key) + name = self._attribute_mapping.get(name) or name + setattr(self, name, value) diff --git a/swf/models/event/factory.py b/swf/models/event/factory.py index ed2c0cf1b..6e5a5a763 100644 --- a/swf/models/event/factory.py +++ b/swf/models/event/factory.py @@ -33,6 +33,11 @@ CompiledMarkerEvent ) +from swf.models.event.lambda_function import ( + LambdaFunctionEvent, + CompiledLambdaFunctionEvent +) + from swf.utils import camel_to_underscore, decapitalize @@ -66,6 +71,10 @@ 'event': TimerEvent, 'compiled': CompiledTimerEvent, }), + ('LambdaFunction', { + 'event': LambdaFunctionEvent, + 'compiled': CompiledLambdaFunctionEvent, + }), ]) diff --git a/swf/models/event/lambda_function.py b/swf/models/event/lambda_function.py new file mode 100644 index 000000000..3bb0cb860 --- /dev/null +++ b/swf/models/event/lambda_function.py @@ -0,0 +1,39 @@ +# -*- coding:utf-8 -*- + +# Copyright (c) 2016, Botify +# +# See the file LICENSE for copying permission. + +from swf.models.event.base import Event +from swf.models.event.compiler import CompiledEvent + + +class LambdaFunctionEvent(Event): + _type = 'LambdaFunction' + _attribute_mapping = { + 'id': 'lambda_id', + 'name': 'lambda_name', + } + + +class CompiledLambdaFunctionEvent(CompiledEvent): + _type = 'LambdaFunction' + states = ( + 'scheduled', + 'schedule_failed', + 'start_failed', + 'started', + 'completed', + 'failed', + 'timed_out', + ) + + transitions = { + 'scheduled': ('schedule_failed', 'timed_out', 'start_failed', 'started'), + 'schedule_failed': ('scheduled', 'timed_out'), + 'started': ('failed', 'timed_out', 'completed'), + 'failed': ('scheduled', 'timed_out'), + 'timed_out': ('scheduled'), + } + + initial_state = 'scheduled' diff --git a/swf/models/history/builder.py b/swf/models/history/builder.py index 28061b6b9..4d12eadc5 100644 --- a/swf/models/history/builder.py +++ b/swf/models/history/builder.py @@ -43,7 +43,7 @@ class History(swf.models.History): Helper class to build a history to simulate the execution of a workflow. """ - def __init__(self, workflow, input=None, tag_list=None): + def __init__(self, workflow, input=None, tag_list=None, lambda_role=None): """ Bootstrap a history with the first events added by SWF. @@ -53,6 +53,8 @@ def __init__(self, workflow, input=None, tag_list=None): :type input: dict :param tag_list: string of tags (beware not a list) :type tag_list: str + :param lambda_role: lambda role + :type lambda_role: Optional[str] """ self._workflow = workflow @@ -72,6 +74,7 @@ def __init__(self, workflow, input=None, tag_list=None): "executionStartToCloseTimeout": workflow.execution_timeout, "input": json_dumps(input if input else {}), + "lambdaRole": lambda_role, "workflowType": { "name": workflow.name, "version": workflow.version @@ -370,7 +373,9 @@ def add_child_workflow_start_initiated(self, input=None, control=None, tag_list=None, - task_start_to_close_timeout=0): + task_start_to_close_timeout=0, + lambda_role=None, + ): if control is None: control = {} @@ -384,6 +389,7 @@ def add_child_workflow_start_initiated(self, 'decisionTaskCompletedEventId': 76, 'executionStartToCloseTimeout': '432000', 'input': json_dumps(input) if input is not None else '{}', + 'lambdaRole': lambda_role, 'tagList': tag_list, 'taskList': {'name': task_list}, 'taskStartToCloseTimeout': task_start_to_close_timeout, @@ -588,13 +594,17 @@ def add_child_workflow(self, task_list=None, input=None, result=None, - control=None): + control=None, + lambda_role=None, + ): self.add_child_workflow_start_initiated( workflow, workflow_id=workflow_id, task_list=task_list, input=input, - control=control) + control=control, + lambda_role=lambda_role, + ) if last_state not in CHILD_WORKFLOW_STATES: raise ValueError('last_state "{}" not supported for ' diff --git a/swf/models/workflow.py b/swf/models/workflow.py index b68b56632..fd2db733e 100644 --- a/swf/models/workflow.py +++ b/swf/models/workflow.py @@ -45,42 +45,43 @@ class WorkflowExecutionDoesNotExist(DoesNotExistError): class WorkflowType(BaseModel): """Simple Workflow Type wrapper - :param domain: Domain the workflow type should be registered in - :type domain: swf.models.Domain + :ivar domain: Domain the workflow type should be registered in + :type domain: swf.models.Domain - :param name: name of the workflow type - :type name: str + :ivar name: name of the workflow type + :type name: str - :param version: workflow type version - :type version: str + :ivar version: workflow type version + :type version: str - :param status: workflow type status - :type status: swf.core.ConnectedSWFObject.{REGISTERED, DEPRECATED} + :ivar status: workflow type status + :type status: swf.core.ConnectedSWFObject.{REGISTERED, DEPRECATED} - :param creation_date: creation date of the current WorkflowType (timestamp) - :type creation_date: float + :ivar creation_date: creation date of the current WorkflowType (timestamp) + :type creation_date: float - :param deprecation_date: deprecation date of WorkflowType (timestamp) - :type deprecation_date: float + :ivar deprecation_date: deprecation date of WorkflowType (timestamp) + :type deprecation_date: float - :param task_list: task list to use for scheduling decision tasks for executions + :ivar task_list: task list to use for scheduling decision tasks for executions of this workflow type - :type task_list: str + :type task_list: str - :param child_policy: policy to use for the child workflow executions + :ivar child_policy: policy to use for the child workflow executions when a workflow execution of this type is terminated - :type child_policy: CHILD_POLICIES.{TERMINATE | - REQUEST_CANCEL | - ABANDON} + :type child_policy: CHILD_POLICIES - :param execution_timeout: maximum duration for executions of this workflow type - :type execution_timeout: str + :ivar execution_timeout: maximum duration for executions of this workflow type + :type execution_timeout: str - :param decision_tasks_timeout: maximum duration of decision tasks for this workflow type - :type decision_tasks_timeout: str + :ivar decision_tasks_timeout: maximum duration of decision tasks for this workflow type + :type decision_tasks_timeout: str - :param description: Textual description of the workflow type - :type description: str + :ivar description: Textual description of the workflow type + :type description: str + + :ivar lambda_role: Lambda role + :type lambda_role: str """ __slots__ = [ 'domain', @@ -94,6 +95,7 @@ class WorkflowType(BaseModel): 'execution_timeout', 'decision_tasks_timeout', 'description', + 'lambda_role', ] def __init__(self, domain, name, version, @@ -104,7 +106,9 @@ def __init__(self, domain, name, version, child_policy=CHILD_POLICIES.TERMINATE, execution_timeout='300', decision_tasks_timeout='300', - description=None, *args, **kwargs): + description=None, + lambda_role=None, + *args, **kwargs): self.domain = domain self.name = name self.version = version @@ -116,13 +120,14 @@ def __init__(self, domain, name, version, self.execution_timeout = execution_timeout self.decision_tasks_timeout = decision_tasks_timeout self.description = description + self.lambda_role = lambda_role # Explicitly call child_policy setter # to validate input value self.set_child_policy(child_policy) # immutable decorator rebinds class name, - # so have to use generice self.__class__ + # so have to use generic self.__class__ super(self.__class__, self).__init__(*args, **kwargs) def set_child_policy(self, policy): @@ -162,8 +167,9 @@ def _diff(self): ('deprecation_date', self.deprecation_date, workflow_info['deprecationDate']), ('task_list', self.task_list, workflow_config['defaultTaskList']['name']), ('child_policy', self.child_policy, workflow_config['defaultChildPolicy']), + ('lambda_role', self.lambda_role, workflow_config.get('defaultLambdaRole')), ('execution_timeout', self.execution_timeout, workflow_config['defaultExecutionStartToCloseTimeout']), - ('decision_tasks_timout', self.decision_tasks_timeout, workflow_config['defaultTaskStartToCloseTimeout']), + ('decision_tasks_timeout', self.decision_tasks_timeout, workflow_config['defaultTaskStartToCloseTimeout']), ('description', self.description, workflow_info['description']), ) @@ -189,7 +195,7 @@ def exists(self): def save(self): """Creates the workflow type amazon side""" try: - self.connection.register_workflow_type( + self.custom_register_workflow_type( self.domain.name, self.name, self.version, @@ -197,6 +203,7 @@ def save(self): default_child_policy=str(self.child_policy), default_execution_start_to_close_timeout=str(self.execution_timeout), default_task_start_to_close_timeout=str(self.decision_tasks_timeout), + default_lambda_role=self.lambda_role, description=self.description ) except SWFTypeAlreadyExistsError: @@ -220,34 +227,37 @@ def upstream(self): def start_execution(self, workflow_id=None, task_list=None, child_policy=None, execution_timeout=None, - input=None, tag_list=None, decision_tasks_timeout=None): + input=None, tag_list=None, decision_tasks_timeout=None, + lambda_role=None, + ): """Starts a Workflow execution of current workflow type :param workflow_id: The user defined identifier associated with the workflow execution - :type workflow_id: String + :type workflow_id: str :param task_list: task list to use for scheduling decision tasks for execution of this workflow - :type task_list: String + :type task_list: str :param child_policy: policy to use for the child workflow executions of this workflow execution. - :type child_policy: CHILD_POLICIES.{TERMINATE | - REQUEST_CANCEL | - ABANDON} + :type child_policy: CHILD_POLICIES | str :param execution_timeout: maximum duration for the workflow execution - :type execution_timeout: String + :type execution_timeout: str :param input: Input of the workflow execution :type input: dict :param tag_list: Tags associated with the workflow execution - :type tag_list: String or list of strings or None + :type tag_list: Optional[str | list[str] :param decision_tasks_timeout: maximum duration of decision tasks for this workflow execution - :type decision_tasks_timeout: String + :type decision_tasks_timeout: str + + :param lambda_role: Lambda role. + :type lambda_role: Optional[str] """ workflow_id = workflow_id or '%s-%s-%i' % (self.name, self.version, time.time()) task_list = task_list or self.task_list @@ -263,7 +273,7 @@ def start_execution(self, workflow_id=None, task_list=None, if tag_list and len(tag_list) > 5: raise ValueError("You cannot have more than 5 tags in StartWorkflowExecution.") - run_id = self.connection.start_workflow_execution( + run_id = self.custom_start_workflow_execution( self.domain.name, workflow_id, self.name, @@ -272,12 +282,201 @@ def start_execution(self, workflow_id=None, task_list=None, child_policy=child_policy, execution_start_to_close_timeout=execution_timeout, input=format.input(input), + lambda_role=lambda_role, tag_list=tag_list, task_start_to_close_timeout=decision_tasks_timeout, )['runId'] return WorkflowExecution(self.domain, workflow_id, run_id=run_id) + def custom_register_workflow_type(self, domain, name, version, + task_list=None, + default_child_policy=None, + default_execution_start_to_close_timeout=None, + default_task_start_to_close_timeout=None, + default_lambda_role=None, + description=None): + """ + Registers a new workflow type and its configuration settings + in the specified domain. + + :type domain: string + :param domain: The name of the domain in which to register + the workflow type. + + :type name: string + :param name: The name of the workflow type. + + :type version: string + :param version: The version of the workflow type. + + :type task_list: list of name, version of tasks + :param task_list: If set, specifies the default task list to use + for scheduling decision tasks for executions of this workflow + type. This default is used only if a task list is not provided + when starting the execution through the StartWorkflowExecution + Action or StartChildWorkflowExecution Decision. + + :type default_child_policy: string + + :param default_child_policy: If set, specifies the default + policy to use for the child workflow executions when a + workflow execution of this type is terminated, by calling the + TerminateWorkflowExecution action explicitly or due to an + expired timeout. This default can be overridden when starting + a workflow execution using the StartWorkflowExecution action + or the StartChildWorkflowExecution Decision. The supported + child policies are: + + * TERMINATE: the child executions will be terminated. + + * REQUEST_CANCEL: a request to cancel will be attempted + for each child execution by recording a + WorkflowExecutionCancelRequested event in its + history. It is up to the decider to take appropriate + actions when it receives an execution history with this + event. + + * ABANDON: no action will be taken. The child executions + will continue to run.no docs + + :type default_execution_start_to_close_timeout: string + :param default_execution_start_to_close_timeout: If set, + specifies the default maximum duration for executions of this + workflow type. You can override this default when starting an + execution through the StartWorkflowExecution Action or + StartChildWorkflowExecution Decision. + + :type default_task_start_to_close_timeout: string + :param default_task_start_to_close_timeout: If set, specifies + the default maximum duration of decision tasks for this + workflow type. This default can be overridden when starting a + workflow execution using the StartWorkflowExecution action or + the StartChildWorkflowExecution Decision. + + :param default_lambda_role: Default Lambda role + :type default_lambda_role: str + + :type description: string + :param description: Textual description of the workflow type. + + :raises: SWFTypeAlreadyExistsError, SWFLimitExceededError, + UnknownResourceFault, SWFOperationNotPermittedError + """ + return self.connection.json_request('RegisterWorkflowType', { + 'domain': domain, + 'name': name, + 'version': version, + 'defaultTaskList': {'name': task_list}, + 'defaultChildPolicy': default_child_policy, + 'defaultExecutionStartToCloseTimeout': default_execution_start_to_close_timeout, + 'defaultLambdaRole': default_lambda_role, + 'defaultTaskStartToCloseTimeout': default_task_start_to_close_timeout, + 'description': description, + }) + + def custom_start_workflow_execution(self, domain, workflow_id, + workflow_name, workflow_version, + task_list=None, child_policy=None, + execution_start_to_close_timeout=None, + input=None, tag_list=None, + task_start_to_close_timeout=None, + lambda_role=None): + """ + Starts an execution of the workflow type in the specified + domain using the provided workflowId and input data. + + :type domain: string + :param domain: The name of the domain in which the workflow + execution is created. + + :type workflow_id: string + :param workflow_id: The user defined identifier associated with + the workflow execution. You can use this to associate a + custom identifier with the workflow execution. You may + specify the same identifier if a workflow execution is + logically a restart of a previous execution. You cannot + have two open workflow executions with the same workflowId + at the same time. + + :type workflow_name: string + :param workflow_name: The name of the workflow type. + + :type workflow_version: string + :param workflow_version: The version of the workflow type. + + :type task_list: string + :param task_list: The task list to use for the decision tasks + generated for this workflow execution. This overrides the + defaultTaskList specified when registering the workflow type. + + :type child_policy: string + :param child_policy: If set, specifies the policy to use for the + child workflow executions of this workflow execution if it + is terminated, by calling the TerminateWorkflowExecution + action explicitly or due to an expired timeout. This policy + overrides the default child policy specified when registering + the workflow type using RegisterWorkflowType. The supported + child policies are: + + * TERMINATE: the child executions will be terminated. + * REQUEST_CANCEL: a request to cancel will be attempted + for each child execution by recording a + WorkflowExecutionCancelRequested event in its history. + It is up to the decider to take appropriate actions + when it receives an execution history with this event. + * ABANDON: no action will be taken. The child executions + will continue to run. + + :type execution_start_to_close_timeout: string + :param execution_start_to_close_timeout: The total duration for + this workflow execution. This overrides the + defaultExecutionStartToCloseTimeout specified when + registering the workflow type. + + :type input: string + :param input: The input for the workflow + execution. This is a free form string which should be + meaningful to the workflow you are starting. This input is + made available to the new workflow execution in the + WorkflowExecutionStarted history event. + + :type tag_list: list :param tag_list: The list of tags to + associate with the workflow execution. You can specify a + maximum of 5 tags. You can list workflow executions with a + specific tag by calling list_open_workflow_executions or + list_closed_workflow_executions and specifying a TagFilter. + + :type task_start_to_close_timeout: string + :param task_start_to_close_timeout: Specifies the maximum duration of + decision tasks for this workflow execution. This parameter + overrides the defaultTaskStartToCloseTimeout specified when + registering the workflow type using register_workflow_type. + + :param lambda_role: Lambda role. + :type lambda_role: str + + :raises: UnknownResourceFault, TypeDeprecatedFault, + SWFWorkflowExecutionAlreadyStartedError, SWFLimitExceededError, + SWFOperationNotPermittedError, DefaultUndefinedFault + """ + return self.connection.json_request('StartWorkflowExecution', { + 'domain': domain, + 'workflowId': workflow_id, + 'workflowType': { + 'name': workflow_name, + 'version': workflow_version + }, + 'taskList': {'name': task_list}, + 'childPolicy': child_policy, + 'executionStartToCloseTimeout': execution_start_to_close_timeout, + 'input': input, + 'lambdaRole': lambda_role, + 'tagList': tag_list, + 'taskStartToCloseTimeout': task_start_to_close_timeout, + + }) + def __repr__(self): return '<{} domain={} name={} version={} status={}>'.format( self.__class__.__name__, @@ -339,6 +538,7 @@ class WorkflowExecution(BaseModel): 'close_status', 'execution_timeout', 'input', + 'lambda_role', 'tag_list', 'decision_tasks_timeout', 'close_timestamp', @@ -363,6 +563,7 @@ def __init__(self, domain, workflow_id, run_id=None, latest_activity_task_timestamp=None, open_counts=None, parent=None, + lambda_role=None, *args, **kwargs): Domain.check(domain) self.domain = domain @@ -375,6 +576,7 @@ def __init__(self, domain, workflow_id, run_id=None, self.close_status = close_status self.execution_timeout = execution_timeout self.input = input + self.lambda_role = lambda_role self.tag_list = tag_list or [] self.decision_tasks_timeout = decision_tasks_timeout self.close_timestamp = close_timestamp @@ -386,7 +588,7 @@ def __init__(self, domain, workflow_id, run_id=None, self.parent = parent or {} # so we can query keys in any case # immutable decorator rebinds class name, - # so have to use generice self.__class__ + # so have to use generic self.__class__ super(self.__class__, self).__init__(*args, **kwargs) def _diff(self): @@ -418,6 +620,7 @@ def _diff(self): ('status', self.status, execution_info['executionStatus']), ('task_list', self.task_list, execution_config['taskList']['name']), ('child_policy', self.child_policy, execution_config['childPolicy']), + ('lambda_role', self.lambda_role, execution_config.get('lambdaRole')), ('execution_timeout', self.execution_timeout, execution_config['executionStartToCloseTimeout']), ('tag_list', self.tag_list, execution_info.get('tagList')), ('decision_tasks_timeout', self.decision_tasks_timeout, execution_config['taskStartToCloseTimeout']), diff --git a/tests/integration/cassettes/test_lambda.yaml b/tests/integration/cassettes/test_lambda.yaml new file mode 100644 index 000000000..65d622e32 --- /dev/null +++ b/tests/integration/cassettes/test_lambda.yaml @@ -0,0 +1,87 @@ +interactions: +- request: + body: '{"domain": "TestDomain", "workflowType": {"name": "basic", "version": "example"}}' + headers: + Authorization: ['AWS4-HMAC-SHA256 Credential=1234AB/20160823/us-east-1/swf/aws4_request,SignedHeaders=host;x-amz-date;x-amz-target,Signature=foobar'] + Content-Encoding: [amz-1.0] + Content-Length: ['81'] + Content-Type: [application/json; charset=UTF-8] + Host: [swf.us-east-1.amazonaws.com] + User-Agent: [Boto/2.47.0 Python/3.6.1 Linux/4.10.0-21-generic] + X-Amz-Date: [20170610T183557Z] + X-Amz-Target: [com.amazonaws.swf.service.model.SimpleWorkflowService.DescribeWorkflowType] + method: POST + uri: https://swf.us-east-1.amazonaws.com/ + response: + body: {string: '{"configuration":{"defaultChildPolicy":"TERMINATE","defaultExecutionStartToCloseTimeout":"300","defaultTaskList":{"name":"None"},"defaultTaskStartToCloseTimeout":"300"},"typeInfo":{"creationDate":1.488630704586E9,"status":"REGISTERED","workflowType":{"name":"basic","version":"example"}}}'} + headers: + Content-Length: ['288'] + Content-Type: [application/json] + x-amzn-RequestId: [a4ff48f5-4e0b-11e7-a675-552ffed790c9] + status: {code: 200, message: OK} +- request: + body: '{"domain": "TestDomain", "workflowId": "test-simpleflow-workflow", "workflowType": + {"name": "basic", "version": "example"}, "taskList": {"name": "test-simpleflow-workflow-7d4d6b0513764d3ba38c60c4061483ba"}, + "childPolicy": "TERMINATE", "input": "{\"args\":[],\"kwargs\":{}}", "lambdaRole": + "arn:aws:iam::111111000000:role/swf-lambda"}' + headers: + Authorization: ['AWS4-HMAC-SHA256 Credential=1234AB/20160823/us-east-1/swf/aws4_request,SignedHeaders=host;x-amz-date;x-amz-target,Signature=foobar'] + Content-Encoding: [amz-1.0] + Content-Length: ['333'] + Content-Type: [application/json; charset=UTF-8] + Host: [swf.us-east-1.amazonaws.com] + User-Agent: [Boto/2.47.0 Python/3.6.1 Linux/4.10.0-21-generic] + X-Amz-Date: [20170610T183557Z] + X-Amz-Target: [com.amazonaws.swf.service.model.SimpleWorkflowService.StartWorkflowExecution] + method: POST + uri: https://swf.us-east-1.amazonaws.com/ + response: + body: {string: '{"runId":"22Yr7TxK3TwETHtk/p2S2dWiEyzwD8e9bjDaOKtBXy1wE="}'} + headers: + Content-Length: ['58'] + Content-Type: [application/json] + x-amzn-RequestId: [a54bbb7e-4e0b-11e7-818d-3f4a87b50f20] + status: {code: 200, message: OK} +- request: + body: '{"domain": "TestDomain", "execution": {"runId": "22Yr7TxK3TwETHtk/p2S2dWiEyzwD8e9bjDaOKtBXy1wE=", + "workflowId": "test-simpleflow-workflow"}}' + headers: + Authorization: ['AWS4-HMAC-SHA256 Credential=1234AB/20160823/us-east-1/swf/aws4_request,SignedHeaders=host;x-amz-date;x-amz-target,Signature=foobar'] + Content-Encoding: [amz-1.0] + Content-Length: ['140'] + Content-Type: [application/json; charset=UTF-8] + Host: [swf.us-east-1.amazonaws.com] + User-Agent: [Boto/2.47.0 Python/3.6.1 Linux/4.10.0-21-generic] + X-Amz-Date: [20170610T183600Z] + X-Amz-Target: [com.amazonaws.swf.service.model.SimpleWorkflowService.DescribeWorkflowExecution] + method: POST + uri: https://swf.us-east-1.amazonaws.com/ + response: + body: {string: '{"executionConfiguration":{"childPolicy":"TERMINATE","executionStartToCloseTimeout":"300","lambdaRole":"arn:aws:iam::111111000000:role/swf-lambda","taskList":{"name":"test-simpleflow-workflow-7d4d6b0513764d3ba38c60c4061483ba"},"taskStartToCloseTimeout":"300"},"executionInfo":{"cancelRequested":false,"closeStatus":"COMPLETED","closeTimestamp":1.497119759876E9,"execution":{"runId":"22Yr7TxK3TwETHtk/p2S2dWiEyzwD8e9bjDaOKtBXy1wE=","workflowId":"test-simpleflow-workflow"},"executionStatus":"CLOSED","startTimestamp":1.497119758313E9,"workflowType":{"name":"basic","version":"example"}},"openCounts":{"openActivityTasks":0,"openChildWorkflowExecutions":0,"openDecisionTasks":0,"openLambdaFunctions":0,"openTimers":0}}'} + headers: + Content-Length: ['716'] + Content-Type: [application/json] + x-amzn-RequestId: [a6d36cb2-4e0b-11e7-8e25-3b3c40344394] + status: {code: 200, message: OK} +- request: + body: '{"domain": "TestDomain", "execution": {"runId": "22Yr7TxK3TwETHtk/p2S2dWiEyzwD8e9bjDaOKtBXy1wE=", + "workflowId": "test-simpleflow-workflow"}}' + headers: + Authorization: ['AWS4-HMAC-SHA256 Credential=1234AB/20160823/us-east-1/swf/aws4_request,SignedHeaders=host;x-amz-date;x-amz-target,Signature=foobar'] + Content-Encoding: [amz-1.0] + Content-Length: ['140'] + Content-Type: [application/json; charset=UTF-8] + Host: [swf.us-east-1.amazonaws.com] + User-Agent: [Boto/2.47.0 Python/3.6.1 Linux/4.10.0-21-generic] + X-Amz-Date: [20170610T183701Z] + X-Amz-Target: [com.amazonaws.swf.service.model.SimpleWorkflowService.GetWorkflowExecutionHistory] + method: POST + uri: https://swf.us-east-1.amazonaws.com/ + response: + body: {string: '{"events":[{"eventId":1,"eventTimestamp":1.497119758313E9,"eventType":"WorkflowExecutionStarted","workflowExecutionStartedEventAttributes":{"childPolicy":"TERMINATE","executionStartToCloseTimeout":"300","input":"{\"args\":[],\"kwargs\":{}}","lambdaRole":"arn:aws:iam::111111000000:role/swf-lambda","parentInitiatedEventId":0,"taskList":{"name":"test-simpleflow-workflow-7d4d6b0513764d3ba38c60c4061483ba"},"taskStartToCloseTimeout":"300","workflowType":{"name":"basic","version":"example"}}},{"decisionTaskScheduledEventAttributes":{"startToCloseTimeout":"300","taskList":{"name":"test-simpleflow-workflow-7d4d6b0513764d3ba38c60c4061483ba"}},"eventId":2,"eventTimestamp":1.497119758313E9,"eventType":"DecisionTaskScheduled"},{"decisionTaskStartedEventAttributes":{"identity":"{\"hostname\":\"zeb-P5Q-E\",\"pid\":6614,\"user\":\"zeb\"}","scheduledEventId":2},"eventId":3,"eventTimestamp":1.497119758358E9,"eventType":"DecisionTaskStarted"},{"decisionTaskCompletedEventAttributes":{"scheduledEventId":2,"startedEventId":3},"eventId":4,"eventTimestamp":1.497119758915E9,"eventType":"DecisionTaskCompleted"},{"eventId":5,"eventTimestamp":1.497119758915E9,"eventType":"LambdaFunctionScheduled","lambdaFunctionScheduledEventAttributes":{"decisionTaskCompletedEventId":4,"id":"hello-world-python-5ced6d5d9fae58fd0a7eb0d3908cabed","input":"{\"args\":[8],\"kwargs\":{\"foo\":\"bar\"}}","name":"hello-world-python"}},{"eventId":6,"eventTimestamp":1.497119758941E9,"eventType":"LambdaFunctionStarted","lambdaFunctionStartedEventAttributes":{"scheduledEventId":5}},{"eventId":7,"eventTimestamp":1.497119759328E9,"eventType":"LambdaFunctionCompleted","lambdaFunctionCompletedEventAttributes":{"result":"42","scheduledEventId":5,"startedEventId":6}},{"decisionTaskScheduledEventAttributes":{"startToCloseTimeout":"300","taskList":{"name":"test-simpleflow-workflow-7d4d6b0513764d3ba38c60c4061483ba"}},"eventId":8,"eventTimestamp":1.497119759328E9,"eventType":"DecisionTaskScheduled"},{"decisionTaskStartedEventAttributes":{"identity":"{\"hostname\":\"zeb-P5Q-E\",\"pid\":6613,\"user\":\"zeb\"}","scheduledEventId":8},"eventId":9,"eventTimestamp":1.497119759353E9,"eventType":"DecisionTaskStarted"},{"decisionTaskCompletedEventAttributes":{"scheduledEventId":8,"startedEventId":9},"eventId":10,"eventTimestamp":1.497119759876E9,"eventType":"DecisionTaskCompleted"},{"eventId":11,"eventTimestamp":1.497119759876E9,"eventType":"WorkflowExecutionCompleted","workflowExecutionCompletedEventAttributes":{"decisionTaskCompletedEventId":10,"result":"null"}}]}'} + headers: + Content-Length: ['2535'] + Content-Type: [application/json] + x-amzn-RequestId: [cb1ece9f-4e0b-11e7-8c8a-29c4fe87cb8e] + status: {code: 200, message: OK} +version: 1 diff --git a/tests/integration/test_lambda.py b/tests/integration/test_lambda.py new file mode 100644 index 000000000..e112ec3cf --- /dev/null +++ b/tests/integration/test_lambda.py @@ -0,0 +1,17 @@ +from sure import expect + +from tests.integration import VCRIntegrationTest, vcr + + +class TestLambda(VCRIntegrationTest): + @vcr.use_cassette + def test_lambda(self): + events = self.run_standalone('tests.integration.workflow.LambdaWorkflow') + expect(len(events)).should.equal(11) + expect(events[0]['workflowExecutionStartedEventAttributes']).should.have.key('lambdaRole') + expect(events[4]['eventType']).should.equal('LambdaFunctionScheduled') + expect(events[4]['lambdaFunctionScheduledEventAttributes']['name']).should.equal('hello-world-python') + expect(events[5]['eventType']).should.equal('LambdaFunctionStarted') + expect(events[6]['eventType']).should.equal('LambdaFunctionCompleted') + expect(events[6]['lambdaFunctionCompletedEventAttributes']['result']).should.equal('42') + print(events) diff --git a/tests/integration/test_task_list.py b/tests/integration/test_task_list.py index d37ae282f..d61d4c7a9 100644 --- a/tests/integration/test_task_list.py +++ b/tests/integration/test_task_list.py @@ -56,6 +56,7 @@ def test_not_standalone(self): input='[]', input_file=None, local=False, + lambda_role=None, ) while True: time.sleep(1) diff --git a/tests/integration/workflow.py b/tests/integration/workflow.py index d49c2f639..9d887049b 100644 --- a/tests/integration/workflow.py +++ b/tests/integration/workflow.py @@ -10,6 +10,8 @@ from simpleflow.canvas import Chain, Group from simpleflow.constants import HOUR, MINUTE from simpleflow.swf.utils import get_workflow_execution +from simpleflow.lambda_function import LambdaFunction +from simpleflow.swf.task import LambdaFunctionTask from simpleflow.task import ActivityTask @@ -154,6 +156,26 @@ def run(self): return future.result +class LambdaWorkflow(Workflow): + name = 'basic' + version = 'example' + task_list = 'example' + lambda_role = 'arn:aws:iam::111111000000:role/swf-lambda' + + def run(self): + future = self.submit( + LambdaFunctionTask( + LambdaFunction( + 'hello-world-python', + idempotent=True, + ), + 8, + foo='bar', + ) + ) + print(future.result) + + class TimerWorkflow(Workflow): name = 'example' version = 'example' diff --git a/tests/test_swf/models/test_workflow.py b/tests/test_swf/models/test_workflow.py index f6de03f94..30e8a566a 100644 --- a/tests/test_swf/models/test_workflow.py +++ b/tests/test_swf/models/test_workflow.py @@ -187,13 +187,13 @@ def test_workflow_type_changes_with_identical_workflow_type(self): self.assertLength(diffs, 0) def test_save_already_existing_type(self): - with patch.object(self.wt.connection, 'register_workflow_type') as mock: + with patch.object(self.wt, 'custom_register_workflow_type') as mock: with self.assertRaises(AlreadyExistsError): mock.side_effect = SWFTypeAlreadyExistsError(400, "mocked exception") self.wt.save() def test_save_with_response_error(self): - with patch.object(self.wt.connection, 'register_workflow_type') as mock: + with patch.object(self.wt, 'custom_register_workflow_type') as mock: with self.assertRaises(DoesNotExistError): mock.side_effect = SWFResponseError( 400, diff --git a/tests/test_swf/querysets/test_workflow.py b/tests/test_swf/querysets/test_workflow.py index 22192948d..553b39563 100644 --- a/tests/test_swf/querysets/test_workflow.py +++ b/tests/test_swf/querysets/test_workflow.py @@ -14,7 +14,7 @@ WorkflowExecutionQuerySet from swf.utils import datetime_timestamp, past_day -from ..mocks.workflow import mock_describe_workflow_type,\ +from tests.test_swf.mocks.workflow import mock_describe_workflow_type,\ mock_list_workflow_types,\ mock_list_open_workflow_executions,\ mock_list_closed_workflow_executions @@ -110,7 +110,7 @@ def test_get_or_create_non_existent_workflow_type(self): with patch.object(Layer1, 'describe_workflow_type') as mock: mock.side_effect = DoesNotExistError("Mocked exception") - with patch.object(Layer1, 'register_workflow_type', mock_describe_workflow_type): + with patch.object(WorkflowType, 'custom_register_workflow_type', mock_describe_workflow_type): workflow_type = self.wtq.get_or_create("TestDomain", "testversion") self.assertIsInstance(workflow_type, WorkflowType) @@ -145,9 +145,8 @@ def test_filter_with_registered_status(self): self.assertEqual(wt.status, REGISTERED) def test_create_workflow_type(self): - with patch.object(Layer1, 'register_workflow_type'): + with patch.object(WorkflowType, 'custom_register_workflow_type'): new_wt = self.wtq.create( - self.domain, "TestWorkflowType", "0.test", )