-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Add VercelAIAdapter.dump_messages to convert Pydantic AI messages to Vercel AI messages
#3392
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 13 commits
4fdb27d
1cb60bf
261bc3a
3f70b83
97feec2
dfcb30c
f99bc0c
4870dd4
b1272b7
6cdec4d
e300c15
42596b9
57157d6
58a71a2
2f3b2a2
a061421
73fb21d
e49f656
54f77a7
dbdca1d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,23 +2,29 @@ | |
|
|
||
| from __future__ import annotations | ||
|
|
||
| import json | ||
| import uuid | ||
| from collections.abc import Sequence | ||
| from dataclasses import dataclass | ||
| from functools import cached_property | ||
| from typing import TYPE_CHECKING | ||
| from typing import TYPE_CHECKING, Any, cast | ||
|
|
||
| from pydantic import TypeAdapter | ||
| from typing_extensions import assert_never | ||
|
|
||
| from ...messages import ( | ||
| AudioUrl, | ||
| BaseToolCallPart, | ||
| BinaryContent, | ||
| BuiltinToolCallPart, | ||
| BuiltinToolReturnPart, | ||
| CachePoint, | ||
| DocumentUrl, | ||
| FilePart, | ||
| ImageUrl, | ||
| ModelMessage, | ||
| ModelRequest, | ||
| ModelResponse, | ||
| RetryPromptPart, | ||
| SystemPromptPart, | ||
| TextPart, | ||
|
|
@@ -35,6 +41,9 @@ | |
| from ._event_stream import VercelAIEventStream | ||
| from .request_types import ( | ||
| DataUIPart, | ||
| DynamicToolInputAvailablePart, | ||
| DynamicToolOutputAvailablePart, | ||
| DynamicToolOutputErrorPart, | ||
| DynamicToolUIPart, | ||
| FileUIPart, | ||
| ReasoningUIPart, | ||
|
|
@@ -43,10 +52,12 @@ | |
| SourceUrlUIPart, | ||
| StepStartUIPart, | ||
| TextUIPart, | ||
| ToolInputAvailablePart, | ||
| ToolOutputAvailablePart, | ||
| ToolOutputErrorPart, | ||
| ToolUIPart, | ||
| UIMessage, | ||
| UIMessagePart, | ||
| ) | ||
| from .response_types import BaseChunk | ||
|
|
||
|
|
@@ -57,6 +68,7 @@ | |
| __all__ = ['VercelAIAdapter'] | ||
|
|
||
| request_data_ta: TypeAdapter[RequestData] = TypeAdapter(RequestData) | ||
| BUILTIN_TOOL_CALL_ID_PREFIX = 'pyd_ai_builtin' | ||
|
|
||
|
|
||
| @dataclass | ||
|
|
@@ -122,7 +134,16 @@ def load_messages(cls, messages: Sequence[UIMessage]) -> list[ModelMessage]: # | |
| if isinstance(part, TextUIPart): | ||
| builder.add(TextPart(content=part.text)) | ||
| elif isinstance(part, ReasoningUIPart): | ||
| builder.add(ThinkingPart(content=part.text)) | ||
| pydantic_ai_meta = (part.provider_metadata or {}).get('pydantic_ai', {}) | ||
| builder.add( | ||
| ThinkingPart( | ||
| content=part.text, | ||
| id=pydantic_ai_meta.get('id'), | ||
| signature=pydantic_ai_meta.get('signature'), | ||
| provider_name=pydantic_ai_meta.get('provider_name'), | ||
| provider_details=pydantic_ai_meta.get('provider_details'), | ||
| ) | ||
| ) | ||
| elif isinstance(part, FileUIPart): | ||
| try: | ||
| file = BinaryContent.from_data_uri(part.url) | ||
|
|
@@ -141,7 +162,20 @@ def load_messages(cls, messages: Sequence[UIMessage]) -> list[ModelMessage]: # | |
| builtin_tool = part.provider_executed | ||
|
|
||
| tool_call_id = part.tool_call_id | ||
| args = part.input | ||
|
|
||
| args: str | dict[str, Any] | None = part.input | ||
|
|
||
| if isinstance(args, str): | ||
| try: | ||
| parsed = json.loads(args) | ||
| if isinstance(parsed, dict): | ||
| args = cast(dict[str, Any], parsed) | ||
| except json.JSONDecodeError: | ||
| pass | ||
| elif isinstance(args, dict) or args is None: | ||
| pass | ||
| else: | ||
| assert_never(args) | ||
|
|
||
| if builtin_tool: | ||
| call_part = BuiltinToolCallPart(tool_name=tool_name, tool_call_id=tool_call_id, args=args) | ||
|
|
@@ -197,3 +231,204 @@ def load_messages(cls, messages: Sequence[UIMessage]) -> list[ModelMessage]: # | |
| assert_never(msg.role) | ||
|
|
||
| return builder.messages | ||
|
|
||
| @classmethod | ||
| def dump_messages( # noqa: C901 | ||
dsfaccini marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| cls, | ||
| messages: Sequence[ModelMessage], | ||
| ) -> list[UIMessage]: | ||
| """Transform Pydantic AI messages into Vercel AI messages. | ||
|
|
||
| Args: | ||
| messages: A sequence of ModelMessage objects to convert | ||
| _id_generator: Optional ID generator function for testing. If not provided, uses uuid.uuid4(). | ||
dsfaccini marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| Returns: | ||
| A list of UIMessage objects in Vercel AI format | ||
| """ | ||
|
|
||
| def _message_id_generator() -> str: | ||
dsfaccini marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| """Generate a message ID.""" | ||
| return uuid.uuid4().hex | ||
|
|
||
| tool_returns: dict[str, ToolReturnPart] = {} | ||
| tool_errors: dict[str, RetryPromptPart] = {} | ||
|
|
||
| for msg in messages: | ||
| if isinstance(msg, ModelRequest): | ||
| for part in msg.parts: | ||
| if isinstance(part, ToolReturnPart): | ||
| tool_returns[part.tool_call_id] = part | ||
| elif isinstance(part, RetryPromptPart) and part.tool_call_id: | ||
| tool_errors[part.tool_call_id] = part | ||
|
|
||
| result: list[UIMessage] = [] | ||
|
|
||
| for msg in messages: | ||
| if isinstance(msg, ModelRequest): | ||
| system_ui_parts: list[UIMessagePart] = [] | ||
| user_ui_parts: list[UIMessagePart] = [] | ||
|
|
||
| for part in msg.parts: | ||
| if isinstance(part, SystemPromptPart): | ||
| system_ui_parts.append(TextUIPart(text=part.content, state='done')) | ||
| elif isinstance(part, UserPromptPart): | ||
| user_ui_parts.extend(_convert_user_prompt_part(part)) | ||
| elif isinstance(part, ToolReturnPart | RetryPromptPart): | ||
| # Tool returns/errors don't create separate UI parts | ||
| # They're merged into the tool call in the assistant message | ||
|
||
| pass | ||
| else: | ||
| assert_never(part) | ||
|
|
||
| if system_ui_parts: | ||
| result.append(UIMessage(id=_message_id_generator(), role='system', parts=system_ui_parts)) | ||
|
|
||
| if user_ui_parts: | ||
| result.append(UIMessage(id=_message_id_generator(), role='user', parts=user_ui_parts)) | ||
|
|
||
| elif isinstance( # pragma: no branch | ||
| msg, ModelResponse | ||
| ): | ||
| ui_parts: list[UIMessagePart] = [] | ||
|
|
||
| # For builtin tools, returns can be in the same ModelResponse as calls | ||
| # Build a local mapping for this message | ||
| local_builtin_returns: dict[str, BuiltinToolReturnPart] = {} | ||
| for part in msg.parts: | ||
| if isinstance(part, BuiltinToolReturnPart): | ||
| local_builtin_returns[part.tool_call_id] = part | ||
|
||
|
|
||
| for part in msg.parts: | ||
| if isinstance(part, BuiltinToolReturnPart): | ||
| continue | ||
| elif isinstance(part, TextPart): | ||
| # Combine consecutive text parts by checking the last UI part | ||
| if ui_parts and isinstance(ui_parts[-1], TextUIPart): | ||
| last_text = ui_parts[-1] | ||
| ui_parts[-1] = last_text.model_copy(update={'text': last_text.text + part.content}) | ||
|
||
| else: | ||
| ui_parts.append(TextUIPart(text=part.content, state='done')) | ||
| elif isinstance(part, ThinkingPart): | ||
| thinking_metadata: dict[str, Any] = {} | ||
| if part.id is not None: | ||
| thinking_metadata['id'] = part.id | ||
| if part.signature is not None: | ||
| thinking_metadata['signature'] = part.signature | ||
| if part.provider_name is not None: | ||
| thinking_metadata['provider_name'] = part.provider_name | ||
| if part.provider_details is not None: | ||
| thinking_metadata['provider_details'] = part.provider_details | ||
|
|
||
| provider_metadata = {'pydantic_ai': thinking_metadata} if thinking_metadata else None | ||
| ui_parts.append( | ||
| ReasoningUIPart(text=part.content, state='done', provider_metadata=provider_metadata) | ||
| ) | ||
| elif isinstance(part, FilePart): | ||
| ui_parts.append( | ||
| FileUIPart( | ||
| url=part.content.data_uri, | ||
| media_type=part.content.media_type, | ||
| ) | ||
| ) | ||
| elif isinstance(part, BaseToolCallPart): | ||
| if isinstance(part, BuiltinToolCallPart): | ||
| prefixed_id = ( | ||
| f'{BUILTIN_TOOL_CALL_ID_PREFIX}|{part.provider_name or ""}|{part.tool_call_id}' | ||
|
||
| ) | ||
|
|
||
| if builtin_return := local_builtin_returns.get(part.tool_call_id): | ||
| content = builtin_return.model_response_str() | ||
| call_provider_metadata = ( | ||
| {'pydantic_ai': {'provider_name': part.provider_name}} | ||
DouweM marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if part.provider_name | ||
| else None | ||
| ) | ||
| ui_parts.append( | ||
| ToolOutputAvailablePart( | ||
| type=f'tool-{part.tool_name}', | ||
| tool_call_id=prefixed_id, | ||
| input=part.args_as_json_str(), | ||
| output=content, | ||
| state='output-available', | ||
| provider_executed=True, | ||
| call_provider_metadata=call_provider_metadata, | ||
| ) | ||
| ) | ||
| else: # pragma: no cover | ||
| ui_parts.append( | ||
| ToolInputAvailablePart( | ||
| type=f'tool-{part.tool_name}', | ||
| tool_call_id=prefixed_id, | ||
| input=part.args_as_json_str(), | ||
| state='input-available', | ||
| provider_executed=True, | ||
|
||
| ) | ||
| ) | ||
| else: | ||
| tool_return = tool_returns.get(part.tool_call_id) | ||
| tool_error = tool_errors.get(part.tool_call_id) | ||
|
||
|
|
||
| if isinstance(tool_return, ToolReturnPart): | ||
| content = tool_return.model_response_str() | ||
| ui_parts.append( | ||
| DynamicToolOutputAvailablePart( | ||
| tool_name=part.tool_name, | ||
| tool_call_id=part.tool_call_id, | ||
| input=part.args_as_json_str(), | ||
| output=content, | ||
| state='output-available', | ||
| ) | ||
| ) | ||
| elif tool_error: | ||
| error_text = tool_error.model_response() | ||
| ui_parts.append( | ||
| DynamicToolOutputErrorPart( | ||
| tool_name=part.tool_name, | ||
| tool_call_id=part.tool_call_id, | ||
| input=part.args_as_json_str(), | ||
| error_text=error_text, | ||
| state='output-error', | ||
| ) | ||
| ) | ||
| else: | ||
| ui_parts.append( | ||
| DynamicToolInputAvailablePart( | ||
| tool_name=part.tool_name, | ||
| tool_call_id=part.tool_call_id, | ||
| input=part.args_as_json_str(), | ||
| state='input-available', | ||
| ) | ||
| ) | ||
| else: | ||
| assert_never(part) | ||
|
|
||
| if ui_parts: # pragma: no branch | ||
| result.append(UIMessage(id=_message_id_generator(), role='assistant', parts=ui_parts)) | ||
| else: | ||
| assert_never(msg) | ||
|
|
||
| return result | ||
|
|
||
|
|
||
| def _convert_user_prompt_part(part: UserPromptPart) -> list[UIMessagePart]: | ||
| """Convert a UserPromptPart to a list of UI message parts.""" | ||
| ui_parts: list[UIMessagePart] = [] | ||
|
|
||
| if isinstance(part.content, str): | ||
| ui_parts.append(TextUIPart(text=part.content, state='done')) | ||
| else: | ||
| for item in part.content: | ||
| if isinstance(item, str): | ||
| ui_parts.append(TextUIPart(text=item, state='done')) | ||
| elif isinstance(item, BinaryContent): | ||
| ui_parts.append(FileUIPart(url=item.data_uri, media_type=item.media_type)) | ||
| elif isinstance(item, ImageUrl | AudioUrl | VideoUrl | DocumentUrl): | ||
| ui_parts.append(FileUIPart(url=item.url, media_type=item.media_type)) | ||
| elif isinstance(item, CachePoint): | ||
| # CachePoint is metadata for prompt caching, skip for UI conversion | ||
| pass | ||
| else: | ||
| assert_never(item) | ||
|
|
||
| return ui_parts | ||
Uh oh!
There was an error while loading. Please reload this page.