Skip to content

Commit bada5eb

Browse files
viniciusdsmellogustavocidornelas
authored andcommitted
refactor(tracer): streamline code formatting and improve readability
1 parent 243a7f9 commit bada5eb

File tree

1 file changed

+51
-92
lines changed

1 file changed

+51
-92
lines changed

src/openlayer/lib/tracing/tracer.py

Lines changed: 51 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,16 @@
2222
TRUE_LIST = ["true", "on", "1"]
2323

2424
_publish = utils.get_env_variable("OPENLAYER_DISABLE_PUBLISH") not in TRUE_LIST
25-
_verify_ssl = (
26-
utils.get_env_variable("OPENLAYER_VERIFY_SSL") or "true"
27-
).lower() in TRUE_LIST
25+
_verify_ssl = (utils.get_env_variable("OPENLAYER_VERIFY_SSL") or "true").lower() in TRUE_LIST
2826
_client = None
2927

28+
3029
def _get_client() -> Optional[Openlayer]:
3130
"""Get or create the Openlayer client with lazy initialization."""
3231
global _client
3332
if not _publish:
3433
return None
35-
34+
3635
if _client is None:
3736
# Lazy initialization - create client when first needed
3837
if _verify_ssl:
@@ -45,12 +44,14 @@ def _get_client() -> Optional[Openlayer]:
4544
)
4645
return _client
4746

47+
4848
_current_step = contextvars.ContextVar("current_step")
4949
_current_trace = contextvars.ContextVar("current_trace")
5050
_rag_context = contextvars.ContextVar("rag_context")
5151

5252
# ----------------------------- Public API functions ----------------------------- #
5353

