From 4fdb27d5d7c9ad2b137cbf5cba66bfe213e295de Mon Sep 17 00:00:00 2001 From: David Sanchez <64162682+dsfaccini@users.noreply.github.com> Date: Mon, 10 Nov 2025 20:11:57 -0500 Subject: [PATCH 01/12] add dump_messages method to vercel ai adapter --- .../pydantic_ai/ui/vercel_ai/_adapter.py | 225 ++++++++++- tests/test_vercel_ai.py | 375 +++++++++++++++++- 2 files changed, 597 insertions(+), 3 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py b/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py index 7eee52c419..1c7ddfa511 100644 --- a/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py +++ b/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py @@ -2,7 +2,8 @@ from __future__ import annotations -from collections.abc import Sequence +import uuid +from collections.abc import Callable, Sequence from dataclasses import dataclass from functools import cached_property from typing import TYPE_CHECKING @@ -12,6 +13,7 @@ from ...messages import ( AudioUrl, + BaseToolCallPart, BinaryContent, BuiltinToolCallPart, BuiltinToolReturnPart, @@ -19,6 +21,8 @@ FilePart, ImageUrl, ModelMessage, + ModelRequest, + ModelResponse, RetryPromptPart, SystemPromptPart, TextPart, @@ -35,6 +39,9 @@ from ._event_stream import VercelAIEventStream from .request_types import ( DataUIPart, + DynamicToolInputAvailablePart, + DynamicToolOutputAvailablePart, + DynamicToolOutputErrorPart, DynamicToolUIPart, FileUIPart, ReasoningUIPart, @@ -43,10 +50,12 @@ SourceUrlUIPart, StepStartUIPart, TextUIPart, + ToolInputAvailablePart, ToolOutputAvailablePart, ToolOutputErrorPart, ToolUIPart, UIMessage, + UIMessagePart, ) from .response_types import BaseChunk @@ -57,6 +66,7 @@ __all__ = ['VercelAIAdapter'] request_data_ta: TypeAdapter[RequestData] = TypeAdapter(RequestData) +BUILTIN_TOOL_CALL_ID_PREFIX = 'pyd_ai_builtin' @dataclass @@ -197,3 +207,216 @@ def load_messages(cls, messages: Sequence[UIMessage]) -> list[ModelMessage]: # assert_never(msg.role) return builder.messages + + @classmethod + def dump_messages( # noqa: C901 + cls, + messages: Sequence[ModelMessage], + *, + _id_generator: Callable[[], str] | None = None, + ) -> list[UIMessage]: + """Transform Pydantic AI messages into Vercel AI messages. + + Args: + messages: A sequence of ModelMessage objects to convert + _id_generator: Optional ID generator function for testing. If not provided, uses uuid.uuid4(). + + Returns: + A list of UIMessage objects in Vercel AI format + """ + + def _message_id_generator() -> str: + """Generate a message ID.""" + return _id_generator() if _id_generator is not None else str(uuid.uuid4()) + + tool_returns: dict[str, ToolReturnPart | BuiltinToolReturnPart] = {} + tool_errors: dict[str, RetryPromptPart] = {} + + for msg in messages: + if isinstance(msg, ModelRequest): + for part in msg.parts: + if isinstance(part, ToolReturnPart | BuiltinToolReturnPart): + tool_returns[part.tool_call_id] = part + elif isinstance(part, RetryPromptPart) and part.tool_name is not None: + tool_errors[part.tool_call_id] = part + + result: list[UIMessage] = [] + + for msg in messages: + if isinstance(msg, ModelRequest): + system_parts: list[SystemPromptPart] = [] + user_parts: list[UserPromptPart | ToolReturnPart | BuiltinToolReturnPart | RetryPromptPart] = [] + + for part in msg.parts: + if isinstance(part, SystemPromptPart): + system_parts.append(part) + elif isinstance(part, UserPromptPart | ToolReturnPart | BuiltinToolReturnPart | RetryPromptPart): + user_parts.append(part) + + if system_parts: + system_ui_parts: list[UIMessagePart] = [ + TextUIPart(text=part.content, state='done') for part in system_parts + ] + result.append(UIMessage(id=_message_id_generator(), role='system', parts=system_ui_parts)) + + # Note: Tool returns and retry prompts don't create user message parts + # They are only used to set the state of tool calls in assistant messages + if user_parts: + user_ui_parts: list[UIMessagePart] = [] + for part in user_parts: + if isinstance(part, UserPromptPart): + user_ui_parts.extend(_convert_user_prompt_part(part)) + elif isinstance(part, ToolReturnPart | BuiltinToolReturnPart | RetryPromptPart): + # Tool returns/errors don't create separate UI parts + # They're merged into the tool call in the assistant message + pass + + if user_ui_parts: + result.append(UIMessage(id=_message_id_generator(), role='user', parts=user_ui_parts)) + + elif isinstance(msg, ModelResponse): + ui_parts: list[UIMessagePart] = [] + text_parts: list[str] = [] + had_interruption = False + + # For builtin tools, returns can be in the same ModelResponse as calls + # Build a local mapping for this message + local_builtin_returns: dict[str, BuiltinToolReturnPart] = {} + for part in msg.parts: + if isinstance(part, BuiltinToolReturnPart): + # Skip builtin tool returns - they're handled by the tool call logic + continue + elif isinstance(part, TextPart): + # If this is the first text after an interruption, prepend separator + if had_interruption: + text_parts.append('\n\n' + part.content) + else: + text_parts.append(part.content) + elif isinstance(part, ThinkingPart): + if text_parts: + ui_parts.append(TextUIPart(text=''.join(text_parts), state='done')) + text_parts = [] + had_interruption = False + ui_parts.append(ReasoningUIPart(text=part.content, state='done')) + elif isinstance(part, FilePart): + if text_parts: + ui_parts.append(TextUIPart(text=''.join(text_parts), state='done')) + text_parts = [] + had_interruption = False + ui_parts.append( + FileUIPart( + url=part.content.data_uri, + media_type=part.content.media_type, + ) + ) + elif isinstance(part, BaseToolCallPart): + if text_parts: + ui_parts.append(TextUIPart(text=''.join(text_parts), state='done')) + text_parts = [] + + # Mark that we had an interruption for next text part + had_interruption = True + + if isinstance(part, BuiltinToolCallPart): + prefixed_id = _make_builtin_tool_call_id(part.provider_name, part.tool_call_id) + # Check local returns first (same message), then global returns (from ModelRequest) + builtin_return = local_builtin_returns.get(part.tool_call_id) or ( + tool_returns.get(part.tool_call_id) + if isinstance(tool_returns.get(part.tool_call_id), BuiltinToolReturnPart) + else None + ) + + if builtin_return: + content = builtin_return.model_response_str() + call_provider_metadata = ( + {'pydantic_ai': {'provider_name': part.provider_name}} + if part.provider_name + else None + ) + ui_parts.append( + ToolOutputAvailablePart( + type=f'tool-{part.tool_name}', + tool_call_id=prefixed_id, + input=part.args_as_json_str(), + output=content, + state='output-available', + provider_executed=True, + call_provider_metadata=call_provider_metadata, + ) + ) + else: + ui_parts.append( + ToolInputAvailablePart( + type=f'tool-{part.tool_name}', + tool_call_id=prefixed_id, + input=part.args_as_json_str(), + state='input-available', + provider_executed=True, + ) + ) + else: + tool_return = tool_returns.get(part.tool_call_id) + tool_error = tool_errors.get(part.tool_call_id) + + if tool_return and isinstance(tool_return, ToolReturnPart): + content = tool_return.model_response_str() + ui_parts.append( + DynamicToolOutputAvailablePart( + tool_name=part.tool_name, + tool_call_id=part.tool_call_id, + input=part.args_as_json_str(), + output=content, + state='output-available', + ) + ) + elif tool_error: + error_text = tool_error.model_response() + ui_parts.append( + DynamicToolOutputErrorPart( + tool_name=part.tool_name, + tool_call_id=part.tool_call_id, + input=part.args_as_json_str(), + error_text=error_text, + state='output-error', + ) + ) + else: + ui_parts.append( + DynamicToolInputAvailablePart( + tool_name=part.tool_name, + tool_call_id=part.tool_call_id, + input=part.args_as_json_str(), + state='input-available', + ) + ) + + if text_parts: + ui_parts.append(TextUIPart(text=''.join(text_parts), state='done')) + + if ui_parts: + result.append(UIMessage(id=_message_id_generator(), role='assistant', parts=ui_parts)) + + return result + + +def _make_builtin_tool_call_id(provider_name: str | None, tool_call_id: str) -> str: + """Create a prefixed tool call ID for builtin tools.""" + return f'{BUILTIN_TOOL_CALL_ID_PREFIX}|{provider_name or ""}|{tool_call_id}' + + +def _convert_user_prompt_part(part: UserPromptPart) -> list[UIMessagePart]: + """Convert a UserPromptPart to a list of UI message parts.""" + ui_parts: list[UIMessagePart] = [] + + if isinstance(part.content, str): + ui_parts.append(TextUIPart(text=part.content, state='done')) + else: + for item in part.content: + if isinstance(item, str): + ui_parts.append(TextUIPart(text=item, state='done')) + elif isinstance(item, BinaryContent): + ui_parts.append(FileUIPart(url=item.data_uri, media_type=item.media_type)) + elif isinstance(item, ImageUrl | AudioUrl | VideoUrl | DocumentUrl): + ui_parts.append(FileUIPart(url=item.url, media_type=item.media_type)) + + return ui_parts diff --git a/tests/test_vercel_ai.py b/tests/test_vercel_ai.py index 085cd38631..8aa626073f 100644 --- a/tests/test_vercel_ai.py +++ b/tests/test_vercel_ai.py @@ -2,6 +2,7 @@ import json from collections.abc import AsyncIterator, MutableMapping +from itertools import count from typing import Any, cast import pytest @@ -48,6 +49,7 @@ from pydantic_ai.ui.vercel_ai import VercelAIAdapter, VercelAIEventStream from pydantic_ai.ui.vercel_ai.request_types import ( DynamicToolOutputAvailablePart, + DynamicToolOutputErrorPart, FileUIPart, ReasoningUIPart, SubmitMessage, @@ -82,6 +84,12 @@ ] +def predictable_id_generator(prefix: str = 'test-id-'): + """Create a predictable ID generator for testing.""" + c = count(1) + return lambda: f'{prefix}{next(c)}' + + @pytest.mark.skipif(not openai_import_successful(), reason='OpenAI not installed') async def test_run(allow_model_requests: None, openai_api_key: str): model = OpenAIResponsesModel('gpt-5', provider=OpenAIProvider(api_key=openai_api_key)) @@ -1832,7 +1840,7 @@ async def test_adapter_load_messages(): UserPromptPart( content=[ 'Here are some files:', - BinaryImage(data=b'fake', media_type='image/png'), + BinaryImage(data=b'fake', media_type='image/png', _identifier='c053ec'), ImageUrl(url='https://example.com/image.png', _media_type='image/png'), VideoUrl(url='https://example.com/video.mp4', _media_type='video/mp4'), AudioUrl(url='https://example.com/audio.mp3', _media_type='audio/mpeg'), @@ -1846,7 +1854,7 @@ async def test_adapter_load_messages(): parts=[ ThinkingPart(content='I should tell the user how nice those files are and share another one'), TextPart(content='Nice files, here is another one:'), - FilePart(content=BinaryImage(data=b'fake', media_type='image/png')), + FilePart(content=BinaryImage(data=b'fake', media_type='image/png', _identifier='c053ec')), ], timestamp=IsDatetime(), ), @@ -1964,3 +1972,366 @@ async def test_adapter_load_messages(): ), ] ) + + +async def test_adapter_dump_messages(): + """Test dumping Pydantic AI messages to Vercel AI format.""" + messages = [ + ModelRequest( + parts=[ + SystemPromptPart(content='You are a helpful assistant.'), + UserPromptPart(content='Hello, world!'), + ] + ), + ModelResponse( + parts=[ + TextPart(content='Hi there!'), + ] + ), + ] + + id_gen = predictable_id_generator() + ui_messages = VercelAIAdapter.dump_messages(messages, _id_generator=id_gen) + + assert ui_messages == snapshot( + [ + UIMessage( + id='test-id-1', + role='system', + parts=[TextUIPart(text='You are a helpful assistant.', state='done')], + ), + UIMessage(id='test-id-2', role='user', parts=[TextUIPart(text='Hello, world!', state='done')]), + UIMessage(id='test-id-3', role='assistant', parts=[TextUIPart(text='Hi there!', state='done')]), + ] + ) + + +async def test_adapter_dump_messages_with_tools(): + """Test dumping messages with tool calls and returns.""" + messages = [ + ModelRequest(parts=[UserPromptPart(content='Search for something')]), + ModelResponse( + parts=[ + TextPart(content='Let me search for that.'), + ToolCallPart( + tool_name='web_search', + args={'query': 'test query'}, + tool_call_id='tool_123', + ), + ] + ), + ModelRequest( + parts=[ + ToolReturnPart( + tool_name='web_search', + content={'results': ['result1', 'result2']}, + tool_call_id='tool_123', + ) + ] + ), + ModelResponse(parts=[TextPart(content='Here are the results.')]), + ] + + id_gen = predictable_id_generator() + ui_messages = VercelAIAdapter.dump_messages(messages, _id_generator=id_gen) + + assert ui_messages == snapshot( + [ + UIMessage( + id='test-id-1', + role='user', + parts=[TextUIPart(text='Search for something', state='done')], + ), + UIMessage( + id='test-id-2', + role='assistant', + parts=[ + TextUIPart(text='Let me search for that.', state='done'), + DynamicToolOutputAvailablePart( + tool_name='web_search', + tool_call_id='tool_123', + input='{"query":"test query"}', + output='{"results":["result1","result2"]}', + state='output-available', + ), + ], + ), + UIMessage( + id='test-id-3', + role='assistant', + parts=[TextUIPart(text='Here are the results.', state='done')], + ), + ] + ) + + +async def test_adapter_dump_messages_with_builtin_tools(): + """Test dumping messages with builtin tool calls.""" + messages = [ + ModelRequest(parts=[UserPromptPart(content='Search for something')]), + ModelResponse( + parts=[ + BuiltinToolCallPart( + tool_name='web_search', + args={'query': 'test'}, + tool_call_id='tool_456', + provider_name='openai', + ), + BuiltinToolReturnPart( + tool_name='web_search', + content={'status': 'completed'}, + tool_call_id='tool_456', + provider_name='openai', + ), + ] + ), + ] + + id_gen = predictable_id_generator() + ui_messages = VercelAIAdapter.dump_messages(messages, _id_generator=id_gen) + + assert ui_messages == snapshot( + [ + UIMessage( + id='test-id-1', + role='user', + parts=[TextUIPart(text='Search for something', state='done')], + ), + UIMessage( + id='test-id-2', + role='assistant', + parts=[ + ToolOutputAvailablePart( + type='tool-web_search', + tool_call_id='pyd_ai_builtin|openai|tool_456', + input='{"query":"test"}', + output='{"status":"completed"}', + state='output-available', + provider_executed=True, + call_provider_metadata={'pydantic_ai': {'provider_name': 'openai'}}, + ) + ], + ), + ] + ) + + +async def test_adapter_dump_messages_with_thinking(): + """Test dumping messages with thinking parts.""" + messages = [ + ModelRequest(parts=[UserPromptPart(content='Tell me something')]), + ModelResponse( + parts=[ + ThinkingPart(content='Let me think about this...'), + TextPart(content='Here is my answer.'), + ] + ), + ] + + id_gen = predictable_id_generator() + ui_messages = VercelAIAdapter.dump_messages(messages, _id_generator=id_gen) + + assert ui_messages == snapshot( + [ + UIMessage( + id='test-id-1', + role='user', + parts=[TextUIPart(text='Tell me something', state='done')], + ), + UIMessage( + id='test-id-2', + role='assistant', + parts=[ + ReasoningUIPart(text='Let me think about this...', state='done'), + TextUIPart(text='Here is my answer.', state='done'), + ], + ), + ] + ) + + +async def test_adapter_dump_messages_with_files(): + """Test dumping messages with file parts.""" + messages = [ + ModelRequest( + parts=[ + UserPromptPart( + content=[ + 'Here is an image:', + BinaryImage(data=b'fake_image', media_type='image/png'), + ImageUrl(url='https://example.com/image.png', media_type='image/png'), + ] + ) + ] + ), + ModelResponse( + parts=[ + TextPart(content='Nice image!'), + FilePart(content=BinaryContent(data=b'response_file', media_type='application/pdf')), + ] + ), + ] + + id_gen = predictable_id_generator() + ui_messages = VercelAIAdapter.dump_messages(messages, _id_generator=id_gen) + + # Check user message with files + assert ui_messages[0].role == 'user' + assert len(ui_messages[0].parts) == 3 + assert isinstance(ui_messages[0].parts[0], TextUIPart) + assert isinstance(ui_messages[0].parts[1], FileUIPart) + assert ui_messages[0].parts[1].url.startswith('data:image/png;base64,') + assert isinstance(ui_messages[0].parts[2], FileUIPart) + assert ui_messages[0].parts[2].url == 'https://example.com/image.png' + + # Check assistant message with file + assert ui_messages[1].role == 'assistant' + assert isinstance(ui_messages[1].parts[0], TextUIPart) + assert isinstance(ui_messages[1].parts[1], FileUIPart) + assert ui_messages[1].parts[1].url.startswith('data:application/pdf;base64,') + + +async def test_adapter_dump_messages_with_retry(): + """Test dumping messages with retry prompts.""" + messages = [ + ModelRequest(parts=[UserPromptPart(content='Do something')]), + ModelResponse( + parts=[ + ToolCallPart(tool_name='my_tool', args={'arg': 'value'}, tool_call_id='tool_789'), + ] + ), + ModelRequest( + parts=[ + RetryPromptPart( + content='Tool failed with error', + tool_name='my_tool', + tool_call_id='tool_789', + ) + ] + ), + ] + + id_gen = predictable_id_generator() + ui_messages = VercelAIAdapter.dump_messages(messages, _id_generator=id_gen) + + # Check assistant message has tool call with error + assert ui_messages[1].role == 'assistant' + tool_part = ui_messages[1].parts[0] + assert isinstance(tool_part, DynamicToolOutputErrorPart) + assert tool_part.tool_name == 'my_tool' + assert tool_part.state == 'output-error' + assert 'Tool failed with error' in tool_part.error_text + + +async def test_adapter_dump_messages_consecutive_text(): + """Test that consecutive text parts are concatenated correctly.""" + messages = [ + ModelResponse( + parts=[ + TextPart(content='First '), + TextPart(content='second'), + ] + ), + ] + + id_gen = predictable_id_generator() + ui_messages = VercelAIAdapter.dump_messages(messages, _id_generator=id_gen) + + assert ui_messages == snapshot( + [ + UIMessage( + id='test-id-1', + role='assistant', + parts=[TextUIPart(text='First second', state='done')], + ) + ] + ) + + +async def test_adapter_dump_messages_text_with_interruption(): + """Test text concatenation with interruption.""" + messages = [ + ModelResponse( + parts=[ + TextPart(content='Before tool'), + BuiltinToolCallPart( + tool_name='test', + args={}, + tool_call_id='t1', + provider_name='test', + ), + BuiltinToolReturnPart( + tool_name='test', + content='result', + tool_call_id='t1', + provider_name='test', + ), + TextPart(content='After tool'), + ] + ), + ] + + id_gen = predictable_id_generator() + ui_messages = VercelAIAdapter.dump_messages(messages, _id_generator=id_gen) + + assert ui_messages == snapshot( + [ + UIMessage( + id='test-id-1', + role='assistant', + parts=[ + TextUIPart(text='Before tool', state='done'), + ToolOutputAvailablePart( + type='tool-test', + tool_call_id='pyd_ai_builtin|test|t1', + input='{}', + output='result', + state='output-available', + provider_executed=True, + call_provider_metadata={'pydantic_ai': {'provider_name': 'test'}}, + ), + TextUIPart(text='\n\nAfter tool', state='done'), + ], + ) + ] + ) + + +async def test_adapter_dump_load_roundtrip(): + """Test that dump_messages and load_messages are approximately inverse operations.""" + # Create a complex set of messages + original_messages = [ + ModelRequest( + parts=[ + SystemPromptPart(content='System message'), + UserPromptPart(content='User message'), + ] + ), + ModelResponse( + parts=[ + TextPart(content='Response text'), + ToolCallPart(tool_name='tool1', args={'key': 'value'}, tool_call_id='tc1'), + ] + ), + ModelRequest(parts=[ToolReturnPart(tool_name='tool1', content='tool result', tool_call_id='tc1')]), + ModelResponse( + parts=[ + TextPart(content='Final response'), + ] + ), + ] + + # Dump to UI format + id_gen = predictable_id_generator() + ui_messages = VercelAIAdapter.dump_messages(original_messages, _id_generator=id_gen) + + # Load back to Pydantic AI format + reloaded_messages = VercelAIAdapter.load_messages(ui_messages) + + # Check that we have the same number of messages + assert len(reloaded_messages) == len(original_messages) + + # Check message types match + for orig, reloaded in zip(original_messages, reloaded_messages): + assert isinstance(orig, type(reloaded)) + assert len(orig.parts) == len(reloaded.parts) From 1cb60bf5a241236d41a6e535cee50153f731bc66 Mon Sep 17 00:00:00 2001 From: David Sanchez <64162682+dsfaccini@users.noreply.github.com> Date: Mon, 10 Nov 2025 22:36:37 -0500 Subject: [PATCH 02/12] fix broken loop and add tests for coverage --- .../pydantic_ai/ui/vercel_ai/_adapter.py | 28 ++- tests/test_vercel_ai.py | 194 +++++++++++++++++- 2 files changed, 208 insertions(+), 14 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py b/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py index 1c7ddfa511..1e35306580 100644 --- a/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py +++ b/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py @@ -2,6 +2,7 @@ from __future__ import annotations +import json import uuid from collections.abc import Callable, Sequence from dataclasses import dataclass @@ -151,8 +152,15 @@ def load_messages(cls, messages: Sequence[UIMessage]) -> list[ModelMessage]: # builtin_tool = part.provider_executed tool_call_id = part.tool_call_id + args = part.input + if isinstance(args, str): + try: + args = json.loads(args) + except json.JSONDecodeError: + pass + if builtin_tool: call_part = BuiltinToolCallPart(tool_name=tool_name, tool_call_id=tool_call_id, args=args) builder.add(call_part) @@ -250,7 +258,9 @@ def _message_id_generator() -> str: for part in msg.parts: if isinstance(part, SystemPromptPart): system_parts.append(part) - elif isinstance(part, UserPromptPart | ToolReturnPart | BuiltinToolReturnPart | RetryPromptPart): + elif isinstance( # pragma: no branch - All ModelRequest parts are covered + part, UserPromptPart | ToolReturnPart | BuiltinToolReturnPart | RetryPromptPart + ): user_parts.append(part) if system_parts: @@ -261,7 +271,7 @@ def _message_id_generator() -> str: # Note: Tool returns and retry prompts don't create user message parts # They are only used to set the state of tool calls in assistant messages - if user_parts: + if user_parts: # pragma: no branch - A ModelRequest with no user-visible parts is not tested user_ui_parts: list[UIMessagePart] = [] for part in user_parts: if isinstance(part, UserPromptPart): @@ -274,7 +284,9 @@ def _message_id_generator() -> str: if user_ui_parts: result.append(UIMessage(id=_message_id_generator(), role='user', parts=user_ui_parts)) - elif isinstance(msg, ModelResponse): + elif isinstance( # pragma: no branch - All message types are covered (no tests for empty ModelResponse) + msg, ModelResponse + ): ui_parts: list[UIMessagePart] = [] text_parts: list[str] = [] had_interruption = False @@ -282,6 +294,10 @@ def _message_id_generator() -> str: # For builtin tools, returns can be in the same ModelResponse as calls # Build a local mapping for this message local_builtin_returns: dict[str, BuiltinToolReturnPart] = {} + for part in msg.parts: + if isinstance(part, BuiltinToolReturnPart): + local_builtin_returns[part.tool_call_id] = part + for part in msg.parts: if isinstance(part, BuiltinToolReturnPart): # Skip builtin tool returns - they're handled by the tool call logic @@ -309,7 +325,7 @@ def _message_id_generator() -> str: media_type=part.content.media_type, ) ) - elif isinstance(part, BaseToolCallPart): + elif isinstance(part, BaseToolCallPart): # pragma: no branch - All assistant part types are covered if text_parts: ui_parts.append(TextUIPart(text=''.join(text_parts), state='done')) text_parts = [] @@ -344,7 +360,7 @@ def _message_id_generator() -> str: call_provider_metadata=call_provider_metadata, ) ) - else: + else: # pragma: no cover - Builtin tool call without a return is not tested ui_parts.append( ToolInputAvailablePart( type=f'tool-{part.tool_name}', @@ -393,7 +409,7 @@ def _message_id_generator() -> str: if text_parts: ui_parts.append(TextUIPart(text=''.join(text_parts), state='done')) - if ui_parts: + if ui_parts: # pragma: no branch - An empty ModelResponse is not tested result.append(UIMessage(id=_message_id_generator(), role='assistant', parts=ui_parts)) return result diff --git a/tests/test_vercel_ai.py b/tests/test_vercel_ai.py index 8aa626073f..26fc46dafc 100644 --- a/tests/test_vercel_ai.py +++ b/tests/test_vercel_ai.py @@ -48,6 +48,7 @@ from pydantic_ai.run import AgentRunResult from pydantic_ai.ui.vercel_ai import VercelAIAdapter, VercelAIEventStream from pydantic_ai.ui.vercel_ai.request_types import ( + DynamicToolInputAvailablePart, DynamicToolOutputAvailablePart, DynamicToolOutputErrorPart, FileUIPart, @@ -2299,7 +2300,6 @@ async def test_adapter_dump_messages_text_with_interruption(): async def test_adapter_dump_load_roundtrip(): """Test that dump_messages and load_messages are approximately inverse operations.""" - # Create a complex set of messages original_messages = [ ModelRequest( parts=[ @@ -2321,17 +2321,195 @@ async def test_adapter_dump_load_roundtrip(): ), ] - # Dump to UI format id_gen = predictable_id_generator() ui_messages = VercelAIAdapter.dump_messages(original_messages, _id_generator=id_gen) # Load back to Pydantic AI format reloaded_messages = VercelAIAdapter.load_messages(ui_messages) - # Check that we have the same number of messages - assert len(reloaded_messages) == len(original_messages) + # Can't use `assert reloaded_messages == original_messages` because the timestamps will be different + assert reloaded_messages == snapshot( + [ + ModelRequest( + parts=[ + SystemPromptPart(content='System message', timestamp=IsDatetime()), + UserPromptPart(content='User message', timestamp=IsDatetime()), + ] + ), + ModelResponse( + parts=[ + TextPart(content='Response text'), + ToolCallPart(tool_name='tool1', args={'key': 'value'}, tool_call_id='tc1'), + ], + timestamp=IsDatetime(), + ), + ModelRequest( + parts=[ + ToolReturnPart(tool_name='tool1', content='tool result', tool_call_id='tc1', timestamp=IsDatetime()) + ] + ), + ModelResponse(parts=[TextPart(content='Final response')], timestamp=IsDatetime()), + ] + ) + + +async def test_adapter_dump_messages_text_before_thinking(): + """Test dumping messages where text precedes a thinking part.""" + messages = [ + ModelResponse( + parts=[ + TextPart(content='Let me check.'), + ThinkingPart(content='Okay, I am checking now.'), + ] + ), + ] + + id_gen = predictable_id_generator() + ui_messages = VercelAIAdapter.dump_messages(messages, _id_generator=id_gen) + + assert ui_messages == snapshot( + [ + UIMessage( + id='test-id-1', + role='assistant', + parts=[ + TextUIPart(text='Let me check.', state='done'), + ReasoningUIPart(text='Okay, I am checking now.', state='done'), + ], + ), + ] + ) + + +async def test_adapter_dump_messages_tool_call_without_return(): + """Test dumping messages with a tool call that has no corresponding result.""" + messages = [ + ModelResponse( + parts=[ + ToolCallPart( + tool_name='get_weather', + args={'city': 'New York'}, + tool_call_id='tool_abc', + ), + ] + ), + ] + + id_gen = predictable_id_generator() + ui_messages = VercelAIAdapter.dump_messages(messages, _id_generator=id_gen) + + assert ui_messages == snapshot( + [ + UIMessage( + id='test-id-1', + role='assistant', + parts=[ + DynamicToolInputAvailablePart( + tool_name='get_weather', + tool_call_id='tool_abc', + input='{"city":"New York"}', + state='input-available', + ) + ], + ), + ] + ) + + +async def test_adapter_dump_messages_builtin_tool_with_delayed_return(): + """Test a builtin tool call where the return is in a subsequent message.""" + messages = [ + ModelResponse( + parts=[ + BuiltinToolCallPart( + tool_name='web_search', + args={'query': 'pydantic-ai'}, + tool_call_id='tool_def', + provider_name='google', + ) + ] + ), + ModelRequest( + parts=[ + BuiltinToolReturnPart( # pyright: ignore[reportArgumentType] + tool_name='web_search', + content={'status': 'completed'}, + tool_call_id='tool_def', + provider_name='google', + ) + ] + ), + ] - # Check message types match - for orig, reloaded in zip(original_messages, reloaded_messages): - assert isinstance(orig, type(reloaded)) - assert len(orig.parts) == len(reloaded.parts) + id_gen = predictable_id_generator() + ui_messages = VercelAIAdapter.dump_messages(messages, _id_generator=id_gen) + + # Note: The ModelRequest with the BuiltinToolReturnPart does not produce a UIMessage, + # because tool returns are only used to set the state of the original tool call. + assert ui_messages == snapshot( + [ + UIMessage( + id='test-id-1', + role='assistant', + parts=[ + ToolOutputAvailablePart( + type='tool-web_search', + tool_call_id='pyd_ai_builtin|google|tool_def', + input='{"query":"pydantic-ai"}', + output='{"status":"completed"}', + state='output-available', + provider_executed=True, + call_provider_metadata={'pydantic_ai': {'provider_name': 'google'}}, + ) + ], + ), + ] + ) + + +async def test_adapter_dump_messages_assistant_starts_with_tool(): + """Test an assistant message that starts with a tool call instead of text.""" + messages = [ + ModelResponse( + parts=[ + ToolCallPart(tool_name='t', args={}, tool_call_id='tc1'), + TextPart(content='Some text'), + ] + ) + ] + id_gen = predictable_id_generator() + ui_messages = VercelAIAdapter.dump_messages(messages, _id_generator=id_gen) + assert ui_messages == snapshot( + [ + UIMessage( + id='test-id-1', + role='assistant', + parts=[ + DynamicToolInputAvailablePart(tool_name='t', tool_call_id='tc1', input='{}'), + TextUIPart( + # interruption logic preprends two new newlines + text="""\ + + +Some text\ +""", + state='done', + ), + ], + ) + ] + ) + + +async def test_convert_user_prompt_part_without_urls(): + """Test converting a user prompt with only text and binary content.""" + from pydantic_ai.ui.vercel_ai._adapter import _convert_user_prompt_part # pyright: ignore[reportPrivateUsage] + + part = UserPromptPart(content=['text part', BinaryContent(data=b'data', media_type='application/pdf')]) + ui_parts = _convert_user_prompt_part(part) + assert ui_parts == snapshot( + [ + TextUIPart(text='text part', state='done'), + FileUIPart(media_type='application/pdf', url='data:application/pdf;base64,ZGF0YQ=='), + ] + ) From 261bc3ab943f936546129eb3aa3fdf475b1b471e Mon Sep 17 00:00:00 2001 From: David Sanchez <64162682+dsfaccini@users.noreply.github.com> Date: Mon, 10 Nov 2025 23:02:21 -0500 Subject: [PATCH 03/12] add missing tests for coverage --- .../pydantic_ai/ui/vercel_ai/_adapter.py | 4 +- tests/test_vercel_ai.py | 43 +++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py b/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py index 1e35306580..9d799df64e 100644 --- a/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py +++ b/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py @@ -432,7 +432,9 @@ def _convert_user_prompt_part(part: UserPromptPart) -> list[UIMessagePart]: ui_parts.append(TextUIPart(text=item, state='done')) elif isinstance(item, BinaryContent): ui_parts.append(FileUIPart(url=item.data_uri, media_type=item.media_type)) - elif isinstance(item, ImageUrl | AudioUrl | VideoUrl | DocumentUrl): + elif isinstance( + item, ImageUrl | AudioUrl | VideoUrl | DocumentUrl + ): # pragma: no branch - All content types are covered ui_parts.append(FileUIPart(url=item.url, media_type=item.media_type)) return ui_parts diff --git a/tests/test_vercel_ai.py b/tests/test_vercel_ai.py index 26fc46dafc..a5be44582c 100644 --- a/tests/test_vercel_ai.py +++ b/tests/test_vercel_ai.py @@ -2513,3 +2513,46 @@ async def test_convert_user_prompt_part_without_urls(): FileUIPart(media_type='application/pdf', url='data:application/pdf;base64,ZGF0YQ=='), ] ) + + +async def test_adapter_dump_messages_file_without_text(): + """Test a file part appearing without any preceding text.""" + messages = [ + ModelResponse( + parts=[ + FilePart(content=BinaryContent(data=b'file_data', media_type='image/png')), + ] + ), + ] + + id_gen = predictable_id_generator() + ui_messages = VercelAIAdapter.dump_messages(messages, _id_generator=id_gen) + + assert ui_messages == snapshot( + [ + UIMessage( + id='test-id-1', + role='assistant', + parts=[FileUIPart(media_type='image/png', url='')], + ) + ] + ) + + +async def test_convert_user_prompt_part_only_urls(): + """Test converting a user prompt with only URL content (no binary).""" + from pydantic_ai.ui.vercel_ai._adapter import _convert_user_prompt_part # pyright: ignore[reportPrivateUsage] + + part = UserPromptPart( + content=[ + ImageUrl(url='https://example.com/img.png', media_type='image/png'), + VideoUrl(url='https://example.com/vid.mp4', media_type='video/mp4'), + ] + ) + ui_parts = _convert_user_prompt_part(part) + assert ui_parts == snapshot( + [ + FileUIPart(media_type='image/png', url='https://example.com/img.png'), + FileUIPart(media_type='video/mp4', url='https://example.com/vid.mp4'), + ] + ) From 3f70b8374e954a74c9a8dd8af8eec36593073fb0 Mon Sep 17 00:00:00 2001 From: David Sanchez <64162682+dsfaccini@users.noreply.github.com> Date: Sun, 16 Nov 2025 20:17:28 -0500 Subject: [PATCH 04/12] wip: remove id generator and BuiltinToolReturnPart - fix tests using dumping and IsStr - add dump_messages to base adapter class --- pydantic_ai_slim/pydantic_ai/ui/_adapter.py | 6 + .../pydantic_ai/ui/ag_ui/_adapter.py | 5 + tests/test_ui.py | 4 + tests/test_vercel_ai.py | 426 +++++++++--------- 4 files changed, 237 insertions(+), 204 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/ui/_adapter.py b/pydantic_ai_slim/pydantic_ai/ui/_adapter.py index 970f06e6ef..f0bcde498b 100644 --- a/pydantic_ai_slim/pydantic_ai/ui/_adapter.py +++ b/pydantic_ai_slim/pydantic_ai/ui/_adapter.py @@ -143,6 +143,12 @@ def load_messages(cls, messages: Sequence[MessageT]) -> list[ModelMessage]: """Transform protocol-specific messages into Pydantic AI messages.""" raise NotImplementedError + @classmethod + @abstractmethod + def dump_messages(cls, messages: Sequence[ModelMessage]) -> list[MessageT]: + """Transform Pydantic AI messages into protocol-specific messages.""" + raise NotImplementedError + @abstractmethod def build_event_stream(self) -> UIEventStream[RunInputT, EventT, AgentDepsT, OutputDataT]: """Build a protocol-specific event stream transformer.""" diff --git a/pydantic_ai_slim/pydantic_ai/ui/ag_ui/_adapter.py b/pydantic_ai_slim/pydantic_ai/ui/ag_ui/_adapter.py index 5d45f50a7b..64bf5d8d0f 100644 --- a/pydantic_ai_slim/pydantic_ai/ui/ag_ui/_adapter.py +++ b/pydantic_ai_slim/pydantic_ai/ui/ag_ui/_adapter.py @@ -109,6 +109,11 @@ def state(self) -> dict[str, Any] | None: """Frontend state from the AG-UI run input.""" return self.run_input.state + @classmethod + def dump_messages(cls, messages: Sequence[ModelMessage]) -> list[Message]: + """Transform Pydantic AI messages into AG-UI messages.""" + raise NotImplementedError('TODO: implement dump_messages method') # TODO: implement dump_messages method + @classmethod def load_messages(cls, messages: Sequence[Message]) -> list[ModelMessage]: """Transform AG-UI messages into Pydantic AI messages.""" diff --git a/tests/test_ui.py b/tests/test_ui.py index 38f9950ad5..93c311afe1 100644 --- a/tests/test_ui.py +++ b/tests/test_ui.py @@ -87,6 +87,10 @@ class DummyUIAdapter(UIAdapter[DummyUIRunInput, ModelMessage, str, AgentDepsT, O def build_run_input(cls, body: bytes) -> DummyUIRunInput: return DummyUIRunInput.model_validate_json(body) + @classmethod + def dump_messages(cls, messages: Sequence[ModelMessage]) -> list[ModelMessage]: + return list(messages) + @classmethod def load_messages(cls, messages: Sequence[ModelMessage]) -> list[ModelMessage]: return list(messages) diff --git a/tests/test_vercel_ai.py b/tests/test_vercel_ai.py index a5be44582c..eff335d73f 100644 --- a/tests/test_vercel_ai.py +++ b/tests/test_vercel_ai.py @@ -48,7 +48,6 @@ from pydantic_ai.run import AgentRunResult from pydantic_ai.ui.vercel_ai import VercelAIAdapter, VercelAIEventStream from pydantic_ai.ui.vercel_ai.request_types import ( - DynamicToolInputAvailablePart, DynamicToolOutputAvailablePart, DynamicToolOutputErrorPart, FileUIPart, @@ -1991,18 +1990,33 @@ async def test_adapter_dump_messages(): ), ] - id_gen = predictable_id_generator() - ui_messages = VercelAIAdapter.dump_messages(messages, _id_generator=id_gen) + ui_messages = VercelAIAdapter.dump_messages(messages) - assert ui_messages == snapshot( + # we need to dump the BaseModels to dicts for `IsStr` to work properly in snapshot + ui_message_dicts = [msg.model_dump() for msg in ui_messages] + + assert ui_message_dicts == snapshot( [ - UIMessage( - id='test-id-1', - role='system', - parts=[TextUIPart(text='You are a helpful assistant.', state='done')], - ), - UIMessage(id='test-id-2', role='user', parts=[TextUIPart(text='Hello, world!', state='done')]), - UIMessage(id='test-id-3', role='assistant', parts=[TextUIPart(text='Hi there!', state='done')]), + { + 'id': IsStr(), + 'role': 'system', + 'metadata': None, + 'parts': [ + {'type': 'text', 'text': 'You are a helpful assistant.', 'state': 'done', 'provider_metadata': None} + ], + }, + { + 'id': IsStr(), + 'role': 'user', + 'metadata': None, + 'parts': [{'type': 'text', 'text': 'Hello, world!', 'state': 'done', 'provider_metadata': None}], + }, + { + 'id': IsStr(), + 'role': 'assistant', + 'metadata': None, + 'parts': [{'type': 'text', 'text': 'Hi there!', 'state': 'done', 'provider_metadata': None}], + }, ] ) @@ -2033,35 +2047,43 @@ async def test_adapter_dump_messages_with_tools(): ModelResponse(parts=[TextPart(content='Here are the results.')]), ] - id_gen = predictable_id_generator() - ui_messages = VercelAIAdapter.dump_messages(messages, _id_generator=id_gen) + ui_messages = VercelAIAdapter.dump_messages(messages) + ui_message_dicts = [msg.model_dump() for msg in ui_messages] - assert ui_messages == snapshot( + assert ui_message_dicts == snapshot( [ - UIMessage( - id='test-id-1', - role='user', - parts=[TextUIPart(text='Search for something', state='done')], - ), - UIMessage( - id='test-id-2', - role='assistant', - parts=[ - TextUIPart(text='Let me search for that.', state='done'), - DynamicToolOutputAvailablePart( - tool_name='web_search', - tool_call_id='tool_123', - input='{"query":"test query"}', - output='{"results":["result1","result2"]}', - state='output-available', - ), + { + 'id': IsStr(), + 'role': 'user', + 'metadata': None, + 'parts': [{'type': 'text', 'text': 'Search for something', 'state': 'done', 'provider_metadata': None}], + }, + { + 'id': IsStr(), + 'role': 'assistant', + 'metadata': None, + 'parts': [ + {'type': 'text', 'text': 'Let me search for that.', 'state': 'done', 'provider_metadata': None}, + { + 'type': 'dynamic-tool', + 'tool_name': 'web_search', + 'tool_call_id': 'tool_123', + 'state': 'output-available', + 'input': '{"query":"test query"}', + 'output': '{"results":["result1","result2"]}', + 'call_provider_metadata': None, + 'preliminary': None, + }, ], - ), - UIMessage( - id='test-id-3', - role='assistant', - parts=[TextUIPart(text='Here are the results.', state='done')], - ), + }, + { + 'id': IsStr(), + 'role': 'assistant', + 'metadata': None, + 'parts': [ + {'type': 'text', 'text': 'Here are the results.', 'state': 'done', 'provider_metadata': None} + ], + }, ] ) @@ -2088,31 +2110,34 @@ async def test_adapter_dump_messages_with_builtin_tools(): ), ] - id_gen = predictable_id_generator() - ui_messages = VercelAIAdapter.dump_messages(messages, _id_generator=id_gen) + ui_messages = VercelAIAdapter.dump_messages(messages) + ui_message_dicts = [msg.model_dump() for msg in ui_messages] - assert ui_messages == snapshot( + assert ui_message_dicts == snapshot( [ - UIMessage( - id='test-id-1', - role='user', - parts=[TextUIPart(text='Search for something', state='done')], - ), - UIMessage( - id='test-id-2', - role='assistant', - parts=[ - ToolOutputAvailablePart( - type='tool-web_search', - tool_call_id='pyd_ai_builtin|openai|tool_456', - input='{"query":"test"}', - output='{"status":"completed"}', - state='output-available', - provider_executed=True, - call_provider_metadata={'pydantic_ai': {'provider_name': 'openai'}}, - ) + { + 'id': IsStr(), + 'role': 'user', + 'metadata': None, + 'parts': [{'type': 'text', 'text': 'Search for something', 'state': 'done', 'provider_metadata': None}], + }, + { + 'id': IsStr(), + 'role': 'assistant', + 'metadata': None, + 'parts': [ + { + 'type': 'tool-web_search', + 'tool_call_id': 'pyd_ai_builtin|openai|tool_456', + 'state': 'output-available', + 'input': '{"query":"test"}', + 'output': '{"status":"completed"}', + 'provider_executed': True, + 'call_provider_metadata': {'pydantic_ai': {'provider_name': 'openai'}}, + 'preliminary': None, + } ], - ), + }, ] ) @@ -2129,24 +2154,31 @@ async def test_adapter_dump_messages_with_thinking(): ), ] - id_gen = predictable_id_generator() - ui_messages = VercelAIAdapter.dump_messages(messages, _id_generator=id_gen) + ui_messages = VercelAIAdapter.dump_messages(messages) + ui_message_dicts = [msg.model_dump() for msg in ui_messages] - assert ui_messages == snapshot( + assert ui_message_dicts == snapshot( [ - UIMessage( - id='test-id-1', - role='user', - parts=[TextUIPart(text='Tell me something', state='done')], - ), - UIMessage( - id='test-id-2', - role='assistant', - parts=[ - ReasoningUIPart(text='Let me think about this...', state='done'), - TextUIPart(text='Here is my answer.', state='done'), + { + 'id': IsStr(), + 'role': 'user', + 'metadata': None, + 'parts': [{'type': 'text', 'text': 'Tell me something', 'state': 'done', 'provider_metadata': None}], + }, + { + 'id': IsStr(), + 'role': 'assistant', + 'metadata': None, + 'parts': [ + { + 'type': 'reasoning', + 'text': 'Let me think about this...', + 'state': 'done', + 'provider_metadata': None, + }, + {'type': 'text', 'text': 'Here is my answer.', 'state': 'done', 'provider_metadata': None}, ], - ), + }, ] ) @@ -2173,8 +2205,7 @@ async def test_adapter_dump_messages_with_files(): ), ] - id_gen = predictable_id_generator() - ui_messages = VercelAIAdapter.dump_messages(messages, _id_generator=id_gen) + ui_messages = VercelAIAdapter.dump_messages(messages) # Check user message with files assert ui_messages[0].role == 'user' @@ -2212,8 +2243,7 @@ async def test_adapter_dump_messages_with_retry(): ), ] - id_gen = predictable_id_generator() - ui_messages = VercelAIAdapter.dump_messages(messages, _id_generator=id_gen) + ui_messages = VercelAIAdapter.dump_messages(messages) # Check assistant message has tool call with error assert ui_messages[1].role == 'assistant' @@ -2235,16 +2265,17 @@ async def test_adapter_dump_messages_consecutive_text(): ), ] - id_gen = predictable_id_generator() - ui_messages = VercelAIAdapter.dump_messages(messages, _id_generator=id_gen) + ui_messages = VercelAIAdapter.dump_messages(messages) + ui_message_dicts = [msg.model_dump() for msg in ui_messages] - assert ui_messages == snapshot( + assert ui_message_dicts == snapshot( [ - UIMessage( - id='test-id-1', - role='assistant', - parts=[TextUIPart(text='First second', state='done')], - ) + { + 'id': IsStr(), + 'role': 'assistant', + 'metadata': None, + 'parts': [{'type': 'text', 'text': 'First second', 'state': 'done', 'provider_metadata': None}], + } ] ) @@ -2272,28 +2303,39 @@ async def test_adapter_dump_messages_text_with_interruption(): ), ] - id_gen = predictable_id_generator() - ui_messages = VercelAIAdapter.dump_messages(messages, _id_generator=id_gen) + ui_messages = VercelAIAdapter.dump_messages(messages) + ui_message_dicts = [msg.model_dump() for msg in ui_messages] - assert ui_messages == snapshot( + assert ui_message_dicts == snapshot( [ - UIMessage( - id='test-id-1', - role='assistant', - parts=[ - TextUIPart(text='Before tool', state='done'), - ToolOutputAvailablePart( - type='tool-test', - tool_call_id='pyd_ai_builtin|test|t1', - input='{}', - output='result', - state='output-available', - provider_executed=True, - call_provider_metadata={'pydantic_ai': {'provider_name': 'test'}}, - ), - TextUIPart(text='\n\nAfter tool', state='done'), + { + 'id': IsStr(), + 'role': 'assistant', + 'metadata': None, + 'parts': [ + {'type': 'text', 'text': 'Before tool', 'state': 'done', 'provider_metadata': None}, + { + 'type': 'tool-test', + 'tool_call_id': 'pyd_ai_builtin|test|t1', + 'state': 'output-available', + 'input': '{}', + 'output': 'result', + 'provider_executed': True, + 'call_provider_metadata': {'pydantic_ai': {'provider_name': 'test'}}, + 'preliminary': None, + }, + { + 'type': 'text', + 'text': """\ + + +After tool\ +""", + 'state': 'done', + 'provider_metadata': None, + }, ], - ) + } ] ) @@ -2321,8 +2363,7 @@ async def test_adapter_dump_load_roundtrip(): ), ] - id_gen = predictable_id_generator() - ui_messages = VercelAIAdapter.dump_messages(original_messages, _id_generator=id_gen) + ui_messages = VercelAIAdapter.dump_messages(original_messages) # Load back to Pydantic AI format reloaded_messages = VercelAIAdapter.load_messages(ui_messages) @@ -2364,19 +2405,25 @@ async def test_adapter_dump_messages_text_before_thinking(): ), ] - id_gen = predictable_id_generator() - ui_messages = VercelAIAdapter.dump_messages(messages, _id_generator=id_gen) + ui_messages = VercelAIAdapter.dump_messages(messages) + ui_message_dicts = [msg.model_dump() for msg in ui_messages] - assert ui_messages == snapshot( + assert ui_message_dicts == snapshot( [ - UIMessage( - id='test-id-1', - role='assistant', - parts=[ - TextUIPart(text='Let me check.', state='done'), - ReasoningUIPart(text='Okay, I am checking now.', state='done'), + { + 'id': IsStr(), + 'role': 'assistant', + 'metadata': None, + 'parts': [ + {'type': 'text', 'text': 'Let me check.', 'state': 'done', 'provider_metadata': None}, + { + 'type': 'reasoning', + 'text': 'Okay, I am checking now.', + 'state': 'done', + 'provider_metadata': None, + }, ], - ), + } ] ) @@ -2395,74 +2442,26 @@ async def test_adapter_dump_messages_tool_call_without_return(): ), ] - id_gen = predictable_id_generator() - ui_messages = VercelAIAdapter.dump_messages(messages, _id_generator=id_gen) - - assert ui_messages == snapshot( - [ - UIMessage( - id='test-id-1', - role='assistant', - parts=[ - DynamicToolInputAvailablePart( - tool_name='get_weather', - tool_call_id='tool_abc', - input='{"city":"New York"}', - state='input-available', - ) - ], - ), - ] - ) - - -async def test_adapter_dump_messages_builtin_tool_with_delayed_return(): - """Test a builtin tool call where the return is in a subsequent message.""" - messages = [ - ModelResponse( - parts=[ - BuiltinToolCallPart( - tool_name='web_search', - args={'query': 'pydantic-ai'}, - tool_call_id='tool_def', - provider_name='google', - ) - ] - ), - ModelRequest( - parts=[ - BuiltinToolReturnPart( # pyright: ignore[reportArgumentType] - tool_name='web_search', - content={'status': 'completed'}, - tool_call_id='tool_def', - provider_name='google', - ) - ] - ), - ] - - id_gen = predictable_id_generator() - ui_messages = VercelAIAdapter.dump_messages(messages, _id_generator=id_gen) + ui_messages = VercelAIAdapter.dump_messages(messages) + ui_message_dicts = [msg.model_dump() for msg in ui_messages] - # Note: The ModelRequest with the BuiltinToolReturnPart does not produce a UIMessage, - # because tool returns are only used to set the state of the original tool call. - assert ui_messages == snapshot( + assert ui_message_dicts == snapshot( [ - UIMessage( - id='test-id-1', - role='assistant', - parts=[ - ToolOutputAvailablePart( - type='tool-web_search', - tool_call_id='pyd_ai_builtin|google|tool_def', - input='{"query":"pydantic-ai"}', - output='{"status":"completed"}', - state='output-available', - provider_executed=True, - call_provider_metadata={'pydantic_ai': {'provider_name': 'google'}}, - ) + { + 'id': IsStr(), + 'role': 'assistant', + 'metadata': None, + 'parts': [ + { + 'type': 'dynamic-tool', + 'tool_name': 'get_weather', + 'tool_call_id': 'tool_abc', + 'state': 'input-available', + 'input': '{"city":"New York"}', + 'call_provider_metadata': None, + } ], - ), + } ] ) @@ -2477,26 +2476,36 @@ async def test_adapter_dump_messages_assistant_starts_with_tool(): ] ) ] - id_gen = predictable_id_generator() - ui_messages = VercelAIAdapter.dump_messages(messages, _id_generator=id_gen) - assert ui_messages == snapshot( + ui_messages = VercelAIAdapter.dump_messages(messages) + + ui_message_dicts = [msg.model_dump() for msg in ui_messages] + assert ui_message_dicts == snapshot( [ - UIMessage( - id='test-id-1', - role='assistant', - parts=[ - DynamicToolInputAvailablePart(tool_name='t', tool_call_id='tc1', input='{}'), - TextUIPart( - # interruption logic preprends two new newlines - text="""\ + { + 'id': IsStr(), + 'role': 'assistant', + 'metadata': None, + 'parts': [ + { + 'type': 'dynamic-tool', + 'tool_name': 't', + 'tool_call_id': 'tc1', + 'state': 'input-available', + 'input': '{}', + 'call_provider_metadata': None, + }, + { + 'type': 'text', + 'text': """\ Some text\ """, - state='done', - ), + 'state': 'done', + 'provider_metadata': None, + }, ], - ) + } ] ) @@ -2525,16 +2534,25 @@ async def test_adapter_dump_messages_file_without_text(): ), ] - id_gen = predictable_id_generator() - ui_messages = VercelAIAdapter.dump_messages(messages, _id_generator=id_gen) + ui_messages = VercelAIAdapter.dump_messages(messages) + ui_message_dicts = [msg.model_dump() for msg in ui_messages] - assert ui_messages == snapshot( + assert ui_message_dicts == snapshot( [ - UIMessage( - id='test-id-1', - role='assistant', - parts=[FileUIPart(media_type='image/png', url='')], - ) + { + 'id': IsStr(), + 'role': 'assistant', + 'metadata': None, + 'parts': [ + { + 'type': 'file', + 'media_type': 'image/png', + 'filename': None, + 'url': '', + 'provider_metadata': None, + } + ], + } ] ) From 4870dd46562dca6a1e5ea5ba708c16aac56430fe Mon Sep 17 00:00:00 2001 From: David Sanchez <64162682+dsfaccini@users.noreply.github.com> Date: Sun, 23 Nov 2025 10:48:42 -0500 Subject: [PATCH 05/12] refactor: simplify dump_messages method and remove unused id generator --- .../pydantic_ai/ui/vercel_ai/_adapter.py | 109 +++++--------- tests/test_vercel_ai.py | 141 +++++++++++------- 2 files changed, 124 insertions(+), 126 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py b/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py index 9d799df64e..b2be306031 100644 --- a/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py +++ b/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py @@ -4,7 +4,7 @@ import json import uuid -from collections.abc import Callable, Sequence +from collections.abc import Sequence from dataclasses import dataclass from functools import cached_property from typing import TYPE_CHECKING @@ -220,8 +220,6 @@ def load_messages(cls, messages: Sequence[UIMessage]) -> list[ModelMessage]: # def dump_messages( # noqa: C901 cls, messages: Sequence[ModelMessage], - *, - _id_generator: Callable[[], str] | None = None, ) -> list[UIMessage]: """Transform Pydantic AI messages into Vercel AI messages. @@ -235,61 +233,48 @@ def dump_messages( # noqa: C901 def _message_id_generator() -> str: """Generate a message ID.""" - return _id_generator() if _id_generator is not None else str(uuid.uuid4()) + return uuid.uuid4().hex - tool_returns: dict[str, ToolReturnPart | BuiltinToolReturnPart] = {} + tool_returns: dict[str, ToolReturnPart] = {} tool_errors: dict[str, RetryPromptPart] = {} for msg in messages: if isinstance(msg, ModelRequest): for part in msg.parts: - if isinstance(part, ToolReturnPart | BuiltinToolReturnPart): + if isinstance(part, ToolReturnPart): tool_returns[part.tool_call_id] = part - elif isinstance(part, RetryPromptPart) and part.tool_name is not None: + elif isinstance(part, RetryPromptPart) and part.tool_call_id: tool_errors[part.tool_call_id] = part result: list[UIMessage] = [] for msg in messages: if isinstance(msg, ModelRequest): - system_parts: list[SystemPromptPart] = [] - user_parts: list[UserPromptPart | ToolReturnPart | BuiltinToolReturnPart | RetryPromptPart] = [] + system_ui_parts: list[UIMessagePart] = [] + user_ui_parts: list[UIMessagePart] = [] for part in msg.parts: if isinstance(part, SystemPromptPart): - system_parts.append(part) - elif isinstance( # pragma: no branch - All ModelRequest parts are covered - part, UserPromptPart | ToolReturnPart | BuiltinToolReturnPart | RetryPromptPart - ): - user_parts.append(part) - - if system_parts: - system_ui_parts: list[UIMessagePart] = [ - TextUIPart(text=part.content, state='done') for part in system_parts - ] + system_ui_parts.append(TextUIPart(text=part.content, state='done')) + elif isinstance(part, UserPromptPart): + user_ui_parts.extend(_convert_user_prompt_part(part)) + elif isinstance(part, ToolReturnPart | RetryPromptPart): + # Tool returns/errors don't create separate UI parts + # They're merged into the tool call in the assistant message + pass + else: + assert_never(part) + + if system_ui_parts: result.append(UIMessage(id=_message_id_generator(), role='system', parts=system_ui_parts)) - # Note: Tool returns and retry prompts don't create user message parts - # They are only used to set the state of tool calls in assistant messages - if user_parts: # pragma: no branch - A ModelRequest with no user-visible parts is not tested - user_ui_parts: list[UIMessagePart] = [] - for part in user_parts: - if isinstance(part, UserPromptPart): - user_ui_parts.extend(_convert_user_prompt_part(part)) - elif isinstance(part, ToolReturnPart | BuiltinToolReturnPart | RetryPromptPart): - # Tool returns/errors don't create separate UI parts - # They're merged into the tool call in the assistant message - pass - - if user_ui_parts: - result.append(UIMessage(id=_message_id_generator(), role='user', parts=user_ui_parts)) - - elif isinstance( # pragma: no branch - All message types are covered (no tests for empty ModelResponse) + if user_ui_parts: + result.append(UIMessage(id=_message_id_generator(), role='user', parts=user_ui_parts)) + + elif isinstance( # pragma: no branch msg, ModelResponse ): ui_parts: list[UIMessagePart] = [] - text_parts: list[str] = [] - had_interruption = False # For builtin tools, returns can be in the same ModelResponse as calls # Build a local mapping for this message @@ -303,44 +288,25 @@ def _message_id_generator() -> str: # Skip builtin tool returns - they're handled by the tool call logic continue elif isinstance(part, TextPart): - # If this is the first text after an interruption, prepend separator - if had_interruption: - text_parts.append('\n\n' + part.content) + # Combine consecutive text parts by checking the last UI part + if ui_parts and isinstance(ui_parts[-1], TextUIPart): + last_text = ui_parts[-1] + ui_parts[-1] = last_text.model_copy(update={'text': last_text.text + part.content}) else: - text_parts.append(part.content) + ui_parts.append(TextUIPart(text=part.content, state='done')) elif isinstance(part, ThinkingPart): - if text_parts: - ui_parts.append(TextUIPart(text=''.join(text_parts), state='done')) - text_parts = [] - had_interruption = False ui_parts.append(ReasoningUIPart(text=part.content, state='done')) elif isinstance(part, FilePart): - if text_parts: - ui_parts.append(TextUIPart(text=''.join(text_parts), state='done')) - text_parts = [] - had_interruption = False ui_parts.append( FileUIPart( url=part.content.data_uri, media_type=part.content.media_type, ) ) - elif isinstance(part, BaseToolCallPart): # pragma: no branch - All assistant part types are covered - if text_parts: - ui_parts.append(TextUIPart(text=''.join(text_parts), state='done')) - text_parts = [] - - # Mark that we had an interruption for next text part - had_interruption = True - + elif isinstance(part, BaseToolCallPart): if isinstance(part, BuiltinToolCallPart): prefixed_id = _make_builtin_tool_call_id(part.provider_name, part.tool_call_id) - # Check local returns first (same message), then global returns (from ModelRequest) - builtin_return = local_builtin_returns.get(part.tool_call_id) or ( - tool_returns.get(part.tool_call_id) - if isinstance(tool_returns.get(part.tool_call_id), BuiltinToolReturnPart) - else None - ) + builtin_return = local_builtin_returns.get(part.tool_call_id) if builtin_return: content = builtin_return.model_response_str() @@ -360,7 +326,7 @@ def _message_id_generator() -> str: call_provider_metadata=call_provider_metadata, ) ) - else: # pragma: no cover - Builtin tool call without a return is not tested + else: # pragma: no cover ui_parts.append( ToolInputAvailablePart( type=f'tool-{part.tool_name}', @@ -405,12 +371,13 @@ def _message_id_generator() -> str: state='input-available', ) ) + else: + assert_never(part) - if text_parts: - ui_parts.append(TextUIPart(text=''.join(text_parts), state='done')) - - if ui_parts: # pragma: no branch - An empty ModelResponse is not tested + if ui_parts: # pragma: no branch result.append(UIMessage(id=_message_id_generator(), role='assistant', parts=ui_parts)) + else: + assert_never(msg) return result @@ -432,9 +399,9 @@ def _convert_user_prompt_part(part: UserPromptPart) -> list[UIMessagePart]: ui_parts.append(TextUIPart(text=item, state='done')) elif isinstance(item, BinaryContent): ui_parts.append(FileUIPart(url=item.data_uri, media_type=item.media_type)) - elif isinstance( - item, ImageUrl | AudioUrl | VideoUrl | DocumentUrl - ): # pragma: no branch - All content types are covered + elif isinstance(item, ImageUrl | AudioUrl | VideoUrl | DocumentUrl): ui_parts.append(FileUIPart(url=item.url, media_type=item.media_type)) + else: + assert_never(item) return ui_parts diff --git a/tests/test_vercel_ai.py b/tests/test_vercel_ai.py index edfd778953..777b5fa41e 100644 --- a/tests/test_vercel_ai.py +++ b/tests/test_vercel_ai.py @@ -49,7 +49,6 @@ from pydantic_ai.ui.vercel_ai import VercelAIAdapter, VercelAIEventStream from pydantic_ai.ui.vercel_ai.request_types import ( DynamicToolOutputAvailablePart, - DynamicToolOutputErrorPart, FileUIPart, ReasoningUIPart, SubmitMessage, @@ -2199,20 +2198,49 @@ async def test_adapter_dump_messages_with_files(): ui_messages = VercelAIAdapter.dump_messages(messages) - # Check user message with files - assert ui_messages[0].role == 'user' - assert len(ui_messages[0].parts) == 3 - assert isinstance(ui_messages[0].parts[0], TextUIPart) - assert isinstance(ui_messages[0].parts[1], FileUIPart) - assert ui_messages[0].parts[1].url.startswith('data:image/png;base64,') - assert isinstance(ui_messages[0].parts[2], FileUIPart) - assert ui_messages[0].parts[2].url == 'https://example.com/image.png' + ui_message_dicts = [msg.model_dump() for msg in ui_messages] - # Check assistant message with file - assert ui_messages[1].role == 'assistant' - assert isinstance(ui_messages[1].parts[0], TextUIPart) - assert isinstance(ui_messages[1].parts[1], FileUIPart) - assert ui_messages[1].parts[1].url.startswith('data:application/pdf;base64,') + assert ui_message_dicts == snapshot( + [ + { + 'id': IsStr(), + 'role': 'user', + 'metadata': None, + 'parts': [ + {'type': 'text', 'text': 'Here is an image:', 'state': 'done', 'provider_metadata': None}, + { + 'type': 'file', + 'media_type': 'image/png', + 'filename': None, + 'url': '', + 'provider_metadata': None, + }, + { + 'type': 'file', + 'media_type': 'image/png', + 'filename': None, + 'url': 'https://example.com/image.png', + 'provider_metadata': None, + }, + ], + }, + { + 'id': IsStr(), + 'role': 'assistant', + 'metadata': None, + 'parts': [ + {'type': 'text', 'text': 'Nice image!', 'state': 'done', 'provider_metadata': None}, + { + 'type': 'file', + 'media_type': 'application/pdf', + 'filename': None, + 'url': 'data:application/pdf;base64,cmVzcG9uc2VfZmlsZQ==', + 'provider_metadata': None, + }, + ], + }, + ] + ) async def test_adapter_dump_messages_with_retry(): @@ -2237,13 +2265,38 @@ async def test_adapter_dump_messages_with_retry(): ui_messages = VercelAIAdapter.dump_messages(messages) - # Check assistant message has tool call with error - assert ui_messages[1].role == 'assistant' - tool_part = ui_messages[1].parts[0] - assert isinstance(tool_part, DynamicToolOutputErrorPart) - assert tool_part.tool_name == 'my_tool' - assert tool_part.state == 'output-error' - assert 'Tool failed with error' in tool_part.error_text + ui_message_dicts = [msg.model_dump() for msg in ui_messages] + + assert ui_message_dicts == snapshot( + [ + { + 'id': IsStr(), + 'role': 'user', + 'metadata': None, + 'parts': [{'type': 'text', 'text': 'Do something', 'state': 'done', 'provider_metadata': None}], + }, + { + 'id': IsStr(), + 'role': 'assistant', + 'metadata': None, + 'parts': [ + { + 'type': 'dynamic-tool', + 'tool_name': 'my_tool', + 'tool_call_id': 'tool_789', + 'state': 'output-error', + 'input': '{"arg":"value"}', + 'error_text': """\ +Tool failed with error + +Fix the errors and try again.\ +""", + 'call_provider_metadata': None, + } + ], + }, + ] + ) async def test_adapter_dump_messages_consecutive_text(): @@ -2318,11 +2371,7 @@ async def test_adapter_dump_messages_text_with_interruption(): }, { 'type': 'text', - 'text': """\ - - -After tool\ -""", + 'text': 'After tool', 'state': 'done', 'provider_metadata': None, }, @@ -2357,33 +2406,19 @@ async def test_adapter_dump_load_roundtrip(): ui_messages = VercelAIAdapter.dump_messages(original_messages) + def sync_timestamps(original: list[ModelRequest | ModelResponse], new: list[ModelRequest | ModelResponse]) -> None: + for orig_msg, new_msg in zip(original, new): + for orig_part, new_part in zip(orig_msg.parts, new_msg.parts): + if hasattr(orig_part, 'timestamp') and hasattr(new_part, 'timestamp'): + new_part.timestamp = orig_part.timestamp # pyright: ignore[reportAttributeAccessIssue, reportUnknownMemberType] + if hasattr(orig_msg, 'timestamp') and hasattr(new_msg, 'timestamp'): + new_msg.timestamp = orig_msg.timestamp # pyright: ignore[reportAttributeAccessIssue, reportUnknownMemberType] + # Load back to Pydantic AI format reloaded_messages = VercelAIAdapter.load_messages(ui_messages) + sync_timestamps(original_messages, reloaded_messages) - # Can't use `assert reloaded_messages == original_messages` because the timestamps will be different - assert reloaded_messages == snapshot( - [ - ModelRequest( - parts=[ - SystemPromptPart(content='System message', timestamp=IsDatetime()), - UserPromptPart(content='User message', timestamp=IsDatetime()), - ] - ), - ModelResponse( - parts=[ - TextPart(content='Response text'), - ToolCallPart(tool_name='tool1', args={'key': 'value'}, tool_call_id='tc1'), - ], - timestamp=IsDatetime(), - ), - ModelRequest( - parts=[ - ToolReturnPart(tool_name='tool1', content='tool result', tool_call_id='tc1', timestamp=IsDatetime()) - ] - ), - ModelResponse(parts=[TextPart(content='Final response')], timestamp=IsDatetime()), - ] - ) + assert reloaded_messages == original_messages async def test_adapter_dump_messages_text_before_thinking(): @@ -2488,11 +2523,7 @@ async def test_adapter_dump_messages_assistant_starts_with_tool(): }, { 'type': 'text', - 'text': """\ - - -Some text\ -""", + 'text': 'Some text', 'state': 'done', 'provider_metadata': None, }, From b1272b7f897305e47c2c92a37fd34180f05fdf91 Mon Sep 17 00:00:00 2001 From: David Sanchez <64162682+dsfaccini@users.noreply.github.com> Date: Thu, 27 Nov 2025 13:50:15 -0500 Subject: [PATCH 06/12] test: add unit test for dumping and loading ThinkingPart with metadata --- .../pydantic_ai/ui/vercel_ai/_adapter.py | 49 ++++++++++++---- tests/test_vercel_ai.py | 57 +++++++++++++++++++ 2 files changed, 95 insertions(+), 11 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py b/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py index b2be306031..aca9c418f9 100644 --- a/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py +++ b/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py @@ -7,7 +7,7 @@ from collections.abc import Sequence from dataclasses import dataclass from functools import cached_property -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any, cast from pydantic import TypeAdapter from typing_extensions import assert_never @@ -18,6 +18,7 @@ BinaryContent, BuiltinToolCallPart, BuiltinToolReturnPart, + CachePoint, DocumentUrl, FilePart, ImageUrl, @@ -133,7 +134,16 @@ def load_messages(cls, messages: Sequence[UIMessage]) -> list[ModelMessage]: # if isinstance(part, TextUIPart): builder.add(TextPart(content=part.text)) elif isinstance(part, ReasoningUIPart): - builder.add(ThinkingPart(content=part.text)) + pydantic_ai_meta = (part.provider_metadata or {}).get('pydantic_ai', {}) + builder.add( + ThinkingPart( + content=part.text, + id=pydantic_ai_meta.get('id'), + signature=pydantic_ai_meta.get('signature'), + provider_name=pydantic_ai_meta.get('provider_name'), + provider_details=pydantic_ai_meta.get('provider_details'), + ) + ) elif isinstance(part, FileUIPart): try: file = BinaryContent.from_data_uri(part.url) @@ -153,13 +163,17 @@ def load_messages(cls, messages: Sequence[UIMessage]) -> list[ModelMessage]: # tool_call_id = part.tool_call_id - args = part.input + args: str | dict[str, Any] | None = part.input if isinstance(args, str): try: - args = json.loads(args) + parsed = json.loads(args) + if isinstance(parsed, dict): + args = cast(dict[str, Any], parsed) except json.JSONDecodeError: pass + elif args is not None and not isinstance(args, dict): + raise ValueError(f'Unsupported tool call args type: {type(args)}') if builtin_tool: call_part = BuiltinToolCallPart(tool_name=tool_name, tool_call_id=tool_call_id, args=args) @@ -295,7 +309,20 @@ def _message_id_generator() -> str: else: ui_parts.append(TextUIPart(text=part.content, state='done')) elif isinstance(part, ThinkingPart): - ui_parts.append(ReasoningUIPart(text=part.content, state='done')) + thinking_metadata: dict[str, Any] = {} + if part.id is not None: + thinking_metadata['id'] = part.id + if part.signature is not None: + thinking_metadata['signature'] = part.signature + if part.provider_name is not None: + thinking_metadata['provider_name'] = part.provider_name + if part.provider_details is not None: + thinking_metadata['provider_details'] = part.provider_details + + provider_metadata = {'pydantic_ai': thinking_metadata} if thinking_metadata else None + ui_parts.append( + ReasoningUIPart(text=part.content, state='done', provider_metadata=provider_metadata) + ) elif isinstance(part, FilePart): ui_parts.append( FileUIPart( @@ -305,7 +332,9 @@ def _message_id_generator() -> str: ) elif isinstance(part, BaseToolCallPart): if isinstance(part, BuiltinToolCallPart): - prefixed_id = _make_builtin_tool_call_id(part.provider_name, part.tool_call_id) + prefixed_id = ( + f'{BUILTIN_TOOL_CALL_ID_PREFIX}|{part.provider_name or ""}|{part.tool_call_id}' + ) builtin_return = local_builtin_returns.get(part.tool_call_id) if builtin_return: @@ -382,11 +411,6 @@ def _message_id_generator() -> str: return result -def _make_builtin_tool_call_id(provider_name: str | None, tool_call_id: str) -> str: - """Create a prefixed tool call ID for builtin tools.""" - return f'{BUILTIN_TOOL_CALL_ID_PREFIX}|{provider_name or ""}|{tool_call_id}' - - def _convert_user_prompt_part(part: UserPromptPart) -> list[UIMessagePart]: """Convert a UserPromptPart to a list of UI message parts.""" ui_parts: list[UIMessagePart] = [] @@ -401,6 +425,9 @@ def _convert_user_prompt_part(part: UserPromptPart) -> list[UIMessagePart]: ui_parts.append(FileUIPart(url=item.data_uri, media_type=item.media_type)) elif isinstance(item, ImageUrl | AudioUrl | VideoUrl | DocumentUrl): ui_parts.append(FileUIPart(url=item.url, media_type=item.media_type)) + elif isinstance(item, CachePoint): + # CachePoint is metadata for prompt caching, skip for UI conversion + pass else: assert_never(item) diff --git a/tests/test_vercel_ai.py b/tests/test_vercel_ai.py index 777b5fa41e..f2120c3665 100644 --- a/tests/test_vercel_ai.py +++ b/tests/test_vercel_ai.py @@ -2597,3 +2597,60 @@ async def test_convert_user_prompt_part_only_urls(): FileUIPart(media_type='video/mp4', url='https://example.com/vid.mp4'), ] ) + + +async def test_adapter_dump_messages_thinking_with_metadata(): + """Test dumping and loading messages with ThinkingPart metadata preservation.""" + original_messages = [ + ModelResponse( + parts=[ + ThinkingPart( + content='Let me think about this...', + id='thinking_123', + signature='sig_abc', + provider_name='anthropic', + provider_details={'model': 'claude-3'}, + ), + TextPart(content='Here is my answer.'), + ] + ), + ] + + ui_messages = VercelAIAdapter.dump_messages(original_messages) + ui_message_dicts = [msg.model_dump() for msg in ui_messages] + + assert ui_message_dicts == snapshot( + [ + { + 'id': IsStr(), + 'role': 'assistant', + 'metadata': None, + 'parts': [ + { + 'type': 'reasoning', + 'text': 'Let me think about this...', + 'state': 'done', + 'provider_metadata': { + 'pydantic_ai': { + 'id': 'thinking_123', + 'signature': 'sig_abc', + 'provider_name': 'anthropic', + 'provider_details': {'model': 'claude-3'}, + } + }, + }, + {'type': 'text', 'text': 'Here is my answer.', 'state': 'done', 'provider_metadata': None}, + ], + } + ] + ) + + # Test roundtrip - verify metadata is preserved when loading back + reloaded_messages = VercelAIAdapter.load_messages(ui_messages) + + # Sync timestamps for comparison + for orig_msg, new_msg in zip(original_messages, reloaded_messages): + if hasattr(orig_msg, 'timestamp') and hasattr(new_msg, 'timestamp'): + new_msg.timestamp = orig_msg.timestamp # pyright: ignore[reportAttributeAccessIssue] + + assert reloaded_messages == original_messages From e300c157a5fd7cc4d43307cf138ddf9f9054e354 Mon Sep 17 00:00:00 2001 From: David Sanchez <64162682+dsfaccini@users.noreply.github.com> Date: Thu, 27 Nov 2025 14:34:01 -0500 Subject: [PATCH 07/12] coverage --- .../pydantic_ai/ui/vercel_ai/_adapter.py | 6 +- tests/test_ui.py | 9 +++ tests/test_vercel_ai.py | 79 ++++++++++++++++--- 3 files changed, 82 insertions(+), 12 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py b/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py index aca9c418f9..9975565308 100644 --- a/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py +++ b/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py @@ -172,8 +172,10 @@ def load_messages(cls, messages: Sequence[UIMessage]) -> list[ModelMessage]: # args = cast(dict[str, Any], parsed) except json.JSONDecodeError: pass - elif args is not None and not isinstance(args, dict): - raise ValueError(f'Unsupported tool call args type: {type(args)}') + elif isinstance(args, dict) or args is None: + pass + else: + assert_never(args) if builtin_tool: call_part = BuiltinToolCallPart(tool_name=tool_name, tool_call_id=tool_call_id, args=args) diff --git a/tests/test_ui.py b/tests/test_ui.py index d50e08343e..37d22c6ecb 100644 --- a/tests/test_ui.py +++ b/tests/test_ui.py @@ -680,3 +680,12 @@ async def send(data: MutableMapping[str, Any]) -> None: {'type': 'http.response.body', 'body': b'', 'more_body': False}, ] ) + + +def test_dummy_adapter_dump_messages(): + """Test that DummyUIAdapter.dump_messages returns messages as-is.""" + from pydantic_ai.messages import UserPromptPart + + messages = [ModelRequest(parts=[UserPromptPart(content='Hello')])] + result = DummyUIAdapter.dump_messages(messages) + assert result == messages diff --git a/tests/test_vercel_ai.py b/tests/test_vercel_ai.py index f2120c3665..0e2ec5c2fe 100644 --- a/tests/test_vercel_ai.py +++ b/tests/test_vercel_ai.py @@ -2,7 +2,6 @@ import json from collections.abc import AsyncIterator, MutableMapping -from itertools import count from typing import Any, cast import pytest @@ -83,12 +82,6 @@ ] -def predictable_id_generator(prefix: str = 'test-id-'): - """Create a predictable ID generator for testing.""" - c = count(1) - return lambda: f'{prefix}{next(c)}' - - @pytest.mark.skipif(not openai_import_successful(), reason='OpenAI not installed') async def test_run(allow_model_requests: None, openai_api_key: str): model = OpenAIResponsesModel('gpt-5', provider=OpenAIProvider(api_key=openai_api_key)) @@ -2648,9 +2641,75 @@ async def test_adapter_dump_messages_thinking_with_metadata(): # Test roundtrip - verify metadata is preserved when loading back reloaded_messages = VercelAIAdapter.load_messages(ui_messages) - # Sync timestamps for comparison + # Sync timestamps for comparison (ModelResponse always has timestamp) for orig_msg, new_msg in zip(original_messages, reloaded_messages): - if hasattr(orig_msg, 'timestamp') and hasattr(new_msg, 'timestamp'): - new_msg.timestamp = orig_msg.timestamp # pyright: ignore[reportAttributeAccessIssue] + new_msg.timestamp = orig_msg.timestamp # pyright: ignore[reportAttributeAccessIssue] assert reloaded_messages == original_messages + + +async def test_adapter_load_messages_json_list_args(): + """Test that JSON list args are kept as strings (not parsed).""" + ui_messages = [ + UIMessage( + id='msg1', + role='assistant', + parts=[ + DynamicToolOutputAvailablePart( + tool_name='my_tool', + tool_call_id='tc1', + input='[1, 2, 3]', # JSON list - should stay as string + output='result', + state='output-available', + ) + ], + ) + ] + + messages = VercelAIAdapter.load_messages(ui_messages) + + assert len(messages) == 2 # ToolCall in response + ToolReturn in request + response = messages[0] + assert isinstance(response, ModelResponse) + assert len(response.parts) == 1 + tool_call = response.parts[0] + assert isinstance(tool_call, ToolCallPart) + # Args should remain as string since it parses to a list, not a dict + assert tool_call.args == '[1, 2, 3]' + + +async def test_adapter_dump_messages_with_cache_point(): + """Test that CachePoint in user content is skipped during conversion.""" + from pydantic_ai.messages import CachePoint + + messages = [ + ModelRequest( + parts=[ + UserPromptPart( + content=[ + 'Hello', + CachePoint(), # Should be skipped + 'World', + ] + ) + ] + ), + ] + + ui_messages = VercelAIAdapter.dump_messages(messages) + ui_message_dicts = [msg.model_dump() for msg in ui_messages] + + # CachePoint should be omitted, only text parts remain + assert ui_message_dicts == snapshot( + [ + { + 'id': IsStr(), + 'role': 'user', + 'metadata': None, + 'parts': [ + {'type': 'text', 'text': 'Hello', 'state': 'done', 'provider_metadata': None}, + {'type': 'text', 'text': 'World', 'state': 'done', 'provider_metadata': None}, + ], + } + ] + ) From 57157d695f23d94140fd107cca95f51901983f90 Mon Sep 17 00:00:00 2001 From: David Sanchez <64162682+dsfaccini@users.noreply.github.com> Date: Thu, 27 Nov 2025 21:10:40 -0500 Subject: [PATCH 08/12] syntax imprevement --- pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py b/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py index 9975565308..8b648a7cb8 100644 --- a/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py +++ b/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py @@ -301,7 +301,6 @@ def _message_id_generator() -> str: for part in msg.parts: if isinstance(part, BuiltinToolReturnPart): - # Skip builtin tool returns - they're handled by the tool call logic continue elif isinstance(part, TextPart): # Combine consecutive text parts by checking the last UI part @@ -337,9 +336,8 @@ def _message_id_generator() -> str: prefixed_id = ( f'{BUILTIN_TOOL_CALL_ID_PREFIX}|{part.provider_name or ""}|{part.tool_call_id}' ) - builtin_return = local_builtin_returns.get(part.tool_call_id) - if builtin_return: + if builtin_return := local_builtin_returns.get(part.tool_call_id): content = builtin_return.model_response_str() call_provider_metadata = ( {'pydantic_ai': {'provider_name': part.provider_name}} @@ -371,7 +369,7 @@ def _message_id_generator() -> str: tool_return = tool_returns.get(part.tool_call_id) tool_error = tool_errors.get(part.tool_call_id) - if tool_return and isinstance(tool_return, ToolReturnPart): + if isinstance(tool_return, ToolReturnPart): content = tool_return.model_response_str() ui_parts.append( DynamicToolOutputAvailablePart( From 58a71a28657b7253a25e5f653fe7f4f2f37178dc Mon Sep 17 00:00:00 2001 From: David Sanchez <64162682+dsfaccini@users.noreply.github.com> Date: Fri, 28 Nov 2025 17:09:56 -0500 Subject: [PATCH 09/12] address review points --- pydantic_ai_slim/pydantic_ai/ui/_adapter.py | 6 --- .../pydantic_ai/ui/ag_ui/_adapter.py | 5 -- .../pydantic_ai/ui/vercel_ai/_adapter.py | 52 ++++++++----------- tests/test_vercel_ai.py | 4 +- 4 files changed, 24 insertions(+), 43 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/ui/_adapter.py b/pydantic_ai_slim/pydantic_ai/ui/_adapter.py index b396ee634d..a1ca12cd6e 100644 --- a/pydantic_ai_slim/pydantic_ai/ui/_adapter.py +++ b/pydantic_ai_slim/pydantic_ai/ui/_adapter.py @@ -143,12 +143,6 @@ def load_messages(cls, messages: Sequence[MessageT]) -> list[ModelMessage]: """Transform protocol-specific messages into Pydantic AI messages.""" raise NotImplementedError - @classmethod - @abstractmethod - def dump_messages(cls, messages: Sequence[ModelMessage]) -> list[MessageT]: - """Transform Pydantic AI messages into protocol-specific messages.""" - raise NotImplementedError - @abstractmethod def build_event_stream(self) -> UIEventStream[RunInputT, EventT, AgentDepsT, OutputDataT]: """Build a protocol-specific event stream transformer.""" diff --git a/pydantic_ai_slim/pydantic_ai/ui/ag_ui/_adapter.py b/pydantic_ai_slim/pydantic_ai/ui/ag_ui/_adapter.py index 638cdeb138..fe3513ae58 100644 --- a/pydantic_ai_slim/pydantic_ai/ui/ag_ui/_adapter.py +++ b/pydantic_ai_slim/pydantic_ai/ui/ag_ui/_adapter.py @@ -117,11 +117,6 @@ def state(self) -> dict[str, Any] | None: return cast('dict[str, Any]', state) - @classmethod - def dump_messages(cls, messages: Sequence[ModelMessage]) -> list[Message]: - """Transform Pydantic AI messages into AG-UI messages.""" - raise NotImplementedError('TODO: implement dump_messages method') # TODO: implement dump_messages method - @classmethod def load_messages(cls, messages: Sequence[Message]) -> list[ModelMessage]: """Transform AG-UI messages into Pydantic AI messages.""" diff --git a/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py b/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py index 8b648a7cb8..97c5365b93 100644 --- a/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py +++ b/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py @@ -68,7 +68,6 @@ __all__ = ['VercelAIAdapter'] request_data_ta: TypeAdapter[RequestData] = TypeAdapter(RequestData) -BUILTIN_TOOL_CALL_ID_PREFIX = 'pyd_ai_builtin' @dataclass @@ -241,16 +240,10 @@ def dump_messages( # noqa: C901 Args: messages: A sequence of ModelMessage objects to convert - _id_generator: Optional ID generator function for testing. If not provided, uses uuid.uuid4(). Returns: A list of UIMessage objects in Vercel AI format """ - - def _message_id_generator() -> str: - """Generate a message ID.""" - return uuid.uuid4().hex - tool_returns: dict[str, ToolReturnPart] = {} tool_errors: dict[str, RetryPromptPart] = {} @@ -274,18 +267,24 @@ def _message_id_generator() -> str: system_ui_parts.append(TextUIPart(text=part.content, state='done')) elif isinstance(part, UserPromptPart): user_ui_parts.extend(_convert_user_prompt_part(part)) - elif isinstance(part, ToolReturnPart | RetryPromptPart): - # Tool returns/errors don't create separate UI parts - # They're merged into the tool call in the assistant message + elif isinstance(part, ToolReturnPart): + # Tool returns are merged into the tool call in the assistant message pass + elif isinstance(part, RetryPromptPart): + if part.tool_call_id: + # Tool errors with IDs are merged into the tool call in the assistant message + pass + else: + # RetryPromptPart without tool_call_id becomes a user text message + user_ui_parts.append(TextUIPart(text=part.model_response(), state='done')) else: assert_never(part) if system_ui_parts: - result.append(UIMessage(id=_message_id_generator(), role='system', parts=system_ui_parts)) + result.append(UIMessage(id=str(uuid.uuid4()), role='system', parts=system_ui_parts)) if user_ui_parts: - result.append(UIMessage(id=_message_id_generator(), role='user', parts=user_ui_parts)) + result.append(UIMessage(id=str(uuid.uuid4()), role='user', parts=user_ui_parts)) elif isinstance( # pragma: no branch msg, ModelResponse @@ -293,20 +292,17 @@ def _message_id_generator() -> str: ui_parts: list[UIMessagePart] = [] # For builtin tools, returns can be in the same ModelResponse as calls - # Build a local mapping for this message - local_builtin_returns: dict[str, BuiltinToolReturnPart] = {} - for part in msg.parts: - if isinstance(part, BuiltinToolReturnPart): - local_builtin_returns[part.tool_call_id] = part + local_builtin_returns: dict[str, BuiltinToolReturnPart] = { + part.tool_call_id: part for part in msg.parts if isinstance(part, BuiltinToolReturnPart) + } for part in msg.parts: if isinstance(part, BuiltinToolReturnPart): continue elif isinstance(part, TextPart): - # Combine consecutive text parts by checking the last UI part + # Combine consecutive text parts if ui_parts and isinstance(ui_parts[-1], TextUIPart): - last_text = ui_parts[-1] - ui_parts[-1] = last_text.model_copy(update={'text': last_text.text + part.content}) + ui_parts[-1].text += part.content else: ui_parts.append(TextUIPart(text=part.content, state='done')) elif isinstance(part, ThinkingPart): @@ -333,21 +329,16 @@ def _message_id_generator() -> str: ) elif isinstance(part, BaseToolCallPart): if isinstance(part, BuiltinToolCallPart): - prefixed_id = ( - f'{BUILTIN_TOOL_CALL_ID_PREFIX}|{part.provider_name or ""}|{part.tool_call_id}' + call_provider_metadata = ( + {'pydantic_ai': {'provider_name': part.provider_name}} if part.provider_name else None ) if builtin_return := local_builtin_returns.get(part.tool_call_id): content = builtin_return.model_response_str() - call_provider_metadata = ( - {'pydantic_ai': {'provider_name': part.provider_name}} - if part.provider_name - else None - ) ui_parts.append( ToolOutputAvailablePart( type=f'tool-{part.tool_name}', - tool_call_id=prefixed_id, + tool_call_id=part.tool_call_id, input=part.args_as_json_str(), output=content, state='output-available', @@ -359,10 +350,11 @@ def _message_id_generator() -> str: ui_parts.append( ToolInputAvailablePart( type=f'tool-{part.tool_name}', - tool_call_id=prefixed_id, + tool_call_id=part.tool_call_id, input=part.args_as_json_str(), state='input-available', provider_executed=True, + call_provider_metadata=call_provider_metadata, ) ) else: @@ -404,7 +396,7 @@ def _message_id_generator() -> str: assert_never(part) if ui_parts: # pragma: no branch - result.append(UIMessage(id=_message_id_generator(), role='assistant', parts=ui_parts)) + result.append(UIMessage(id=str(uuid.uuid4()), role='assistant', parts=ui_parts)) else: assert_never(msg) diff --git a/tests/test_vercel_ai.py b/tests/test_vercel_ai.py index 0e2ec5c2fe..c148da0291 100644 --- a/tests/test_vercel_ai.py +++ b/tests/test_vercel_ai.py @@ -2112,7 +2112,7 @@ async def test_adapter_dump_messages_with_builtin_tools(): 'parts': [ { 'type': 'tool-web_search', - 'tool_call_id': 'pyd_ai_builtin|openai|tool_456', + 'tool_call_id': 'tool_456', 'state': 'output-available', 'input': '{"query":"test"}', 'output': '{"status":"completed"}', @@ -2354,7 +2354,7 @@ async def test_adapter_dump_messages_text_with_interruption(): {'type': 'text', 'text': 'Before tool', 'state': 'done', 'provider_metadata': None}, { 'type': 'tool-test', - 'tool_call_id': 'pyd_ai_builtin|test|t1', + 'tool_call_id': 't1', 'state': 'output-available', 'input': '{}', 'output': 'result', From a06142190a814dac51c222d39dfc9502a60b897a Mon Sep 17 00:00:00 2001 From: David Sanchez <64162682+dsfaccini@users.noreply.github.com> Date: Fri, 28 Nov 2025 20:44:45 -0500 Subject: [PATCH 10/12] fix coverage --- pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py b/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py index 97c5365b93..bcf5a77f58 100644 --- a/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py +++ b/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py @@ -271,12 +271,10 @@ def dump_messages( # noqa: C901 # Tool returns are merged into the tool call in the assistant message pass elif isinstance(part, RetryPromptPart): - if part.tool_call_id: - # Tool errors with IDs are merged into the tool call in the assistant message - pass - else: - # RetryPromptPart without tool_call_id becomes a user text message - user_ui_parts.append(TextUIPart(text=part.model_response(), state='done')) + # RetryPromptPart always has a tool_call_id (generated if not provided). + # These are handled when processing ToolCallPart in ModelResponse, + # where they become DynamicToolOutputErrorPart via the tool_errors dict. + pass else: assert_never(part) From 73fb21d25f8d1132dd4ba0a6b5f6b353e2f17bae Mon Sep 17 00:00:00 2001 From: David Sanchez <64162682+dsfaccini@users.noreply.github.com> Date: Sat, 29 Nov 2025 23:16:04 -0500 Subject: [PATCH 11/12] refaator (for comfort) --- .../pydantic_ai/ui/vercel_ai/_adapter.py | 268 ++++++++++-------- 1 file changed, 142 insertions(+), 126 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py b/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py index bcf5a77f58..af757ffc32 100644 --- a/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py +++ b/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py @@ -231,8 +231,145 @@ def load_messages(cls, messages: Sequence[UIMessage]) -> list[ModelMessage]: # return builder.messages + @staticmethod + def _dump_request_message(msg: ModelRequest) -> tuple[list[UIMessagePart], list[UIMessagePart]]: + """Convert a ModelRequest into a UIMessage.""" + system_ui_parts: list[UIMessagePart] = [] + user_ui_parts: list[UIMessagePart] = [] + + for part in msg.parts: + if isinstance(part, SystemPromptPart): + system_ui_parts.append(TextUIPart(text=part.content, state='done')) + elif isinstance(part, UserPromptPart): + user_ui_parts.extend(_convert_user_prompt_part(part)) + elif isinstance(part, ToolReturnPart): + # Tool returns are merged into the tool call in the assistant message + pass + elif isinstance(part, RetryPromptPart): + # RetryPromptPart always has a tool_call_id (generated if not provided). + # These are handled when processing ToolCallPart in ModelResponse, + # where they become DynamicToolOutputErrorPart via the tool_errors dict. + pass + else: + assert_never(part) + + return system_ui_parts, user_ui_parts + + @staticmethod + def _dump_response_message( # noqa: C901 + msg: ModelResponse, + tool_returns: dict[str, ToolReturnPart], + tool_errors: dict[str, RetryPromptPart], + ) -> list[UIMessagePart]: + """Convert a ModelResponse into a UIMessage.""" + ui_parts: list[UIMessagePart] = [] + + # For builtin tools, returns can be in the same ModelResponse as calls + local_builtin_returns: dict[str, BuiltinToolReturnPart] = { + part.tool_call_id: part for part in msg.parts if isinstance(part, BuiltinToolReturnPart) + } + + for part in msg.parts: + if isinstance(part, BuiltinToolReturnPart): + continue + elif isinstance(part, TextPart): + # Combine consecutive text parts + if ui_parts and isinstance(ui_parts[-1], TextUIPart): + ui_parts[-1].text += part.content + else: + ui_parts.append(TextUIPart(text=part.content, state='done')) + elif isinstance(part, ThinkingPart): + thinking_metadata: dict[str, Any] = {} + if part.id is not None: + thinking_metadata['id'] = part.id + if part.signature is not None: + thinking_metadata['signature'] = part.signature + if part.provider_name is not None: + thinking_metadata['provider_name'] = part.provider_name + if part.provider_details is not None: + thinking_metadata['provider_details'] = part.provider_details + + provider_metadata = {'pydantic_ai': thinking_metadata} if thinking_metadata else None + ui_parts.append(ReasoningUIPart(text=part.content, state='done', provider_metadata=provider_metadata)) + elif isinstance(part, FilePart): + ui_parts.append( + FileUIPart( + url=part.content.data_uri, + media_type=part.content.media_type, + ) + ) + elif isinstance(part, BaseToolCallPart): + if isinstance(part, BuiltinToolCallPart): + call_provider_metadata = ( + {'pydantic_ai': {'provider_name': part.provider_name}} if part.provider_name else None + ) + + if builtin_return := local_builtin_returns.get(part.tool_call_id): + content = builtin_return.model_response_str() + ui_parts.append( + ToolOutputAvailablePart( + type=f'tool-{part.tool_name}', + tool_call_id=part.tool_call_id, + input=part.args_as_json_str(), + output=content, + state='output-available', + provider_executed=True, + call_provider_metadata=call_provider_metadata, + ) + ) + else: # pragma: no cover + ui_parts.append( + ToolInputAvailablePart( + type=f'tool-{part.tool_name}', + tool_call_id=part.tool_call_id, + input=part.args_as_json_str(), + state='input-available', + provider_executed=True, + call_provider_metadata=call_provider_metadata, + ) + ) + else: + tool_return = tool_returns.get(part.tool_call_id) + tool_error = tool_errors.get(part.tool_call_id) + + if isinstance(tool_return, ToolReturnPart): + content = tool_return.model_response_str() + ui_parts.append( + DynamicToolOutputAvailablePart( + tool_name=part.tool_name, + tool_call_id=part.tool_call_id, + input=part.args_as_json_str(), + output=content, + state='output-available', + ) + ) + elif tool_error: + error_text = tool_error.model_response() + ui_parts.append( + DynamicToolOutputErrorPart( + tool_name=part.tool_name, + tool_call_id=part.tool_call_id, + input=part.args_as_json_str(), + error_text=error_text, + state='output-error', + ) + ) + else: + ui_parts.append( + DynamicToolInputAvailablePart( + tool_name=part.tool_name, + tool_call_id=part.tool_call_id, + input=part.args_as_json_str(), + state='input-available', + ) + ) + else: + assert_never(part) + + return ui_parts + @classmethod - def dump_messages( # noqa: C901 + def dump_messages( cls, messages: Sequence[ModelMessage], ) -> list[UIMessage]: @@ -259,25 +396,7 @@ def dump_messages( # noqa: C901 for msg in messages: if isinstance(msg, ModelRequest): - system_ui_parts: list[UIMessagePart] = [] - user_ui_parts: list[UIMessagePart] = [] - - for part in msg.parts: - if isinstance(part, SystemPromptPart): - system_ui_parts.append(TextUIPart(text=part.content, state='done')) - elif isinstance(part, UserPromptPart): - user_ui_parts.extend(_convert_user_prompt_part(part)) - elif isinstance(part, ToolReturnPart): - # Tool returns are merged into the tool call in the assistant message - pass - elif isinstance(part, RetryPromptPart): - # RetryPromptPart always has a tool_call_id (generated if not provided). - # These are handled when processing ToolCallPart in ModelResponse, - # where they become DynamicToolOutputErrorPart via the tool_errors dict. - pass - else: - assert_never(part) - + system_ui_parts, user_ui_parts = cls._dump_request_message(msg) if system_ui_parts: result.append(UIMessage(id=str(uuid.uuid4()), role='system', parts=system_ui_parts)) @@ -287,112 +406,9 @@ def dump_messages( # noqa: C901 elif isinstance( # pragma: no branch msg, ModelResponse ): - ui_parts: list[UIMessagePart] = [] - - # For builtin tools, returns can be in the same ModelResponse as calls - local_builtin_returns: dict[str, BuiltinToolReturnPart] = { - part.tool_call_id: part for part in msg.parts if isinstance(part, BuiltinToolReturnPart) - } - - for part in msg.parts: - if isinstance(part, BuiltinToolReturnPart): - continue - elif isinstance(part, TextPart): - # Combine consecutive text parts - if ui_parts and isinstance(ui_parts[-1], TextUIPart): - ui_parts[-1].text += part.content - else: - ui_parts.append(TextUIPart(text=part.content, state='done')) - elif isinstance(part, ThinkingPart): - thinking_metadata: dict[str, Any] = {} - if part.id is not None: - thinking_metadata['id'] = part.id - if part.signature is not None: - thinking_metadata['signature'] = part.signature - if part.provider_name is not None: - thinking_metadata['provider_name'] = part.provider_name - if part.provider_details is not None: - thinking_metadata['provider_details'] = part.provider_details - - provider_metadata = {'pydantic_ai': thinking_metadata} if thinking_metadata else None - ui_parts.append( - ReasoningUIPart(text=part.content, state='done', provider_metadata=provider_metadata) - ) - elif isinstance(part, FilePart): - ui_parts.append( - FileUIPart( - url=part.content.data_uri, - media_type=part.content.media_type, - ) - ) - elif isinstance(part, BaseToolCallPart): - if isinstance(part, BuiltinToolCallPart): - call_provider_metadata = ( - {'pydantic_ai': {'provider_name': part.provider_name}} if part.provider_name else None - ) - - if builtin_return := local_builtin_returns.get(part.tool_call_id): - content = builtin_return.model_response_str() - ui_parts.append( - ToolOutputAvailablePart( - type=f'tool-{part.tool_name}', - tool_call_id=part.tool_call_id, - input=part.args_as_json_str(), - output=content, - state='output-available', - provider_executed=True, - call_provider_metadata=call_provider_metadata, - ) - ) - else: # pragma: no cover - ui_parts.append( - ToolInputAvailablePart( - type=f'tool-{part.tool_name}', - tool_call_id=part.tool_call_id, - input=part.args_as_json_str(), - state='input-available', - provider_executed=True, - call_provider_metadata=call_provider_metadata, - ) - ) - else: - tool_return = tool_returns.get(part.tool_call_id) - tool_error = tool_errors.get(part.tool_call_id) - - if isinstance(tool_return, ToolReturnPart): - content = tool_return.model_response_str() - ui_parts.append( - DynamicToolOutputAvailablePart( - tool_name=part.tool_name, - tool_call_id=part.tool_call_id, - input=part.args_as_json_str(), - output=content, - state='output-available', - ) - ) - elif tool_error: - error_text = tool_error.model_response() - ui_parts.append( - DynamicToolOutputErrorPart( - tool_name=part.tool_name, - tool_call_id=part.tool_call_id, - input=part.args_as_json_str(), - error_text=error_text, - state='output-error', - ) - ) - else: - ui_parts.append( - DynamicToolInputAvailablePart( - tool_name=part.tool_name, - tool_call_id=part.tool_call_id, - input=part.args_as_json_str(), - state='input-available', - ) - ) - else: - assert_never(part) - + ui_parts: list[UIMessagePart] = cls._dump_response_message( + msg, tool_returns=tool_returns, tool_errors=tool_errors + ) if ui_parts: # pragma: no branch result.append(UIMessage(id=str(uuid.uuid4()), role='assistant', parts=ui_parts)) else: From e49f6564f4fdd0892da5914f59fd628f18aeba88 Mon Sep 17 00:00:00 2001 From: David Sanchez <64162682+dsfaccini@users.noreply.github.com> Date: Mon, 1 Dec 2025 17:11:41 -0500 Subject: [PATCH 12/12] Refactor dump_messages per review: merge tool dicts, handle RetryPromptPart without tool_name, split BaseToolCallPart branches --- pydantic_ai_slim/pydantic_ai/ui/_adapter.py | 5 + .../pydantic_ai/ui/vercel_ai/_adapter.py | 143 +++++++++--------- tests/test_vercel_ai.py | 55 +++++++ 3 files changed, 129 insertions(+), 74 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/ui/_adapter.py b/pydantic_ai_slim/pydantic_ai/ui/_adapter.py index a1ca12cd6e..8cad4aeffb 100644 --- a/pydantic_ai_slim/pydantic_ai/ui/_adapter.py +++ b/pydantic_ai_slim/pydantic_ai/ui/_adapter.py @@ -143,6 +143,11 @@ def load_messages(cls, messages: Sequence[MessageT]) -> list[ModelMessage]: """Transform protocol-specific messages into Pydantic AI messages.""" raise NotImplementedError + @classmethod + def dump_messages(cls, messages: Sequence[ModelMessage]) -> list[MessageT]: + """Transform Pydantic AI messages into protocol-specific messages.""" + raise NotImplementedError + @abstractmethod def build_event_stream(self) -> UIEventStream[RunInputT, EventT, AgentDepsT, OutputDataT]: """Build a protocol-specific event stream transformer.""" diff --git a/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py b/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py index af757ffc32..fc6c97d224 100644 --- a/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py +++ b/pydantic_ai_slim/pydantic_ai/ui/vercel_ai/_adapter.py @@ -14,7 +14,6 @@ from ...messages import ( AudioUrl, - BaseToolCallPart, BinaryContent, BuiltinToolCallPart, BuiltinToolReturnPart, @@ -246,10 +245,12 @@ def _dump_request_message(msg: ModelRequest) -> tuple[list[UIMessagePart], list[ # Tool returns are merged into the tool call in the assistant message pass elif isinstance(part, RetryPromptPart): - # RetryPromptPart always has a tool_call_id (generated if not provided). - # These are handled when processing ToolCallPart in ModelResponse, - # where they become DynamicToolOutputErrorPart via the tool_errors dict. - pass + if part.tool_name: + # Tool-related retries are handled when processing ToolCallPart in ModelResponse + pass + else: + # Non-tool retries (e.g., output validation errors) become user text + user_ui_parts.append(TextUIPart(text=part.model_response(), state='done')) else: assert_never(part) @@ -258,8 +259,7 @@ def _dump_request_message(msg: ModelRequest) -> tuple[list[UIMessagePart], list[ @staticmethod def _dump_response_message( # noqa: C901 msg: ModelResponse, - tool_returns: dict[str, ToolReturnPart], - tool_errors: dict[str, RetryPromptPart], + tool_results: dict[str, ToolReturnPart | RetryPromptPart], ) -> list[UIMessagePart]: """Convert a ModelResponse into a UIMessage.""" ui_parts: list[UIMessagePart] = [] @@ -298,71 +298,69 @@ def _dump_response_message( # noqa: C901 media_type=part.content.media_type, ) ) - elif isinstance(part, BaseToolCallPart): - if isinstance(part, BuiltinToolCallPart): - call_provider_metadata = ( - {'pydantic_ai': {'provider_name': part.provider_name}} if part.provider_name else None - ) + elif isinstance(part, BuiltinToolCallPart): + call_provider_metadata = ( + {'pydantic_ai': {'provider_name': part.provider_name}} if part.provider_name else None + ) - if builtin_return := local_builtin_returns.get(part.tool_call_id): - content = builtin_return.model_response_str() - ui_parts.append( - ToolOutputAvailablePart( - type=f'tool-{part.tool_name}', - tool_call_id=part.tool_call_id, - input=part.args_as_json_str(), - output=content, - state='output-available', - provider_executed=True, - call_provider_metadata=call_provider_metadata, - ) + if builtin_return := local_builtin_returns.get(part.tool_call_id): + content = builtin_return.model_response_str() + ui_parts.append( + ToolOutputAvailablePart( + type=f'tool-{part.tool_name}', + tool_call_id=part.tool_call_id, + input=part.args_as_json_str(), + output=content, + state='output-available', + provider_executed=True, + call_provider_metadata=call_provider_metadata, ) - else: # pragma: no cover - ui_parts.append( - ToolInputAvailablePart( - type=f'tool-{part.tool_name}', - tool_call_id=part.tool_call_id, - input=part.args_as_json_str(), - state='input-available', - provider_executed=True, - call_provider_metadata=call_provider_metadata, - ) + ) + else: # pragma: no cover + ui_parts.append( + ToolInputAvailablePart( + type=f'tool-{part.tool_name}', + tool_call_id=part.tool_call_id, + input=part.args_as_json_str(), + state='input-available', + provider_executed=True, + call_provider_metadata=call_provider_metadata, ) - else: - tool_return = tool_returns.get(part.tool_call_id) - tool_error = tool_errors.get(part.tool_call_id) - - if isinstance(tool_return, ToolReturnPart): - content = tool_return.model_response_str() - ui_parts.append( - DynamicToolOutputAvailablePart( - tool_name=part.tool_name, - tool_call_id=part.tool_call_id, - input=part.args_as_json_str(), - output=content, - state='output-available', - ) + ) + elif isinstance(part, ToolCallPart): + tool_result = tool_results.get(part.tool_call_id) + + if isinstance(tool_result, ToolReturnPart): + content = tool_result.model_response_str() + ui_parts.append( + DynamicToolOutputAvailablePart( + tool_name=part.tool_name, + tool_call_id=part.tool_call_id, + input=part.args_as_json_str(), + output=content, + state='output-available', ) - elif tool_error: - error_text = tool_error.model_response() - ui_parts.append( - DynamicToolOutputErrorPart( - tool_name=part.tool_name, - tool_call_id=part.tool_call_id, - input=part.args_as_json_str(), - error_text=error_text, - state='output-error', - ) + ) + elif isinstance(tool_result, RetryPromptPart): + error_text = tool_result.model_response() + ui_parts.append( + DynamicToolOutputErrorPart( + tool_name=part.tool_name, + tool_call_id=part.tool_call_id, + input=part.args_as_json_str(), + error_text=error_text, + state='output-error', ) - else: - ui_parts.append( - DynamicToolInputAvailablePart( - tool_name=part.tool_name, - tool_call_id=part.tool_call_id, - input=part.args_as_json_str(), - state='input-available', - ) + ) + else: + ui_parts.append( + DynamicToolInputAvailablePart( + tool_name=part.tool_name, + tool_call_id=part.tool_call_id, + input=part.args_as_json_str(), + state='input-available', ) + ) else: assert_never(part) @@ -381,16 +379,15 @@ def dump_messages( Returns: A list of UIMessage objects in Vercel AI format """ - tool_returns: dict[str, ToolReturnPart] = {} - tool_errors: dict[str, RetryPromptPart] = {} + tool_results: dict[str, ToolReturnPart | RetryPromptPart] = {} for msg in messages: if isinstance(msg, ModelRequest): for part in msg.parts: if isinstance(part, ToolReturnPart): - tool_returns[part.tool_call_id] = part - elif isinstance(part, RetryPromptPart) and part.tool_call_id: - tool_errors[part.tool_call_id] = part + tool_results[part.tool_call_id] = part + elif isinstance(part, RetryPromptPart) and part.tool_name: + tool_results[part.tool_call_id] = part result: list[UIMessage] = [] @@ -406,9 +403,7 @@ def dump_messages( elif isinstance( # pragma: no branch msg, ModelResponse ): - ui_parts: list[UIMessagePart] = cls._dump_response_message( - msg, tool_returns=tool_returns, tool_errors=tool_errors - ) + ui_parts: list[UIMessagePart] = cls._dump_response_message(msg, tool_results) if ui_parts: # pragma: no branch result.append(UIMessage(id=str(uuid.uuid4()), role='assistant', parts=ui_parts)) else: diff --git a/tests/test_vercel_ai.py b/tests/test_vercel_ai.py index c148da0291..99765386ad 100644 --- a/tests/test_vercel_ai.py +++ b/tests/test_vercel_ai.py @@ -2292,6 +2292,61 @@ async def test_adapter_dump_messages_with_retry(): ) +async def test_adapter_dump_messages_with_retry_no_tool_name(): + """Test dumping messages with retry prompts without tool_name (e.g., output validation errors).""" + messages = [ + ModelRequest(parts=[UserPromptPart(content='Give me a number')]), + ModelResponse(parts=[TextPart(content='Not a valid number')]), + ModelRequest( + parts=[ + RetryPromptPart( + content='Output validation failed: expected integer', + # No tool_name - this is an output validation error, not a tool error + ) + ] + ), + ] + + ui_messages = VercelAIAdapter.dump_messages(messages) + + ui_message_dicts = [msg.model_dump() for msg in ui_messages] + + assert ui_message_dicts == snapshot( + [ + { + 'id': IsStr(), + 'role': 'user', + 'metadata': None, + 'parts': [{'type': 'text', 'text': 'Give me a number', 'state': 'done', 'provider_metadata': None}], + }, + { + 'id': IsStr(), + 'role': 'assistant', + 'metadata': None, + 'parts': [{'type': 'text', 'text': 'Not a valid number', 'state': 'done', 'provider_metadata': None}], + }, + { + 'id': IsStr(), + 'role': 'user', + 'metadata': None, + 'parts': [ + { + 'type': 'text', + 'text': """\ +Validation feedback: +Output validation failed: expected integer + +Fix the errors and try again.\ +""", + 'state': 'done', + 'provider_metadata': None, + } + ], + }, + ] + ) + + async def test_adapter_dump_messages_consecutive_text(): """Test that consecutive text parts are concatenated correctly.""" messages = [