|
51 | 51 | ) |
52 | 52 | from .writablebagfile import create_job, write_bag_file # change this later |
53 | 53 |
|
| 54 | +# from schema_salad.utils import convert_to_dict |
| 55 | + |
| 56 | + |
54 | 57 | if TYPE_CHECKING: |
55 | 58 | from .ro import ResearchObject |
56 | 59 |
|
| 60 | +_attributes_type = Dict[str | Identifier, Any] |
| 61 | + |
57 | 62 |
|
58 | 63 | def copy_job_order(job: Union[Process, JobsType], job_order_object: CWLObjectType) -> CWLObjectType: |
59 | 64 | """Create copy of job object for provenance.""" |
@@ -235,13 +240,13 @@ def evaluate( |
235 | 240 | """Evaluate the nature of job.""" |
236 | 241 | if not hasattr(process, "steps"): |
237 | 242 | # record provenance of independent commandline tool executions |
238 | | - self.prospective_prov(job) |
| 243 | + self.prospective_prov(job, process) |
239 | 244 | customised_job = copy_job_order(job, job_order_object) |
240 | 245 | self.used_artefacts(customised_job, self.workflow_run_uri) |
241 | 246 | create_job(research_obj, customised_job) |
242 | 247 | elif hasattr(job, "workflow"): |
243 | 248 | # record provenance of workflow executions |
244 | | - self.prospective_prov(job) |
| 249 | + self.prospective_prov(job, process) |
245 | 250 | customised_job = copy_job_order(job, job_order_object) |
246 | 251 | self.used_artefacts(customised_job, self.workflow_run_uri) |
247 | 252 | # if CWLPROV['prov'].uri in job_order_object: # maybe move this to another place |
@@ -734,35 +739,38 @@ def generate_output_prov( |
734 | 739 | entity, process_run_id, timestamp, None, {"prov:role": role} |
735 | 740 | ) |
736 | 741 |
|
737 | | - def prospective_prov(self, job: JobsType) -> None: |
| 742 | + def prospective_prov(self, job: JobsType, process: Process) -> None: |
738 | 743 | """Create prospective prov recording as wfdesc prov:Plan.""" |
| 744 | + prov_items: _attributes_type = { |
| 745 | + PROV_TYPE: WFDESC["Workflow"] if isinstance(job, WorkflowJob) else WFDESC["Process"], |
| 746 | + "prov:type": PROV["Plan"], |
| 747 | + "prov:label": "Prospective provenance", |
| 748 | + } |
| 749 | + if "doc" in process.tool: |
| 750 | + prov_items["schema:description"] = process.tool["doc"] |
| 751 | + if "label" in process.tool: |
| 752 | + prov_items["schema:name"] = process.tool["label"] |
| 753 | + # # TypeError: unhashable type: 'list' |
| 754 | + # if "intent" in process.tool: |
| 755 | + # prov_items["schema:featureList"] = convert_to_dict(process.tool["intent"]) |
| 756 | + self.document.entity("wf:main", prov_items) |
739 | 757 | if not isinstance(job, WorkflowJob): |
740 | | - # direct command line tool execution |
741 | | - self.document.entity( |
742 | | - "wf:main", |
743 | | - { |
744 | | - PROV_TYPE: WFDESC["Process"], |
745 | | - "prov:type": PROV["Plan"], |
746 | | - "prov:label": "Prospective provenance", |
747 | | - }, |
748 | | - ) |
749 | 758 | return |
750 | 759 |
|
751 | | - self.document.entity( |
752 | | - "wf:main", |
753 | | - { |
754 | | - PROV_TYPE: WFDESC["Workflow"], |
755 | | - "prov:type": PROV["Plan"], |
756 | | - "prov:label": "Prospective provenance", |
757 | | - }, |
758 | | - ) |
759 | | - |
760 | 760 | for step in job.steps: |
761 | 761 | stepnametemp = "wf:main/" + str(step.name)[5:] |
762 | 762 | stepname = urllib.parse.quote(stepnametemp, safe=":/,#") |
| 763 | + provstep_items: _attributes_type = { |
| 764 | + PROV_TYPE: WFDESC["Process"], |
| 765 | + "prov:type": PROV["Plan"], |
| 766 | + } |
| 767 | + if "doc" in step.tool: |
| 768 | + provstep_items["schema:description"] = step.tool["doc"] |
| 769 | + if "label" in step.tool: |
| 770 | + provstep_items["schema:name"] = step.tool["label"] |
763 | 771 | provstep = self.document.entity( |
764 | 772 | stepname, |
765 | | - {PROV_TYPE: WFDESC["Process"], "prov:type": PROV["Plan"]}, |
| 773 | + provstep_items, |
766 | 774 | ) |
767 | 775 | self.document.entity( |
768 | 776 | "wf:main", |
|
0 commit comments