Skip to content

Commit 37cd59d

Browse files
authored
Merge pull request #209 from botify-labs/bugfix/200/try-to-use-less-memory
Bugfix/200/try to use less memory
2 parents 09ee246 + 371f03c commit 37cd59d

File tree

7 files changed

+80
-46
lines changed

7 files changed

+80
-46
lines changed

simpleflow/command.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,21 @@
2626
from simpleflow.utils import json_dumps
2727
from simpleflow import __version__
2828

29+
if False:
30+
from typing import Text, Type
31+
from simpleflow import Workflow
32+
from swf.models import WorkflowType
33+
34+
2935
logger = logging.getLogger(__name__)
3036

3137

3238
def get_workflow(clspath):
39+
# type: (Text) -> Type[Workflow]
3340
"""
3441
Import a workflow class.
3542
:param clspath: class path
36-
:type clspath: str
3743
:return:
38-
:rtype: simpleflow.workflow.Workflow
3944
"""
4045
modname, clsname = clspath.rsplit('.', 1)
4146
module = __import__(modname, fromlist=['*'])
@@ -78,19 +83,17 @@ def cli(ctx, header, format):
7883
ctx.params['header'] = header
7984

8085

81-
def get_workflow_type(domain_name, workflow):
86+
def get_workflow_type(domain_name, workflow_class):
87+
# type: (Text, Type[Workflow]) -> WorkflowType
8288
"""
83-
Get or create the given workflow.
89+
Get or create the given workflow on SWF.
8490
:param domain_name:
85-
:type domain_name: str
86-
:param workflow:
87-
:type workflow: simpleflow.workflow.Workflow
91+
:param workflow_class:
8892
:return:
89-
:rtype: swf.models.WorkflowType
9093
"""
9194
domain = swf.models.Domain(domain_name)
9295
query = swf.querysets.WorkflowTypeQuerySet(domain)
93-
return query.get_or_create(workflow.name, workflow.version)
96+
return query.get_or_create(workflow_class.name, workflow_class.version)
9497

9598

9699
def load_input(input_fp):
@@ -164,24 +167,24 @@ def start_workflow(workflow,
164167
input,
165168
input_file,
166169
local):
167-
workflow_definition = get_workflow(workflow)
170+
workflow_class = get_workflow(workflow)
168171

169172
wf_input = get_or_load_input(input_file, input)
170173

171174
if local:
172175
from .local import Executor
173176

174-
Executor(workflow_definition).run(wf_input)
177+
Executor(workflow_class).run(wf_input)
175178

176179
return
177180

178181
if not domain:
179182
raise ValueError('*domain* must be set when not running in local mode')
180183

