33import asyncio
44import inspect
55from dataclasses import dataclass , field
6- from typing import Any , Callable , Generic , cast
6+ from typing import Any , Callable , Generic , cast , get_args
77
8- from openai .types .responses import ResponseCompletedEvent
8+ from openai .types .responses import (
9+ ResponseCompletedEvent ,
10+ ResponseOutputItemAddedEvent ,
11+ )
912from openai .types .responses .response_prompt_param import (
1013 ResponsePromptParam ,
1114)
4043 OutputGuardrailResult ,
4144)
4245from .handoffs import Handoff , HandoffInputFilter , handoff
43- from .items import ItemHelpers , ModelResponse , RunItem , TResponseInputItem
46+ from .items import (
47+ ItemHelpers ,
48+ ModelResponse ,
49+ RunItem ,
50+ ToolCallItem ,
51+ ToolCallItemTypes ,
52+ TResponseInputItem ,
53+ )
4454from .lifecycle import RunHooks
4555from .logger import logger
4656from .memory import Session
4959from .models .multi_provider import MultiProvider
5060from .result import RunResult , RunResultStreaming
5161from .run_context import RunContextWrapper , TContext
52- from .stream_events import AgentUpdatedStreamEvent , RawResponsesStreamEvent
62+ from .stream_events import AgentUpdatedStreamEvent , RawResponsesStreamEvent , RunItemStreamEvent
5363from .tool import Tool
5464from .tracing import Span , SpanError , agent_span , get_current_trace , trace
5565from .tracing .span_data import AgentSpanData
@@ -905,6 +915,8 @@ async def _run_single_turn_streamed(
905915 all_tools : list [Tool ],
906916 previous_response_id : str | None ,
907917 ) -> SingleStepResult :
918+ emitted_tool_call_ids : set [str ] = set ()
919+
908920 if should_run_agent_start_hooks :
909921 await asyncio .gather (
910922 hooks .on_agent_start (context_wrapper , agent ),
@@ -984,6 +996,25 @@ async def _run_single_turn_streamed(
984996 )
985997 context_wrapper .usage .add (usage )
986998
999+ if isinstance (event , ResponseOutputItemAddedEvent ):
1000+ output_item = event .item
1001+
1002+ if isinstance (output_item , _TOOL_CALL_TYPES ):
1003+ call_id : str | None = getattr (
1004+ output_item , "call_id" , getattr (output_item , "id" , None )
1005+ )
1006+
1007+ if call_id and call_id not in emitted_tool_call_ids :
1008+ emitted_tool_call_ids .add (call_id )
1009+
1010+ tool_item = ToolCallItem (
1011+ raw_item = cast (ToolCallItemTypes , output_item ),
1012+ agent = agent ,
1013+ )
1014+ streamed_result ._event_queue .put_nowait (
1015+ RunItemStreamEvent (item = tool_item , name = "tool_called" )
1016+ )
1017+
9871018 streamed_result ._event_queue .put_nowait (RawResponsesStreamEvent (data = event ))
9881019
9891020 # Call hook just after the model response is finalized.
@@ -995,9 +1026,10 @@ async def _run_single_turn_streamed(
9951026 raise ModelBehaviorError ("Model did not produce a final response!" )
9961027
9971028 # 3. Now, we can process the turn as we do in the non-streaming case
998- return await cls ._get_single_step_result_from_streamed_response (
1029+ single_step_result = await cls ._get_single_step_result_from_response (
9991030 agent = agent ,
1000- streamed_result = streamed_result ,
1031+ original_input = streamed_result .input ,
1032+ pre_step_items = streamed_result .new_items ,
10011033 new_response = final_response ,
10021034 output_schema = output_schema ,
10031035 all_tools = all_tools ,
@@ -1008,6 +1040,34 @@ async def _run_single_turn_streamed(
10081040 tool_use_tracker = tool_use_tracker ,
10091041 )
10101042
1043+ if emitted_tool_call_ids :
1044+ import dataclasses as _dc
1045+
1046+ filtered_items = [
1047+ item
1048+ for item in single_step_result .new_step_items
1049+ if not (
1050+ isinstance (item , ToolCallItem )
1051+ and (
1052+ call_id := getattr (
1053+ item .raw_item , "call_id" , getattr (item .raw_item , "id" , None )
1054+ )
1055+ )
1056+ and call_id in emitted_tool_call_ids
1057+ )
1058+ ]
1059+
1060+ single_step_result_filtered = _dc .replace (
1061+ single_step_result , new_step_items = filtered_items
1062+ )
1063+
1064+ RunImpl .stream_step_result_to_queue (
1065+ single_step_result_filtered , streamed_result ._event_queue
1066+ )
1067+ else :
1068+ RunImpl .stream_step_result_to_queue (single_step_result , streamed_result ._event_queue )
1069+ return single_step_result
1070+
10111071 @classmethod
10121072 async def _run_single_turn (
10131073 cls ,
@@ -1397,9 +1457,11 @@ async def _save_result_to_session(
13971457
13981458
13991459DEFAULT_AGENT_RUNNER = AgentRunner ()
1460+ _TOOL_CALL_TYPES : tuple [type , ...] = get_args (ToolCallItemTypes )
14001461
14011462
14021463def _copy_str_or_list (input : str | list [TResponseInputItem ]) -> str | list [TResponseInputItem ]:
14031464 if isinstance (input , str ):
14041465 return input
14051466 return input .copy ()
1467+
0 commit comments