66import logging
77import re
88import urlparse
9- from schema_salad .ref_resolver import Loader
9+
10+ from schema_salad .ref_resolver import Loader , Fetcher , DefaultFetcher
1011import schema_salad .validate as validate
1112from schema_salad .validate import ValidationException
1213import schema_salad .schema as schema
14+ import requests
15+
16+ from typing import Any , AnyStr , Callable , cast , Dict , Text , Tuple , Union
17+
1318from avro .schema import Names
19+
1420from . import update
1521from . import process
1622from .process import Process , shortname
1723from .errors import WorkflowException
18- from typing import Any , AnyStr , Callable , cast , Dict , Text , Tuple , Union
1924
2025_logger = logging .getLogger ("cwltool" )
2126
22- def fetch_document (argsworkflow , resolver = None ):
23- # type: (Union[Text, dict[Text, Any]], Any) -> Tuple[Loader, Dict[Text, Any], Text]
27+ def fetch_document (argsworkflow , # type: Union[Text, dict[Text, Any]]
28+ resolver = None , # type: Callable[[Loader, Union[Text, dict[Text, Any]]], Text]
29+ fetcher_constructor = DefaultFetcher # type: Callable[[Dict[unicode, unicode], requests.sessions.Session], Fetcher]
30+ ):
31+ # type: (...) -> Tuple[Loader, Dict[Text, Any], Text]
2432 """Retrieve a CWL document."""
25- document_loader = Loader ({"cwl" : "https://w3id.org/cwl/cwl#" , "id" : "@id" })
33+
34+ document_loader = Loader ({"cwl" : "https://w3id.org/cwl/cwl#" , "id" : "@id" },
35+ fetcher_constructor = fetcher_constructor )
2636
2737 uri = None # type: Text
2838 workflowobj = None # type: Dict[Text, Any]
@@ -95,16 +105,23 @@ def _convert_stdstreams_to_files(workflowobj):
95105 for entry in workflowobj :
96106 _convert_stdstreams_to_files (entry )
97107
98- def validate_document (document_loader , workflowobj , uri ,
99- enable_dev = False , strict = True , preprocess_only = False ):
100- # type: (Loader, Dict[Text, Any], Text, bool, bool, bool) -> Tuple[Loader, Names, Union[Dict[Text, Any], List[Dict[Text, Any]]], Dict[Text, Any], Text]
108+ def validate_document (document_loader , # type: Loader
109+ workflowobj , # type: Dict[Text, Any]
110+ uri , # type: Text
111+ enable_dev = False , # type: bool
112+ strict = True , # type: bool
113+ preprocess_only = False , # type: bool
114+ fetcher_constructor = DefaultFetcher # type: Callable[[Dict[unicode, unicode], requests.sessions.Session], Fetcher]
115+ ):
116+ # type: (...) -> Tuple[Loader, Names, Union[Dict[Text, Any], List[Dict[Text, Any]]], Dict[Text, Any], Text]
101117 """Validate a CWL document."""
118+
102119 jobobj = None
103120 if "cwl:tool" in workflowobj :
104121 jobobj , _ = document_loader .resolve_all (workflowobj , uri )
105122 uri = urlparse .urljoin (uri , workflowobj ["https://w3id.org/cwl/cwl#tool" ])
106123 del cast (dict , jobobj )["https://w3id.org/cwl/cwl#tool" ]
107- workflowobj = fetch_document (uri )[1 ]
124+ workflowobj = fetch_document (uri , fetcher_constructor = fetcher_constructor )[1 ]
108125
109126 if isinstance (workflowobj , list ):
110127 workflowobj = {
@@ -130,12 +147,16 @@ def validate_document(document_loader, workflowobj, uri,
130147 workflowobj ["$graph" ] = workflowobj ["@graph" ]
131148 del workflowobj ["@graph" ]
132149
133- (document_loader , avsc_names ) = \
150+ (sch_document_loader , avsc_names ) = \
134151 process .get_schema (workflowobj ["cwlVersion" ])[:2 ]
135152
136153 if isinstance (avsc_names , Exception ):
137154 raise avsc_names
138155
156+ document_loader = Loader (sch_document_loader .ctx , schemagraph = sch_document_loader .graph ,
157+ idx = document_loader .idx , cache = sch_document_loader .cache ,
158+ fetcher_constructor = fetcher_constructor )
159+
139160 workflowobj ["id" ] = fileuri
140161 processobj , metadata = document_loader .resolve_all (workflowobj , fileuri )
141162 if not isinstance (processobj , (dict , list )):
@@ -165,8 +186,14 @@ def validate_document(document_loader, workflowobj, uri,
165186 return document_loader , avsc_names , processobj , metadata , uri
166187
167188
168- def make_tool (document_loader , avsc_names , metadata , uri , makeTool , kwargs ):
169- # type: (Loader, Names, Dict[Text, Any], Text, Callable[..., Process], Dict[AnyStr, Any]) -> Process
189+ def make_tool (document_loader , # type: Loader
190+ avsc_names , # type: Names
191+ metadata , # type: Dict[Text, Any]
192+ uri , # type: Text
193+ makeTool , # type: Callable[..., Process]
194+ kwargs # type: dict
195+ ):
196+ # type: (...) -> Process
170197 """Make a Python CWL object."""
171198 resolveduri = document_loader .resolve_ref (uri )[0 ]
172199
@@ -179,8 +206,10 @@ def make_tool(document_loader, avsc_names, metadata, uri, makeTool, kwargs):
179206 "one of #%s" % ", #" .join (
180207 urlparse .urldefrag (i ["id" ])[1 ] for i in resolveduri
181208 if "id" in i ))
182- else :
209+ elif isinstance ( resolveduri , dict ) :
183210 processobj = resolveduri
211+ else :
212+ raise Exception ("Must resolve to list or dict" )
184213
185214 kwargs = kwargs .copy ()
186215 kwargs .update ({
@@ -200,14 +229,19 @@ def make_tool(document_loader, avsc_names, metadata, uri, makeTool, kwargs):
200229 return tool
201230
202231
203- def load_tool (argsworkflow , makeTool , kwargs = None ,
204- enable_dev = False ,
205- strict = True ,
206- resolver = None ):
207- # type: (Union[Text, dict[Text, Any]], Callable[...,Process], Dict[AnyStr, Any], bool, bool, Any) -> Any
208- document_loader , workflowobj , uri = fetch_document (argsworkflow , resolver = resolver )
232+ def load_tool (argsworkflow , # type: Union[Text, Dict[Text, Any]]
233+ makeTool , # type: Callable[..., Process]
234+ kwargs = None , # type: dict
235+ enable_dev = False , # type: bool
236+ strict = True , # type: bool
237+ resolver = None , # type: Callable[[Loader, Union[Text, dict[Text, Any]]], Text]
238+ fetcher_constructor = DefaultFetcher # type: Callable[[Dict[unicode, unicode], requests.sessions.Session], Fetcher]
239+ ):
240+ # type: (...) -> Process
241+
242+ document_loader , workflowobj , uri = fetch_document (argsworkflow , resolver = resolver , fetcher_constructor = fetcher_constructor )
209243 document_loader , avsc_names , processobj , metadata , uri = validate_document (
210244 document_loader , workflowobj , uri , enable_dev = enable_dev ,
211- strict = strict )
245+ strict = strict , fetcher_constructor = fetcher_constructor )
212246 return make_tool (document_loader , avsc_names , metadata , uri ,
213247 makeTool , kwargs if kwargs else {})
0 commit comments