Skip to content

Commit 1001763

Browse files
committed
WIP, does not compute
Signed-off-by: Yves Bastide <yves@botify.com>
1 parent 9c478db commit 1001763

File tree

4 files changed

+21
-23
lines changed

4 files changed

+21
-23
lines changed

simpleflow/executor.py

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@
33

44
from ._decorators import deprecated
55

6-
76
if False:
8-
from typing import Text, Type
7+
from typing import Type
98
from simpleflow import Workflow
109

1110
__all__ = ['Executor']
@@ -32,9 +31,6 @@ class Executor(object):
3231
- asynchronous
3332
- asynchronous with full replay
3433
35-
:ivar _workflow: the workflow
36-
:type _workflow: simpleflow.workflow.Workflow
37-
3834
"""
3935
__metaclass__ = abc.ABCMeta
4036

@@ -48,22 +44,22 @@ def __init__(self, workflow_class):
4844
as a program, the workflow, and an interpreter, the executor.
4945
5046
"""
51-
self._workflow = workflow_class(self)
47+
self._workflow_class = workflow_class
48+
self._workflow = None
5249

5350
@property
54-
def workflow(self):
55-
"""
56-
:return:
57-
:rtype: simpleflow.workflow.Workflow
58-
"""
59-
return self._workflow
51+
def workflow_class(self):
52+
return self._workflow_class
6053

6154
def run_workflow(self, *args, **kwargs):
6255
"""
6356
Runs the workflow definition.
6457
6558
"""
66-
workflow = self._workflow
59+
workflow = self.workflow_class(self)
60+
if False:
61+
assert isinstance(workflow, Workflow)
62+
self._workflow = workflow
6763
result = workflow.run(*args, **kwargs)
6864
return result
6965

@@ -84,7 +80,7 @@ def submit(self, submittable, *args, **kwargs):
8480
def map(self, callable, iterable):
8581
"""Submit *callable* with each of the items in ``*iterables``.
8682
87-
All items in ``*iterables`` must be serializable in JSON.
83+
All items in ``*iterable`` must be serializable in JSON.
8884
8985
"""
9086
return [self.submit(callable, argument) for

simpleflow/local/executor.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,18 @@ class Executor(executor.Executor):
2424
"""
2525
def __init__(self, workflow_class):
2626
super(Executor, self).__init__(workflow_class)
27+
self.update_workflow_class()
2728
self.nb_activities = 0
2829
self.signals_sent = set()
2930

30-
@property
31-
def _workflow_class(self):
31+
def update_workflow_class(self):
3232
"""
3333
Returns the workflow class with all the needed attributes for
3434
swf.models.history.builder.History()
3535
This allows to get a SWF-compatible history in local executions so that
3636
the metrology feature works correctly.
3737
"""
38-
cls = self._workflow.__class__
38+
cls = self._workflow_class
3939
for attr in ("decision_tasks_timeout", "execution_timeout", ):
4040
if not hasattr(cls, attr):
4141
setattr(cls, attr, None)

simpleflow/swf/executor.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
from simpleflow.base import Submittable
2222
from simpleflow.history import History
2323
from simpleflow.signal import WaitForSignal
24-
from simpleflow.swf import constants
2524
from simpleflow.swf.helpers import swf_identity
2625
from simpleflow.swf.task import ActivityTask, WorkflowTask, SignalTask
2726
from simpleflow.task import (
@@ -151,7 +150,6 @@ class Executor(executor.Executor):
151150
152151
:ivar domain: domain
153152
:type domain: swf.models.domain.Domain
154-
:ivar workflow: workflow
155153
:ivar task_list: task list
156154
:type task_list: Optional[str]
157155
:ivar repair_with: previous history to use for repairing
@@ -175,6 +173,7 @@ def __init__(self, domain, workflow_class, task_list=None, repair_with=None,
175173
self.force_activities = None
176174
self.reset()
177175

176+
# noinspection PyAttributeOutsideInit
178177
def reset(self):
179178
"""
180179
Clears the state of the execution.
@@ -187,6 +186,8 @@ def reset(self):
187186
self._decisions = []
188187
self._tasks = TaskRegistry()
189188
self._idempotent_tasks_to_submit = set()
189+
self._execution = None
190+
self.current_priority = None
190191

191192
def _make_task_id(self, a_task, *args, **kwargs):
192193
"""

simpleflow/swf/process/decider/base.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,21 +62,22 @@ def __init__(self, workflow_executors, domain, task_list, nb_retries=3,
6262
:type workflow_executors: list[simpleflow.swf.executor.Executor]
6363
6464
"""
65+
print(ex._workflow for ex in workflow_executors)
6566
self._workflow_name = '{}'.format(','.join(
6667
[
67-
ex.workflow.name for ex in workflow_executors
68+
ex.workflow_class.name for ex in workflow_executors
6869
]))
6970

7071
# Maps a workflow's name to its definition.
7172
# Used to dispatch a decision task to the corresponding workflow.
7273
self._workflow_executors = {
73-
executor.workflow.name: executor for executor in workflow_executors
74+
executor.workflow_class.name: executor for executor in workflow_executors
7475
}
7576

7677
if task_list:
7778
self.task_list = task_list
7879
else:
79-
self.task_list = workflow_executors[0].workflow.task_list
80+
self.task_list = workflow_executors[0].workflow_class.task_list
8081
# If not passed explicitly, all executors must use the same task list
8182
# else it's probably a mistake so we raise an error.
8283
self._check_all_task_lists_identical()
@@ -106,7 +107,7 @@ def _check_all_domains_identical(self):
106107

107108
def _check_all_task_lists_identical(self):
108109
for ex in self._workflow_executors.values():
109-
if ex.workflow.task_list != self.task_list:
110+
if ex.workflow_class.task_list != self.task_list:
110111
raise ValueError(
111112
'all workflows must have the same task list '
112113
'"{}" unless you specify it explicitly'.format(

0 commit comments

Comments
 (0)