Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "uipath-langchain"
version = "0.5.27"
version = "0.5.28"
description = "Python SDK that enables developers to build and deploy LangGraph agents to the UiPath Cloud Platform"
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.11"
Expand Down
2 changes: 0 additions & 2 deletions src/uipath_langchain/agent/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from .extraction_tool import create_ixp_extraction_tool
from .integration_tool import create_integration_tool
from .ixp_escalation_tool import create_ixp_escalation_tool
from .mcp_tool import create_mcp_tools
from .process_tool import create_process_tool
from .tool_factory import (
create_tools_from_resources,
Expand All @@ -19,7 +18,6 @@
"create_process_tool",
"create_integration_tool",
"create_escalation_tool",
"create_mcp_tools",
"create_ixp_extraction_tool",
"create_ixp_escalation_tool",
"UiPathToolNode",
Expand Down
15 changes: 15 additions & 0 deletions src/uipath_langchain/agent/tools/mcp/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""MCP (Model Context Protocol) tools."""

from .mcp_client import McpClient
from .mcp_tool import (
create_mcp_tools,
create_mcp_tools_from_agent,
create_mcp_tools_from_metadata_for_mcp_server,
)

__all__ = [
"McpClient",
"create_mcp_tools",
"create_mcp_tools_from_agent",
"create_mcp_tools_from_metadata_for_mcp_server",
]
433 changes: 433 additions & 0 deletions src/uipath_langchain/agent/tools/mcp/claude.md
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we remove this?

Large diffs are not rendered by default.

290 changes: 290 additions & 0 deletions src/uipath_langchain/agent/tools/mcp/mcp_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
"""MCP Session management for tool invocations.

This module provides a session class that manages the lifecycle of MCP connections,
including automatic reconnection on session disconnect errors.
"""

import asyncio
import logging
from contextlib import AsyncExitStack
from typing import Any

import httpx
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from mcp import ClientSession
from mcp.client.streamable_http import (
GetSessionIdCallback,
streamable_http_client,
)
from mcp.shared.exceptions import McpError
from mcp.shared.message import SessionMessage
from mcp.types import CallToolResult
from uipath._utils._ssl_context import get_httpx_client_kwargs
from uipath.runtime.base import UiPathDisposableProtocol

logger = logging.getLogger(__name__)


class McpClient(UiPathDisposableProtocol):
"""Manages an MCP session for tool invocations.

This class handles the lifecycle of MCP connections with two distinct phases:

1. **Client Initialization** (first call):
- Creates HTTP client
- Establishes streamable HTTP connection
- Creates ClientSession
- Calls session.initialize() to get session ID

2. **Session Reinitialization** (on 404 error):
- Reuses existing HTTP client and streamable connection
- Calls session.initialize() again to get new session ID

Thread-safety is ensured via asyncio.Lock for both phases.
"""

# Error codes that indicate session disconnect/termination
SESSION_ERROR_CODES = [32600, -32000]

def __init__(
self,
url: str,
headers: dict[str, str] | None = None,
timeout: httpx.Timeout | None = None,
max_retries: int = 1,
) -> None:
"""Initialize the MCP tool session.

Args:
url: The MCP server endpoint URL.
headers: Optional headers to include in HTTP requests.
timeout: Optional timeout configuration for HTTP requests.
max_retries: Maximum number of retries on session disconnect errors.
"""
self._url = url
self._headers = headers or {}
self._timeout = timeout or httpx.Timeout(600)
self._max_retries = max_retries

# Lock for both client initialization and session reinitialization
self._lock = asyncio.Lock()

# Client state (created once, reused across session reinitializations)
self._http_client: httpx.AsyncClient | None = None
self._read_stream: (
MemoryObjectReceiveStream[SessionMessage | Exception] | None
) = None
self._write_stream: MemoryObjectSendStream[SessionMessage] | None = None
self._get_session_id: GetSessionIdCallback | None = None
self._stack: AsyncExitStack | None = None

# Session state (can be reinitialized without recreating client)
self._session: ClientSession | None = None
self._session_id: str | None = None
self._client_initialized: bool = False

@property
def session_id(self) -> str | None:
"""Get the current session ID."""
return self._session_id

@property
def is_client_initialized(self) -> bool:
"""Check if the HTTP client and streamable connection are initialized."""
return self._client_initialized

async def _initialize_client(self) -> None:
"""Initialize the HTTP client and streamable connection.

This is called once on first use. Creates:
- httpx.AsyncClient
- Streamable HTTP connection (read/write streams)
- ClientSession

Then calls _initialize_session() to complete the MCP handshake.
"""
logger.debug("Initializing MCP client")

# Create exit stack for resource management
self._stack = AsyncExitStack()
await self._stack.__aenter__()

# Create HTTP client with SSL, proxy, and redirect settings
default_client_kwargs = get_httpx_client_kwargs()
client_kwargs = {
**default_client_kwargs,
"headers": self._headers,
"timeout": self._timeout,
}
self._http_client = await self._stack.enter_async_context(
httpx.AsyncClient(**client_kwargs)
)

# Create streamable HTTP connection
(
self._read_stream,
self._write_stream,
self._get_session_id,
) = await self._stack.enter_async_context(
streamable_http_client(
url=self._url,
http_client=self._http_client,
)
)

# Create ClientSession (but don't initialize yet)
# These are guaranteed to be set by the context manager above
assert self._read_stream is not None
assert self._write_stream is not None
self._session = await self._stack.enter_async_context(
ClientSession(self._read_stream, self._write_stream)
)

self._client_initialized = True
logger.info("MCP client initialized")

# Now initialize the MCP session
await self._initialize_session()
Comment on lines +143 to +147
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If _initialize_session() at line 146 raises an exception, the client will be left in an inconsistent state. The _client_initialized flag is set to True at line 142, but if session initialization fails, future calls to _ensure_session() will return immediately thinking the client is ready (line 176-181), but _session_id will be None and the session may not be properly initialized.

Add exception handling to ensure cleanup on failure:

try:
    # ... client initialization ...
    self._client_initialized = True
    await self._initialize_session()
except Exception:
    # Clean up on failure
    await self.close()
    raise
Suggested change
self._client_initialized = True
logger.info("MCP client initialized")
# Now initialize the MCP session
await self._initialize_session()
try:
self._client_initialized = True
logger.info("MCP client initialized")
# Now initialize the MCP session
await self._initialize_session()
except Exception:
# Clean up on failure to avoid leaving the client in an inconsistent state
await self.close()
raise

Copilot uses AI. Check for mistakes.

async def _initialize_session(self) -> None:
"""Initialize or reinitialize the MCP session.

Calls session.initialize() to perform the MCP handshake and obtain
a session ID from the server. Can be called multiple times on the
same ClientSession to recover from session disconnects.

Requires: Client must be initialized first (_initialize_client).
"""
if self._session is None:
raise RuntimeError("Cannot initialize session: client not initialized")

logger.debug(f"Initializing MCP session (previous: {self._session_id})")

await self._session.initialize()
self._session_id = self._get_session_id() # type: ignore[misc]

logger.info(f"MCP session initialized: {self._session_id}")

async def _ensure_session(self) -> ClientSession:
"""Ensure client and session are initialized, return the session.

Thread-safe via lock. Only initializes once; subsequent calls
return the existing session immediately.

Returns:
The initialized ClientSession.
"""
if not self._client_initialized:
async with self._lock:
if not self._client_initialized:
await self._initialize_client()

return self._session # type: ignore[return-value]

async def _reinitialize_session(self) -> None:
"""Reinitialize only the MCP session after a disconnect error.

Thread-safe via lock. Reuses existing HTTP client and streamable
connection; only performs a new MCP handshake.
"""
async with self._lock:
if not self._client_initialized:
# Client not initialized, do full initialization
await self._initialize_client()
else:
# Client exists, just reinitialize session
await self._initialize_session()

def _is_session_error(self, error: McpError) -> bool:
"""Check if an McpError indicates a session disconnect.

Args:
error: The McpError to check.

Returns:
True if the error indicates a session disconnect.
"""
return (
hasattr(error, "error")
and hasattr(error.error, "code")
and error.error.code in self.SESSION_ERROR_CODES
)
Comment on lines +198 to +211
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error checking logic using hasattr on line 207-208 is fragile and could silently fail. If the McpError structure changes or if error.error exists but is None, this will return False instead of raising an appropriate error. Additionally, accessing error.error.code at line 249 without the same checks could raise an AttributeError.

Consider:

  1. Using more explicit type checking or isinstance checks
  2. Handling the case where error.error is None
  3. Being consistent - if you use hasattr in the check, also guard the access at line 249

Copilot uses AI. Check for mistakes.

async def call_tool(
self,
name: str,
arguments: dict[str, Any] | None = None,
) -> CallToolResult:
"""Call an MCP tool with automatic retry on session disconnect.

On first call, initializes the full client stack. On session
disconnect (404/32600), reinitializes only the session and retries.

Args:
name: The name of the tool to call.
arguments: Optional arguments to pass to the tool.

Returns:
The tool call result.

Raises:
McpError: If the tool call fails after all retries.
"""
retry_count = 0

while retry_count <= self._max_retries:
try:
session = await self._ensure_session()
logger.debug(
f"Calling tool {name} (attempt {retry_count + 1}/{self._max_retries + 1})"
)
result = await session.call_tool(name, arguments=arguments)
logger.info(f"Tool call successful: {name}")
return result

except McpError as e:
logger.info(f"McpError during tool call: {e}")

if self._is_session_error(e) and retry_count < self._max_retries:
logger.warning(
f"Session disconnected (error code: {e.error.code}), "
f"reinitializing session"
)
await self._reinitialize_session()
retry_count += 1
continue
Comment on lines +245 to +255
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a potential race condition when multiple concurrent tool calls encounter 404 errors. The scenario:

  1. Tool call A gets 404, enters retry logic
  2. Tool call B gets 404 while A is reinitializing, also enters retry logic
  3. Both calls try to reinitialize the session

While _reinitialize_session is protected by a lock, the check if self._is_session_error(e) happens outside the lock in call_tool. This means multiple concurrent calls could all increment their retry counters and potentially exhaust retries even though only one session reinitialization is needed.

A more robust approach would be to track the session ID at the start of each call attempt and only reinitialize if the session ID hasn't changed (meaning another concurrent call hasn't already reinitialized). However, given the lock protection in _reinitialize_session, this is a minor efficiency issue rather than a correctness issue - at worst, some calls might retry unnecessarily or fail when they could have succeeded.

Copilot uses AI. Check for mistakes.
else:
if retry_count >= self._max_retries:
logger.error(f"Max retries reached after session error: {e}")
else:
logger.error(f"Non-retryable MCP error: {e}")
raise
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-McpError exceptions are not handled in the retry logic, which could lead to inconsistent state. If session.call_tool() at line 240 raises any exception other than McpError (e.g., httpx.HTTPError, asyncio.TimeoutError, or connection errors), it will propagate immediately without cleanup. This leaves the client in an initialized state but potentially with a broken connection.

Consider wrapping in a broader exception handler to either:

  1. Close and reset the client on non-McpError exceptions
  2. Document which exceptions are expected and should not be retried
  3. Add logging for unexpected exceptions
Suggested change
raise
raise
except asyncio.CancelledError:
# Preserve task cancellation semantics; do not attempt cleanup here.
raise
except Exception as e:
# Handle unexpected non-McpError exceptions by resetting the client state
logger.exception(
"Unexpected error during MCP tool call; closing session to reset state"
)
try:
await self.close()
except Exception as cleanup_error:
logger.debug(f"Error during cleanup after unexpected exception: {cleanup_error}")
raise

Copilot uses AI. Check for mistakes.

# Should not reach here, but just in case
raise RuntimeError("Exited retry loop unexpectedly")

async def dispose(self) -> None:
"""Dispose of the client and release all resources.

Implements UiPathDisposableProtocol.
Releases the HTTP client, streamable connection, and ClientSession.
After calling dispose(), the client can be reused - a new call_tool()
will reinitialize everything.
"""
async with self._lock:
if self._stack is not None:
try:
await self._stack.__aexit__(None, None, None)
except Exception as e:
logger.debug(f"Error during cleanup: {e}")
finally:
self._stack = None
self._session = None
self._session_id = None
self._http_client = None
self._read_stream = None
self._write_stream = None
self._get_session_id = None
self._client_initialized = False

logger.info("MCP client disposed")
Loading