diff --git a/docs/ja/streaming.md b/docs/ja/streaming.md index 72cb5482c..b4e5659f5 100644 --- a/docs/ja/streaming.md +++ b/docs/ja/streaming.md @@ -88,4 +88,64 @@ async def main(): if __name__ == "__main__": asyncio.run(main()) -``` \ No newline at end of file +``` + +## ツール出力ストリーミングイベント + +[`ToolOutputStreamEvent`][agents.stream_events.ToolOutputStreamEvent] を使用すると、ツールの実行中に増分出力を受け取ることができます。これは、長時間実行されるツールでユーザーにリアルタイムで進捗を表示したい場合に有用です。 + +ストリーミングツールを作成するには、文字列チャンクを yield する非同期ジェネレータ関数を定義します: + +```python +import asyncio +from collections.abc import AsyncIterator +from agents import Agent, Runner, ToolOutputStreamEvent, function_tool + +@function_tool +async def search_documents(query: str) -> AsyncIterator[str]: + """ドキュメントを検索し、見つかった結果をストリーミングします。""" + documents = [ + f"ドキュメント 1 には {query} に関する情報が含まれています...\n", + f"ドキュメント 2 には {query} に関する追加の詳細があります...\n", + f"ドキュメント 3 は {query} の分析を提供します...\n", + ] + + for doc in documents: + # 処理時間をシミュレート + await asyncio.sleep(0.5) + # 増分結果を yield + yield doc + + +async def main(): + agent = Agent( + name="Research Assistant", + instructions="あなたはユーザーの情報検索を支援します。", + tools=[search_documents], + ) + + result = Runner.run_streamed( + agent, + input="AI に関する情報を検索してください", + ) + + async for event in result.stream_events(): + # ツールストリーミングイベントを処理 + if event.type == "tool_output_stream_event": + print(f"[{event.tool_name}] {event.delta}", end="", flush=True) + # 最終ツール出力を処理 + elif event.type == "run_item_stream_event" and event.name == "tool_output": + print(f"\n✓ ツール完了\n") + + +if __name__ == "__main__": + asyncio.run(main()) +``` + +ストリーミングツールに関する重要なポイント: + +- ストリーミングツールは `AsyncIterator[str]`(文字列を yield する非同期ジェネレータ)を返す必要があります +- yield された各チャンクは `ToolOutputStreamEvent` として発行されます +- すべてのチャンクは自動的に蓄積され、最終的なツール出力として LLM に送信されます +- 非ストリーミングツールはストリーミングツールと一緒に正常に動作します +- 非ストリーミングモード(`Runner.run()`)では、ストリーミングツールは返す前にすべてのチャンクを自動的に収集します \ No newline at end of file diff --git a/docs/ko/streaming.md b/docs/ko/streaming.md index e9ab2dd70..f8fc4d7b4 100644 --- a/docs/ko/streaming.md +++ b/docs/ko/streaming.md @@ -88,4 +88,64 @@ async def main(): if __name__ == "__main__": asyncio.run(main()) -``` \ No newline at end of file +``` + +## 도구 출력 스트리밍 이벤트 + +[`ToolOutputStreamEvent`][agents.stream_events.ToolOutputStreamEvent]를 사용하면 도구가 실행되는 동안 증분 출력을 받을 수 있습니다. 이는 장시간 실행되는 도구에서 사용자에게 실시간으로 진행 상황을 표시하려는 경우에 유용합니다. + +스트리밍 도구를 만들려면 문자열 청크를 yield하는 비동기 제너레이터 함수를 정의하세요: + +```python +import asyncio +from collections.abc import AsyncIterator +from agents import Agent, Runner, ToolOutputStreamEvent, function_tool + +@function_tool +async def search_documents(query: str) -> AsyncIterator[str]: + """문서를 검색하고 발견된 결과를 스트리밍합니다.""" + documents = [ + f"문서 1에는 {query}에 대한 정보가 포함되어 있습니다...\n", + f"문서 2에는 {query}에 대한 추가 세부정보가 있습니다...\n", + f"문서 3은 {query}에 대한 분석을 제공합니다...\n", + ] + + for doc in documents: + # 처리 시간 시뮬레이션 + await asyncio.sleep(0.5) + # 증분 결과 yield + yield doc + + +async def main(): + agent = Agent( + name="Research Assistant", + instructions="당신은 사용자가 정보를 검색하도록 돕습니다.", + tools=[search_documents], + ) + + result = Runner.run_streamed( + agent, + input="AI에 관한 정보를 검색하세요", + ) + + async for event in result.stream_events(): + # 도구 스트리밍 이벤트 처리 + if event.type == "tool_output_stream_event": + print(f"[{event.tool_name}] {event.delta}", end="", flush=True) + # 최종 도구 출력 처리 + elif event.type == "run_item_stream_event" and event.name == "tool_output": + print(f"\n✓ 도구 완료\n") + + +if __name__ == "__main__": + asyncio.run(main()) +``` + +스트리밍 도구에 대한 주요 사항: + +- 스트리밍 도구는 `AsyncIterator[str]`(문자열을 yield하는 비동기 제너레이터)을 반환해야 합니다 +- yield된 각 청크는 `ToolOutputStreamEvent`로 발행됩니다 +- 모든 청크는 자동으로 누적되어 최종 도구 출력으로 LLM에 전송됩니다 +- 비스트리밍 도구는 스트리밍 도구와 함께 정상적으로 작동합니다 +- 비스트리밍 모드(`Runner.run()`)에서 스트리밍 도구는 반환하기 전에 모든 청크를 자동으로 수집합니다 \ No newline at end of file diff --git a/docs/streaming.md b/docs/streaming.md index b2c7c095d..9680db9e2 100644 --- a/docs/streaming.md +++ b/docs/streaming.md @@ -85,3 +85,63 @@ async def main(): if __name__ == "__main__": asyncio.run(main()) ``` + +## Tool output streaming events + +[`ToolOutputStreamEvent`][agents.stream_events.ToolOutputStreamEvent] allows you to receive incremental output from tools as they execute. This is useful for long-running tools where you want to show progress to the user in real-time. + +To create a streaming tool, define an async generator function that yields string chunks: + +```python +import asyncio +from collections.abc import AsyncIterator +from agents import Agent, Runner, ToolOutputStreamEvent, function_tool + +@function_tool +async def search_documents(query: str) -> AsyncIterator[str]: + """Search through documents and stream results as they are found.""" + documents = [ + f"Document 1 contains information about {query}...\n", + f"Document 2 has additional details on {query}...\n", + f"Document 3 provides analysis of {query}...\n", + ] + + for doc in documents: + # Simulate processing time + await asyncio.sleep(0.5) + # Yield incremental results + yield doc + + +async def main(): + agent = Agent( + name="Research Assistant", + instructions="You help users search for information.", + tools=[search_documents], + ) + + result = Runner.run_streamed( + agent, + input="Search for information about AI", + ) + + async for event in result.stream_events(): + # Handle tool streaming events + if event.type == "tool_output_stream_event": + print(f"[{event.tool_name}] {event.delta}", end="", flush=True) + # Handle final tool output + elif event.type == "run_item_stream_event" and event.name == "tool_output": + print(f"\n✓ Tool completed\n") + + +if __name__ == "__main__": + asyncio.run(main()) +``` + +Key points about streaming tools: + +- Streaming tools must return `AsyncIterator[str]` (an async generator that yields strings) +- Each yielded chunk is emitted as a `ToolOutputStreamEvent` +- All chunks are automatically accumulated and sent to the LLM as the final tool output +- Non-streaming tools continue to work normally alongside streaming tools +- In non-streaming mode (`Runner.run()`), streaming tools automatically collect all chunks before returning diff --git a/docs/zh/streaming.md b/docs/zh/streaming.md index 95526de31..d632f872e 100644 --- a/docs/zh/streaming.md +++ b/docs/zh/streaming.md @@ -88,4 +88,64 @@ async def main(): if __name__ == "__main__": asyncio.run(main()) -``` \ No newline at end of file +``` + +## 工具输出流式事件 + +[`ToolOutputStreamEvent`][agents.stream_events.ToolOutputStreamEvent] 允许你在工具执行时接收增量输出。这对于长时间运行的工具非常有用,可以实时向用户显示进度。 + +要创建流式工具,请定义一个异步生成器函数,逐块 yield 字符串: + +```python +import asyncio +from collections.abc import AsyncIterator +from agents import Agent, Runner, ToolOutputStreamEvent, function_tool + +@function_tool +async def search_documents(query: str) -> AsyncIterator[str]: + """搜索文档并流式返回找到的结果。""" + documents = [ + f"文档 1 包含关于 {query} 的信息...\n", + f"文档 2 提供关于 {query} 的更多细节...\n", + f"文档 3 分析了 {query}...\n", + ] + + for doc in documents: + # 模拟处理时间 + await asyncio.sleep(0.5) + # yield 增量结果 + yield doc + + +async def main(): + agent = Agent( + name="Research Assistant", + instructions="你帮助用户搜索信息。", + tools=[search_documents], + ) + + result = Runner.run_streamed( + agent, + input="搜索关于人工智能的信息", + ) + + async for event in result.stream_events(): + # 处理工具流式事件 + if event.type == "tool_output_stream_event": + print(f"[{event.tool_name}] {event.delta}", end="", flush=True) + # 处理最终工具输出 + elif event.type == "run_item_stream_event" and event.name == "tool_output": + print(f"\n✓ 工具完成\n") + + +if __name__ == "__main__": + asyncio.run(main()) +``` + +关于流式工具的要点: + +- 流式工具必须返回 `AsyncIterator[str]`(一个 yield 字符串的异步生成器) +- 每个 yield 的块都会作为 `ToolOutputStreamEvent` 发出 +- 所有块会自动累积并作为最终工具输出发送给 LLM +- 非流式工具可以与流式工具正常共存 +- 在非流式模式(`Runner.run()`)中,流式工具会在返回前自动收集所有块 \ No newline at end of file diff --git a/examples/tools/streaming_tool_example.py b/examples/tools/streaming_tool_example.py new file mode 100644 index 000000000..9ebd9f3ca --- /dev/null +++ b/examples/tools/streaming_tool_example.py @@ -0,0 +1,97 @@ +""" +Example of using streaming tools with the Agents SDK. + +This example demonstrates how to create a tool that yields incremental output, +allowing you to stream tool execution results to the user in real-time. +""" + +import asyncio +from collections.abc import AsyncIterator + +from agents import Agent, Runner, ToolOutputStreamEvent, function_tool + + +@function_tool +async def search_documents(query: str) -> AsyncIterator[str]: + """Search through documents and stream results as they are found. + + Args: + query: The search query. + + Yields: + Incremental search results. + """ + # Simulate searching through multiple documents + documents = [ + f"Document 1 contains information about {query}...\n", + f"Document 2 has additional details on {query}...\n", + f"Document 3 provides analysis of {query}...\n", + ] + + for doc in documents: + # Simulate processing time + await asyncio.sleep(0.5) + # Yield incremental results + yield doc + + +@function_tool +async def generate_report(topic: str) -> AsyncIterator[str]: + """Generate a report on a topic, streaming the output as it's generated. + + Args: + topic: The topic to generate a report on. + + Yields: + Incremental report content. + """ + sections = [ + f"# Report on {topic}\n\n", + f"## Introduction\n\nThis report covers {topic} in detail.\n\n", + f"## Analysis\n\nOur analysis of {topic} shows several key points...\n\n", + f"## Conclusion\n\nIn summary, {topic} is an important topic.\n\n", + ] + + for section in sections: + await asyncio.sleep(0.3) + yield section + + +async def main(): + # Create an agent with streaming tools + agent = Agent( + name="Research Assistant", + instructions="You are a helpful research assistant that can search documents and generate reports.", + tools=[search_documents, generate_report], + ) + + # Run the agent in streaming mode + result = Runner.run_streamed( + agent, + input="Search for information about artificial intelligence and generate a brief report.", + ) + + print("Streaming agent output:\n") + + # Stream events and display tool outputs in real-time + async for event in result.stream_events(): + # Handle tool streaming events + if event.type == "tool_output_stream_event": + assert isinstance(event, ToolOutputStreamEvent) + print(f"[{event.tool_name}] {event.delta}", end="", flush=True) + + # Handle run item events (final outputs) + elif event.type == "run_item_stream_event": + if event.name == "tool_output": + print(f"\n✓ Tool '{event.item.agent.name}' completed\n") + elif event.name == "message_output_created": + print(f"\n[Agent Response]: {event.item}\n") + + # Get final result + print("\n" + "=" * 60) + print("Final output:", result.final_output) + print("=" * 60) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/agents/__init__.py b/src/agents/__init__.py index b285d6f8c..d48ff9e46 100644 --- a/src/agents/__init__.py +++ b/src/agents/__init__.py @@ -65,6 +65,7 @@ RawResponsesStreamEvent, RunItemStreamEvent, StreamEvent, + ToolOutputStreamEvent, ) from .tool import ( CodeInterpreterTool, @@ -263,6 +264,7 @@ def enable_verbose_stdout_logging(): "RawResponsesStreamEvent", "RunItemStreamEvent", "AgentUpdatedStreamEvent", + "ToolOutputStreamEvent", "StreamEvent", "FunctionTool", "FunctionToolResult", diff --git a/src/agents/_run_impl.py b/src/agents/_run_impl.py index 88a770a56..609e22d80 100644 --- a/src/agents/_run_impl.py +++ b/src/agents/_run_impl.py @@ -260,6 +260,7 @@ async def execute_tools_and_side_effects( hooks: RunHooks[TContext], context_wrapper: RunContextWrapper[TContext], run_config: RunConfig, + event_queue: asyncio.Queue[Any] | None = None, ) -> SingleStepResult: # Make a copy of the generated items pre_step_items = list(pre_step_items) @@ -279,6 +280,7 @@ async def execute_tools_and_side_effects( hooks=hooks, context_wrapper=context_wrapper, config=run_config, + event_queue=event_queue, ), cls.execute_computer_actions( agent=agent, @@ -721,7 +723,7 @@ async def _execute_tool_with_hooks( tool_call: The tool call details. Returns: - The result from the tool execution. + The result from the tool execution. May be an async generator for streaming tools. """ await asyncio.gather( hooks.on_tool_start(tool_context, agent, func_tool), @@ -732,7 +734,10 @@ async def _execute_tool_with_hooks( ), ) - return await func_tool.on_invoke_tool(tool_context, tool_call.arguments) + # on_invoke_tool always returns an Awaitable, so we must await it. + # The awaited result may be a regular value or an AsyncIterator[str]. + result = await func_tool.on_invoke_tool(tool_context, tool_call.arguments) + return result @classmethod async def execute_function_tool_calls( @@ -743,6 +748,7 @@ async def execute_function_tool_calls( hooks: RunHooks[TContext], context_wrapper: RunContextWrapper[TContext], config: RunConfig, + event_queue: asyncio.Queue[Any] | None = None, ) -> tuple[ list[FunctionToolResult], list[ToolInputGuardrailResult], list[ToolOutputGuardrailResult] ]: @@ -783,6 +789,27 @@ async def run_single_tool( tool_call=tool_call, ) + # Check if the result is an async generator (streaming output) + if inspect.isasyncgen(real_result): + # Stream the tool output + from .stream_events import ToolOutputStreamEvent + + accumulated_output = [] + async for chunk in real_result: + accumulated_output.append(str(chunk)) + # Emit streaming event if event_queue is available + if event_queue is not None: + event_queue.put_nowait( + ToolOutputStreamEvent( + tool_name=func_tool.name, + tool_call_id=tool_call.call_id, + delta=str(chunk), + agent=agent, + ) + ) + # Use accumulated output as the final result + real_result = "".join(accumulated_output) + # 3) Run output tool guardrails, if any final_result = await cls._execute_output_guardrails( func_tool=func_tool, diff --git a/src/agents/realtime/session.py b/src/agents/realtime/session.py index 42dcf531a..9016cbea3 100644 --- a/src/agents/realtime/session.py +++ b/src/agents/realtime/session.py @@ -410,7 +410,18 @@ async def _handle_tool_call(self, event: RealtimeModelToolCallEvent) -> None: tool_call_id=event.call_id, tool_arguments=event.arguments, ) - result = await func_tool.on_invoke_tool(tool_context, event.arguments) + # on_invoke_tool always returns an Awaitable + result_or_generator = await func_tool.on_invoke_tool(tool_context, event.arguments) + # Check if the result is an async generator (streaming tool) + result: Any + if inspect.isasyncgen(result_or_generator): + # For streaming tools in realtime, collect all chunks + chunks = [] + async for chunk in result_or_generator: + chunks.append(str(chunk)) + result = "".join(chunks) + else: + result = result_or_generator await self._model.send_event( RealtimeModelSendToolOutput( diff --git a/src/agents/run.py b/src/agents/run.py index 85607e7dd..88e41ae5f 100644 --- a/src/agents/run.py +++ b/src/agents/run.py @@ -1479,6 +1479,7 @@ async def _get_single_step_result_from_response( hooks=hooks, context_wrapper=context_wrapper, run_config=run_config, + event_queue=event_queue, ) @classmethod diff --git a/src/agents/stream_events.py b/src/agents/stream_events.py index c0e9807a1..2adeb4618 100644 --- a/src/agents/stream_events.py +++ b/src/agents/stream_events.py @@ -58,5 +58,29 @@ class AgentUpdatedStreamEvent: type: Literal["agent_updated_stream_event"] = "agent_updated_stream_event" -StreamEvent: TypeAlias = Union[RawResponsesStreamEvent, RunItemStreamEvent, AgentUpdatedStreamEvent] +@dataclass +class ToolOutputStreamEvent: + """Streaming event for tool execution output. This event is emitted during tool execution + to provide incremental output chunks as the tool runs. + """ + + tool_name: str + """The name of the tool being executed.""" + + tool_call_id: str + """The ID of the tool call.""" + + delta: str + """The incremental output from the tool.""" + + agent: Agent[Any] + """The agent executing the tool.""" + + type: Literal["tool_output_stream_event"] = "tool_output_stream_event" + """The type of the event.""" + + +StreamEvent: TypeAlias = Union[ + RawResponsesStreamEvent, RunItemStreamEvent, AgentUpdatedStreamEvent, ToolOutputStreamEvent +] """A streaming event from an agent.""" diff --git a/src/agents/tool.py b/src/agents/tool.py index 66c70c29d..3fbd2565b 100644 --- a/src/agents/tool.py +++ b/src/agents/tool.py @@ -2,7 +2,7 @@ import inspect import json -from collections.abc import Awaitable +from collections.abc import AsyncIterator, Awaitable from dataclasses import dataclass from typing import TYPE_CHECKING, Any, Callable, Literal, Union, overload @@ -142,15 +142,19 @@ class FunctionTool: params_json_schema: dict[str, Any] """The JSON schema for the tool's parameters.""" - on_invoke_tool: Callable[[ToolContext[Any], str], Awaitable[Any]] + on_invoke_tool: Callable[[ToolContext[Any], str], Awaitable[Any | AsyncIterator[str]]] """A function that invokes the tool with the given context and parameters. The params passed are: 1. The tool run context. 2. The arguments from the LLM, as a JSON string. - You must return a one of the structured tool output types (e.g. ToolOutputText, ToolOutputImage, - ToolOutputFileContent) or a string representation of the tool output, or a list of them, - or something we can call `str()` on. + The function returns an Awaitable that, when awaited, yields one of: + - A structured tool output type (e.g. ToolOutputText, ToolOutputImage, ToolOutputFileContent) + - A string representation of the tool output + - A list of the above + - Something we can call `str()` on + - An AsyncIterator[str] for streaming output (caller should iterate with async for) + In case of errors, you can either raise an Exception (which will cause the run to fail) or return a string error message (which will be sent back to the LLM). """ @@ -464,7 +468,9 @@ def _create_function_tool(the_func: ToolFunction[...]) -> FunctionTool: strict_json_schema=strict_mode, ) - async def _on_invoke_tool_impl(ctx: ToolContext[Any], input: str) -> Any: + async def _on_invoke_tool_impl( + ctx: ToolContext[Any], input: str + ) -> Any | AsyncIterator[str]: try: json_data: dict[str, Any] = json.loads(input) if input else {} except Exception as e: @@ -495,7 +501,14 @@ async def _on_invoke_tool_impl(ctx: ToolContext[Any], input: str) -> Any: if not _debug.DONT_LOG_TOOL_DATA: logger.debug(f"Tool call args: {args}, kwargs: {kwargs_dict}") - if inspect.iscoroutinefunction(the_func): + # Check if the function is an async generator + if inspect.isasyncgenfunction(the_func): + # Return the async generator for streaming output + if schema.takes_context: + return the_func(ctx, *args, **kwargs_dict) + else: + return the_func(*args, **kwargs_dict) + elif inspect.iscoroutinefunction(the_func): if schema.takes_context: result = await the_func(ctx, *args, **kwargs_dict) else: @@ -506,16 +519,21 @@ async def _on_invoke_tool_impl(ctx: ToolContext[Any], input: str) -> Any: else: result = the_func(*args, **kwargs_dict) - if _debug.DONT_LOG_TOOL_DATA: - logger.debug(f"Tool {schema.name} completed.") - else: - logger.debug(f"Tool {schema.name} returned {result}") + if not inspect.isasyncgenfunction(the_func): + if _debug.DONT_LOG_TOOL_DATA: + logger.debug(f"Tool {schema.name} completed.") + else: + logger.debug(f"Tool {schema.name} returned {result}") return result - async def _on_invoke_tool(ctx: ToolContext[Any], input: str) -> Any: + async def _on_invoke_tool(ctx: ToolContext[Any], input: str) -> Any | AsyncIterator[str]: try: - return await _on_invoke_tool_impl(ctx, input) + result = await _on_invoke_tool_impl(ctx, input) + # If the result is an async generator, return it directly for streaming + if inspect.isasyncgen(result): + return result + return result except Exception as e: if failure_error_function is None: raise diff --git a/tests/test_function_tool_decorator.py b/tests/test_function_tool_decorator.py index 2f5a38223..95e239edf 100644 --- a/tests/test_function_tool_decorator.py +++ b/tests/test_function_tool_decorator.py @@ -43,7 +43,7 @@ async def test_sync_no_context_with_args_invocation(): tool = sync_no_context_with_args input_data = {"a": 5, "b": 7} output = await tool.on_invoke_tool(ctx_wrapper(), json.dumps(input_data)) - assert int(output) == 12 + assert int(output) == 12 # type: ignore[arg-type] @function_tool @@ -70,7 +70,7 @@ async def test_async_no_context_invocation(): tool = async_no_context input_data = {"a": 3, "b": 4} output = await tool.on_invoke_tool(ctx_wrapper(), json.dumps(input_data)) - assert int(output) == 12 + assert int(output) == 12 # type: ignore[arg-type] @function_tool diff --git a/tests/test_tool_streaming.py b/tests/test_tool_streaming.py new file mode 100644 index 000000000..555e36a38 --- /dev/null +++ b/tests/test_tool_streaming.py @@ -0,0 +1,262 @@ +"""Tests for streaming tool output functionality.""" + +import asyncio +from collections.abc import AsyncIterator + +import pytest + +from agents import Agent, Runner, ToolOutputStreamEvent, function_tool +from agents.items import ToolCallOutputItem + +from .fake_model import FakeModel +from .test_responses import get_function_tool_call, get_text_message + + +@function_tool +async def streaming_counter() -> AsyncIterator[str]: + """A simple streaming tool that counts from 1 to 5.""" + for i in range(1, 6): + await asyncio.sleep(0.01) # Small delay to simulate processing + yield f"{i}... " + + +@function_tool +async def streaming_search(query: str) -> AsyncIterator[str]: + """A streaming search tool that returns results incrementally.""" + results = [ + f"Searching for '{query}'...\n", + "Found result 1\n", + "Found result 2\n", + "Found result 3\n", + "Search complete!\n", + ] + for result in results: + await asyncio.sleep(0.01) + yield result + + +@function_tool +async def non_streaming_tool() -> str: + """A traditional non-streaming tool for comparison.""" + await asyncio.sleep(0.01) + return "Non-streaming result" + + +@pytest.mark.asyncio +async def test_basic_streaming_tool(): + """Test that a streaming tool emits ToolOutputStreamEvent events.""" + model = FakeModel() + agent = Agent( + name="StreamingAgent", + model=model, + tools=[streaming_counter], + ) + + model.add_multiple_turn_outputs( + [ + # First turn: call the streaming tool + [get_function_tool_call("streaming_counter", "{}")], + # Second turn: text message + [get_text_message("done")], + ] + ) + + result = Runner.run_streamed(agent, input="Count to 5") + + # Collect all events + events = [] + async for event in result.stream_events(): + events.append(event) + + # Verify we received ToolOutputStreamEvent events + tool_stream_events = [e for e in events if e.type == "tool_output_stream_event"] + assert len(tool_stream_events) > 0, "Should have received streaming events" + + # Verify all streaming events are ToolOutputStreamEvent instances + for event in tool_stream_events: + assert isinstance(event, ToolOutputStreamEvent) + assert event.tool_name == "streaming_counter" + assert event.tool_call_id is not None + assert event.delta is not None + + # Verify we also received the final tool_output event + tool_output_events = [ + e for e in events if e.type == "run_item_stream_event" and e.name == "tool_output" + ] + assert len(tool_output_events) == 1, "Should have received final tool output event" + + # Verify the final output contains all chunks combined + assert result.final_output is not None + + +@pytest.mark.asyncio +async def test_streaming_tool_with_arguments(): + """Test a streaming tool that accepts arguments.""" + model = FakeModel() + agent = Agent( + name="SearchAgent", + model=model, + tools=[streaming_search], + ) + + model.add_multiple_turn_outputs( + [ + # First turn: call the streaming search tool + [get_function_tool_call("streaming_search", '{"query": "AI"}')], + # Second turn: text message + [get_text_message("Search completed")], + ] + ) + + result = Runner.run_streamed(agent, input="Search for AI") + + # Collect streaming events + stream_deltas = [] + async for event in result.stream_events(): + if event.type == "tool_output_stream_event": + assert isinstance(event, ToolOutputStreamEvent) + stream_deltas.append(event.delta) + + # Verify we received multiple chunks + assert len(stream_deltas) > 0, "Should have received streaming output" + + # Verify the accumulated output makes sense + full_output = "".join(stream_deltas) + assert "Searching for 'AI'" in full_output + assert "Found result" in full_output + assert "Search complete!" in full_output + + +@pytest.mark.asyncio +async def test_mixed_streaming_and_non_streaming_tools(): + """Test that both streaming and non-streaming tools work together.""" + model = FakeModel() + agent = Agent( + name="MixedAgent", + model=model, + tools=[streaming_counter, non_streaming_tool], + ) + + model.add_multiple_turn_outputs( + [ + # First turn: call both tools + [ + get_function_tool_call("streaming_counter", "{}"), + get_function_tool_call("non_streaming_tool", "{}"), + ], + # Second turn: text message + [get_text_message("Both tools completed")], + ] + ) + + result = Runner.run_streamed(agent, input="Run both tools") + + # Collect events + streaming_events = [] + tool_output_events = [] + + async for event in result.stream_events(): + if event.type == "tool_output_stream_event": + streaming_events.append(event) + elif event.type == "run_item_stream_event" and event.name == "tool_output": + tool_output_events.append(event) + + # Verify only the streaming tool emitted stream events + assert len(streaming_events) > 0, "Should have streaming events from streaming_counter" + assert all(e.tool_name == "streaming_counter" for e in streaming_events), ( + "Streaming events should only be from streaming tool" + ) + + # Verify both tools produced final outputs + assert len(tool_output_events) == 2, "Should have 2 tool output events" + + +@pytest.mark.asyncio +async def test_streaming_tool_accumulation(): + """Test that streaming tool output is properly accumulated.""" + model = FakeModel() + agent = Agent( + name="AccumulationAgent", + model=model, + tools=[streaming_counter], + ) + + model.add_multiple_turn_outputs( + [ + [get_function_tool_call("streaming_counter", "{}")], + [get_text_message("done")], + ] + ) + + result = Runner.run_streamed(agent, input="Count") + + # Collect all deltas + accumulated = [] + final_output = None + + async for event in result.stream_events(): + if event.type == "tool_output_stream_event": + assert isinstance(event, ToolOutputStreamEvent) + accumulated.append(event.delta) + elif event.type == "run_item_stream_event" and event.name == "tool_output": + assert isinstance(event.item, ToolCallOutputItem) + final_output = str(event.item.output) + + # Verify accumulated output matches final output + accumulated_str = "".join(accumulated) + assert accumulated_str == final_output, "Accumulated output should match final output" + assert accumulated_str == "1... 2... 3... 4... 5... ", "Output should be correct" + + +@pytest.mark.asyncio +async def test_streaming_tool_in_non_streaming_mode(): + """Test that streaming tools work correctly in non-streaming mode.""" + model = FakeModel() + agent = Agent( + name="NonStreamingAgent", + model=model, + tools=[streaming_counter], + ) + + model.add_multiple_turn_outputs( + [ + [get_function_tool_call("streaming_counter", "{}")], + [get_text_message("done")], + ] + ) + + # Use regular run instead of run_streamed + result = await Runner.run(agent, input="Count") + + # The result should still work, just without streaming events + assert result.final_output is not None + # The tool should have been executed successfully + tool_outputs = [item for item in result.new_items if isinstance(item, ToolCallOutputItem)] + assert len(tool_outputs) == 1 + assert str(tool_outputs[0].output) == "1... 2... 3... 4... 5... " + + +@pytest.mark.asyncio +async def test_streaming_tool_agent_association(): + """Test that streaming events contain correct agent information.""" + model = FakeModel() + agent = Agent( + name="TestAgent", + model=model, + tools=[streaming_counter], + ) + + model.add_multiple_turn_outputs( + [ + [get_function_tool_call("streaming_counter", "{}")], + [get_text_message("done")], + ] + ) + + result = Runner.run_streamed(agent, input="Count") + + async for event in result.stream_events(): + if event.type == "tool_output_stream_event": + assert isinstance(event, ToolOutputStreamEvent) + assert event.agent.name == "TestAgent" + assert event.agent == agent