|
21 | 21 | create_trace_args, |
22 | 22 | add_to_trace, |
23 | 23 | parse_non_streaming_output_data, |
| 24 | + parse_structured_output_data, |
24 | 25 | # Import Responses API helper functions |
25 | 26 | extract_responses_chunk_data, |
26 | 27 | extract_responses_inputs, |
@@ -98,6 +99,33 @@ async def traced_chat_create_func(*args, **kwargs): |
98 | 99 |
|
99 | 100 | client.chat.completions.create = traced_chat_create_func |
100 | 101 |
|
| 102 | + # Patch parse method if it exists |
| 103 | + if hasattr(client.chat.completions, 'parse'): |
| 104 | + parse_func = client.chat.completions.parse |
| 105 | + |
| 106 | + @wraps(parse_func) |
| 107 | + async def traced_parse_func(*args, **kwargs): |
| 108 | + inference_id = kwargs.pop("inference_id", None) |
| 109 | + stream = kwargs.get("stream", False) |
| 110 | + |
| 111 | + if stream: |
| 112 | + return handle_async_streaming_parse( |
| 113 | + *args, |
| 114 | + **kwargs, |
| 115 | + parse_func=parse_func, |
| 116 | + inference_id=inference_id, |
| 117 | + is_azure_openai=is_azure_openai, |
| 118 | + ) |
| 119 | + return await handle_async_non_streaming_parse( |
| 120 | + *args, |
| 121 | + **kwargs, |
| 122 | + parse_func=parse_func, |
| 123 | + inference_id=inference_id, |
| 124 | + is_azure_openai=is_azure_openai, |
| 125 | + ) |
| 126 | + |
| 127 | + client.chat.completions.parse = traced_parse_func |
| 128 | + |
101 | 129 | # Patch Responses API (if available) |
102 | 130 | if hasattr(client, "responses"): |
103 | 131 | responses_create_func = client.responses.create |
@@ -466,3 +494,186 @@ async def handle_async_responses_non_streaming_create( |
466 | 494 | logger.error("Failed to trace the Responses API request with Openlayer. %s", e) |
467 | 495 |
|
468 | 496 | return response |
| 497 | + |
| 498 | + |
| 499 | +async def handle_async_streaming_parse( |
| 500 | + parse_func: callable, |
| 501 | + *args, |
| 502 | + is_azure_openai: bool = False, |
| 503 | + inference_id: Optional[str] = None, |
| 504 | + **kwargs, |
| 505 | +) -> AsyncIterator[Any]: |
| 506 | + """Handles the parse method when streaming is enabled. |
| 507 | +
|
| 508 | + Parameters |
| 509 | + ---------- |
| 510 | + parse_func : callable |
| 511 | + The parse method to handle. |
| 512 | + is_azure_openai : bool, optional |
| 513 | + Whether the client is an Azure OpenAI client, by default False |
| 514 | + inference_id : Optional[str], optional |
| 515 | + A user-generated inference id, by default None |
| 516 | +
|
| 517 | + Returns |
| 518 | + ------- |
| 519 | + AsyncIterator[Any] |
| 520 | + A generator that yields the chunks of the completion. |
| 521 | + """ |
| 522 | + chunks = await parse_func(*args, **kwargs) |
| 523 | + |
| 524 | + # Create and return a new async generator that processes chunks |
| 525 | + collected_output_data = [] |
| 526 | + collected_function_call = { |
| 527 | + "name": "", |
| 528 | + "arguments": "", |
| 529 | + } |
| 530 | + raw_outputs = [] |
| 531 | + start_time = time.time() |
| 532 | + end_time = None |
| 533 | + first_token_time = None |
| 534 | + num_of_completion_tokens = None |
| 535 | + latency = None |
| 536 | + try: |
| 537 | + i = 0 |
| 538 | + async for chunk in chunks: |
| 539 | + raw_outputs.append(chunk.model_dump()) |
| 540 | + if i == 0: |
| 541 | + first_token_time = time.time() |
| 542 | + if i > 0: |
| 543 | + num_of_completion_tokens = i + 1 |
| 544 | + i += 1 |
| 545 | + |
| 546 | + delta = chunk.choices[0].delta |
| 547 | + |
| 548 | + if delta.content: |
| 549 | + collected_output_data.append(delta.content) |
| 550 | + elif delta.function_call: |
| 551 | + if delta.function_call.name: |
| 552 | + collected_function_call["name"] += delta.function_call.name |
| 553 | + if delta.function_call.arguments: |
| 554 | + collected_function_call[ |
| 555 | + "arguments" |
| 556 | + ] += delta.function_call.arguments |
| 557 | + elif delta.tool_calls: |
| 558 | + if delta.tool_calls[0].function.name: |
| 559 | + collected_function_call["name"] += delta.tool_calls[0].function.name |
| 560 | + if delta.tool_calls[0].function.arguments: |
| 561 | + collected_function_call["arguments"] += delta.tool_calls[ |
| 562 | + 0 |
| 563 | + ].function.arguments |
| 564 | + |
| 565 | + yield chunk |
| 566 | + |
| 567 | + end_time = time.time() |
| 568 | + latency = (end_time - start_time) * 1000 |
| 569 | + # pylint: disable=broad-except |
| 570 | + except Exception as e: |
| 571 | + logger.error("Failed yield chunk. %s", e) |
| 572 | + finally: |
| 573 | + # Try to add step to the trace |
| 574 | + try: |
| 575 | + collected_output_data = [ |
| 576 | + message for message in collected_output_data if message is not None |
| 577 | + ] |
| 578 | + if collected_output_data: |
| 579 | + output_data = "".join(collected_output_data) |
| 580 | + else: |
| 581 | + collected_function_call["arguments"] = json.loads( |
| 582 | + collected_function_call["arguments"] |
| 583 | + ) |
| 584 | + output_data = collected_function_call |
| 585 | + |
| 586 | + trace_args = create_trace_args( |
| 587 | + end_time=end_time, |
| 588 | + inputs={"prompt": kwargs["messages"]}, |
| 589 | + output=output_data, |
| 590 | + latency=latency, |
| 591 | + tokens=num_of_completion_tokens, |
| 592 | + prompt_tokens=0, |
| 593 | + completion_tokens=num_of_completion_tokens, |
| 594 | + model=kwargs.get("model"), |
| 595 | + model_parameters=get_model_parameters(kwargs), |
| 596 | + raw_output=raw_outputs, |
| 597 | + id=inference_id, |
| 598 | + metadata={ |
| 599 | + "timeToFirstToken": ( |
| 600 | + (first_token_time - start_time) * 1000 |
| 601 | + if first_token_time |
| 602 | + else None |
| 603 | + ), |
| 604 | + "method": "parse", |
| 605 | + "response_format": kwargs.get("response_format"), |
| 606 | + }, |
| 607 | + ) |
| 608 | + add_to_trace( |
| 609 | + **trace_args, |
| 610 | + is_azure_openai=is_azure_openai, |
| 611 | + ) |
| 612 | + |
| 613 | + # pylint: disable=broad-except |
| 614 | + except Exception as e: |
| 615 | + logger.error( |
| 616 | + "Failed to trace the parse chat completion request with Openlayer. %s", |
| 617 | + e, |
| 618 | + ) |
| 619 | + |
| 620 | + |
| 621 | +async def handle_async_non_streaming_parse( |
| 622 | + parse_func: callable, |
| 623 | + *args, |
| 624 | + is_azure_openai: bool = False, |
| 625 | + inference_id: Optional[str] = None, |
| 626 | + **kwargs, |
| 627 | +) -> Any: |
| 628 | + """Handles the parse method when streaming is disabled. |
| 629 | +
|
| 630 | + Parameters |
| 631 | + ---------- |
| 632 | + parse_func : callable |
| 633 | + The parse method to handle. |
| 634 | + is_azure_openai : bool, optional |
| 635 | + Whether the client is an Azure OpenAI client, by default False |
| 636 | + inference_id : Optional[str], optional |
| 637 | + A user-generated inference id, by default None |
| 638 | +
|
| 639 | + Returns |
| 640 | + ------- |
| 641 | + Any |
| 642 | + The parsed completion response. |
| 643 | + """ |
| 644 | + start_time = time.time() |
| 645 | + response = await parse_func(*args, **kwargs) |
| 646 | + end_time = time.time() |
| 647 | + |
| 648 | + # Try to add step to the trace |
| 649 | + try: |
| 650 | + output_data = parse_structured_output_data(response) |
| 651 | + trace_args = create_trace_args( |
| 652 | + end_time=end_time, |
| 653 | + inputs={"prompt": kwargs["messages"]}, |
| 654 | + output=output_data, |
| 655 | + latency=(end_time - start_time) * 1000, |
| 656 | + tokens=response.usage.total_tokens, |
| 657 | + prompt_tokens=response.usage.prompt_tokens, |
| 658 | + completion_tokens=response.usage.completion_tokens, |
| 659 | + model=response.model, |
| 660 | + model_parameters=get_model_parameters(kwargs), |
| 661 | + raw_output=response.model_dump(), |
| 662 | + id=inference_id, |
| 663 | + metadata={ |
| 664 | + "method": "parse", |
| 665 | + "response_format": kwargs.get("response_format"), |
| 666 | + }, |
| 667 | + ) |
| 668 | + |
| 669 | + add_to_trace( |
| 670 | + is_azure_openai=is_azure_openai, |
| 671 | + **trace_args, |
| 672 | + ) |
| 673 | + # pylint: disable=broad-except |
| 674 | + except Exception as e: |
| 675 | + logger.error( |
| 676 | + "Failed to trace the parse chat completion request with Openlayer. %s", e |
| 677 | + ) |
| 678 | + |
| 679 | + return response |
0 commit comments