1212import tempfile
1313import threading
1414from functools import cmp_to_key , partial
15- from typing import (Any , Callable , Dict , Generator , List , Mapping , MutableMapping ,
16- MutableSequence , Optional , Set , Union , cast )
15+ from typing import (Any , Callable , Dict , Generator , IO , List , Mapping ,
16+ MutableMapping , MutableSequence , Optional , Set , Union , cast )
1717
1818from typing_extensions import Text , Type , TYPE_CHECKING # pylint: disable=unused-import
1919# move to a regular typing import when Python 3.3-3.6 is no longer supported
@@ -83,8 +83,8 @@ def __init__(self,
8383 builder , # type: Builder
8484 script , # type: Dict[Text, Text]
8585 output_callback , # type: Callable[[Any, Any], Any]
86- requirements , # type: Dict[Text, Text]
87- hints , # type: Dict[Text, Text]
86+ requirements , # type: List[ Dict[Text, Text] ]
87+ hints , # type: List[ Dict[Text, Text] ]
8888 outdir = None , # type: Optional[Text]
8989 tmpdir = None , # type: Optional[Text]
9090 ): # type: (...) -> None
@@ -101,7 +101,7 @@ def __init__(self,
101101
102102 def run (self ,
103103 runtimeContext , # type: RuntimeContext
104- tmpdir_lock = None # type: threading.Lock
104+ tmpdir_lock = None # type: Optional[ threading.Lock]
105105 ): # type: (...) -> None
106106 try :
107107 normalizeFilesDirs (self .builder .job )
@@ -194,8 +194,10 @@ def __init__(self, job, output_callback, cachebuilder, jobcache):
194194 self .outdir = jobcache
195195 self .prov_obj = None # type: Optional[ProvenanceProfile]
196196
197- def run (self , runtimeContext ):
198- # type: (RuntimeContext) -> None
197+ def run (self ,
198+ runtimeContext , # type: RuntimeContext
199+ tmpdir_lock = None # type: Optional[threading.Lock]
200+ ): # type: (...) -> None
199201 self .output_callback (self .job .collect_output_ports (
200202 self .job .tool ["outputs" ],
201203 self .cachebuilder ,
@@ -232,7 +234,7 @@ def check_adjust(builder, file_o):
232234 file_o ["basename" ]))
233235 return file_o
234236
235- def check_valid_locations (fs_access , ob ):
237+ def check_valid_locations (fs_access , ob ): # type: (StdFsAccess, Dict[Text, Any]) -> None
236238 if ob ["location" ].startswith ("_:" ):
237239 pass
238240 if ob ["class" ] == "File" and not fs_access .isfile (ob ["location" ]):
@@ -285,7 +287,7 @@ def make_path_mapper(self, reffiles, stagedir, runtimeContext, separateDirs):
285287 return PathMapper (reffiles , runtimeContext .basedir , stagedir , separateDirs )
286288
287289 def updatePathmap (self , outdir , pathmap , fn ):
288- # type: (Text, PathMapper, Dict) -> None
290+ # type: (Text, PathMapper, Dict[Text, Any] ) -> None
289291 if "location" in fn and fn ["location" ] in pathmap :
290292 pathmap .update (fn ["location" ], pathmap .mapper (fn ["location" ]).resolved ,
291293 os .path .join (outdir , fn ["basename" ]),
@@ -298,7 +300,7 @@ def updatePathmap(self, outdir, pathmap, fn):
298300 def job (self ,
299301 job_order , # type: Mapping[Text, Text]
300302 output_callbacks , # type: Callable[[Any, Any], Any]
301- runtimeContext # RuntimeContext
303+ runtimeContext # type: RuntimeContext
302304 ):
303305 # type: (...) -> Generator[Union[JobBase, CallbackJob], None, None]
304306
@@ -332,9 +334,9 @@ def job(self,
332334 if dockerimg is not None :
333335 cmdline = ["docker" , "run" , dockerimg ] + cmdline
334336 # not really run using docker, just for hashing purposes
335- keydict = {u"cmdline" : cmdline }
337+ keydict = {u"cmdline" : cmdline } # type: Dict[Text, Union[Dict[Text, Any], List[Any]]]
336338
337- for shortcut in ["stdout " , "stderr" ]: # later, add "stdin"
339+ for shortcut in ["stdin " , "stdout" , "stderr" ]:
338340 if shortcut in self .tool :
339341 keydict [shortcut ] = self .tool [shortcut ]
340342
@@ -409,8 +411,12 @@ def job(self,
409411 runtimeContext = runtimeContext .copy ()
410412 runtimeContext .outdir = jobcache
411413
412- def update_status_output_callback (output_callbacks , jobcachelock ,
413- outputs , processStatus ):
414+ def update_status_output_callback (
415+ output_callbacks , # type: Callable[[List[Dict[Text, Any]], Text], None]
416+ jobcachelock , # type: IO[Any]
417+ outputs , # type: List[Dict[Text, Any]]
418+ processStatus # type: Text
419+ ): # type: (...) -> None
414420 # save status to the lockfile then release the lock
415421 jobcachelock .seek (0 )
416422 jobcachelock .truncate ()
@@ -556,13 +562,15 @@ def update_status_output_callback(output_callbacks, jobcachelock,
556562 muts = set () # type: Set[Text]
557563
558564 if builder .mutation_manager is not None :
559- def register_mut (f ):
565+ def register_mut (f ): # type: (Dict[Text, Any]) -> None
566+ mm = cast (MutationManager , builder .mutation_manager )
560567 muts .add (f ["location" ])
561- builder . mutation_manager .register_mutation (j .name , f )
568+ mm .register_mutation (j .name , f )
562569
563- def register_reader (f ):
570+ def register_reader (f ): # type: (Dict[Text, Any]) -> None
571+ mm = cast (MutationManager , builder .mutation_manager )
564572 if f ["location" ] not in muts :
565- builder . mutation_manager .register_reader (j .name , f )
573+ mm .register_reader (j .name , f )
566574 readers [f ["location" ]] = copy .deepcopy (f )
567575
568576 for li in j .generatefiles ["listing" ]:
@@ -628,7 +636,7 @@ def collect_output_ports(self,
628636 rcode , # type: int
629637 compute_checksum = True , # type: bool
630638 jobname = "" , # type: Text
631- readers = None # type: Dict[Text, Any]
639+ readers = None # type: Optional[ Dict[Text, Any] ]
632640 ): # type: (...) -> OutputPorts
633641 ret = {} # type: OutputPorts
634642 debug = _logger .isEnabledFor (logging .DEBUG )
@@ -647,11 +655,12 @@ def collect_output_ports(self,
647655 json_dumps (ret , indent = 4 ))
648656 else :
649657 for i , port in enumerate (ports ):
650- def makeWorkflowException (msg ):
651- return WorkflowException (
652- u"Error collecting output for parameter '%s':\n %s"
653- % (shortname (port ["id" ]), msg ))
654- with SourceLine (ports , i , makeWorkflowException , debug ):
658+ class ParameterOutputWorkflowException (WorkflowException ):
659+ def __init__ (self , msg , ** kwargs ): # type: (Text, **Any) -> None
660+ super (ParameterOutputWorkflowException , self ).__init__ (
661+ u"Error collecting output for parameter '%s':\n %s"
662+ % (shortname (port ["id" ]), msg ), kwargs )
663+ with SourceLine (ports , i , ParameterOutputWorkflowException , debug ):
655664 fragment = shortname (port ["id" ])
656665 ret [fragment ] = self .collect_output (port , builder , outdir , fs_access ,
657666 compute_checksum = compute_checksum )
0 commit comments