Skip to content
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def read(fname):
include_package_data=True,
install_requires=[
'simple-workflow>=0.1.42',
'futures'
],
license=read("LICENSE"),
zip_safe=False,
Expand Down
2 changes: 1 addition & 1 deletion simpleflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-

__version__ = '0.3.0'
__version__ = '0.3.2'
__author__ = 'Greg Leclercq'
__license__ = "MIT"

Expand Down
4 changes: 2 additions & 2 deletions simpleflow/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ def get_actual_value(value):
"""Unwrap the result of a Future or return the value.

"""
if isinstance(value, futures.Future):
return futures.get_result_or_raise(value)
if isinstance(value, futures.AbstractFuture):
return value.result()
return value


Expand Down
129 changes: 35 additions & 94 deletions simpleflow/futures.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
# -*- coding: utf-8 -*-

from . import exceptions
import abc
from concurrent.futures._base import (
PENDING,
RUNNING,
CANCELLED,
CANCELLED_AND_NOTIFIED,
FINISHED
)


__all__ = ['Future', 'get_result_or_raise', 'wait']
__all__ = ['AbstractFuture', 'wait']


FIRST_COMPLETED = 'FIRST_COMPLETED'
FIRST_EXCEPTION = 'FIRST_EXCEPTION'
ALL_COMPLETED = 'ALL_COMPLETED'
_AS_COMPLETED = '_AS_COMPLETED'

PENDING = 'PENDING'
RUNNING = 'RUNNING'
CANCELLED = 'CANCELLED'
CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
FINISHED = 'FINISHED'

_FUTURE_STATES = [
PENDING,
Expand All @@ -25,111 +27,50 @@
FINISHED
]

_STATE_TO_DESCRIPTION_MAP = {
PENDING: "pending",
RUNNING: "running",
CANCELLED: "cancelled",
CANCELLED_AND_NOTIFIED: "cancelled",
FINISHED: "finished"
}


def get_result_or_raise(future):
"""Returns the ``result`` of *future* if it is available, otherwise
raise."""
if future.state == PENDING:
raise exceptions.ExecutionBlocked()
return future.result


def wait(*fs):
"""Returns a list of the results of futures if there are available.

Raises a ``exceptions.ExecutionBlocked`` otherwise.

"""
if any(future.state == PENDING for future in fs):
raise exceptions.ExecutionBlocked()

return [future.result for future in fs]


class Future(object):
def __init__(self):
"""Represents the state of a computation.

It tries to mimics mod::`concurrent.futures` but involved some
adaptations to fit the Amazon SWF model.
return [future.result() for future in fs]

"""
self._state = PENDING
self._result = None
self._exception = None

def __repr__(self):
return '<Future at %s state=%s>' % (
hex(id(self)),
_STATE_TO_DESCRIPTION_MAP[self._state])
class AbstractFuture(object):
"""Base future class that defines an interface for concrete impls
"""

def wait(self):
raise exceptions.ExecutionBlocked
__metaclass__ = abc.ABCMeta

@property
@abc.abstractmethod
def result(self):
"""Raise a cls::`exceptions.ExecutionBlocked` when the result is not
available."""
if self._state != FINISHED:
return self.wait()

return self._result

def cancel(self):
"""Cancel a future.

Note: cannot cancel a future that is already finished.
It will not raise an exception but return ``False`` to notify it.
"""Return the result of the underlying computation

The actual behavior (blocking etc.) depends on impl
"""
if self._state == FINISHED:
return False

self._state = CANCELLED
return True

@property
def state(self):
return self._state
raise NotImplementedError

@property
@abc.abstractmethod
def exception(self):
"""
Returns `None` if no exception occurred, otherwise, returns the
exception object that what raised by the task.

Raise a cls::`exceptions.ExecutionBlocked` when the result is not
available.
"""Return the exception raised (if any) by the underlying computation

The actual behavior (blocking etc.) depends on impl
"""
if self._state != FINISHED:
return self.wait()

return self._exception

@property
def cancelled(self):
return self._state == CANCELLED
raise NotImplementedError

@property
@abc.abstractmethod
def running(self):
return self._state == RUNNING
"""Return True if the underlying computation is currently executing
"""
raise NotImplementedError

@property
@abc.abstractmethod
def finished(self):
return self._state == FINISHED
"""Return True if the underlying computation has finished
"""
raise NotImplementedError

@property
@abc.abstractmethod
def done(self):
return self._state in [
CANCELLED,
FINISHED
]
"""Return True if the underlying compuation is cancelled or
has finished
"""
raise NotImplementedError
21 changes: 21 additions & 0 deletions simpleflow/local/futures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from simpleflow.futures import AbstractFuture
from concurrent.futures import Future as PythonFuture