181-
workflow_type = get_workflow_type(domain, workflow_definition)
184+
workflow_type = get_workflow_type(domain, workflow_class)
182185
execution = workflow_type.start_execution(
183186
workflow_id=workflow_id,
184-
task_list=task_list or workflow_definition.task_list,
187+
task_list=task_list or workflow_class.task_list,
185188
execution_timeout=execution_timeout,
186189
input=wf_input,
187190
tag_list=tags,

simpleflow/executor.py

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33

44
from ._decorators import deprecated
55

6+
if False:
7+
from typing import Type
8+
from simpleflow import Workflow
9+
610
__all__ = ['Executor']
711

812

@@ -27,13 +31,11 @@ class Executor(object):
2731
- asynchronous
2832
- asynchronous with full replay
2933
30-
:ivar _workflow: the workflow
31-
:type _workflow: simpleflow.workflow.Workflow
32-
3334
"""
3435
__metaclass__ = abc.ABCMeta
3536

36-
def __init__(self, workflow):
37+
def __init__(self, workflow_class):
38+
# type: (Type[Workflow]) -> None
3739
"""
3840
Binds the workflow's definition.
3941
@@ -42,23 +44,30 @@ def __init__(self, workflow):
4244
as a program, the workflow, and an interpreter, the executor.
4345
4446
"""
45-
self._workflow = workflow(self)
47+
self._workflow_class = workflow_class
48+
self._workflow = None
49+
50+
@property
51+
def workflow_class(self):
52+
return self._workflow_class
4653

4754
@property
4855
def workflow(self):
49-
"""
50-
:return:
51-
:rtype: simpleflow.workflow.Workflow
52-
"""
5356
return self._workflow
5457

58+
def create_workflow(self):
59+
if self._workflow is None:
60+
workflow = self._workflow_class(self)
61+
if False:
62+
assert isinstance(workflow, Workflow)
63+
self._workflow = workflow
64+
5565
def run_workflow(self, *args, **kwargs):
5666
"""
5767
Runs the workflow definition.
5868
5969
"""
60-
workflow = self._workflow
61-
result = workflow.run(*args, **kwargs)
70+
result = self._workflow.run(*args, **kwargs)
6271
return result
6372

6473
@abc.abstractmethod
@@ -78,7 +87,7 @@ def submit(self, submittable, *args, **kwargs):
7887
def map(self, callable, iterable):
7988
"""Submit *callable* with each of the items in ``*iterables``.
8089
81-
All items in ``*iterables`` must be serializable in JSON.
90+
All items in ``*iterable`` must be serializable in JSON.
8291
8392
"""
8493
return [self.submit(callable, argument) for

simpleflow/local/executor.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,20 @@ class Executor(executor.Executor):
2222
Executes all tasks synchronously in a single local process.
2323
2424
"""
25-
def __init__(self, workflow):
26-
super(Executor, self).__init__(workflow)
25+
def __init__(self, workflow_class):
26+
super(Executor, self).__init__(workflow_class)
27+
self.update_workflow_class()
2728
self.nb_activities = 0
2829
self.signals_sent = set()
2930

30-
@property
31-
def _workflow_class(self):
31+
def update_workflow_class(self):
3232
"""
3333
Returns the workflow class with all the needed attributes for
3434
swf.models.history.builder.History()
3535
This allows to get a SWF-compatible history in local executions so that
3636
the metrology feature works correctly.
3737
"""
38-
cls = self._workflow.__class__
38+
cls = self._workflow_class
3939
for attr in ("decision_tasks_timeout", "execution_timeout", ):
4040
if not hasattr(cls, attr):
4141
setattr(cls, attr, None)
@@ -105,12 +105,15 @@ def run(self, input=None):
105105
input = {}
106106
args = input.get('args', ())
107107
kwargs = input.get('kwargs', {})
108+
self.create_workflow()
108109

109110
self.initialize_history(input)
110111

111112
self.before_replay()
112113
result = self.run_workflow(*args, **kwargs)
113114

115+
# Hack: self._history must be available to the callback as a
116+
# simpleflow.history.History, not a swf.models.history.builder.History
114117
self._history = History(self._history)
115118
self._history.parse()
116119
self.after_replay()

simpleflow/swf/executor.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
from simpleflow.base import Submittable
2222
from simpleflow.history import History
2323
from simpleflow.signal import WaitForSignal
24-
from simpleflow.swf import constants
2524
from simpleflow.swf.helpers import swf_identity
2625
from simpleflow.swf.task import ActivityTask, WorkflowTask, SignalTask
2726
from simpleflow.task import (
@@ -151,7 +150,6 @@ class Executor(executor.Executor):
151150
152151
:ivar domain: domain
153152
:type domain: swf.models.domain.Domain
154-
:ivar workflow: workflow
155153
:ivar task_list: task list
156154
:type task_list: Optional[str]
157155
:ivar repair_with: previous history to use for repairing
@@ -161,9 +159,9 @@ class Executor(executor.Executor):
161159
162160
"""
163161

164-
def __init__(self, domain, workflow, task_list=None, repair_with=None,
162+
def __init__(self, domain, workflow_class, task_list=None, repair_with=None,
165163
force_activities=None):
166-
super(Executor, self).__init__(workflow)
164+
super(Executor, self).__init__(workflow_class)
167165
self._history = None
168166
self._execution_context = {}
169167
self.domain = domain
@@ -175,6 +173,7 @@ def __init__(self, domain, workflow, task_list=None, repair_with=None,
175173
self.force_activities = None
176174
self.reset()
177175

176+
# noinspection PyAttributeOutsideInit
178177
def reset(self):
179178
"""
180179
Clears the state of the execution.
@@ -187,6 +186,9 @@ def reset(self):
187186
self._decisions = []
188187
self._tasks = TaskRegistry()
189188
self._idempotent_tasks_to_submit = set()
189+
self._execution = None
190+
self.current_priority = None
191+
self.create_workflow()
190192

191193
def _make_task_id(self, a_task, *args, **kwargs):
192194
"""
@@ -783,6 +785,7 @@ def replay(self, decision_response):
783785
len(self._decisions),
784786
))
785787
self.after_replay()
788+
self.decref_workflow()
786789
return self._decisions, {}
787790
except exceptions.TaskException as err:
788791
reason = 'Workflow execution error in task {}: "{}"'.format(
@@ -799,6 +802,7 @@ def replay(self, decision_response):
799802
details=swf.format.details(details),
800803
)
801804
self.after_closed()
805+
self.decref_workflow()
802806
return [decision], {}
803807

804808
except Exception as err:
@@ -819,15 +823,23 @@ def replay(self, decision_response):
819823
details=swf.format.details(details),
820824
)
821825
self.after_closed()
826+
self.decref_workflow()
822827
return [decision], {}
823828

824829
self.after_replay()
825830
decision = swf.models.decision.WorkflowExecutionDecision()
826831
decision.complete(result=swf.format.result(json_dumps(result)))
827832
self.on_completed()
828833
self.after_closed()
834+
self.decref_workflow()
829835
return [decision], {}
830836

837+
def decref_workflow(self):
838+
"""
839+
Set the `_workflow` ivar to None in the hope of reducing memory consumption.
840+
"""
841+
self._workflow = None
842+
831843
def before_replay(self):
832844
return self._workflow.before_replay(self._history)
833845

simpleflow/swf/process/decider/base.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,19 +64,19 @@ def __init__(self, workflow_executors, domain, task_list, nb_retries=3,
6464
"""
6565
self._workflow_name = '{}'.format(','.join(
6666
[
67-
ex.workflow.name for ex in workflow_executors
67+
ex.workflow_class.name for ex in workflow_executors
6868
]))
6969

7070
# Maps a workflow's name to its definition.
7171
# Used to dispatch a decision task to the corresponding workflow.
7272
self._workflow_executors = {
73-
executor.workflow.name: executor for executor in workflow_executors
73+
executor.workflow_class.name: executor for executor in workflow_executors
7474
}
7575

7676
if task_list:
7777
self.task_list = task_list
7878
else:
79-
self.task_list = workflow_executors[0].workflow.task_list
79+
self.task_list = workflow_executors[0].workflow_class.task_list
8080
# If not passed explicitly, all executors must use the same task list
8181
# else it's probably a mistake so we raise an error.
8282
self._check_all_task_lists_identical()
@@ -106,7 +106,7 @@ def _check_all_domains_identical(self):
106106

107107
def _check_all_task_lists_identical(self):
108108
for ex in self._workflow_executors.values():
109-
if ex.workflow.task_list != self.task_list:
109+
if ex.workflow_class.task_list != self.task_list:
110110
raise ValueError(
111111
'all workflows must have the same task list '
112112
'"{}" unless you specify it explicitly'.format(

simpleflow/swf/task.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ def schedule(self, domain, task_list=None, **kwargs):
3232
:param task_list:
3333
:type task_list: Optional[str]
3434
:param kwargs:
35-
:type kwargs: dict
3635
:return:
3736
:rtype: list[swf.models.decision.Decision]
3837
"""

tests/test_simpleflow/test_dataflow.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
from swf.responses import Response
1919

2020
from simpleflow import (
21-
Workflow,
2221
futures,
2322
)
2423
from simpleflow.task import ActivityTask
@@ -40,6 +39,9 @@
4039
)
4140

4241

42+
# Note: tests checking the Executor.workflow instance must patch `decref_workflow`.
43+
44+
4345
def check_task_scheduled_decision(decision, task):
4446
"""
4547
Asserts that *decision* schedules *task*.
@@ -218,7 +220,8 @@ def run(self, a):
218220

219221

220222
@mock_swf
221-
def test_workflow_with_before_replay():
223+
@patch.object(Executor, 'decref_workflow')
224+
def test_workflow_with_before_replay(mock_decref_workflow):
222225
workflow = ATestDefinitionWithBeforeReplay
223226
executor = Executor(DOMAIN, workflow)
224227

@@ -248,7 +251,8 @@ def run(self, a):
248251

249252

250253
@mock_swf
251-
def test_workflow_with_after_replay():
254+
@patch.object(Executor, 'decref_workflow')
255+
def test_workflow_with_after_replay(mock_decref_workflow):
252256
workflow = ATestDefinitionWithAfterReplay
253257
executor = Executor(DOMAIN, workflow)
254258

@@ -277,7 +281,8 @@ def run(self, a):
277281

278282

279283
@mock_swf
280-
def test_workflow_with_after_closed():
284+
@patch.object(Executor, 'decref_workflow')
285+
def test_workflow_with_after_closed(mock_decref_workflow):
281286
workflow = ATestDefinitionWithAfterClosed
282287
executor = Executor(DOMAIN, workflow)
283288

@@ -1068,7 +1073,8 @@ def run(self):
10681073

10691074

10701075
@mock_swf
1071-
def test_workflow_failed_from_definition():
1076+
@patch.object(Executor, 'decref_workflow')
1077+
def test_workflow_failed_from_definition(mock_decref_workflow):
10721078
workflow = ATestDefinitionFailWorkflow
10731079
executor = Executor(DOMAIN, workflow)
10741080
history = builder.History(workflow)
@@ -1110,7 +1116,8 @@ def run(self):
11101116

11111117

11121118
@mock_swf
1113-
def test_workflow_activity_raises_on_failure():
1119+
@patch.object(Executor, 'decref_workflow')
1120+
def test_workflow_activity_raises_on_failure(mock_decref_workflow):
11141121
workflow = ATestDefinitionActivityRaisesOnFailure
11151122
executor = Executor(DOMAIN, workflow)
11161123
history = builder.History(workflow)
@@ -1148,7 +1155,8 @@ def run(self):
11481155

11491156

11501157
@mock_swf
1151-
def test_on_failure_callback():
1158+
@patch.object(Executor, 'decref_workflow')
1159+
def test_on_failure_callback(mock_decref_workflow):
11521160
workflow = ATestOnFailureDefinition
11531161
executor = Executor(DOMAIN, workflow)
11541162
history = builder.History(workflow)

0 commit comments

Comments
 (0)