Skip to content

Commit f8a7a37

Browse files
committed
Executor: add create_workflow
The `create_workflow` method must be called at the start of `Executor.run`. `run` can later set the `_workflow` instance to None, if this preserves memory... Signed-off-by: Yves Bastide <yves@botify.com>
1 parent 1001763 commit f8a7a37

File tree

5 files changed

+24
-7
lines changed

5 files changed

+24
-7
lines changed

simpleflow/executor.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,16 +51,23 @@ def __init__(self, workflow_class):
5151
def workflow_class(self):
5252
return self._workflow_class
5353

54+
@property
55+
def workflow(self):
56+
return self._workflow
57+
58+
def create_workflow(self):
59+
if self._workflow is None:
60+
workflow = self._workflow_class(self)
61+
if False:
62+
assert isinstance(workflow, Workflow)
63+
self._workflow = workflow
64+
5465
def run_workflow(self, *args, **kwargs):
5566
"""
5667
Runs the workflow definition.
5768
5869
"""
59-
workflow = self.workflow_class(self)
60-
if False:
61-
assert isinstance(workflow, Workflow)
62-
self._workflow = workflow
63-
result = workflow.run(*args, **kwargs)
70+
result = self._workflow.run(*args, **kwargs)
6471
return result
6572

6673
@abc.abstractmethod

simpleflow/local/executor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ def run(self, input=None):
105105
input = {}
106106
args = input.get('args', ())
107107
kwargs = input.get('kwargs', {})
108+
self.create_workflow()
108109

109110
self.initialize_history(input)
110111

simpleflow/swf/executor.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ def reset(self):
188188
self._idempotent_tasks_to_submit = set()
189189
self._execution = None
190190
self.current_priority = None
191+
self.create_workflow()
191192

192193
def _make_task_id(self, a_task, *args, **kwargs):
193194
"""
@@ -784,6 +785,7 @@ def replay(self, decision_response):
784785
len(self._decisions),
785786
))
786787
self.after_replay()
788+
self.decref_workflow()
787789
return self._decisions, {}
788790
except exceptions.TaskException as err:
789791
reason = 'Workflow execution error in task {}: "{}"'.format(
@@ -800,6 +802,7 @@ def replay(self, decision_response):
800802
details=swf.format.details(details),
801803
)
802804
self.after_closed()
805+
self.decref_workflow()
803806
return [decision], {}
804807

805808
except Exception as err:
@@ -820,15 +823,20 @@ def replay(self, decision_response):
820823
details=swf.format.details(details),
821824
)
822825
self.after_closed()
826+
self.decref_workflow()
823827
return [decision], {}
824828

825829
self.after_replay()
826830
decision = swf.models.decision.WorkflowExecutionDecision()
827831
decision.complete(result=swf.format.result(json_dumps(result)))
828832
self.on_completed()
829833
self.after_closed()
834+
self.decref_workflow()
830835
return [decision], {}
831836

837+
def decref_workflow(self):
838+
self._workflow = None
839+
832840
def before_replay(self):
833841
return self._workflow.before_replay(self._history)
834842

simpleflow/swf/task.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ def schedule(self, domain, task_list=None, **kwargs):
3232
:param task_list:
3333
:type task_list: Optional[str]
3434
:param kwargs:
35-
:type kwargs: dict
3635
:return:
3736
:rtype: list[swf.models.decision.Decision]
3837
"""

tests/test_simpleflow/test_dataflow.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
from swf.responses import Response
1919

2020
from simpleflow import (
21-
Workflow,
2221
futures,
2322
)
2423
from simpleflow.task import ActivityTask
@@ -40,6 +39,9 @@
4039
)
4140

4241

42+
Executor.decref_workflow = lambda _: None # FIXME use @patch?
43+
44+
4345
def check_task_scheduled_decision(decision, task):
4446
"""
4547
Asserts that *decision* schedules *task*.

0 commit comments

Comments
 (0)