class Future(PythonFuture, AbstractFuture):
"""Future impl for local execution

The `concurrent.futures.Future` from python is itself a concrete impl
rather than an abstract class or an interface. So a multiple inheritance
is used to work with our interface `simpleflow.futures.AbstractFuture`.

This class inherits both the abstracts methods from `AbstractFuture` and
concrete methods from `concurrent.futures.Future`. The `AbstractFuture`
interface is designed so that python's Future 'implements' its methods.
Since python's Future is placed left to the `AbstractFuture` in inheritance
declaration, runtime method resolution will pick the concrete methods.

This Future class is interoperable with python's builtin executors.
"""
def finished(self):
return self.done()
24 changes: 14 additions & 10 deletions simpleflow/swf/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,21 @@
import json
import logging

import swf.format
import swf.models
import swf.models.decision
import swf.exceptions

from simpleflow import (
executor,
futures,
exceptions,
constants,
)
from simpleflow.activity import Activity
from simpleflow.workflow import Workflow
from simpleflow.history import History
from simpleflow.swf.task import ActivityTask, WorkflowTask
from . import futures

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -180,15 +181,15 @@ def resume_activity(self, task, event):
if not future: # Task in history does not count.
return None

if not future.finished: # Still pending or running...
if not future.finished(): # Still pending or running...
return future

if future.exception is None: # Result available!
if future.exception() is None: # Result available!
return future

if event.get('retry', 0) == task.activity.retry: # No more retry!
if task.activity.raises_on_failure:
raise exceptions.TaskException(task, future.exception)
raise exceptions.TaskException(task, future.exception())
return future # with future.exception set.

# Otherwise retry the task by scheduling it again.
Expand Down Expand Up @@ -306,8 +307,9 @@ def replay(self, history):

decision = swf.models.decision.WorkflowExecutionDecision()
decision.fail(
reason=reason,
details=details)
reason=swf.format.reason(reason),
details=swf.format.details(details),
)
return [decision], {}

except Exception, err:
Expand All @@ -319,12 +321,12 @@ def replay(self, history):
self.on_failure(reason)

decision = swf.models.decision.WorkflowExecutionDecision()
decision.fail(reason=reason)
decision.fail(reason=swf.format.reason(reason))

return [decision], {}

decision = swf.models.decision.WorkflowExecutionDecision()
decision.complete(result=json.dumps(result))
decision.complete(result=swf.format.result(json.dumps(result)))

return [decision], {}

Expand All @@ -339,8 +341,10 @@ def fail(self, reason, details=None):

decision = swf.models.decision.WorkflowExecutionDecision()
decision.fail(
reason='Workflow execution failed: {}'.format(reason),
details=details)
reason=swf.format.reason(
'Workflow execution failed: {}'.format(reason)),
details=swf.format.details(details),
)

self._decisions.append(decision)
raise exceptions.ExecutionBlocked('workflow execution failed')
Expand Down
96 changes: 96 additions & 0 deletions simpleflow/swf/futures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from simpleflow import exceptions
from simpleflow.futures import (
FINISHED,
CANCELLED,
PENDING,
RUNNING,
CANCELLED_AND_NOTIFIED,
AbstractFuture
)


_STATE_TO_DESCRIPTION_MAP = {
PENDING: "pending",
RUNNING: "running",
CANCELLED: "cancelled",
CANCELLED_AND_NOTIFIED: "cancelled",
FINISHED: "finished"
}


class Future(AbstractFuture):
"""Future impl that contains Simple Workflow specific logic
"""

def __init__(self):
"""Represents the state of a computation.

It tries to mimics mod::`concurrent.futures` but involved some
adaptations to fit the Amazon SWF model.

"""
self._state = PENDING
self._result = None
self._exception = None

def __repr__(self):
return '<Future at %s state=%s>' % (
hex(id(self)),
_STATE_TO_DESCRIPTION_MAP[self._state])

@classmethod
def wait(cls):
raise exceptions.ExecutionBlocked

def result(self):
"""Raise a cls::`exceptions.ExecutionBlocked` when the result is not
available."""
if self._state != FINISHED:
return self.wait()

return self._result

def cancel(self):
"""Cancel a future.

Note: cannot cancel a future that is already finished.
It will not raise an exception but return ``False`` to notify it.

"""
if self._state == FINISHED:
return False

self._state = CANCELLED
return True

def state(self):
return self._state

def exception(self):
"""
Returns `None` if no exception occurred, otherwise, returns the
exception object that what raised by the task.

Raise a cls::`exceptions.ExecutionBlocked` when the result is not
available.

"""
if self._state != FINISHED:
return self.wait()

return self._exception

def cancelled(self):
return self._state == CANCELLED

def running(self):
return self._state == RUNNING

def finished(self):
return self._state == FINISHED

def done(self):
return self._state in [
CANCELLED,
FINISHED
]
Loading