-
Notifications
You must be signed in to change notification settings - Fork 31
feat: mcp client with 404 recovery #502
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| """MCP (Model Context Protocol) tools.""" | ||
|
|
||
| from .mcp_client import McpClient | ||
| from .mcp_tool import ( | ||
| create_mcp_tools, | ||
| create_mcp_tools_from_agent, | ||
| create_mcp_tools_from_metadata_for_mcp_server, | ||
| ) | ||
|
|
||
| __all__ = [ | ||
| "McpClient", | ||
| "create_mcp_tools", | ||
| "create_mcp_tools_from_agent", | ||
| "create_mcp_tools_from_metadata_for_mcp_server", | ||
| ] |
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,290 @@ | ||||||||||||||||||||||||||||||||
| """MCP Session management for tool invocations. | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| This module provides a session class that manages the lifecycle of MCP connections, | ||||||||||||||||||||||||||||||||
| including automatic reconnection on session disconnect errors. | ||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| import asyncio | ||||||||||||||||||||||||||||||||
| import logging | ||||||||||||||||||||||||||||||||
| from contextlib import AsyncExitStack | ||||||||||||||||||||||||||||||||
| from typing import Any | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| import httpx | ||||||||||||||||||||||||||||||||
| from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream | ||||||||||||||||||||||||||||||||
| from mcp import ClientSession | ||||||||||||||||||||||||||||||||
| from mcp.client.streamable_http import ( | ||||||||||||||||||||||||||||||||
| GetSessionIdCallback, | ||||||||||||||||||||||||||||||||
| streamable_http_client, | ||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||
| from mcp.shared.exceptions import McpError | ||||||||||||||||||||||||||||||||
| from mcp.shared.message import SessionMessage | ||||||||||||||||||||||||||||||||
| from mcp.types import CallToolResult | ||||||||||||||||||||||||||||||||
| from uipath._utils._ssl_context import get_httpx_client_kwargs | ||||||||||||||||||||||||||||||||
| from uipath.runtime.base import UiPathDisposableProtocol | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| logger = logging.getLogger(__name__) | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| class McpClient(UiPathDisposableProtocol): | ||||||||||||||||||||||||||||||||
| """Manages an MCP session for tool invocations. | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| This class handles the lifecycle of MCP connections with two distinct phases: | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| 1. **Client Initialization** (first call): | ||||||||||||||||||||||||||||||||
| - Creates HTTP client | ||||||||||||||||||||||||||||||||
| - Establishes streamable HTTP connection | ||||||||||||||||||||||||||||||||
| - Creates ClientSession | ||||||||||||||||||||||||||||||||
| - Calls session.initialize() to get session ID | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| 2. **Session Reinitialization** (on 404 error): | ||||||||||||||||||||||||||||||||
| - Reuses existing HTTP client and streamable connection | ||||||||||||||||||||||||||||||||
| - Calls session.initialize() again to get new session ID | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| Thread-safety is ensured via asyncio.Lock for both phases. | ||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| # Error codes that indicate session disconnect/termination | ||||||||||||||||||||||||||||||||
| SESSION_ERROR_CODES = [32600, -32000] | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| def __init__( | ||||||||||||||||||||||||||||||||
| self, | ||||||||||||||||||||||||||||||||
| url: str, | ||||||||||||||||||||||||||||||||
| headers: dict[str, str] | None = None, | ||||||||||||||||||||||||||||||||
| timeout: httpx.Timeout | None = None, | ||||||||||||||||||||||||||||||||
| max_retries: int = 1, | ||||||||||||||||||||||||||||||||
| ) -> None: | ||||||||||||||||||||||||||||||||
| """Initialize the MCP tool session. | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| Args: | ||||||||||||||||||||||||||||||||
| url: The MCP server endpoint URL. | ||||||||||||||||||||||||||||||||
| headers: Optional headers to include in HTTP requests. | ||||||||||||||||||||||||||||||||
| timeout: Optional timeout configuration for HTTP requests. | ||||||||||||||||||||||||||||||||
| max_retries: Maximum number of retries on session disconnect errors. | ||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||
| self._url = url | ||||||||||||||||||||||||||||||||
| self._headers = headers or {} | ||||||||||||||||||||||||||||||||
| self._timeout = timeout or httpx.Timeout(600) | ||||||||||||||||||||||||||||||||
| self._max_retries = max_retries | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| # Lock for both client initialization and session reinitialization | ||||||||||||||||||||||||||||||||
| self._lock = asyncio.Lock() | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| # Client state (created once, reused across session reinitializations) | ||||||||||||||||||||||||||||||||
| self._http_client: httpx.AsyncClient | None = None | ||||||||||||||||||||||||||||||||
| self._read_stream: ( | ||||||||||||||||||||||||||||||||
| MemoryObjectReceiveStream[SessionMessage | Exception] | None | ||||||||||||||||||||||||||||||||
| ) = None | ||||||||||||||||||||||||||||||||
| self._write_stream: MemoryObjectSendStream[SessionMessage] | None = None | ||||||||||||||||||||||||||||||||
| self._get_session_id: GetSessionIdCallback | None = None | ||||||||||||||||||||||||||||||||
| self._stack: AsyncExitStack | None = None | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| # Session state (can be reinitialized without recreating client) | ||||||||||||||||||||||||||||||||
| self._session: ClientSession | None = None | ||||||||||||||||||||||||||||||||
| self._session_id: str | None = None | ||||||||||||||||||||||||||||||||
| self._client_initialized: bool = False | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| @property | ||||||||||||||||||||||||||||||||
| def session_id(self) -> str | None: | ||||||||||||||||||||||||||||||||
| """Get the current session ID.""" | ||||||||||||||||||||||||||||||||
| return self._session_id | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| @property | ||||||||||||||||||||||||||||||||
| def is_client_initialized(self) -> bool: | ||||||||||||||||||||||||||||||||
| """Check if the HTTP client and streamable connection are initialized.""" | ||||||||||||||||||||||||||||||||
| return self._client_initialized | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| async def _initialize_client(self) -> None: | ||||||||||||||||||||||||||||||||
| """Initialize the HTTP client and streamable connection. | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| This is called once on first use. Creates: | ||||||||||||||||||||||||||||||||
| - httpx.AsyncClient | ||||||||||||||||||||||||||||||||
| - Streamable HTTP connection (read/write streams) | ||||||||||||||||||||||||||||||||
| - ClientSession | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| Then calls _initialize_session() to complete the MCP handshake. | ||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||
| logger.debug("Initializing MCP client") | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| # Create exit stack for resource management | ||||||||||||||||||||||||||||||||
| self._stack = AsyncExitStack() | ||||||||||||||||||||||||||||||||
| await self._stack.__aenter__() | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| # Create HTTP client with SSL, proxy, and redirect settings | ||||||||||||||||||||||||||||||||
| default_client_kwargs = get_httpx_client_kwargs() | ||||||||||||||||||||||||||||||||
| client_kwargs = { | ||||||||||||||||||||||||||||||||
| **default_client_kwargs, | ||||||||||||||||||||||||||||||||
| "headers": self._headers, | ||||||||||||||||||||||||||||||||
| "timeout": self._timeout, | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| self._http_client = await self._stack.enter_async_context( | ||||||||||||||||||||||||||||||||
| httpx.AsyncClient(**client_kwargs) | ||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||
edis-uipath marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| # Create streamable HTTP connection | ||||||||||||||||||||||||||||||||
| ( | ||||||||||||||||||||||||||||||||
| self._read_stream, | ||||||||||||||||||||||||||||||||
| self._write_stream, | ||||||||||||||||||||||||||||||||
| self._get_session_id, | ||||||||||||||||||||||||||||||||
| ) = await self._stack.enter_async_context( | ||||||||||||||||||||||||||||||||
| streamable_http_client( | ||||||||||||||||||||||||||||||||
| url=self._url, | ||||||||||||||||||||||||||||||||
| http_client=self._http_client, | ||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| # Create ClientSession (but don't initialize yet) | ||||||||||||||||||||||||||||||||
| # These are guaranteed to be set by the context manager above | ||||||||||||||||||||||||||||||||
| assert self._read_stream is not None | ||||||||||||||||||||||||||||||||
| assert self._write_stream is not None | ||||||||||||||||||||||||||||||||
| self._session = await self._stack.enter_async_context( | ||||||||||||||||||||||||||||||||
| ClientSession(self._read_stream, self._write_stream) | ||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| self._client_initialized = True | ||||||||||||||||||||||||||||||||
| logger.info("MCP client initialized") | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| # Now initialize the MCP session | ||||||||||||||||||||||||||||||||
| await self._initialize_session() | ||||||||||||||||||||||||||||||||
|
Comment on lines
+143
to
+147
|
||||||||||||||||||||||||||||||||
| self._client_initialized = True | |
| logger.info("MCP client initialized") | |
| # Now initialize the MCP session | |
| await self._initialize_session() | |
| try: | |
| self._client_initialized = True | |
| logger.info("MCP client initialized") | |
| # Now initialize the MCP session | |
| await self._initialize_session() | |
| except Exception: | |
| # Clean up on failure to avoid leaving the client in an inconsistent state | |
| await self.close() | |
| raise |
edis-uipath marked this conversation as resolved.
Show resolved
Hide resolved
edis-uipath marked this conversation as resolved.
Show resolved
Hide resolved
Copilot
AI
Feb 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error checking logic using hasattr on line 207-208 is fragile and could silently fail. If the McpError structure changes or if error.error exists but is None, this will return False instead of raising an appropriate error. Additionally, accessing error.error.code at line 249 without the same checks could raise an AttributeError.
Consider:
- Using more explicit type checking or isinstance checks
- Handling the case where error.error is None
- Being consistent - if you use hasattr in the check, also guard the access at line 249
Copilot
AI
Feb 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a potential race condition when multiple concurrent tool calls encounter 404 errors. The scenario:
- Tool call A gets 404, enters retry logic
- Tool call B gets 404 while A is reinitializing, also enters retry logic
- Both calls try to reinitialize the session
While _reinitialize_session is protected by a lock, the check if self._is_session_error(e) happens outside the lock in call_tool. This means multiple concurrent calls could all increment their retry counters and potentially exhaust retries even though only one session reinitialization is needed.
A more robust approach would be to track the session ID at the start of each call attempt and only reinitialize if the session ID hasn't changed (meaning another concurrent call hasn't already reinitialized). However, given the lock protection in _reinitialize_session, this is a minor efficiency issue rather than a correctness issue - at worst, some calls might retry unnecessarily or fail when they could have succeeded.
Copilot
AI
Feb 4, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-McpError exceptions are not handled in the retry logic, which could lead to inconsistent state. If session.call_tool() at line 240 raises any exception other than McpError (e.g., httpx.HTTPError, asyncio.TimeoutError, or connection errors), it will propagate immediately without cleanup. This leaves the client in an initialized state but potentially with a broken connection.
Consider wrapping in a broader exception handler to either:
- Close and reset the client on non-McpError exceptions
- Document which exceptions are expected and should not be retried
- Add logging for unexpected exceptions
| raise | |
| raise | |
| except asyncio.CancelledError: | |
| # Preserve task cancellation semantics; do not attempt cleanup here. | |
| raise | |
| except Exception as e: | |
| # Handle unexpected non-McpError exceptions by resetting the client state | |
| logger.exception( | |
| "Unexpected error during MCP tool call; closing session to reset state" | |
| ) | |
| try: | |
| await self.close() | |
| except Exception as cleanup_error: | |
| logger.debug(f"Error during cleanup after unexpected exception: {cleanup_error}") | |
| raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we remove this?