|
1 | 1 | """Module with the logic to create and manage traces and steps.""" |
2 | 2 |
|
3 | | -import time |
4 | 3 | import asyncio |
| 4 | +import contextvars |
5 | 5 | import inspect |
6 | 6 | import logging |
7 | | -import contextvars |
8 | | -from typing import Any, Dict, List, Tuple, Optional, Awaitable, Generator |
9 | | -from functools import wraps |
| 7 | +import time |
| 8 | +import traceback |
10 | 9 | from contextlib import contextmanager |
| 10 | +from functools import wraps |
| 11 | +from typing import Any, Awaitable, Dict, Generator, List, Optional, Tuple |
11 | 12 |
|
12 | | -from . import enums, steps, traces |
13 | | -from .. import utils |
14 | | -from ..._client import Openlayer |
15 | 13 | from ..._base_client import DefaultHttpxClient |
| 14 | +from ..._client import Openlayer |
16 | 15 | from ...types.inference_pipelines.data_stream_params import ConfigLlmData |
| 16 | +from .. import utils |
| 17 | +from . import enums, steps, traces |
17 | 18 |
|
18 | 19 | logger = logging.getLogger(__name__) |
19 | 20 |
|
@@ -251,12 +252,14 @@ async def __anext__(self): |
251 | 252 | # Initialize tracing on first iteration only |
252 | 253 | if not self._trace_initialized: |
253 | 254 | self._original_gen = func(*func_args, **func_kwargs) |
254 | | - self._step, self._is_root_step, self._token = _create_and_initialize_step( |
255 | | - step_name=step_name, |
256 | | - step_type=enums.StepType.USER_CALL, |
257 | | - inputs=None, |
258 | | - output=None, |
259 | | - metadata=None, |
| 255 | + self._step, self._is_root_step, self._token = ( |
| 256 | + _create_and_initialize_step( |
| 257 | + step_name=step_name, |
| 258 | + step_type=enums.StepType.USER_CALL, |
| 259 | + inputs=None, |
| 260 | + output=None, |
| 261 | + metadata=None, |
| 262 | + ) |
260 | 263 | ) |
261 | 264 | self._inputs = _extract_function_inputs( |
262 | 265 | func_signature=func_signature, |
@@ -466,16 +469,25 @@ def _handle_trace_completion( |
466 | 469 | ) |
467 | 470 | if _publish: |
468 | 471 | try: |
| 472 | + inference_pipeline_id = inference_pipeline_id or utils.get_env_variable( |
| 473 | + "OPENLAYER_INFERENCE_PIPELINE_ID" |
| 474 | + ) |
469 | 475 | client = _get_client() |
470 | 476 | if client: |
471 | 477 | client.inference_pipelines.data.stream( |
472 | | - inference_pipeline_id=inference_pipeline_id |
473 | | - or utils.get_env_variable("OPENLAYER_INFERENCE_PIPELINE_ID"), |
| 478 | + inference_pipeline_id=inference_pipeline_id, |
474 | 479 | rows=[trace_data], |
475 | 480 | config=config, |
476 | 481 | ) |
477 | 482 | except Exception as err: # pylint: disable=broad-except |
478 | | - logger.error("Could not stream data to Openlayer %s", err) |
| 483 | + logger.error(traceback.format_exc()) |
| 484 | + logger.error( |
| 485 | + "Could not stream data to Openlayer (pipeline_id: %s, base_url: %s)" |
| 486 | + " Error: %s", |
| 487 | + inference_pipeline_id, |
| 488 | + client.base_url, |
| 489 | + err, |
| 490 | + ) |
479 | 491 | else: |
480 | 492 | logger.debug("Ending step %s", step_name) |
481 | 493 |
|
@@ -557,7 +569,6 @@ def _finalize_step_logging( |
557 | 569 | # ----------------------------- Async generator specific functions ----------------------------- # |
558 | 570 |
|
559 | 571 |
|
560 | | - |
561 | 572 | def _finalize_async_generator_step( |
562 | 573 | step: steps.Step, |
563 | 574 | token: Any, |
|
0 commit comments