|
24 | 24 | from timeit import default_timer |
25 | 25 | from typing import Any, AsyncGenerator, Optional |
26 | 26 |
|
| 27 | +from neo4j_graphrag.utils.logging import prettify |
| 28 | + |
27 | 29 | try: |
28 | 30 | import pygraphviz as pgv |
29 | 31 | except ImportError: |
@@ -90,21 +92,21 @@ async def execute(self, **kwargs: Any) -> RunResult | None: |
90 | 92 | if the task run successfully, None if the status update |
91 | 93 | was unsuccessful. |
92 | 94 | """ |
93 | | - logger.debug(f"Running component {self.name} with {kwargs}") |
94 | | - start_time = default_timer() |
95 | 95 | component_result = await self.component.run(**kwargs) |
96 | 96 | run_result = RunResult( |
97 | 97 | result=component_result, |
98 | 98 | ) |
99 | | - end_time = default_timer() |
100 | | - logger.debug(f"Component {self.name} finished in {end_time - start_time}s") |
101 | 99 | return run_result |
102 | 100 |
|
103 | 101 | async def run(self, inputs: dict[str, Any]) -> RunResult | None: |
104 | 102 | """Main method to execute the task.""" |
105 | | - logger.debug(f"TASK START {self.name=} {inputs=}") |
| 103 | + logger.debug(f"TASK START {self.name=} input={prettify(inputs)}") |
| 104 | + start_time = default_timer() |
106 | 105 | res = await self.execute(**inputs) |
107 | | - logger.debug(f"TASK RESULT {self.name=} {res=}") |
| 106 | + end_time = default_timer() |
| 107 | + logger.debug( |
| 108 | + f"TASK FINISHED {self.name} in {end_time - start_time} res={prettify(res)}" |
| 109 | + ) |
108 | 110 | return res |
109 | 111 |
|
110 | 112 |
|
@@ -141,7 +143,9 @@ async def run_task(self, task: TaskPipelineNode, data: dict[str, Any]) -> None: |
141 | 143 | try: |
142 | 144 | await self.set_task_status(task.name, RunStatus.RUNNING) |
143 | 145 | except PipelineStatusUpdateError: |
144 | | - logger.info(f"Component {task.name} already running or done") |
| 146 | + logger.debug( |
| 147 | + f"ORCHESTRATOR: TASK ABORTED: {task.name} is already running or done, aborting" |
| 148 | + ) |
145 | 149 | return None |
146 | 150 | res = await task.run(inputs) |
147 | 151 | await self.set_task_status(task.name, RunStatus.DONE) |
@@ -198,7 +202,8 @@ async def check_dependencies_complete(self, task: TaskPipelineNode) -> None: |
198 | 202 | d_status = await self.get_status_for_component(d.start) |
199 | 203 | if d_status != RunStatus.DONE: |
200 | 204 | logger.debug( |
201 | | - f"Missing dependency {d.start} for {task.name} (status: {d_status}). " |
| 205 | + f"ORCHESTRATOR {self.run_id}: TASK DELAYED: Missing dependency {d.start} for {task.name} " |
| 206 | + f"(status: {d_status}). " |
202 | 207 | "Will try again when dependency is complete." |
203 | 208 | ) |
204 | 209 | raise PipelineMissingDependencyError() |
@@ -227,6 +232,9 @@ async def next( |
227 | 232 | await self.check_dependencies_complete(next_node) |
228 | 233 | except PipelineMissingDependencyError: |
229 | 234 | continue |
| 235 | + logger.debug( |
| 236 | + f"ORCHESTRATOR {self.run_id}: enqueuing next task: {next_node.name}" |
| 237 | + ) |
230 | 238 | yield next_node |
231 | 239 | return |
232 | 240 |
|
@@ -315,7 +323,6 @@ async def run(self, data: dict[str, Any]) -> None: |
315 | 323 | (node without any parent). Then the callback on_task_complete |
316 | 324 | will handle the task dependencies. |
317 | 325 | """ |
318 | | - logger.debug(f"PIPELINE START {data=}") |
319 | 326 | tasks = [self.run_task(root, data) for root in self.pipeline.roots()] |
320 | 327 | await asyncio.gather(*tasks) |
321 | 328 |
|
@@ -624,15 +631,16 @@ def validate_parameter_mapping_for_task(self, task: TaskPipelineNode) -> bool: |
624 | 631 | return True |
625 | 632 |
|
626 | 633 | async def run(self, data: dict[str, Any]) -> PipelineResult: |
627 | | - logger.debug("Starting pipeline") |
| 634 | + logger.debug("PIPELINE START") |
628 | 635 | start_time = default_timer() |
629 | 636 | self.invalidate() |
630 | 637 | self.validate_input_data(data) |
631 | 638 | orchestrator = Orchestrator(self) |
| 639 | + logger.debug(f"PIPELINE ORCHESTRATOR: {orchestrator.run_id}") |
632 | 640 | await orchestrator.run(data) |
633 | 641 | end_time = default_timer() |
634 | 642 | logger.debug( |
635 | | - f"Pipeline {orchestrator.run_id} finished in {end_time - start_time}s" |
| 643 | + f"PIPELINE FINISHED {orchestrator.run_id} in {end_time - start_time}s" |
636 | 644 | ) |
637 | 645 | return PipelineResult( |
638 | 646 | run_id=orchestrator.run_id, |
|
0 commit comments