From d6c587eb3a17dbf89febc93d2f12518e9482d3ff Mon Sep 17 00:00:00 2001 From: Greg Leclercq Date: Thu, 7 Aug 2014 16:10:37 +0200 Subject: [PATCH 1/9] Bump version to 0.3.1 --- simpleflow/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/simpleflow/__init__.py b/simpleflow/__init__.py index 0e13e5afc..a63dc9752 100755 --- a/simpleflow/__init__.py +++ b/simpleflow/__init__.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -__version__ = '0.3.0' +__version__ = '0.3.1' __author__ = 'Greg Leclercq' __license__ = "MIT" From 63aec6ce4c8da1cad05d4258d1d71423eb2643ce Mon Sep 17 00:00:00 2001 From: Greg Leclercq Date: Fri, 16 Jan 2015 15:22:27 +0100 Subject: [PATCH 2/9] Fix swf.executor: wrap reason and details with swf.format functions --- simpleflow/swf/executor.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/simpleflow/swf/executor.py b/simpleflow/swf/executor.py index cf7d20e06..98f951f1d 100644 --- a/simpleflow/swf/executor.py +++ b/simpleflow/swf/executor.py @@ -3,6 +3,7 @@ import json import logging +import swf.format import swf.models import swf.models.decision import swf.exceptions @@ -306,8 +307,9 @@ def replay(self, history): decision = swf.models.decision.WorkflowExecutionDecision() decision.fail( - reason=reason, - details=details) + reason=swf.format.reason(reason), + details=swf.format.details(details), + ) return [decision], {} except Exception, err: @@ -319,7 +321,7 @@ def replay(self, history): self.on_failure(reason) decision = swf.models.decision.WorkflowExecutionDecision() - decision.fail(reason=reason) + decision.fail(reason=swf.format.reason(reason)) return [decision], {} @@ -339,8 +341,10 @@ def fail(self, reason, details=None): decision = swf.models.decision.WorkflowExecutionDecision() decision.fail( - reason='Workflow execution failed: {}'.format(reason), - details=details) + reason=swf.format.reason( + 'Workflow execution failed: {}'.format(reason)), + details=swf.format.details(details), + ) self._decisions.append(decision) raise exceptions.ExecutionBlocked('workflow execution failed') From e0c04fbdda0a88371ddad4cc52d1843ac34786f6 Mon Sep 17 00:00:00 2001 From: Greg Leclercq Date: Fri, 16 Jan 2015 15:29:15 +0100 Subject: [PATCH 3/9] Fix swf.executor: wrap result with swf.format.result() --- simpleflow/swf/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/simpleflow/swf/executor.py b/simpleflow/swf/executor.py index 98f951f1d..422a1be13 100644 --- a/simpleflow/swf/executor.py +++ b/simpleflow/swf/executor.py @@ -326,7 +326,7 @@ def replay(self, history): return [decision], {} decision = swf.models.decision.WorkflowExecutionDecision() - decision.complete(result=json.dumps(result)) + decision.complete(result=swf.format.result(json.dumps(result))) return [decision], {} From b5809f931cd8c49034075a0967a800431791857c Mon Sep 17 00:00:00 2001 From: Greg Leclercq Date: Fri, 16 Jan 2015 15:55:00 +0100 Subject: [PATCH 4/9] Bump version to 0.3.2 --- simpleflow/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/simpleflow/__init__.py b/simpleflow/__init__.py index a63dc9752..4246dfef0 100755 --- a/simpleflow/__init__.py +++ b/simpleflow/__init__.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -__version__ = '0.3.1' +__version__ = '0.3.2' __author__ = 'Greg Leclercq' __license__ = "MIT" From 1c5869c52023dea5386765cc759ef46de5712d9c Mon Sep 17 00:00:00 2001 From: Han JU Date: Fri, 25 Jul 2014 15:36:25 +0200 Subject: [PATCH 5/9] Introduce concurrent.futures.Future as base future impl --- simpleflow/futures.py | 117 +++++----------------------------- tests/futures/test_futures.py | 12 ++-- 2 files changed, 23 insertions(+), 106 deletions(-) diff --git a/simpleflow/futures.py b/simpleflow/futures.py index 39d785a00..3a2d55905 100644 --- a/simpleflow/futures.py +++ b/simpleflow/futures.py @@ -1,9 +1,16 @@ # -*- coding: utf-8 -*- -from . import exceptions +from concurrent import futures as py_futures +from concurrent.futures._base import ( + PENDING, + RUNNING, + CANCELLED, + CANCELLED_AND_NOTIFIED, + FINISHED +) -__all__ = ['Future', 'get_result_or_raise', 'wait'] +__all__ = ['Future', 'wait'] FIRST_COMPLETED = 'FIRST_COMPLETED' @@ -11,11 +18,6 @@ ALL_COMPLETED = 'ALL_COMPLETED' _AS_COMPLETED = '_AS_COMPLETED' -PENDING = 'PENDING' -RUNNING = 'RUNNING' -CANCELLED = 'CANCELLED' -CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' -FINISHED = 'FINISHED' _FUTURE_STATES = [ PENDING, @@ -34,102 +36,17 @@ } -def get_result_or_raise(future): - """Returns the ``result`` of *future* if it is available, otherwise - raise.""" - if future.state == PENDING: - raise exceptions.ExecutionBlocked() - return future.result - - def wait(*fs): """Returns a list of the results of futures if there are available. - - Raises a ``exceptions.ExecutionBlocked`` otherwise. - """ - if any(future.state == PENDING for future in fs): - raise exceptions.ExecutionBlocked() - - return [future.result for future in fs] - - -class Future(object): - def __init__(self): - """Represents the state of a computation. - - It tries to mimics mod::`concurrent.futures` but involved some - adaptations to fit the Amazon SWF model. - - """ - self._state = PENDING - self._result = None - self._exception = None - - def __repr__(self): - return '' % ( - hex(id(self)), - _STATE_TO_DESCRIPTION_MAP[self._state]) + return [future.result() for future in fs] - def wait(self): - raise exceptions.ExecutionBlocked - @property - def result(self): - """Raise a cls::`exceptions.ExecutionBlocked` when the result is not - available.""" - if self._state != FINISHED: - return self.wait() - - return self._result - - def cancel(self): - """Cancel a future. - - Note: cannot cancel a future that is already finished. - It will not raise an exception but return ``False`` to notify it. - - """ - if self._state == FINISHED: - return False - - self._state = CANCELLED - return True - - @property - def state(self): - return self._state - - @property - def exception(self): - """ - Returns `None` if no exception occurred, otherwise, returns the - exception object that what raised by the task. - - Raise a cls::`exceptions.ExecutionBlocked` when the result is not - available. - - """ - if self._state != FINISHED: - return self.wait() - - return self._exception - - @property - def cancelled(self): - return self._state == CANCELLED - - @property - def running(self): - return self._state == RUNNING - - @property +class Future(py_futures.Future): + """Patched version of ``concurrent.futures.Future`` + """ def finished(self): - return self._state == FINISHED - - @property - def done(self): - return self._state in [ - CANCELLED, - FINISHED - ] + with self._condition: + if self._state == FINISHED: + return True + return False \ No newline at end of file diff --git a/tests/futures/test_futures.py b/tests/futures/test_futures.py index 9ea6df488..d1a7b7a42 100644 --- a/tests/futures/test_futures.py +++ b/tests/futures/test_futures.py @@ -21,21 +21,21 @@ def test_future_init_repr(): def test_future_init_cancelled(): - assert Future().cancelled is False + assert Future().cancelled() is False def test_future_init_running(): - assert Future().running is False + assert Future().running() is False def test_future_init_done(): - assert Future().done is False + assert Future().done() is False def test_future_cancel(): future = Future() assert future.cancel() assert future._state == futures.CANCELLED - assert future.running is False - assert future.cancelled - assert future.done + assert future.running() is False + assert future.cancelled() + assert future.done() From 246a01fe95d72785e8cab2fdd6dafafb9e681afd Mon Sep 17 00:00:00 2001 From: Han JU Date: Fri, 25 Jul 2014 15:57:21 +0200 Subject: [PATCH 6/9] Move Swf specific future semantic in swf.futures --- simpleflow/executor.py | 2 +- simpleflow/swf/executor.py | 8 ++++---- simpleflow/swf/futures.py | 36 ++++++++++++++++++++++++++++++++++++ tests/test_dataflow.py | 22 +++++++++++----------- 4 files changed, 52 insertions(+), 16 deletions(-) create mode 100644 simpleflow/swf/futures.py diff --git a/simpleflow/executor.py b/simpleflow/executor.py index 9ececc54e..27c71381e 100644 --- a/simpleflow/executor.py +++ b/simpleflow/executor.py @@ -15,7 +15,7 @@ def get_actual_value(value): """ if isinstance(value, futures.Future): - return futures.get_result_or_raise(value) + return value.result() return value diff --git a/simpleflow/swf/executor.py b/simpleflow/swf/executor.py index 422a1be13..3be8a2cde 100644 --- a/simpleflow/swf/executor.py +++ b/simpleflow/swf/executor.py @@ -10,7 +10,6 @@ from simpleflow import ( executor, - futures, exceptions, constants, ) @@ -18,6 +17,7 @@ from simpleflow.workflow import Workflow from simpleflow.history import History from simpleflow.swf.task import ActivityTask, WorkflowTask +from . import futures logger = logging.getLogger(__name__) @@ -181,15 +181,15 @@ def resume_activity(self, task, event): if not future: # Task in history does not count. return None - if not future.finished: # Still pending or running... + if not future.finished(): # Still pending or running... return future - if future.exception is None: # Result available! + if future.exception() is None: # Result available! return future if event.get('retry', 0) == task.activity.retry: # No more retry! if task.activity.raises_on_failure: - raise exceptions.TaskException(task, future.exception) + raise exceptions.TaskException(task, future.exception()) return future # with future.exception set. # Otherwise retry the task by scheduling it again. diff --git a/simpleflow/swf/futures.py b/simpleflow/swf/futures.py new file mode 100644 index 000000000..16f2f33ed --- /dev/null +++ b/simpleflow/swf/futures.py @@ -0,0 +1,36 @@ +from simpleflow import exceptions +from simpleflow.futures import ( + FINISHED, + CANCELLED, + PENDING, + RUNNING, + Future as BaseFuture +) + + +class Future(BaseFuture): + """Future with special Simple Workflow semantics + """ + @classmethod + def wait(cls): + raise exceptions.ExecutionBlocked + + def result(self, timeout=None): + with self._condition: + if self._state != FINISHED: + return self.wait() + # TODO what happen if cancelled ??? + return self._result + + def exception(self, timeout=None): + with self._condition: + if self._state != FINISHED: + return self.wait() + return self._exception + + def cancel(self): + with self._condition: + if self._state == FINISHED: + return False + self._state = CANCELLED + return True \ No newline at end of file diff --git a/tests/test_dataflow.py b/tests/test_dataflow.py index 7ebc23f0a..ee62b7a50 100644 --- a/tests/test_dataflow.py +++ b/tests/test_dataflow.py @@ -75,7 +75,7 @@ class TestDefinitionWithInput(TestWorkflow): """ def run(self, a): b = self.submit(increment, a) - return b.result + return b.result() def test_workflow_with_input(): @@ -122,7 +122,7 @@ def run(self): b = self.submit(double, a) - return b.result + return b.result() def test_workflow_with_two_tasks(): @@ -237,7 +237,7 @@ def run(self, *args, **kwargs): a = self.submit(increment, 1) b = self.submit(increment, a) - return b.result + return b.result() def test_workflow_with_same_task_called_two_times(): @@ -303,7 +303,7 @@ def run(self, *args, **kwargs): a = self.submit(increment, 1) a = self.submit(double, a) - return a.result + return a.result() def test_workflow_reuse_same_future(): @@ -364,7 +364,7 @@ def run(self, *args, **kwargs): b = self.submit(double, a) c = self.submit(increment, a) - return (b.result, c.result) + return b.result(), c.result() def test_workflow_with_two_tasks_same_future(): @@ -490,7 +490,7 @@ class TestDefinitionRetryActivity(TestWorkflow): def run(self, *args, **kwargs): a = self.submit(increment_retry, 7) - return a.result + return a.result() def test_workflow_retry_activity(): @@ -593,7 +593,7 @@ class TestDefinitionChildWorkflow(TestWorkflow): """ def run(self, x): y = self.submit(TestDefinition, x) - return y.result + return y.result() def test_workflow_with_child_workflow(): @@ -725,7 +725,7 @@ def run(self): if result.exception: self.fail('error') - return result.result + return result.result() def test_workflow_failed_from_definition(): @@ -771,7 +771,7 @@ class TestDefinitionActivityRaisesOnFailure(OnFailureMixin, TestWorkflow): """ def run(self): - return self.submit(raise_on_failure).result + return self.submit(raise_on_failure).result() def test_workflow_activity_raises_on_failure(): @@ -807,7 +807,7 @@ def test_workflow_activity_raises_on_failure(): class TestOnFailureDefinition(OnFailureMixin, TestWorkflow): def run(self): - if self.submit(raise_error).exception: + if self.submit(raise_error).exception(): self.fail('FAIL') @@ -846,7 +846,7 @@ def run(self): b = self.submit(increment, 2) c = self.submit(double, b) - return [a.result, b.result, c.result] + return [a.result(), b.result(), c.result()] def test_multiple_scheduled_activities(): From 8fcb0e4eb9684dc14db48ec1dd586342fb3d5731 Mon Sep 17 00:00:00 2001 From: Han JU Date: Mon, 28 Jul 2014 18:59:38 +0200 Subject: [PATCH 7/9] Update setup.py: add python's futures in install_requires --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index 28429ab58..605640c18 100755 --- a/setup.py +++ b/setup.py @@ -82,6 +82,7 @@ def read(fname): include_package_data=True, install_requires=[ 'simple-workflow>=0.1.42', + 'futures' ], license=read("LICENSE"), zip_safe=False, From 5c5569c536be52eb4b0de6f3151f33a76fcbbedc Mon Sep 17 00:00:00 2001 From: Han JU Date: Mon, 4 Aug 2014 12:00:57 +0200 Subject: [PATCH 8/9] Refactor Future abstraction: define an abstract future and make executor specific future impls in there package --- simpleflow/executor.py | 2 +- simpleflow/futures.py | 56 +++++++++++++++------ simpleflow/local/futures.py | 21 ++++++++ simpleflow/swf/futures.py | 98 ++++++++++++++++++++++++++++++------- 4 files changed, 141 insertions(+), 36 deletions(-) create mode 100644 simpleflow/local/futures.py diff --git a/simpleflow/executor.py b/simpleflow/executor.py index 27c71381e..7a4908d19 100644 --- a/simpleflow/executor.py +++ b/simpleflow/executor.py @@ -14,7 +14,7 @@ def get_actual_value(value): """Unwrap the result of a Future or return the value. """ - if isinstance(value, futures.Future): + if isinstance(value, futures.AbstractFuture): return value.result() return value diff --git a/simpleflow/futures.py b/simpleflow/futures.py index 3a2d55905..3bb109625 100644 --- a/simpleflow/futures.py +++ b/simpleflow/futures.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -from concurrent import futures as py_futures +import abc from concurrent.futures._base import ( PENDING, RUNNING, @@ -10,7 +10,7 @@ ) -__all__ = ['Future', 'wait'] +__all__ = ['AbstractFuture', 'wait'] FIRST_COMPLETED = 'FIRST_COMPLETED' @@ -27,14 +27,6 @@ FINISHED ] -_STATE_TO_DESCRIPTION_MAP = { - PENDING: "pending", - RUNNING: "running", - CANCELLED: "cancelled", - CANCELLED_AND_NOTIFIED: "cancelled", - FINISHED: "finished" -} - def wait(*fs): """Returns a list of the results of futures if there are available. @@ -42,11 +34,43 @@ def wait(*fs): return [future.result() for future in fs] -class Future(py_futures.Future): - """Patched version of ``concurrent.futures.Future`` +class AbstractFuture(object): + """Base future class that defines an interface for concrete impls """ + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def result(self): + """Return the result of the underlying computation + + The actual behavior (blocking etc.) depends on impl + """ + raise NotImplementedError + + @abc.abstractmethod + def exception(self): + """Return the exception raised (if any) by the underlying computation + + The actual behavior (blocking etc.) depends on impl + """ + raise NotImplementedError + + @abc.abstractmethod + def running(self): + """Return True if the underlying computation is currently executing + """ + raise NotImplementedError + + @abc.abstractmethod def finished(self): - with self._condition: - if self._state == FINISHED: - return True - return False \ No newline at end of file + """Return True if the underlying computation has finished + """ + raise NotImplementedError + + @abc.abstractmethod + def done(self): + """Return True if the underlying compuation is cancelled or + has finished + """ + raise NotImplementedError \ No newline at end of file diff --git a/simpleflow/local/futures.py b/simpleflow/local/futures.py new file mode 100644 index 000000000..4c44be80b --- /dev/null +++ b/simpleflow/local/futures.py @@ -0,0 +1,21 @@ +from simpleflow.futures import AbstractFuture +from concurrent.futures import Future as PythonFuture + + +class Future(PythonFuture, AbstractFuture): + """Future impl for local execution + + The `concurrent.futures.Future` from python is itself a concrete impl + rather than an abstract class or an interface. So a multiple inheritance + is used to work with our interface `simpleflow.futures.AbstractFuture`. + + This class inherits both the abstracts methods from `AbstractFuture` and + concrete methods from `concurrent.futures.Future`. The `AbstractFuture` + interface is designed so that python's Future 'implements' its methods. + Since python's Future is placed left to the `AbstractFuture` in inheritance + declaration, runtime method resolution will pick the concrete methods. + + This Future class is interoperable with python's builtin executors. + """ + def finished(self): + return self.done() \ No newline at end of file diff --git a/simpleflow/swf/futures.py b/simpleflow/swf/futures.py index 16f2f33ed..0969119d4 100644 --- a/simpleflow/swf/futures.py +++ b/simpleflow/swf/futures.py @@ -4,33 +4,93 @@ CANCELLED, PENDING, RUNNING, - Future as BaseFuture + CANCELLED_AND_NOTIFIED, + AbstractFuture ) -class Future(BaseFuture): - """Future with special Simple Workflow semantics +_STATE_TO_DESCRIPTION_MAP = { + PENDING: "pending", + RUNNING: "running", + CANCELLED: "cancelled", + CANCELLED_AND_NOTIFIED: "cancelled", + FINISHED: "finished" +} + + +class Future(AbstractFuture): + """Future impl that contains Simple Workflow specific logic """ + + def __init__(self): + """Represents the state of a computation. + + It tries to mimics mod::`concurrent.futures` but involved some + adaptations to fit the Amazon SWF model. + + """ + self._state = PENDING + self._result = None + self._exception = None + + def __repr__(self): + return '' % ( + hex(id(self)), + _STATE_TO_DESCRIPTION_MAP[self._state]) + @classmethod def wait(cls): raise exceptions.ExecutionBlocked - def result(self, timeout=None): - with self._condition: - if self._state != FINISHED: - return self.wait() - # TODO what happen if cancelled ??? - return self._result + def result(self): + """Raise a cls::`exceptions.ExecutionBlocked` when the result is not + available.""" + if self._state != FINISHED: + return self.wait() - def exception(self, timeout=None): - with self._condition: - if self._state != FINISHED: - return self.wait() - return self._exception + return self._result def cancel(self): - with self._condition: - if self._state == FINISHED: - return False - self._state = CANCELLED - return True \ No newline at end of file + """Cancel a future. + + Note: cannot cancel a future that is already finished. + It will not raise an exception but return ``False`` to notify it. + + """ + if self._state == FINISHED: + return False + + self._state = CANCELLED + return True + + def state(self): + return self._state + + def exception(self): + """ + Returns `None` if no exception occurred, otherwise, returns the + exception object that what raised by the task. + + Raise a cls::`exceptions.ExecutionBlocked` when the result is not + available. + + """ + if self._state != FINISHED: + return self.wait() + + return self._exception + + def cancelled(self): + return self._state == CANCELLED + + def running(self): + return self._state == RUNNING + + def finished(self): + return self._state == FINISHED + + def done(self): + return self._state in [ + CANCELLED, + FINISHED + ] \ No newline at end of file From 9a1e16275a169a520e6215e1d4a0cf9323053b46 Mon Sep 17 00:00:00 2001 From: Han JU Date: Mon, 4 Aug 2014 12:52:17 +0200 Subject: [PATCH 9/9] Update tests: fix tests to cope with Future impl changes --- tests/futures/test_futures.py | 2 +- tests/test_dataflow.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/futures/test_futures.py b/tests/futures/test_futures.py index d1a7b7a42..b202744a2 100644 --- a/tests/futures/test_futures.py +++ b/tests/futures/test_futures.py @@ -3,7 +3,7 @@ from simpleflow import futures -from simpleflow.futures import Future +from simpleflow.swf.futures import Future def test_future_init_state(): diff --git a/tests/test_dataflow.py b/tests/test_dataflow.py index ee62b7a50..2e6e8a5da 100644 --- a/tests/test_dataflow.py +++ b/tests/test_dataflow.py @@ -118,7 +118,7 @@ class TestDefinition(TestWorkflow): """ def run(self): a = self.submit(increment, 1) - assert isinstance(a, futures.Future) + assert isinstance(a, futures.AbstractFuture) b = self.submit(double, a)