Skip to content

Commit 1c5869c

Browse files
committed
Introduce concurrent.futures.Future as base future impl
1 parent b5809f9 commit 1c5869c

File tree

2 files changed

+23
-106
lines changed

2 files changed

+23
-106
lines changed

simpleflow/futures.py

Lines changed: 17 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,23 @@
11
# -*- coding: utf-8 -*-
22

3-
from . import exceptions
3+
from concurrent import futures as py_futures
4+
from concurrent.futures._base import (
5+
PENDING,
6+
RUNNING,
7+
CANCELLED,
8+
CANCELLED_AND_NOTIFIED,
9+
FINISHED
10+
)
411

512

6-
__all__ = ['Future', 'get_result_or_raise', 'wait']
13+
__all__ = ['Future', 'wait']
714

815

916
FIRST_COMPLETED = 'FIRST_COMPLETED'
1017
FIRST_EXCEPTION = 'FIRST_EXCEPTION'
1118
ALL_COMPLETED = 'ALL_COMPLETED'
1219
_AS_COMPLETED = '_AS_COMPLETED'
1320

14-
PENDING = 'PENDING'
15-
RUNNING = 'RUNNING'
16-
CANCELLED = 'CANCELLED'
17-
CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
18-
FINISHED = 'FINISHED'
1921

2022
_FUTURE_STATES = [
2123
PENDING,
@@ -34,102 +36,17 @@
3436
}
3537

3638

37-
def get_result_or_raise(future):
38-
"""Returns the ``result`` of *future* if it is available, otherwise
39-
raise."""
40-
if future.state == PENDING:
41-
raise exceptions.ExecutionBlocked()
42-
return future.result
43-
44-
4539
def wait(*fs):
4640
"""Returns a list of the results of futures if there are available.
47-
48-
Raises a ``exceptions.ExecutionBlocked`` otherwise.
49-
5041
"""
51-
if any(future.state == PENDING for future in fs):
52-
raise exceptions.ExecutionBlocked()
53-
54-
return [future.result for future in fs]
55-
56-
57-
class Future(object):
58-
def __init__(self):
59-
"""Represents the state of a computation.
60-
61-
It tries to mimics mod::`concurrent.futures` but involved some
62-
adaptations to fit the Amazon SWF model.
63-
64-
"""
65-
self._state = PENDING
66-
self._result = None
67-
self._exception = None
68-
69-
def __repr__(self):
70-
return '<Future at %s state=%s>' % (
71-
hex(id(self)),
72-
_STATE_TO_DESCRIPTION_MAP[self._state])
42+
return [future.result() for future in fs]
7343

74-
def wait(self):
75-
raise exceptions.ExecutionBlocked
7644

77-
@property
78-
def result(self):
79-
"""Raise a cls::`exceptions.ExecutionBlocked` when the result is not
80-
available."""
81-
if self._state != FINISHED:
82-
return self.wait()
83-
84-
return self._result
85-
86-
def cancel(self):
87-
"""Cancel a future.
88-
89-
Note: cannot cancel a future that is already finished.
90-
It will not raise an exception but return ``False`` to notify it.
91-
92-
"""
93-
if self._state == FINISHED:
94-
return False
95-
96-
self._state = CANCELLED
97-
return True
98-
99-
@property
100-
def state(self):
101-
return self._state
102-
103-
@property
104-
def exception(self):
105-
"""
106-
Returns `None` if no exception occurred, otherwise, returns the
107-
exception object that what raised by the task.
108-
109-
Raise a cls::`exceptions.ExecutionBlocked` when the result is not
110-
available.
111-
112-
"""
113-
if self._state != FINISHED:
114-
return self.wait()
115-
116-
return self._exception
117-
118-
@property
119-
def cancelled(self):
120-
return self._state == CANCELLED
121-
122-
@property
123-
def running(self):
124-
return self._state == RUNNING
125-
126-
@property
45+
class Future(py_futures.Future):
46+
"""Patched version of ``concurrent.futures.Future``
47+
"""
12748
def finished(self):
128-
return self._state == FINISHED
129-
130-
@property
131-
def done(self):
132-
return self._state in [
133-
CANCELLED,
134-
FINISHED
135-
]
49+
with self._condition:
50+
if self._state == FINISHED:
51+
return True
52+
return False

tests/futures/test_futures.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,21 @@ def test_future_init_repr():
2121

2222

2323
def test_future_init_cancelled():
24-
assert Future().cancelled is False
24+
assert Future().cancelled() is False
2525

2626

2727
def test_future_init_running():
28-
assert Future().running is False
28+
assert Future().running() is False
2929

3030

3131
def test_future_init_done():
32-
assert Future().done is False
32+
assert Future().done() is False
3333

3434

3535
def test_future_cancel():
3636
future = Future()
3737
assert future.cancel()
3838
assert future._state == futures.CANCELLED
39-
assert future.running is False
40-
assert future.cancelled
41-
assert future.done
39+
assert future.running() is False
40+
assert future.cancelled()
41+
assert future.done()

0 commit comments

Comments
 (0)