Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 100 additions & 5 deletions src/bot/handlers/message.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Message handlers for non-command inputs."""

import asyncio
import os
from typing import Optional

import structlog
Expand All @@ -19,6 +20,11 @@
from ...security.audit import AuditLogger
from ...security.rate_limiter import RateLimiter
from ...security.validators import SecurityValidator
from ..utils.file_extractor import (
REJECTION_SURFACE_TO_USER,
FileAttachment,
validate_file_path,
)
from ..utils.html_format import escape_html
from ..utils.image_extractor import (
ImageAttachment,
Expand Down Expand Up @@ -355,27 +361,39 @@ async def handle_text_message(
# Flag is only cleared after a successful run so retries keep the intent.
force_new = bool(context.user_data.get("force_new_session"))

# MCP image collection via stream intercept
# MCP image / file collection via stream intercept
mcp_images: list[ImageAttachment] = []
mcp_files: list[FileAttachment] = []
mcp_rejected_files: list[str] = []

# Enhanced stream updates handler with progress tracking
async def stream_handler(update_obj):
# Intercept send_image_to_user MCP tool calls.
# Intercept send_image_to_user / send_file_to_user MCP tool calls.
# The SDK namespaces MCP tools as "mcp__<server>__<tool>".
if update_obj.tool_calls:
for tc in update_obj.tool_calls:
tc_name = tc.get("name", "")
tc_input = tc.get("input", {})
file_path = tc_input.get("file_path", "")
caption = tc_input.get("caption", "")
if tc_name == "send_image_to_user" or tc_name.endswith(
"__send_image_to_user"
):
tc_input = tc.get("input", {})
file_path = tc_input.get("file_path", "")
caption = tc_input.get("caption", "")
img = validate_image_path(
file_path, settings.approved_directory, caption
)
if img:
mcp_images.append(img)
elif tc_name == "send_file_to_user" or tc_name.endswith(
"__send_file_to_user"
):
attachment, reason = validate_file_path(
file_path, settings.approved_directory, caption
)
if attachment:
mcp_files.append(attachment)
elif file_path and reason in REJECTION_SURFACE_TO_USER:
mcp_rejected_files.append(file_path)

try:
progress_text = await _format_progress_update(update_obj)
Expand Down Expand Up @@ -578,6 +596,83 @@ async def stream_handler(update_obj):
error=str(doc_err),
)

# Send MCP-collected files (from send_file_to_user tool calls) and
# surface any paths rejected by bot-side validation so Claude's
# "file sent" reply doesn't silently mislead the user.
if mcp_files or mcp_rejected_files:
from pathlib import Path as _P

failed_files: list[str] = []
for attachment in mcp_files:
# TOCTOU-safe: refuse to send if path was swapped (symlink,
# replacement) between validation and delivery.
try:
fd = os.open(str(attachment.path), os.O_RDONLY | os.O_NOFOLLOW)
except OSError as file_err:
logger.warning(
"TOCTOU-safe open failed for MCP document",
path=str(attachment.path),
error=str(file_err),
)
failed_files.append(attachment.path.name)
continue

try:
file_stat = os.fstat(fd)
if (
file_stat.st_ino != attachment.inode
or file_stat.st_dev != attachment.device
):
logger.warning(
"File identity changed since validation — " "refusing send",
path=str(attachment.path),
)
os.close(fd)
failed_files.append(attachment.path.name)
continue

with os.fdopen(fd, "rb") as f:
await update.message.reply_document(
document=f,
filename=attachment.path.name,
caption=attachment.caption or None,
reply_to_message_id=update.message.message_id,
)
await asyncio.sleep(0.5)
except Exception as file_err:
logger.warning(
"Failed to send MCP document",
path=str(attachment.path),
error=str(file_err),
)
failed_files.append(attachment.path.name)
try:
os.close(fd)
except OSError:
pass

summary_lines: list[str] = []
if failed_files:
summary_lines.append(f"⚠️ Failed to send: {', '.join(failed_files)}")
if mcp_rejected_files:
rejected_names = ", ".join(_P(p).name or p for p in mcp_rejected_files)
summary_lines.append(
"🚫 Rejected by security policy "
"(outside APPROVED_DIRECTORY or blocked secret file): "
f"{rejected_names}"
)
if summary_lines:
try:
await update.message.reply_text(
"\n".join(summary_lines),
reply_to_message_id=update.message.message_id,
)
except Exception as summary_err:
logger.debug(
"Failed to send document error summary",
error=str(summary_err),
)

# Update session info
context.user_data["last_message"] = update.message.text

Expand Down
Loading