@@ -73,6 +73,7 @@ def __init__(
7373 fname = 'unknown' ,
7474 targets = None ,
7575 owner = False ,
76+ return_exceptions = False ,
7677 ):
7778 super (AsyncResult , self ).__init__ ()
7879 if not isinstance (children , list ):
@@ -81,6 +82,8 @@ def __init__(
8182 else :
8283 self ._single_result = False
8384
85+ self ._return_exceptions = return_exceptions
86+
8487 if isinstance (children [0 ], string_types ):
8588 self .msg_ids = children
8689 self ._children = []
@@ -204,22 +207,35 @@ def _reconstruct_result(self, res):
204207 else :
205208 return res
206209
207- def get (self , timeout = - 1 ):
210+ def get (self , timeout = None , return_exceptions = None ):
208211 """Return the result when it arrives.
209212
210- If `timeout` is not ``None`` and the result does not arrive within
211- `timeout` seconds then ``TimeoutError`` is raised. If the
212- remote call raised an exception then that exception will be reraised
213- by get() inside a `RemoteError`.
213+ Arguments:
214+
215+ timeout : int [default None]
216+ If `timeout` is not ``None`` and the result does not arrive within
217+ `timeout` seconds then ``TimeoutError`` is raised. If the
218+ remote call raised an exception then that exception will be reraised
219+ by get() inside a `RemoteError`.
220+ return_exceptions : bool [default False]
221+ If True, return Exceptions instead of raising them.
214222 """
215223 if not self .ready ():
216224 self .wait (timeout )
217225
226+ if return_exceptions is None :
227+ # default to attribute, if AsyncResult was created with return_exceptions=True
228+ return_exceptions = self ._return_exceptions
229+
218230 if self ._ready :
219231 if self ._success :
220232 return self .result ()
221233 else :
222- raise self .exception ()
234+ e = self .exception ()
235+ if return_exceptions :
236+ return self ._reconstruct_result (self ._raw_results )
237+ else :
238+ raise e
223239 else :
224240 raise error .TimeoutError ("Result not ready." )
225241
@@ -270,22 +286,37 @@ def wait(self, timeout=-1):
270286 def _resolve_result (self , f = None ):
271287 if self .done ():
272288 return
289+ if f :
290+ results = f .result ()
291+ else :
292+ results = list (map (self ._client .results .get , self .msg_ids ))
293+
294+ # store raw results
295+ self ._raw_results = results
296+
273297 try :
274- if f :
275- results = f .result ()
276- else :
277- results = list (map (self ._client .results .get , self .msg_ids ))
278298 if self ._single_result :
279299 r = results [0 ]
280300 if isinstance (r , Exception ):
281301 raise r
282302 else :
283- results = error .collect_exceptions (results , self ._fname )
284- self ._success = True
285- self .set_result (self ._reconstruct_result (results ))
303+ results = self ._collect_exceptions (results )
286304 except Exception as e :
287305 self ._success = False
288306 self .set_exception (e )
307+ else :
308+ self ._success = True
309+ self .set_result (self ._reconstruct_result (results ))
310+
311+ def _collect_exceptions (self , results ):
312+ """Wrap Exceptions in a CompositeError
313+
314+ if self._return_exceptions is True, this is a no-op
315+ """
316+ if self ._return_exceptions :
317+ return results
318+ else :
319+ return error .collect_exceptions (results , self ._fname )
289320
290321 def _finalize_result (self , f ):
291322 if self .owner :
@@ -424,10 +455,10 @@ def __getitem__(self, key):
424455 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str."""
425456 if isinstance (key , int ):
426457 self ._check_ready ()
427- return error . collect_exceptions ([self .result ()[key ]], self . _fname )[0 ]
458+ return self . _collect_exceptions ([self .result ()[key ]])[0 ]
428459 elif isinstance (key , slice ):
429460 self ._check_ready ()
430- return error . collect_exceptions (self .result ()[key ], self . _fname )
461+ return self . _collect_exceptions (self .result ()[key ])
431462 elif isinstance (key , string_types ):
432463 # metadata proxy *does not* require that results are done
433464 self .wait (0 )
@@ -473,7 +504,7 @@ def __iter__(self):
473504 for child in self ._children :
474505 self ._wait_for_child (child , evt = evt )
475506 result = child .result ()
476- error . collect_exceptions ([result ], self . _fname )
507+ self . _collect_exceptions ([result ])
477508 yield result
478509 else :
479510 # already done
@@ -583,15 +614,15 @@ def wait_interactive(self, interval=0.1, timeout=-1, widget=None):
583614 Override default context-detection behavior for whether a widget-based progress bar
584615 should be used.
585616 """
586- if timeout is None :
587- timeout = - 1
617+ if timeout and timeout < 0 :
618+ timeout = None
588619 N = len (self )
589620 tic = time .perf_counter ()
590621 progress_bar = progress (widget = widget , total = N , unit = 'tasks' , desc = self ._fname )
591622
592623 n_prev = 0
593624 while not self .ready () and (
594- timeout < 0 or time .perf_counter () - tic <= timeout
625+ timeout is None or time .perf_counter () - tic <= timeout
595626 ):
596627 self .wait (interval )
597628 progress_bar .update (self .progress - n_prev )
@@ -751,25 +782,50 @@ def display_outputs(self, groupby="type", result_only=False):
751782
752783
753784class AsyncMapResult (AsyncResult ):
754- """Class for representing results of non-blocking gathers .
785+ """Class for representing results of non-blocking maps .
755786
756- This will properly reconstruct the gather .
787+ AsyncMapResult.get() will properly reconstruct gathers into single object .
757788
758- This class is iterable at any time, and will wait on results as they come.
789+ AsyncMapResult is iterable at any time, and will wait on results as they come.
759790
760791 If ordered=False, then the first results to arrive will come first, otherwise
761792 results will be yielded in the order they were submitted.
762-
763793 """
764794
765- def __init__ (self , client , children , mapObject , fname = '' , ordered = True ):
795+ def __init__ (
796+ self ,
797+ client ,
798+ children ,
799+ mapObject ,
800+ fname = '' ,
801+ ordered = True ,
802+ return_exceptions = False ,
803+ ):
766804 self ._mapObject = mapObject
767805 self .ordered = ordered
768- AsyncResult .__init__ (self , client , children , fname = fname )
806+ AsyncResult .__init__ (
807+ self ,
808+ client ,
809+ children ,
810+ fname = fname ,
811+ return_exceptions = return_exceptions ,
812+ )
769813 self ._single_result = False
770814
771815 def _reconstruct_result (self , res ):
772816 """Perform the gather on the actual results."""
817+ if self ._return_exceptions :
818+ if any (isinstance (r , Exception ) for r in res ):
819+ # running with _return_exceptions,
820+ # cannot reconstruct original
821+ # use simple chain iterable
822+ flattened = []
823+ for r in res :
824+ if isinstance (r , Exception ):
825+ flattened .append (r )
826+ else :
827+ flattened .extend (r )
828+ return flattened
773829 return self ._mapObject .joinPartitions (res )
774830
775831 # asynchronous iterator:
@@ -786,7 +842,7 @@ def _yield_child_results(self, child):
786842 rlist = child .result ()
787843 if not isinstance (rlist , list ):
788844 rlist = [rlist ]
789- error . collect_exceptions (rlist , self . _fname )
845+ self . _collect_exceptions (rlist )
790846 for r in rlist :
791847 yield r
792848
@@ -841,6 +897,8 @@ def _init_futures(self):
841897 def wait (self , timeout = - 1 ):
842898 """wait for result to complete."""
843899 start = time .time ()
900+ if timeout and timeout < 0 :
901+ timeout = None
844902 if self ._ready :
845903 return
846904 local_ids = [m for m in self .msg_ids if m in self ._client .outstanding ]
@@ -852,7 +910,7 @@ def wait(self, timeout=-1):
852910 else :
853911 rdict = self ._client .result_status (remote_ids , status_only = False )
854912 pending = rdict ['pending' ]
855- while pending and (timeout < 0 or time .time () < start + timeout ):
913+ while pending and (timeout is None or time .time () < start + timeout ):
856914 rdict = self ._client .result_status (remote_ids , status_only = False )
857915 pending = rdict ['pending' ]
858916 if pending :
@@ -865,11 +923,10 @@ def wait(self, timeout=-1):
865923 results = list (map (self ._client .results .get , self .msg_ids ))
866924 if self ._single_result :
867925 r = results [0 ]
868- if isinstance (r , Exception ):
926+ if isinstance (r , Exception ) and not self . _return_exceptions :
869927 raise r
870- self .set_result (r )
871928 else :
872- results = error . collect_exceptions (results , self . _fname )
929+ results = self . _collect_exceptions (results )
873930 self ._success = True
874931 self .set_result (self ._reconstruct_result (results ))
875932 except Exception as e :
0 commit comments