54+
5455
def get_current_trace() -> Optional[traces.Trace]:
5556
"""Returns the current trace."""
5657
return _current_trace.get(None)
@@ -77,11 +78,7 @@ def create_step(
7778
) -> Generator[steps.Step, None, None]:
7879
"""Starts a trace and yields a Step object."""
7980
new_step, is_root_step, token = _create_and_initialize_step(
80-
step_name=name,
81-
step_type=step_type,
82-
inputs=inputs,
83-
output=output,
84-
metadata=metadata
81+
step_name=name, step_type=step_type, inputs=inputs, output=output, metadata=metadata
8582
)
8683
try:
8784
yield new_step
@@ -93,11 +90,7 @@ def create_step(
9390
new_step.latency = latency
9491

9592
_current_step.reset(token)
96-
_handle_trace_completion(
97-
is_root_step=is_root_step,
98-
step_name=name,
99-
inference_pipeline_id=inference_pipeline_id
100-
)
93+
_handle_trace_completion(is_root_step=is_root_step, step_name=name, inference_pipeline_id=inference_pipeline_id)
10194

10295

10396
def add_chat_completion_step_to_trace(**kwargs) -> None:
@@ -158,25 +151,23 @@ def decorator(func):
158151
def wrapper(*func_args, **func_kwargs):
159152
if step_kwargs.get("name") is None:
160153
step_kwargs["name"] = func.__name__
161-
162-
with create_step(
163-
*step_args, inference_pipeline_id=inference_pipeline_id, **step_kwargs
164-
) as step:
154+
155+
with create_step(*step_args, inference_pipeline_id=inference_pipeline_id, **step_kwargs) as step:
165156
output = exception = None
166157
try:
167158
output = func(*func_args, **func_kwargs)
168159
except Exception as exc:
169160
_log_step_exception(step, exc)
170161
exception = exc
171-
162+
172163
# Extract inputs and finalize logging using optimized helper
173164
_process_wrapper_inputs_and_outputs(
174165
step=step,
175166
func_signature=func_signature,
176167
func_args=func_args,
177168
func_kwargs=func_kwargs,
178169
context_kwarg=context_kwarg,
179-
output=output
170+
output=output,
180171
)
181172

182173
if exception is not None:
@@ -220,7 +211,7 @@ def trace_async(
220211

221212
def decorator(func):
222213
func_signature = inspect.signature(func)
223-
214+
224215
if step_kwargs.get("name") is None:
225216
step_kwargs["name"] = func.__name__
226217
step_name = step_kwargs["name"]
@@ -240,27 +231,25 @@ def __init__(self):
240231
self._token = None
241232
self._output_chunks = []
242233
self._trace_initialized = False
243-
234+
244235
def __aiter__(self):
245236
return self
246-
237+
247238
async def __anext__(self):
248239
# Initialize tracing on first iteration only
249240
if not self._trace_initialized:
250241
self._original_gen = func(*func_args, **func_kwargs)
251242
self._step, self._is_root_step, self._token = _create_step_for_async_generator(
252-
step_name=step_name,
253-
inference_pipeline_id=inference_pipeline_id,
254-
**step_kwargs
243+
step_name=step_name, inference_pipeline_id=inference_pipeline_id, **step_kwargs
255244
)
256245
self._inputs = _extract_function_inputs(
257246
func_signature=func_signature,
258247
func_args=func_args,
259248
func_kwargs=func_kwargs,
260-
context_kwarg=context_kwarg
249+
context_kwarg=context_kwarg,
261250
)
262251
self._trace_initialized = True
263-
252+
264253
try:
265254
chunk = await self._original_gen.__anext__()
266255
self._output_chunks.append(chunk)
@@ -275,7 +264,7 @@ async def __anext__(self):
275264
step_name=step_name,
276265
inputs=self._inputs,
277266
output=output,
278-
inference_pipeline_id=inference_pipeline_id
267+
inference_pipeline_id=inference_pipeline_id,
279268
)
280269
raise
281270
except Exception as exc:
@@ -290,64 +279,60 @@ async def __anext__(self):
290279
step_name=step_name,
291280
inputs=self._inputs,
292281
output=output,
293-
inference_pipeline_id=inference_pipeline_id
282+
inference_pipeline_id=inference_pipeline_id,
294283
)
295284
raise
296-
285+
297286
return TracedAsyncGenerator()
298-
287+
299288
return async_generator_wrapper
300289
else:
301290
# Create wrapper for regular async functions
302291
@wraps(func)
303292
async def async_function_wrapper(*func_args, **func_kwargs):
304-
with create_step(
305-
*step_args, inference_pipeline_id=inference_pipeline_id, **step_kwargs
306-
) as step:
293+
with create_step(*step_args, inference_pipeline_id=inference_pipeline_id, **step_kwargs) as step:
307294
output = exception = None
308-
295+
309296
try:
310297
output = await func(*func_args, **func_kwargs)
311298
except Exception as exc:
312299
_log_step_exception(step, exc)
313300
exception = exc
314301
raise
315-
302+
316303
# Extract inputs and finalize logging
317304
_process_wrapper_inputs_and_outputs(
318305
step=step,
319306
func_signature=func_signature,
320307
func_args=func_args,
321308
func_kwargs=func_kwargs,
322309
context_kwarg=context_kwarg,
323-
output=output
310+
output=output,
324311
)
325-
312+
326313
return output
327-
314+
328315
return async_function_wrapper
329316
else:
330317
# For sync functions, use the existing logic with optimizations
331318
@wraps(func)
332319
def sync_wrapper(*func_args, **func_kwargs):
333-
with create_step(
334-
*step_args, inference_pipeline_id=inference_pipeline_id, **step_kwargs
335-
) as step:
320+
with create_step(*step_args, inference_pipeline_id=inference_pipeline_id, **step_kwargs) as step:
336321
output = exception = None
337322
try:
338323
output = func(*func_args, **func_kwargs)
339324
except Exception as exc:
340325
_log_step_exception(step, exc)
341326
exception = exc
342-
327+
343328
# Extract inputs and finalize logging
344329
_process_wrapper_inputs_and_outputs(
345330
step=step,
346331
func_signature=func_signature,
347332
func_args=func_args,
348333
func_kwargs=func_kwargs,
349334
context_kwarg=context_kwarg,
350-
output=output
335+
output=output,
351336
)
352337

353338
if exception is not None:
@@ -381,8 +366,10 @@ def run_async_func(coroutine: Awaitable[Any]) -> Any:
381366
key.set(value)
382367
return result
383368

369+
384370
# ----------------------------- Helper functions for create_step ----------------------------- #
385371

