Skip to content

Commit d0cd7e9

Browse files
authored
fix(openai): add streaming support for responses.create() api (#3437)
1 parent 3468985 commit d0cd7e9

File tree

8 files changed

+1893
-45
lines changed

8 files changed

+1893
-45
lines changed

packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ def _set_response_attributes(span, response):
232232
prompt_tokens_details = dict(usage.get("prompt_tokens_details", {}))
233233
_set_span_attribute(
234234
span,
235-
GenAIAttributes.GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS,
235+
SpanAttributes.LLM_USAGE_CACHE_READ_INPUT_TOKENS,
236236
prompt_tokens_details.get("cached_tokens", 0),
237237
)
238238
return

packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/responses_wrappers.py

Lines changed: 254 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import json
22
import pydantic
33
import re
4+
import threading
45
import time
56

67
from openai import AsyncStream, Stream
8+
from wrapt import ObjectProxy
79

810
# Conditional imports for backward compatibility
911
try:
@@ -192,13 +194,11 @@ def set_data_attributes(traced_response: TracedData, span: Span):
192194
if usage.input_tokens_details:
193195
_set_span_attribute(
194196
span,
195-
GenAIAttributes.GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS,
197+
SpanAttributes.LLM_USAGE_CACHE_READ_INPUT_TOKENS,
196198
usage.input_tokens_details.cached_tokens,
197199
)
198200

199-
# Usage - count of reasoning tokens
200201
reasoning_tokens = None
201-
# Support both dict-style and object-style `usage`
202202
tokens_details = (
203203
usage.get("output_tokens_details") if isinstance(usage, dict)
204204
else getattr(usage, "output_tokens_details", None)
@@ -216,20 +216,16 @@ def set_data_attributes(traced_response: TracedData, span: Span):
216216
reasoning_tokens or 0,
217217
)
218218

219-
# Reasoning attributes
220-
# Request - reasoning summary
221219
_set_span_attribute(
222220
span,
223221
f"{SpanAttributes.LLM_REQUEST_REASONING_SUMMARY}",
224222
traced_response.request_reasoning_summary or (),
225223
)
226-
# Request - reasoning effort
227224
_set_span_attribute(
228225
span,
229226
f"{SpanAttributes.LLM_REQUEST_REASONING_EFFORT}",
230227
traced_response.request_reasoning_effort or (),
231228
)
232-
# Response - reasoning effort
233229
_set_span_attribute(
234230
span,
235231
f"{SpanAttributes.LLM_RESPONSE_REASONING_EFFORT}",
@@ -433,7 +429,19 @@ def responses_get_or_create_wrapper(tracer: Tracer, wrapped, instance, args, kwa
433429
try:
434430
response = wrapped(*args, **kwargs)
435431
if isinstance(response, Stream):
436-
return response
432+
span = tracer.start_span(
433+
SPAN_NAME,
434+
kind=SpanKind.CLIENT,
435+
start_time=start_time,
436+
)
437+
438+
return ResponseStream(
439+
span=span,
440+
response=response,
441+
start_time=start_time,
442+
request_kwargs=kwargs,
443+
tracer=tracer,
444+
)
437445
except Exception as e:
438446
response_id = kwargs.get("response_id")
439447
existing_data = {}
@@ -563,7 +571,19 @@ async def async_responses_get_or_create_wrapper(
563571
try:
564572
response = await wrapped(*args, **kwargs)
565573
if isinstance(response, (Stream, AsyncStream)):
566-
return response
574+
span = tracer.start_span(
575+
SPAN_NAME,
576+
kind=SpanKind.CLIENT,
577+
start_time=start_time,
578+
)
579+
580+
return ResponseStream(
581+
span=span,
582+
response=response,
583+
start_time=start_time,
584+
request_kwargs=kwargs,
585+
tracer=tracer,
586+
)
567587
except Exception as e:
568588
response_id = kwargs.get("response_id")
569589
existing_data = {}
@@ -728,4 +748,228 @@ async def async_responses_cancel_wrapper(
728748
return response
729749

730750

731-
# TODO: build streaming responses
751+
class ResponseStream(ObjectProxy):
752+
"""Proxy class for streaming responses to capture telemetry data"""
753+
754+
_span = None
755+
_start_time = None
756+
_request_kwargs = None
757+
_tracer = None
758+
_traced_data = None
759+
760+
def __init__(
761+
self,
762+
span,
763+
response,
764+
start_time=None,
765+
request_kwargs=None,
766+
tracer=None,
767+
traced_data=None,
768+
):
769+
super().__init__(response)
770+
self._span = span
771+
self._start_time = start_time
772+
self._request_kwargs = request_kwargs or {}
773+
self._tracer = tracer
774+
self._traced_data = traced_data or TracedData(
775+
start_time=start_time,
776+
response_id="",
777+
input=process_input(self._request_kwargs.get("input", [])),
778+
instructions=self._request_kwargs.get("instructions"),
779+
tools=get_tools_from_kwargs(self._request_kwargs),
780+
output_blocks={},
781+
usage=None,
782+
output_text="",
783+
request_model=self._request_kwargs.get("model", ""),
784+
response_model="",
785+
request_reasoning_summary=self._request_kwargs.get("reasoning", {}).get(
786+
"summary"
787+
),
788+
request_reasoning_effort=self._request_kwargs.get("reasoning", {}).get("effort"),
789+
response_reasoning_effort=None,
790+
)
791+
792+
self._complete_response_data = None
793+
self._output_text = ""
794+
795+
self._cleanup_completed = False
796+
self._cleanup_lock = threading.Lock()
797+
798+
def __del__(self):
799+
"""Cleanup when object is garbage collected"""
800+
if hasattr(self, "_cleanup_completed") and not self._cleanup_completed:
801+
self._ensure_cleanup()
802+
803+
def __enter__(self):
804+
"""Context manager entry"""
805+
if hasattr(self.__wrapped__, "__enter__"):
806+
self.__wrapped__.__enter__()
807+
return self
808+
809+
def __exit__(self, exc_type, exc_val, exc_tb):
810+
"""Context manager exit"""
811+
suppress = False
812+
try:
813+
if exc_type is not None:
814+
self._handle_exception(exc_val)
815+
else:
816+
self._process_complete_response()
817+
finally:
818+
if hasattr(self.__wrapped__, "__exit__"):
819+
suppress = bool(self.__wrapped__.__exit__(exc_type, exc_val, exc_tb))
820+
return suppress
821+
822+
async def __aenter__(self):
823+
"""Async context manager entry"""
824+
if hasattr(self.__wrapped__, "__aenter__"):
825+
await self.__wrapped__.__aenter__()
826+
return self
827+
828+
async def __aexit__(self, exc_type, exc_val, exc_tb):
829+
"""Async context manager exit"""
830+
suppress = False
831+
try:
832+
if exc_type is not None:
833+
self._handle_exception(exc_val)
834+
else:
835+
self._process_complete_response()
836+
finally:
837+
if hasattr(self.__wrapped__, "__aexit__"):
838+
suppress = bool(await self.__wrapped__.__aexit__(exc_type, exc_val, exc_tb))
839+
return suppress
840+
841+
def close(self):
842+
try:
843+
self._ensure_cleanup()
844+
finally:
845+
if hasattr(self.__wrapped__, "close"):
846+
return self.__wrapped__.close()
847+
848+
async def aclose(self):
849+
try:
850+
self._ensure_cleanup()
851+
finally:
852+
if hasattr(self.__wrapped__, "aclose"):
853+
return await self.__wrapped__.aclose()
854+
855+
def __iter__(self):
856+
"""Synchronous iterator"""
857+
return self
858+
859+
def __next__(self):
860+
"""Synchronous iteration"""
861+
try:
862+
chunk = self.__wrapped__.__next__()
863+
except StopIteration:
864+
self._process_complete_response()
865+
raise
866+
except Exception as e:
867+
self._handle_exception(e)
868+
raise
869+
else:
870+
self._process_chunk(chunk)
871+
return chunk
872+
873+
def __aiter__(self):
874+
"""Async iterator"""
875+
return self
876+
877+
async def __anext__(self):
878+
"""Async iteration"""
879+
try:
880+
chunk = await self.__wrapped__.__anext__()
881+
except StopAsyncIteration:
882+
self._process_complete_response()
883+
raise
884+
except Exception as e:
885+
self._handle_exception(e)
886+
raise
887+
else:
888+
self._process_chunk(chunk)
889+
return chunk
890+
891+
def _process_chunk(self, chunk):
892+
"""Process a streaming chunk"""
893+
if hasattr(chunk, "type"):
894+
if chunk.type == "response.output_text.delta":
895+
if hasattr(chunk, "delta") and chunk.delta:
896+
self._output_text += chunk.delta
897+
elif chunk.type == "response.completed" and hasattr(chunk, "response"):
898+
self._complete_response_data = chunk.response
899+
900+
if hasattr(chunk, "delta"):
901+
if hasattr(chunk.delta, "text") and chunk.delta.text:
902+
self._output_text += chunk.delta.text
903+
904+
if hasattr(chunk, "response") and chunk.response:
905+
self._complete_response_data = chunk.response
906+
907+
@dont_throw
908+
def _process_complete_response(self):
909+
"""Process the complete response and emit span"""
910+
with self._cleanup_lock:
911+
if self._cleanup_completed:
912+
return
913+
914+
try:
915+
if self._complete_response_data:
916+
parsed_response = parse_response(self._complete_response_data)
917+
918+
self._traced_data.response_id = parsed_response.id
919+
self._traced_data.response_model = parsed_response.model
920+
self._traced_data.output_text = self._output_text
921+
922+
if parsed_response.usage:
923+
self._traced_data.usage = parsed_response.usage
924+
925+
if parsed_response.output:
926+
self._traced_data.output_blocks = {
927+
block.id: block for block in parsed_response.output
928+
}
929+
930+
responses[parsed_response.id] = self._traced_data
931+
932+
set_data_attributes(self._traced_data, self._span)
933+
self._span.set_status(StatusCode.OK)
934+
self._span.end()
935+
self._cleanup_completed = True
936+
937+
except Exception as e:
938+
if self._span and self._span.is_recording():
939+
self._span.set_attribute(ERROR_TYPE, e.__class__.__name__)
940+
self._span.set_status(StatusCode.ERROR, str(e))
941+
self._span.end()
942+
self._cleanup_completed = True
943+
944+
@dont_throw
945+
def _handle_exception(self, exception):
946+
"""Handle exceptions during streaming"""
947+
with self._cleanup_lock:
948+
if self._cleanup_completed:
949+
return
950+
951+
if self._span and self._span.is_recording():
952+
self._span.set_attribute(ERROR_TYPE, exception.__class__.__name__)
953+
self._span.record_exception(exception)
954+
self._span.set_status(StatusCode.ERROR, str(exception))
955+
self._span.end()
956+
957+
self._cleanup_completed = True
958+
959+
@dont_throw
960+
def _ensure_cleanup(self):
961+
"""Ensure cleanup happens even if stream is not fully consumed"""
962+
with self._cleanup_lock:
963+
if self._cleanup_completed:
964+
return
965+
966+
try:
967+
if self._span and self._span.is_recording():
968+
set_data_attributes(self._traced_data, self._span)
969+
self._span.set_status(StatusCode.OK)
970+
self._span.end()
971+
972+
self._cleanup_completed = True
973+
974+
except Exception:
975+
self._cleanup_completed = True

0 commit comments

Comments
 (0)