diff --git a/setup.py b/setup.py index 28429ab58..605640c18 100755 --- a/setup.py +++ b/setup.py @@ -82,6 +82,7 @@ def read(fname): include_package_data=True, install_requires=[ 'simple-workflow>=0.1.42', + 'futures' ], license=read("LICENSE"), zip_safe=False, diff --git a/simpleflow/__init__.py b/simpleflow/__init__.py index 0e13e5afc..4246dfef0 100755 --- a/simpleflow/__init__.py +++ b/simpleflow/__init__.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -__version__ = '0.3.0' +__version__ = '0.3.2' __author__ = 'Greg Leclercq' __license__ = "MIT" diff --git a/simpleflow/executor.py b/simpleflow/executor.py index 9ececc54e..7a4908d19 100644 --- a/simpleflow/executor.py +++ b/simpleflow/executor.py @@ -14,8 +14,8 @@ def get_actual_value(value): """Unwrap the result of a Future or return the value. """ - if isinstance(value, futures.Future): - return futures.get_result_or_raise(value) + if isinstance(value, futures.AbstractFuture): + return value.result() return value diff --git a/simpleflow/futures.py b/simpleflow/futures.py index 39d785a00..3bb109625 100644 --- a/simpleflow/futures.py +++ b/simpleflow/futures.py @@ -1,9 +1,16 @@ # -*- coding: utf-8 -*- -from . import exceptions +import abc +from concurrent.futures._base import ( + PENDING, + RUNNING, + CANCELLED, + CANCELLED_AND_NOTIFIED, + FINISHED +) -__all__ = ['Future', 'get_result_or_raise', 'wait'] +__all__ = ['AbstractFuture', 'wait'] FIRST_COMPLETED = 'FIRST_COMPLETED' @@ -11,11 +18,6 @@ ALL_COMPLETED = 'ALL_COMPLETED' _AS_COMPLETED = '_AS_COMPLETED' -PENDING = 'PENDING' -RUNNING = 'RUNNING' -CANCELLED = 'CANCELLED' -CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' -FINISHED = 'FINISHED' _FUTURE_STATES = [ PENDING, @@ -25,111 +27,50 @@ FINISHED ] -_STATE_TO_DESCRIPTION_MAP = { - PENDING: "pending", - RUNNING: "running", - CANCELLED: "cancelled", - CANCELLED_AND_NOTIFIED: "cancelled", - FINISHED: "finished" -} - - -def get_result_or_raise(future): - """Returns the ``result`` of *future* if it is available, otherwise - raise.""" - if future.state == PENDING: - raise exceptions.ExecutionBlocked() - return future.result - def wait(*fs): """Returns a list of the results of futures if there are available. - - Raises a ``exceptions.ExecutionBlocked`` otherwise. - """ - if any(future.state == PENDING for future in fs): - raise exceptions.ExecutionBlocked() - - return [future.result for future in fs] - - -class Future(object): - def __init__(self): - """Represents the state of a computation. - - It tries to mimics mod::`concurrent.futures` but involved some - adaptations to fit the Amazon SWF model. + return [future.result() for future in fs] - """ - self._state = PENDING - self._result = None - self._exception = None - def __repr__(self): - return '' % ( - hex(id(self)), - _STATE_TO_DESCRIPTION_MAP[self._state]) +class AbstractFuture(object): + """Base future class that defines an interface for concrete impls + """ - def wait(self): - raise exceptions.ExecutionBlocked + __metaclass__ = abc.ABCMeta - @property + @abc.abstractmethod def result(self): - """Raise a cls::`exceptions.ExecutionBlocked` when the result is not - available.""" - if self._state != FINISHED: - return self.wait() - - return self._result - - def cancel(self): - """Cancel a future. - - Note: cannot cancel a future that is already finished. - It will not raise an exception but return ``False`` to notify it. + """Return the result of the underlying computation + The actual behavior (blocking etc.) depends on impl """ - if self._state == FINISHED: - return False - - self._state = CANCELLED - return True - - @property - def state(self): - return self._state + raise NotImplementedError - @property + @abc.abstractmethod def exception(self): - """ - Returns `None` if no exception occurred, otherwise, returns the - exception object that what raised by the task. - - Raise a cls::`exceptions.ExecutionBlocked` when the result is not - available. + """Return the exception raised (if any) by the underlying computation + The actual behavior (blocking etc.) depends on impl """ - if self._state != FINISHED: - return self.wait() - - return self._exception - - @property - def cancelled(self): - return self._state == CANCELLED + raise NotImplementedError - @property + @abc.abstractmethod def running(self): - return self._state == RUNNING + """Return True if the underlying computation is currently executing + """ + raise NotImplementedError - @property + @abc.abstractmethod def finished(self): - return self._state == FINISHED + """Return True if the underlying computation has finished + """ + raise NotImplementedError - @property + @abc.abstractmethod def done(self): - return self._state in [ - CANCELLED, - FINISHED - ] + """Return True if the underlying compuation is cancelled or + has finished + """ + raise NotImplementedError \ No newline at end of file diff --git a/simpleflow/local/futures.py b/simpleflow/local/futures.py new file mode 100644 index 000000000..4c44be80b --- /dev/null +++ b/simpleflow/local/futures.py @@ -0,0 +1,21 @@ +from simpleflow.futures import AbstractFuture +from concurrent.futures import Future as PythonFuture + + +class Future(PythonFuture, AbstractFuture): + """Future impl for local execution + + The `concurrent.futures.Future` from python is itself a concrete impl + rather than an abstract class or an interface. So a multiple inheritance + is used to work with our interface `simpleflow.futures.AbstractFuture`. + + This class inherits both the abstracts methods from `AbstractFuture` and + concrete methods from `concurrent.futures.Future`. The `AbstractFuture` + interface is designed so that python's Future 'implements' its methods. + Since python's Future is placed left to the `AbstractFuture` in inheritance + declaration, runtime method resolution will pick the concrete methods. + + This Future class is interoperable with python's builtin executors. + """ + def finished(self): + return self.done() \ No newline at end of file diff --git a/simpleflow/swf/executor.py b/simpleflow/swf/executor.py index cf7d20e06..3be8a2cde 100644 --- a/simpleflow/swf/executor.py +++ b/simpleflow/swf/executor.py @@ -3,13 +3,13 @@ import json import logging +import swf.format import swf.models import swf.models.decision import swf.exceptions from simpleflow import ( executor, - futures, exceptions, constants, ) @@ -17,6 +17,7 @@ from simpleflow.workflow import Workflow from simpleflow.history import History from simpleflow.swf.task import ActivityTask, WorkflowTask +from . import futures logger = logging.getLogger(__name__) @@ -180,15 +181,15 @@ def resume_activity(self, task, event): if not future: # Task in history does not count. return None - if not future.finished: # Still pending or running... + if not future.finished(): # Still pending or running... return future - if future.exception is None: # Result available! + if future.exception() is None: # Result available! return future if event.get('retry', 0) == task.activity.retry: # No more retry! if task.activity.raises_on_failure: - raise exceptions.TaskException(task, future.exception) + raise exceptions.TaskException(task, future.exception()) return future # with future.exception set. # Otherwise retry the task by scheduling it again. @@ -306,8 +307,9 @@ def replay(self, history): decision = swf.models.decision.WorkflowExecutionDecision() decision.fail( - reason=reason, - details=details) + reason=swf.format.reason(reason), + details=swf.format.details(details), + ) return [decision], {} except Exception, err: @@ -319,12 +321,12 @@ def replay(self, history): self.on_failure(reason) decision = swf.models.decision.WorkflowExecutionDecision() - decision.fail(reason=reason) + decision.fail(reason=swf.format.reason(reason)) return [decision], {} decision = swf.models.decision.WorkflowExecutionDecision() - decision.complete(result=json.dumps(result)) + decision.complete(result=swf.format.result(json.dumps(result))) return [decision], {} @@ -339,8 +341,10 @@ def fail(self, reason, details=None): decision = swf.models.decision.WorkflowExecutionDecision() decision.fail( - reason='Workflow execution failed: {}'.format(reason), - details=details) + reason=swf.format.reason( + 'Workflow execution failed: {}'.format(reason)), + details=swf.format.details(details), + ) self._decisions.append(decision) raise exceptions.ExecutionBlocked('workflow execution failed') diff --git a/simpleflow/swf/futures.py b/simpleflow/swf/futures.py new file mode 100644 index 000000000..0969119d4 --- /dev/null +++ b/simpleflow/swf/futures.py @@ -0,0 +1,96 @@ +from simpleflow import exceptions +from simpleflow.futures import ( + FINISHED, + CANCELLED, + PENDING, + RUNNING, + CANCELLED_AND_NOTIFIED, + AbstractFuture +) + + +_STATE_TO_DESCRIPTION_MAP = { + PENDING: "pending", + RUNNING: "running", + CANCELLED: "cancelled", + CANCELLED_AND_NOTIFIED: "cancelled", + FINISHED: "finished" +} + + +class Future(AbstractFuture): + """Future impl that contains Simple Workflow specific logic + """ + + def __init__(self): + """Represents the state of a computation. + + It tries to mimics mod::`concurrent.futures` but involved some + adaptations to fit the Amazon SWF model. + + """ + self._state = PENDING + self._result = None + self._exception = None + + def __repr__(self): + return '' % ( + hex(id(self)), + _STATE_TO_DESCRIPTION_MAP[self._state]) + + @classmethod + def wait(cls): + raise exceptions.ExecutionBlocked + + def result(self): + """Raise a cls::`exceptions.ExecutionBlocked` when the result is not + available.""" + if self._state != FINISHED: + return self.wait() + + return self._result + + def cancel(self): + """Cancel a future. + + Note: cannot cancel a future that is already finished. + It will not raise an exception but return ``False`` to notify it. + + """ + if self._state == FINISHED: + return False + + self._state = CANCELLED + return True + + def state(self): + return self._state + + def exception(self): + """ + Returns `None` if no exception occurred, otherwise, returns the + exception object that what raised by the task. + + Raise a cls::`exceptions.ExecutionBlocked` when the result is not + available. + + """ + if self._state != FINISHED: + return self.wait() + + return self._exception + + def cancelled(self): + return self._state == CANCELLED + + def running(self): + return self._state == RUNNING + + def finished(self): + return self._state == FINISHED + + def done(self): + return self._state in [ + CANCELLED, + FINISHED + ] \ No newline at end of file diff --git a/tests/futures/test_futures.py b/tests/futures/test_futures.py index 9ea6df488..b202744a2 100644 --- a/tests/futures/test_futures.py +++ b/tests/futures/test_futures.py @@ -3,7 +3,7 @@ from simpleflow import futures -from simpleflow.futures import Future +from simpleflow.swf.futures import Future def test_future_init_state(): @@ -21,21 +21,21 @@ def test_future_init_repr(): def test_future_init_cancelled(): - assert Future().cancelled is False + assert Future().cancelled() is False def test_future_init_running(): - assert Future().running is False + assert Future().running() is False def test_future_init_done(): - assert Future().done is False + assert Future().done() is False def test_future_cancel(): future = Future() assert future.cancel() assert future._state == futures.CANCELLED - assert future.running is False - assert future.cancelled - assert future.done + assert future.running() is False + assert future.cancelled() + assert future.done() diff --git a/tests/test_dataflow.py b/tests/test_dataflow.py index 7ebc23f0a..2e6e8a5da 100644 --- a/tests/test_dataflow.py +++ b/tests/test_dataflow.py @@ -75,7 +75,7 @@ class TestDefinitionWithInput(TestWorkflow): """ def run(self, a): b = self.submit(increment, a) - return b.result + return b.result() def test_workflow_with_input(): @@ -118,11 +118,11 @@ class TestDefinition(TestWorkflow): """ def run(self): a = self.submit(increment, 1) - assert isinstance(a, futures.Future) + assert isinstance(a, futures.AbstractFuture) b = self.submit(double, a) - return b.result + return b.result() def test_workflow_with_two_tasks(): @@ -237,7 +237,7 @@ def run(self, *args, **kwargs): a = self.submit(increment, 1) b = self.submit(increment, a) - return b.result + return b.result() def test_workflow_with_same_task_called_two_times(): @@ -303,7 +303,7 @@ def run(self, *args, **kwargs): a = self.submit(increment, 1) a = self.submit(double, a) - return a.result + return a.result() def test_workflow_reuse_same_future(): @@ -364,7 +364,7 @@ def run(self, *args, **kwargs): b = self.submit(double, a) c = self.submit(increment, a) - return (b.result, c.result) + return b.result(), c.result() def test_workflow_with_two_tasks_same_future(): @@ -490,7 +490,7 @@ class TestDefinitionRetryActivity(TestWorkflow): def run(self, *args, **kwargs): a = self.submit(increment_retry, 7) - return a.result + return a.result() def test_workflow_retry_activity(): @@ -593,7 +593,7 @@ class TestDefinitionChildWorkflow(TestWorkflow): """ def run(self, x): y = self.submit(TestDefinition, x) - return y.result + return y.result() def test_workflow_with_child_workflow(): @@ -725,7 +725,7 @@ def run(self): if result.exception: self.fail('error') - return result.result + return result.result() def test_workflow_failed_from_definition(): @@ -771,7 +771,7 @@ class TestDefinitionActivityRaisesOnFailure(OnFailureMixin, TestWorkflow): """ def run(self): - return self.submit(raise_on_failure).result + return self.submit(raise_on_failure).result() def test_workflow_activity_raises_on_failure(): @@ -807,7 +807,7 @@ def test_workflow_activity_raises_on_failure(): class TestOnFailureDefinition(OnFailureMixin, TestWorkflow): def run(self): - if self.submit(raise_error).exception: + if self.submit(raise_error).exception(): self.fail('FAIL') @@ -846,7 +846,7 @@ def run(self): b = self.submit(increment, 2) c = self.submit(double, b) - return [a.result, b.result, c.result] + return [a.result(), b.result(), c.result()] def test_multiple_scheduled_activities():