From 94dfd216093fd45e0b72c2f126d11c5520fee16f Mon Sep 17 00:00:00 2001 From: Han JU Date: Thu, 17 Jul 2014 15:23:09 +0200 Subject: [PATCH 1/5] add an adaptor between et simpleflow's future --- simpleflow/local_async/__init__.py | 1 + simpleflow/local_async/executor.py | 127 +++++++++++++++++++++++++++++ 2 files changed, 128 insertions(+) create mode 100644 simpleflow/local_async/__init__.py create mode 100644 simpleflow/local_async/executor.py diff --git a/simpleflow/local_async/__init__.py b/simpleflow/local_async/__init__.py new file mode 100644 index 000000000..55f9031da --- /dev/null +++ b/simpleflow/local_async/__init__.py @@ -0,0 +1 @@ +from .executor import Executor \ No newline at end of file diff --git a/simpleflow/local_async/executor.py b/simpleflow/local_async/executor.py new file mode 100644 index 000000000..2e20de704 --- /dev/null +++ b/simpleflow/local_async/executor.py @@ -0,0 +1,127 @@ +import logging + +import multiprocessing +from simpleflow import ( + executor, + futures +) +from concurrent import futures as py_futures + + +logger = logging.getLogger(__name__) + + +class AdaptorFuture(futures.Future): + """A wrapped future object that fills (some of) the semantic gap between + `simpleflow.futures.Future` and `concurrent.futures.Future` + """ + def __init__(self, py_future): + super(AdaptorFuture, self).__init__() + self.py_future = py_future + + # TODO make this method in base class call self.state() + # def __repr__(self): + # return '' % ( + # hex(id(self)), + # _STATE_TO_DESCRIPTION_MAP[self._state]) + + @property + def result(self): + if not self.py_future.running(): + return self.wait() + + return self._result + + def cancel(self): + raise NotImplementedError() + + @property + def state(self): + if self.py_future.running(): + return futures.RUNNING + if self.py_future.done(): + return futures.FINISHED + + return futures.PENDING + + @property + def exception(self): + """ + Returns `None` if no exception occurred, otherwise, returns the + exception object that what raised by the task. + + Raise a cls::`exceptions.ExecutionBlocked` when the result is not + available. + + """ + if not self.py_future.done(): + return self.wait() + + return self.py_future.exception() + + @property + def cancelled(self): + # not supported + return False + + @property + def running(self): + return self.py_future.running() + + @property + def finished(self): + # without cancellation `finish` has the same semantic as `done` + return self.done() + + @property + def done(self): + return self.py_future.done() + + +class Executor(executor.Executor): + def __init__(self, workflow): + super(Executor, self).__init__(workflow) + # the real executor that does all the stuff + self._executor = py_futures.ThreadPoolExecutor( + multiprocessing.cpu_count()) + + def submit(self, func, *args, **kwargs): + logger.info('executing task {}(args={}, kwargs={})'.format( + func, args, kwargs)) + args = [executor.get_actual_value(arg) for arg in args] + kwargs = {key: executor.get_actual_value(val) for + key, val in kwargs.iteritems()} + + py_future = self._executor.submit(func, args, kwargs) + + # use the adaptor to wrap `concurrent.futures.Future` + return AdaptorFuture(py_future) + + def run(self, input=None): + if input is None: + input = {} + args = input.get('args', ()) + kwargs = input.get('kwargs', {}) + + return self.run_workflow(*args, **kwargs) + + +if __name__ == '__main__': + from simpleflow import activity, Workflow + + @activity.with_attributes(task_list='quickstart') + def increment(x): + return x + 1 + + @activity.with_attributes(task_list='quickstart') + def double(x): + return x * 2 + + class SimpleComputation(Workflow): + def run(self, x): + y = self.submit(increment, x) + z = self.submit(double, y) + return z.result + + result = Executor(SimpleComputation).run({"args": [5], "kwargs": {}}) + print result \ No newline at end of file From 6a3327a773cfa4958159b0c0545c9912d6f320c5 Mon Sep 17 00:00:00 2001 From: Han JU Date: Thu, 17 Jul 2014 15:53:53 +0200 Subject: [PATCH 2/5] AdaptorFuture will block if the task is not completed yet --- simpleflow/local_async/executor.py | 32 +++++++++++++----------------- 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/simpleflow/local_async/executor.py b/simpleflow/local_async/executor.py index 2e20de704..eb801f29d 100644 --- a/simpleflow/local_async/executor.py +++ b/simpleflow/local_async/executor.py @@ -27,10 +27,8 @@ def __init__(self, py_future): @property def result(self): - if not self.py_future.running(): - return self.wait() - - return self._result + # will block if the task is not completed yet + return self.py_future.result() def cancel(self): raise NotImplementedError() @@ -46,17 +44,6 @@ def state(self): @property def exception(self): - """ - Returns `None` if no exception occurred, otherwise, returns the - exception object that what raised by the task. - - Raise a cls::`exceptions.ExecutionBlocked` when the result is not - available. - - """ - if not self.py_future.done(): - return self.wait() - return self.py_future.exception() @property @@ -78,21 +65,30 @@ def done(self): return self.py_future.done() +def _get_actual_value(value): + if isinstance(value, AdaptorFuture): + return value.result + return value + + class Executor(executor.Executor): def __init__(self, workflow): super(Executor, self).__init__(workflow) # the real executor that does all the stuff + # FIXME cannot use ProcessPoolExecutor, error like: + # PicklingError: Can't pickle : + # attribute lookup __builtin__.function failed self._executor = py_futures.ThreadPoolExecutor( multiprocessing.cpu_count()) def submit(self, func, *args, **kwargs): logger.info('executing task {}(args={}, kwargs={})'.format( func, args, kwargs)) - args = [executor.get_actual_value(arg) for arg in args] - kwargs = {key: executor.get_actual_value(val) for + args = [_get_actual_value(arg) for arg in args] + kwargs = {key: _get_actual_value(val) for key, val in kwargs.iteritems()} - py_future = self._executor.submit(func, args, kwargs) + py_future = self._executor.submit(func._callable, *args, **kwargs) # use the adaptor to wrap `concurrent.futures.Future` return AdaptorFuture(py_future) From f001573ff797ff209ac4964e086adcfac9808b89 Mon Sep 17 00:00:00 2001 From: Han JU Date: Thu, 17 Jul 2014 15:56:43 +0200 Subject: [PATCH 3/5] simple async execution example --- simpleflow/local_async/executor.py | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/simpleflow/local_async/executor.py b/simpleflow/local_async/executor.py index eb801f29d..4c71d3569 100644 --- a/simpleflow/local_async/executor.py +++ b/simpleflow/local_async/executor.py @@ -104,20 +104,41 @@ def run(self, input=None): if __name__ == '__main__': from simpleflow import activity, Workflow + import time + + @activity.with_attributes(task_list='quickstart') + def side_affect(): + time.sleep(10) + print 'hey!' @activity.with_attributes(task_list='quickstart') def increment(x): + time.sleep(5) return x + 1 @activity.with_attributes(task_list='quickstart') def double(x): + time.sleep(5) return x * 2 class SimpleComputation(Workflow): def run(self, x): + self.submit(side_affect) y = self.submit(increment, x) z = self.submit(double, y) return z.result + before = time.time() result = Executor(SimpleComputation).run({"args": [5], "kwargs": {}}) - print result \ No newline at end of file + after = time.time() + + # Output with: + # >>> 12 + # >>> used 10.0062558651 seconds ... + # >>> hey! + + # => async execution + + print result + print 'used {} seconds ...'.format(after - before) + From 0a17ba1975d7cbc1e26754317cb1e839f8777801 Mon Sep 17 00:00:00 2001 From: Han JU Date: Mon, 21 Jul 2014 17:34:13 +0200 Subject: [PATCH 4/5] remove raise in futures.wait --- simpleflow/futures.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/simpleflow/futures.py b/simpleflow/futures.py index 39d785a00..63238da79 100644 --- a/simpleflow/futures.py +++ b/simpleflow/futures.py @@ -48,9 +48,6 @@ def wait(*fs): Raises a ``exceptions.ExecutionBlocked`` otherwise. """ - if any(future.state == PENDING for future in fs): - raise exceptions.ExecutionBlocked() - return [future.result for future in fs] From 992b0c23f2b70eebe68caf68c8c45fb974ef66a0 Mon Sep 17 00:00:00 2001 From: Han JU Date: Mon, 21 Jul 2014 17:35:44 +0200 Subject: [PATCH 5/5] fix bug caused by property decorator --- simpleflow/local_async/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/simpleflow/local_async/executor.py b/simpleflow/local_async/executor.py index 4c71d3569..1c264d6f0 100644 --- a/simpleflow/local_async/executor.py +++ b/simpleflow/local_async/executor.py @@ -58,7 +58,7 @@ def running(self): @property def finished(self): # without cancellation `finish` has the same semantic as `done` - return self.done() + return self.done @property def done(self):