372+
386373
def _create_and_initialize_step(
387374
step_name: str,
388375
step_type: enums.StepType = enums.StepType.USER_CALL,
@@ -391,17 +378,11 @@ def _create_and_initialize_step(
391378
metadata: Optional[Dict[str, Any]] = None,
392379
) -> Tuple[steps.Step, bool, Any]:
393380
"""Create a new step and initialize trace/parent relationships.
394-
381+
395382
Returns:
396383
Tuple of (step, is_root_step, token)
397384
"""
398-
new_step = steps.step_factory(
399-
step_type=step_type,
400-
name=step_name,
401-
inputs=inputs,
402-
output=output,
403-
metadata=metadata
404-
)
385+
new_step = steps.step_factory(step_type=step_type, name=step_name, inputs=inputs, output=output, metadata=metadata)
405386
new_step.start_time = time.time()
406387

407388
parent_step = get_current_step()
@@ -422,11 +403,7 @@ def _create_and_initialize_step(
422403
return new_step, is_root_step, token
423404

424405

425-
def _handle_trace_completion(
426-
is_root_step: bool,
427-
step_name: str,
428-
inference_pipeline_id: Optional[str] = None
429-
) -> None:
406+
def _handle_trace_completion(is_root_step: bool, step_name: str, inference_pipeline_id: Optional[str] = None) -> None:
430407
"""Handle trace completion and data streaming."""
431408
if is_root_step:
432409
logger.debug("Ending the trace...")
@@ -470,8 +447,10 @@ def _handle_trace_completion(
470447
else:
471448
logger.debug("Ending step %s", step_name)
472449

450+
473451
# ----------------------------- Helper functions for trace decorators ----------------------------- #
474452

453+
475454
def _log_step_exception(step: steps.Step, exception: Exception) -> None:
476455
"""Log exception metadata to a step."""
477456
step.log(metadata={"Exceptions": str(exception)})
@@ -487,32 +466,21 @@ def _process_wrapper_inputs_and_outputs(
487466
) -> None:
488467
"""Extract function inputs and finalize step logging - common pattern across wrappers."""
489468
inputs = _extract_function_inputs(
490-
func_signature=func_signature,
491-
func_args=func_args,
492-
func_kwargs=func_kwargs,
493-
context_kwarg=context_kwarg
494-
)
495-
_finalize_step_logging(
496-
step=step,
497-
inputs=inputs,
498-
output=output,
499-
start_time=step.start_time
469+
func_signature=func_signature, func_args=func_args, func_kwargs=func_kwargs, context_kwarg=context_kwarg
500470
)
471+
_finalize_step_logging(step=step, inputs=inputs, output=output, start_time=step.start_time)
501472

502473

503474
def _extract_function_inputs(
504-
func_signature: inspect.Signature,
505-
func_args: tuple,
506-
func_kwargs: dict,
507-
context_kwarg: Optional[str] = None
475+
func_signature: inspect.Signature, func_args: tuple, func_kwargs: dict, context_kwarg: Optional[str] = None
508476
) -> dict:
509477
"""Extract and clean function inputs for logging."""
510478
bound = func_signature.bind(*func_args, **func_kwargs)
511479
bound.apply_defaults()
512480
inputs = dict(bound.arguments)
513481
inputs.pop("self", None)
514482
inputs.pop("cls", None)
515-
483+
516484
# Handle context kwarg if specified
517485
if context_kwarg:
518486
if context_kwarg in inputs:
@@ -522,7 +490,7 @@ def _extract_function_inputs(
522490
"Context kwarg `%s` not found in inputs of the current function.",
523491
context_kwarg,
524492
)
525-
493+
526494
return inputs
527495

528496

@@ -537,28 +505,24 @@ def _finalize_step_logging(
537505
step.end_time = time.time()
538506
if step.latency is None:
539507
step.latency = (step.end_time - start_time) * 1000 # in ms
540-
508+
541509
step.log(
542510
inputs=inputs,
543511
output=output,
544512
end_time=step.end_time,
545513
latency=step.latency,
546514
)
547515

516+
548517
# ----------------------------- Async generator specific functions ----------------------------- #
549518

519+
550520
def _create_step_for_async_generator(
551-
step_name: str,
552-
inference_pipeline_id: Optional[str] = None,
553-
**step_kwargs
521+
step_name: str, inference_pipeline_id: Optional[str] = None, **step_kwargs
554522
) -> Tuple[steps.Step, bool, Any]:
555523
"""Create and initialize step for async generators - no context manager."""
556524
return _create_and_initialize_step(
557-
step_name=step_name,
558-
step_type=enums.StepType.USER_CALL,
559-
inputs=None,
560-
output=None,
561-
metadata=None
525+
step_name=step_name, step_type=enums.StepType.USER_CALL, inputs=None, output=None, metadata=None
562526
)
563527

564528

@@ -573,25 +537,20 @@ def _finalize_async_generator_step(
573537
) -> None:
574538
"""Finalize async generator step - called when generator is consumed."""
575539
_current_step.reset(token)
576-
_finalize_step_logging(
577-
step=step,
578-
inputs=inputs,
579-
output=output,
580-
start_time=step.start_time
581-
)
540+
_finalize_step_logging(step=step, inputs=inputs, output=output, start_time=step.start_time)
582541
_handle_trace_completion(
583-
is_root_step=is_root_step,
584-
step_name=step_name,
585-
inference_pipeline_id=inference_pipeline_id
542+
is_root_step=is_root_step, step_name=step_name, inference_pipeline_id=inference_pipeline_id
586543
)
587544

588545

589546
def _join_output_chunks(output_chunks: List[Any]) -> str:
590547
"""Join output chunks into a single string, filtering out None values."""
591548
return "".join(str(chunk) for chunk in output_chunks if chunk is not None)
592549

550+
593551
# ----------------------------- Utility functions ----------------------------- #
594552

553+
595554
async def _invoke_with_context(
596555
coroutine: Awaitable[Any],
597556
) -> Tuple[contextvars.Context, Any]:

0 commit comments

Comments
 (0)