@@ -141,8 +141,8 @@ def _rec_fields(rec): # type: (Dict[Text, Any]) -> Dict[Text, Any]
141141 return False
142142 return True
143143
144- def object_from_state (state , parms , frag_only , supportsMultipleInput , sourceField ):
145- # type: (Dict[Text, WorkflowStateItem], List[Dict[Text, Any]], bool, bool, Text) -> Dict[Text, Any]
144+ def object_from_state (state , parms , frag_only , supportsMultipleInput , sourceField , incomplete = False ):
145+ # type: (Dict[Text, WorkflowStateItem], List[Dict[Text, Any]], bool, bool, Text, bool ) -> Dict[Text, Any]
146146 inputobj = {} # type: Dict[Text, Any]
147147 for inp in parms :
148148 iid = inp ["id" ]
@@ -172,7 +172,7 @@ def object_from_state(state, parms, frag_only, supportsMultipleInput, sourceFiel
172172 raise WorkflowException (
173173 u"Connect source '%s' on parameter '%s' does not "
174174 "exist" % (src , inp ["id" ]))
175- else :
175+ elif not incomplete :
176176 return None
177177 elif "default" in inp :
178178 inputobj [iid ] = inp ["default" ]
@@ -225,12 +225,13 @@ def __init__(self, workflow, **kwargs):
225225
226226 def receive_output (self , step , outputparms , jobout , processStatus ):
227227 # type: (WorkflowJobStep, List[Dict[Text,Text]], Dict[Text,Text], Text) -> None
228+
228229 for i in outputparms :
229230 if "id" in i :
230231 if i ["id" ] in jobout :
231232 self .state [i ["id" ]] = WorkflowStateItem (i , jobout [i ["id" ]])
232233 else :
233- _logger .error (u"Output is missing expected field %s" % i ["id" ])
234+ _logger .error (u"[%s] Output is missing expected field %s" , step . name , i ["id" ])
234235 processStatus = "permanentFail"
235236
236237 if _logger .isEnabledFor (logging .DEBUG ):
@@ -240,9 +241,9 @@ def receive_output(self, step, outputparms, jobout, processStatus):
240241 if self .processStatus != "permanentFail" :
241242 self .processStatus = processStatus
242243
243- _logger .warn (u"[%s] completion status is %s" , step .name , processStatus )
244+ _logger .warn (u"[%s] completed %s" , step .name , processStatus )
244245 else :
245- _logger .info (u"[%s] completion status is %s" , step .name , processStatus )
246+ _logger .info (u"[%s] completed %s" , step .name , processStatus )
246247
247248 step .completed = True
248249
@@ -363,37 +364,52 @@ def job(self, joborder, output_callback, **kwargs):
363364 self .state [out ["id" ]] = None
364365
365366 completed = 0
366- while completed < len (self .steps ) and self . processStatus == "success" :
367+ while completed < len (self .steps ):
367368 made_progress = False
368369
369370 for step in self .steps :
370371 if kwargs .get ("on_error" , "stop" ) == "stop" and self .processStatus != "success" :
371372 break
372373
373374 if not step .submitted :
374- step .iterable = self .try_make_job (step , ** kwargs )
375+ try :
376+ step .iterable = self .try_make_job (step , ** kwargs )
377+ except WorkflowException as e :
378+ _logger .error (u"[%s] Cannot make job: %s" , step .name , e )
379+ _logger .debug ("" , exc_info = True )
380+ self .processStatus = "permanentFail"
375381
376382 if step .iterable :
377- for newjob in step .iterable :
378- if kwargs .get ("on_error" , "stop" ) == "stop" and self .processStatus != "success" :
379- break
380- if newjob :
381- made_progress = True
382- yield newjob
383- else :
384- break
383+ try :
384+ for newjob in step .iterable :
385+ if kwargs .get ("on_error" , "stop" ) == "stop" and self .processStatus != "success" :
386+ break
387+ if newjob :
388+ made_progress = True
389+ yield newjob
390+ else :
391+ break
392+ except WorkflowException as e :
393+ _logger .error (u"[%s] Cannot make job: %s" , step .name , e )
394+ _logger .debug ("" , exc_info = True )
395+ self .processStatus = "permanentFail"
385396
386397 completed = sum (1 for s in self .steps if s .completed )
387398
388399 if not made_progress and completed < len (self .steps ):
389- yield None
400+ if self .processStatus != "success" :
401+ break
402+ else :
403+ yield None
390404
391405 supportsMultipleInput = bool (self .workflow .get_requirement ("MultipleInputFeatureRequirement" )[0 ])
392406
393- wo = object_from_state (self .state , self .tool ["outputs" ], True , supportsMultipleInput , "outputSource" )
394-
395- if wo is None :
396- raise WorkflowException ("Output for workflow not available" )
407+ try :
408+ wo = object_from_state (self .state , self .tool ["outputs" ], True , supportsMultipleInput , "outputSource" , incomplete = True )
409+ except WorkflowException as e :
410+ _logger .error (u"[%s] Cannot collect workflow output: %s" , self .name , e )
411+ wo = {}
412+ self .processStatus = "permanentFail"
397413
398414 _logger .info (u"[%s] outdir is %s" , self .name , self .outdir )
399415
@@ -591,17 +607,23 @@ def setTotal(self, total): # type: (int) -> None
591607def parallel_steps (steps , rc , kwargs ): # type: (List[Generator], ReceiveScatterOutput, Dict[str, Any]) -> Generator
592608 while rc .completed < rc .total :
593609 made_progress = False
594- for step in steps :
610+ for index in xrange (len (steps )):
611+ step = steps [index ]
595612 if kwargs .get ("on_error" , "stop" ) == "stop" and rc .processStatus != "success" :
596613 break
597- for j in step :
598- if kwargs .get ("on_error" , "stop" ) == "stop" and rc .processStatus != "success" :
599- break
600- if j :
601- made_progress = True
602- yield j
603- else :
604- break
614+ try :
615+ for j in step :
616+ if kwargs .get ("on_error" , "stop" ) == "stop" and rc .processStatus != "success" :
617+ break
618+ if j :
619+ made_progress = True
620+ yield j
621+ else :
622+ break
623+ except WorkflowException as e :
624+ _logger .error (u"Cannot make scatter job: %s" , e )
625+ _logger .debug ("" , exc_info = True )
626+ rc .receive_scatter_output (index , {}, "permanentFail" )
605627 if not made_progress and rc .completed < rc .total :
606628 yield None
607629
0 commit comments