Skip to content

Commit b6e7979

Browse files
committed
Move Swf specific future semantic in swf.futures
1 parent 6914edf commit b6e7979

File tree

4 files changed

+52
-16
lines changed

4 files changed

+52
-16
lines changed

simpleflow/executor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def get_actual_value(value):
1515
1616
"""
1717
if isinstance(value, futures.Future):
18-
return futures.get_result_or_raise(value)
18+
return value.result()
1919
return value
2020

2121

simpleflow/swf/executor.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@
88

99
from simpleflow import (
1010
executor,
11-
futures,
1211
exceptions,
1312
constants,
1413
)
1514
from simpleflow.activity import Activity
1615
from simpleflow.workflow import Workflow
1716
from simpleflow.history import History
1817
from simpleflow.swf.task import ActivityTask, WorkflowTask
18+
from . import futures
1919

2020
logger = logging.getLogger(__name__)
2121

@@ -179,15 +179,15 @@ def resume_activity(self, task, event):
179179
if not future: # Task in history does not count.
180180
return None
181181

182-
if not future.finished: # Still pending or running...
182+
if not future.finished(): # Still pending or running...
183183
return future
184184

185-
if future.exception is None: # Result available!
185+
if future.exception() is None: # Result available!
186186
return future
187187

188188
if event.get('retry', 0) == task.activity.retry: # No more retry!
189189
if task.activity.raises_on_failure:
190-
raise exceptions.TaskException(task, future.exception)
190+
raise exceptions.TaskException(task, future.exception())
191191
return future # with future.exception set.
192192

193193
# Otherwise retry the task by scheduling it again.

simpleflow/swf/futures.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from simpleflow import exceptions
2+
from simpleflow.futures import (
3+
FINISHED,
4+
CANCELLED,
5+
PENDING,
6+
RUNNING,
7+
Future as BaseFuture
8+
)
9+
10+
11+
class Future(BaseFuture):
12+
"""Future with special Simple Workflow semantics
13+
"""
14+
@classmethod
15+
def wait(cls):
16+
raise exceptions.ExecutionBlocked
17+
18+
def result(self, timeout=None):
19+
with self._condition:
20+
if self._state != FINISHED:
21+
return self.wait()
22+
# TODO what happen if cancelled ???
23+
return self._result
24+
25+
def exception(self, timeout=None):
26+
with self._condition:
27+
if self._state != FINISHED:
28+
return self.wait()
29+
return self._exception
30+
31+
def cancel(self):
32+
with self._condition:
33+
if self._state == FINISHED:
34+
return False
35+
self._state = CANCELLED
36+
return True

tests/test_dataflow.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ class TestDefinitionWithInput(TestWorkflow):
7272
"""
7373
def run(self, a):
7474
b = self.submit(increment, a)
75-
return b.result
75+
return b.result()
7676

7777

7878
def test_workflow_with_input():
@@ -119,7 +119,7 @@ def run(self):
119119

120120
b = self.submit(double, a)
121121

122-
return b.result
122+
return b.result()
123123

124124

125125
def test_workflow_with_two_tasks():
@@ -234,7 +234,7 @@ def run(self, *args, **kwargs):
234234
a = self.submit(increment, 1)
235235
b = self.submit(increment, a)
236236

237-
return b.result
237+
return b.result()
238238

239239

240240
def test_workflow_with_same_task_called_two_times():
@@ -300,7 +300,7 @@ def run(self, *args, **kwargs):
300300
a = self.submit(increment, 1)
301301
a = self.submit(double, a)
302302

303-
return a.result
303+
return a.result()
304304

305305

306306
def test_workflow_reuse_same_future():
@@ -361,7 +361,7 @@ def run(self, *args, **kwargs):
361361
b = self.submit(double, a)
362362
c = self.submit(increment, a)
363363

364-
return (b.result, c.result)
364+
return b.result(), c.result()
365365

366366

367367
def test_workflow_with_two_tasks_same_future():
@@ -487,7 +487,7 @@ class TestDefinitionRetryActivity(TestWorkflow):
487487
def run(self, *args, **kwargs):
488488
a = self.submit(increment_retry, 7)
489489

490-
return a.result
490+
return a.result()
491491

492492

493493
def test_workflow_retry_activity():
@@ -590,7 +590,7 @@ class TestDefinitionChildWorkflow(TestWorkflow):
590590
"""
591591
def run(self, x):
592592
y = self.submit(TestDefinition, x)
593-
return y.result
593+
return y.result()
594594

595595

596596
def test_workflow_with_child_workflow():
@@ -722,7 +722,7 @@ def run(self):
722722
if result.exception:
723723
self.fail('error')
724724

725-
return result.result
725+
return result.result()
726726

727727

728728
def test_workflow_failed_from_definition():
@@ -768,7 +768,7 @@ class TestDefinitionActivityRaisesOnFailure(OnFailureMixin, TestWorkflow):
768768
769769
"""
770770
def run(self):
771-
return self.submit(raise_on_failure).result
771+
return self.submit(raise_on_failure).result()
772772

773773

774774
def test_workflow_activity_raises_on_failure():
@@ -804,7 +804,7 @@ def test_workflow_activity_raises_on_failure():
804804

805805
class TestOnFailureDefinition(OnFailureMixin, TestWorkflow):
806806
def run(self):
807-
if self.submit(raise_error).exception:
807+
if self.submit(raise_error).exception():
808808
self.fail('FAIL')
809809

810810

@@ -843,7 +843,7 @@ def run(self):
843843
b = self.submit(increment, 2)
844844
c = self.submit(double, b)
845845

846-
return [a.result, b.result, c.result]
846+
return [a.result(), b.result(), c.result()]
847847

848848

849849
def test_multiple_scheduled_activities():

0 commit comments

Comments
 (0)