Skip to content

Commit 384e88d

Browse files
committed
Refactor Future abstraction: define an abstract future and make executor specific future impls in there package
1 parent 6e45244 commit 384e88d

File tree

4 files changed

+141
-36
lines changed

4 files changed

+141
-36
lines changed

simpleflow/executor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ def get_actual_value(value):
1414
"""Unwrap the result of a Future or return the value.
1515
1616
"""
17-
if isinstance(value, futures.Future):
17+
if isinstance(value, futures.AbstractFuture):
1818
return value.result()
1919
return value
2020

simpleflow/futures.py

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# -*- coding: utf-8 -*-
22

3-
from concurrent import futures as py_futures
3+
import abc
44
from concurrent.futures._base import (
55
PENDING,
66
RUNNING,
@@ -10,7 +10,7 @@
1010
)
1111

1212

13-
__all__ = ['Future', 'wait']
13+
__all__ = ['AbstractFuture', 'wait']
1414

1515

1616
FIRST_COMPLETED = 'FIRST_COMPLETED'
@@ -27,26 +27,50 @@
2727
FINISHED
2828
]
2929

30-
_STATE_TO_DESCRIPTION_MAP = {
31-
PENDING: "pending",
32-
RUNNING: "running",
33-
CANCELLED: "cancelled",
34-
CANCELLED_AND_NOTIFIED: "cancelled",
35-
FINISHED: "finished"
36-
}
37-
3830

3931
def wait(*fs):
4032
"""Returns a list of the results of futures if there are available.
4133
"""
4234
return [future.result() for future in fs]
4335

4436

45-
class Future(py_futures.Future):
46-
"""Patched version of ``concurrent.futures.Future``
37+
class AbstractFuture(object):
38+
"""Base future class that defines an interface for concrete impls
4739
"""
40+
41+
__metaclass__ = abc.ABCMeta
42+
43+
@abc.abstractmethod
44+
def result(self):
45+
"""Return the result of the underlying computation
46+
47+
The actual behavior (blocking etc.) depends on impl
48+
"""
49+
raise NotImplementedError
50+
51+
@abc.abstractmethod
52+
def exception(self):
53+
"""Return the exception raised (if any) by the underlying computation
54+
55+
The actual behavior (blocking etc.) depends on impl
56+
"""
57+
raise NotImplementedError
58+
59+
@abc.abstractmethod
60+
def running(self):
61+
"""Return True if the underlying computation is currently executing
62+
"""
63+
raise NotImplementedError
64+
65+
@abc.abstractmethod
4866
def finished(self):
49-
with self._condition:
50-
if self._state == FINISHED:
51-
return True
52-
return False
67+
"""Return True if the underlying computation has finished
68+
"""
69+
raise NotImplementedError
70+
71+
@abc.abstractmethod
72+
def done(self):
73+
"""Return True if the underlying compuation is cancelled or
74+
has finished
75+
"""
76+
raise NotImplementedError

simpleflow/local/futures.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from simpleflow.futures import AbstractFuture
2+
from concurrent.futures import Future as PythonFuture
3+
4+
5+
class Future(PythonFuture, AbstractFuture):
6+
"""Future impl for local execution
7+
8+
The `concurrent.futures.Future` from python is itself a concrete impl
9+
rather than an abstract class or an interface. So a multiple inheritance
10+
is used to work with our interface `simpleflow.futures.AbstractFuture`.
11+
12+
This class inherits both the abstracts methods from `AbstractFuture` and
13+
concrete methods from `concurrent.futures.Future`. The `AbstractFuture`
14+
interface is designed so that python's Future 'implements' its methods.
15+
Since python's Future is placed left to the `AbstractFuture` in inheritance
16+
declaration, runtime method resolution will pick the concrete methods.
17+
18+
This Future class is interoperable with python's builtin executors.
19+
"""
20+
def finished(self):
21+
return self.done()

simpleflow/swf/futures.py

Lines changed: 79 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,33 +4,93 @@
44
CANCELLED,
55
PENDING,
66
RUNNING,
7-
Future as BaseFuture
7+
CANCELLED_AND_NOTIFIED,
8+
AbstractFuture
89
)
910

1011

11-
class Future(BaseFuture):
12-
"""Future with special Simple Workflow semantics
12+
_STATE_TO_DESCRIPTION_MAP = {
13+
PENDING: "pending",
14+
RUNNING: "running",
15+
CANCELLED: "cancelled",
16+
CANCELLED_AND_NOTIFIED: "cancelled",
17+
FINISHED: "finished"
18+
}
19+
20+
21+
class Future(AbstractFuture):
22+
"""Future impl that contains Simple Workflow specific logic
1323
"""
24+
25+
def __init__(self):
26+
"""Represents the state of a computation.
27+
28+
It tries to mimics mod::`concurrent.futures` but involved some
29+
adaptations to fit the Amazon SWF model.
30+
31+
"""
32+
self._state = PENDING
33+
self._result = None
34+
self._exception = None
35+
36+
def __repr__(self):
37+
return '<Future at %s state=%s>' % (
38+
hex(id(self)),
39+
_STATE_TO_DESCRIPTION_MAP[self._state])
40+
1441
@classmethod
1542
def wait(cls):
1643
raise exceptions.ExecutionBlocked
1744

18-
def result(self, timeout=None):
19-
with self._condition:
20-
if self._state != FINISHED:
21-
return self.wait()
22-
# TODO what happen if cancelled ???
23-
return self._result
45+
def result(self):
46+
"""Raise a cls::`exceptions.ExecutionBlocked` when the result is not
47+
available."""
48+
if self._state != FINISHED:
49+
return self.wait()
2450

25-
def exception(self, timeout=None):
26-
with self._condition:
27-
if self._state != FINISHED:
28-
return self.wait()
29-
return self._exception
51+
return self._result
3052

3153
def cancel(self):
32-
with self._condition:
33-
if self._state == FINISHED:
34-
return False
35-
self._state = CANCELLED
36-
return True
54+
"""Cancel a future.
55+
56+
Note: cannot cancel a future that is already finished.
57+
It will not raise an exception but return ``False`` to notify it.
58+
59+
"""
60+
if self._state == FINISHED:
61+
return False
62+
63+
self._state = CANCELLED
64+
return True
65+
66+
def state(self):
67+
return self._state
68+
69+
def exception(self):
70+
"""
71+
Returns `None` if no exception occurred, otherwise, returns the
72+
exception object that what raised by the task.
73+
74+
Raise a cls::`exceptions.ExecutionBlocked` when the result is not
75+
available.
76+
77+
"""
78+
if self._state != FINISHED:
79+
return self.wait()
80+
81+
return self._exception
82+
83+
def cancelled(self):
84+
return self._state == CANCELLED
85+
86+
def running(self):
87+
return self._state == RUNNING
88+
89+
def finished(self):
90+
return self._state == FINISHED
91+
92+
def done(self):
93+
return self._state in [
94+
CANCELLED,
95+
FINISHED
96+
]

0 commit comments

Comments
 (0)