Skip to content

Commit 246a01f

Browse files
committed
Move Swf specific future semantic in swf.futures
1 parent 1c5869c commit 246a01f

File tree

4 files changed

+52
-16
lines changed

4 files changed

+52
-16
lines changed

simpleflow/executor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def get_actual_value(value):
1515
1616
"""
1717
if isinstance(value, futures.Future):
18-
return futures.get_result_or_raise(value)
18+
return value.result()
1919
return value
2020

2121

simpleflow/swf/executor.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@
1010

1111
from simpleflow import (
1212
executor,
13-
futures,
1413
exceptions,
1514
constants,
1615
)
1716
from simpleflow.activity import Activity
1817
from simpleflow.workflow import Workflow
1918
from simpleflow.history import History
2019
from simpleflow.swf.task import ActivityTask, WorkflowTask
20+
from . import futures
2121

2222
logger = logging.getLogger(__name__)
2323

@@ -181,15 +181,15 @@ def resume_activity(self, task, event):
181181
if not future: # Task in history does not count.
182182
return None
183183

184-
if not future.finished: # Still pending or running...
184+
if not future.finished(): # Still pending or running...
185185
return future
186186

187-
if future.exception is None: # Result available!
187+
if future.exception() is None: # Result available!
188188
return future
189189

190190
if event.get('retry', 0) == task.activity.retry: # No more retry!
191191
if task.activity.raises_on_failure:
192-
raise exceptions.TaskException(task, future.exception)
192+
raise exceptions.TaskException(task, future.exception())
193193
return future # with future.exception set.
194194

195195
# Otherwise retry the task by scheduling it again.

simpleflow/swf/futures.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from simpleflow import exceptions
2+
from simpleflow.futures import (
3+
FINISHED,
4+
CANCELLED,
5+
PENDING,
6+
RUNNING,
7+
Future as BaseFuture
8+
)
9+
10+
11+
class Future(BaseFuture):
12+
"""Future with special Simple Workflow semantics
13+
"""
14+
@classmethod
15+
def wait(cls):
16+
raise exceptions.ExecutionBlocked
17+
18+
def result(self, timeout=None):
19+
with self._condition:
20+
if self._state != FINISHED:
21+
return self.wait()
22+
# TODO what happen if cancelled ???
23+
return self._result
24+
25+
def exception(self, timeout=None):
26+
with self._condition:
27+
if self._state != FINISHED:
28+
return self.wait()
29+
return self._exception
30+
31+
def cancel(self):
32+
with self._condition:
33+
if self._state == FINISHED:
34+
return False
35+
self._state = CANCELLED
36+
return True

tests/test_dataflow.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class TestDefinitionWithInput(TestWorkflow):
7575
"""
7676
def run(self, a):
7777
b = self.submit(increment, a)
78-
return b.result
78+
return b.result()
7979

8080

8181
def test_workflow_with_input():
@@ -122,7 +122,7 @@ def run(self):
122122

123123
b = self.submit(double, a)
124124

125-
return b.result
125+
return b.result()
126126

127127

128128
def test_workflow_with_two_tasks():
@@ -237,7 +237,7 @@ def run(self, *args, **kwargs):
237237
a = self.submit(increment, 1)
238238
b = self.submit(increment, a)
239239

240-
return b.result
240+
return b.result()
241241

242242

243243
def test_workflow_with_same_task_called_two_times():
@@ -303,7 +303,7 @@ def run(self, *args, **kwargs):
303303
a = self.submit(increment, 1)
304304
a = self.submit(double, a)
305305

306-
return a.result
306+
return a.result()
307307

308308

309309
def test_workflow_reuse_same_future():
@@ -364,7 +364,7 @@ def run(self, *args, **kwargs):
364364
b = self.submit(double, a)
365365
c = self.submit(increment, a)
366366

367-
return (b.result, c.result)
367+
return b.result(), c.result()
368368

369369

370370
def test_workflow_with_two_tasks_same_future():
@@ -490,7 +490,7 @@ class TestDefinitionRetryActivity(TestWorkflow):
490490
def run(self, *args, **kwargs):
491491
a = self.submit(increment_retry, 7)
492492

493-
return a.result
493+
return a.result()
494494

495495

496496
def test_workflow_retry_activity():
@@ -593,7 +593,7 @@ class TestDefinitionChildWorkflow(TestWorkflow):
593593
"""
594594
def run(self, x):
595595
y = self.submit(TestDefinition, x)
596-
return y.result
596+
return y.result()
597597

598598

599599
def test_workflow_with_child_workflow():
@@ -725,7 +725,7 @@ def run(self):
725725
if result.exception:
726726
self.fail('error')
727727

728-
return result.result
728+
return result.result()
729729

730730

731731
def test_workflow_failed_from_definition():
@@ -771,7 +771,7 @@ class TestDefinitionActivityRaisesOnFailure(OnFailureMixin, TestWorkflow):
771771
772772
"""
773773
def run(self):
774-
return self.submit(raise_on_failure).result
774+
return self.submit(raise_on_failure).result()
775775

776776

777777
def test_workflow_activity_raises_on_failure():
@@ -807,7 +807,7 @@ def test_workflow_activity_raises_on_failure():
807807

808808
class TestOnFailureDefinition(OnFailureMixin, TestWorkflow):
809809
def run(self):
810-
if self.submit(raise_error).exception:
810+
if self.submit(raise_error).exception():
811811
self.fail('FAIL')
812812

813813

@@ -846,7 +846,7 @@ def run(self):
846846
b = self.submit(increment, 2)
847847
c = self.submit(double, b)
848848

849-
return [a.result, b.result, c.result]
849+
return [a.result(), b.result(), c.result()]
850850

851851

852852
def test_multiple_scheduled_activities():

0 commit comments

Comments
 (0)