diff --git a/src/bot/handlers/message.py b/src/bot/handlers/message.py index bbd24084..b601da3f 100644 --- a/src/bot/handlers/message.py +++ b/src/bot/handlers/message.py @@ -1,6 +1,7 @@ """Message handlers for non-command inputs.""" import asyncio +import os from typing import Optional import structlog @@ -19,6 +20,11 @@ from ...security.audit import AuditLogger from ...security.rate_limiter import RateLimiter from ...security.validators import SecurityValidator +from ..utils.file_extractor import ( + REJECTION_SURFACE_TO_USER, + FileAttachment, + validate_file_path, +) from ..utils.html_format import escape_html from ..utils.image_extractor import ( ImageAttachment, @@ -355,27 +361,39 @@ async def handle_text_message( # Flag is only cleared after a successful run so retries keep the intent. force_new = bool(context.user_data.get("force_new_session")) - # MCP image collection via stream intercept + # MCP image / file collection via stream intercept mcp_images: list[ImageAttachment] = [] + mcp_files: list[FileAttachment] = [] + mcp_rejected_files: list[str] = [] # Enhanced stream updates handler with progress tracking async def stream_handler(update_obj): - # Intercept send_image_to_user MCP tool calls. + # Intercept send_image_to_user / send_file_to_user MCP tool calls. # The SDK namespaces MCP tools as "mcp____". if update_obj.tool_calls: for tc in update_obj.tool_calls: tc_name = tc.get("name", "") + tc_input = tc.get("input", {}) + file_path = tc_input.get("file_path", "") + caption = tc_input.get("caption", "") if tc_name == "send_image_to_user" or tc_name.endswith( "__send_image_to_user" ): - tc_input = tc.get("input", {}) - file_path = tc_input.get("file_path", "") - caption = tc_input.get("caption", "") img = validate_image_path( file_path, settings.approved_directory, caption ) if img: mcp_images.append(img) + elif tc_name == "send_file_to_user" or tc_name.endswith( + "__send_file_to_user" + ): + attachment, reason = validate_file_path( + file_path, settings.approved_directory, caption + ) + if attachment: + mcp_files.append(attachment) + elif file_path and reason in REJECTION_SURFACE_TO_USER: + mcp_rejected_files.append(file_path) try: progress_text = await _format_progress_update(update_obj) @@ -578,6 +596,83 @@ async def stream_handler(update_obj): error=str(doc_err), ) + # Send MCP-collected files (from send_file_to_user tool calls) and + # surface any paths rejected by bot-side validation so Claude's + # "file sent" reply doesn't silently mislead the user. + if mcp_files or mcp_rejected_files: + from pathlib import Path as _P + + failed_files: list[str] = [] + for attachment in mcp_files: + # TOCTOU-safe: refuse to send if path was swapped (symlink, + # replacement) between validation and delivery. + try: + fd = os.open(str(attachment.path), os.O_RDONLY | os.O_NOFOLLOW) + except OSError as file_err: + logger.warning( + "TOCTOU-safe open failed for MCP document", + path=str(attachment.path), + error=str(file_err), + ) + failed_files.append(attachment.path.name) + continue + + try: + file_stat = os.fstat(fd) + if ( + file_stat.st_ino != attachment.inode + or file_stat.st_dev != attachment.device + ): + logger.warning( + "File identity changed since validation — " "refusing send", + path=str(attachment.path), + ) + os.close(fd) + failed_files.append(attachment.path.name) + continue + + with os.fdopen(fd, "rb") as f: + await update.message.reply_document( + document=f, + filename=attachment.path.name, + caption=attachment.caption or None, + reply_to_message_id=update.message.message_id, + ) + await asyncio.sleep(0.5) + except Exception as file_err: + logger.warning( + "Failed to send MCP document", + path=str(attachment.path), + error=str(file_err), + ) + failed_files.append(attachment.path.name) + try: + os.close(fd) + except OSError: + pass + + summary_lines: list[str] = [] + if failed_files: + summary_lines.append(f"⚠️ Failed to send: {', '.join(failed_files)}") + if mcp_rejected_files: + rejected_names = ", ".join(_P(p).name or p for p in mcp_rejected_files) + summary_lines.append( + "🚫 Rejected by security policy " + "(outside APPROVED_DIRECTORY or blocked secret file): " + f"{rejected_names}" + ) + if summary_lines: + try: + await update.message.reply_text( + "\n".join(summary_lines), + reply_to_message_id=update.message.message_id, + ) + except Exception as summary_err: + logger.debug( + "Failed to send document error summary", + error=str(summary_err), + ) + # Update session info context.user_data["last_message"] = update.message.text diff --git a/src/bot/orchestrator.py b/src/bot/orchestrator.py index 6d9719f0..0be4448c 100644 --- a/src/bot/orchestrator.py +++ b/src/bot/orchestrator.py @@ -6,6 +6,7 @@ """ import asyncio +import os import re import time from dataclasses import dataclass, field @@ -33,6 +34,11 @@ from ..config.settings import Settings from ..projects import PrivateTopicsUnavailableError from .utils.draft_streamer import DraftStreamer, generate_draft_id +from .utils.file_extractor import ( + REJECTION_SURFACE_TO_USER, + FileAttachment, + validate_file_path, +) from .utils.html_format import escape_html from .utils.image_extractor import ( ImageAttachment, @@ -721,6 +727,8 @@ def _make_stream_callback( start_time: float, reply_markup: Optional[InlineKeyboardMarkup] = None, mcp_images: Optional[List[ImageAttachment]] = None, + mcp_files: Optional[List[FileAttachment]] = None, + mcp_rejected_files: Optional[List[str]] = None, approved_directory: Optional[Path] = None, draft_streamer: Optional[DraftStreamer] = None, interrupt_event: Optional[asyncio.Event] = None, @@ -731,15 +739,21 @@ def _make_stream_callback( ``send_image_to_user`` tool calls and collects validated :class:`ImageAttachment` objects for later Telegram delivery. + When *mcp_files* is provided, the callback intercepts + ``send_file_to_user`` tool calls and collects validated + :class:`FileAttachment` objects the same way. + When *draft_streamer* is provided, tool activity and assistant text are streamed to the user in real time via ``sendMessageDraft``. - Returns None when verbose_level is 0 **and** no MCP image + Returns None when verbose_level is 0 **and** no MCP image/file collection or draft streaming is requested. Typing indicators are handled by a separate heartbeat task. """ - need_mcp_intercept = mcp_images is not None and approved_directory is not None + need_mcp_intercept = ( + mcp_images is not None or mcp_files is not None + ) and approved_directory is not None if verbose_level == 0 and not need_mcp_intercept and draft_streamer is None: return None @@ -751,23 +765,44 @@ async def _on_stream(update_obj: StreamUpdate) -> None: if interrupt_event is not None and interrupt_event.is_set(): return - # Intercept send_image_to_user MCP tool calls. + # Intercept send_image_to_user / send_file_to_user MCP tool calls. # The SDK namespaces MCP tools as "mcp____", # so match both the bare name and the namespaced variant. if update_obj.tool_calls and need_mcp_intercept: for tc in update_obj.tool_calls: tc_name = tc.get("name", "") - if tc_name == "send_image_to_user" or tc_name.endswith( - "__send_image_to_user" + tc_input = tc.get("input", {}) + file_path = tc_input.get("file_path", "") + caption = tc_input.get("caption", "") + if mcp_images is not None and ( + tc_name == "send_image_to_user" + or tc_name.endswith("__send_image_to_user") ): - tc_input = tc.get("input", {}) - file_path = tc_input.get("file_path", "") - caption = tc_input.get("caption", "") img = validate_image_path( file_path, approved_directory, caption ) if img: mcp_images.append(img) + elif mcp_files is not None and ( + tc_name == "send_file_to_user" + or tc_name.endswith("__send_file_to_user") + ): + attachment, reason = validate_file_path( + file_path, approved_directory, caption + ) + if attachment: + mcp_files.append(attachment) + elif ( + mcp_rejected_files is not None + and file_path + and reason in REJECTION_SURFACE_TO_USER + ): + # Only surface bot-side rejections (outside + # approved dir, secrets blocklist) — the MCP tool + # already returned an error for size/empty/etc., + # so Claude will describe those accurately on its + # own; our summary would just duplicate/mislead. + mcp_rejected_files.append(file_path) # Capture tool calls if update_obj.tool_calls: @@ -912,6 +947,101 @@ async def _send_images( return caption_sent + async def _send_documents( + self, + update: Update, + files: List[FileAttachment], + rejected: Optional[List[str]] = None, + reply_to_message_id: Optional[int] = None, + ) -> None: + """Send files collected from ``send_file_to_user`` as Telegram documents. + + Each file is sent independently with its own caption; Telegram does not + support grouping documents into an album. On per-file failure we log a + warning and continue with the rest. + + *rejected* carries paths that were refused by bot-side security + validation (``outside_approved`` or ``blocked_secret``). These are + listed in the same summary message so the user isn't misled by + Claude's "file sent" reply. Tool-side rejections + (``too_large``/``empty``/``not_a_file``) are not surfaced here — + Claude already sees the error from the MCP tool and describes it + accurately on its own. + + Each file is re-opened with ``O_NOFOLLOW`` and its inode/device is + compared against what was captured at validation time, to prevent a + TOCTOU where the path is swapped for a symlink (e.g. to a secret) + between the stream callback and this call. + """ + if not files and not rejected: + return + + failed: List[str] = [] + for attachment in files: + try: + fd = os.open(str(attachment.path), os.O_RDONLY | os.O_NOFOLLOW) + except OSError as e: + logger.warning( + "TOCTOU-safe open failed for MCP document", + path=str(attachment.path), + error=str(e), + ) + failed.append(attachment.path.name) + continue + + try: + file_stat = os.fstat(fd) + if ( + file_stat.st_ino != attachment.inode + or file_stat.st_dev != attachment.device + ): + logger.warning( + "File identity changed since validation — refusing send", + path=str(attachment.path), + ) + os.close(fd) + failed.append(attachment.path.name) + continue + + with os.fdopen(fd, "rb") as f: + await update.message.reply_document( + document=f, + filename=attachment.path.name, + caption=attachment.caption or None, + reply_to_message_id=reply_to_message_id, + ) + await asyncio.sleep(0.5) + except Exception as e: + logger.warning( + "Failed to send MCP document", + path=str(attachment.path), + error=str(e), + ) + failed.append(attachment.path.name) + try: + os.close(fd) + except OSError: + pass + + lines: List[str] = [] + if failed: + lines.append(f"⚠️ Failed to send: {', '.join(failed)}") + if rejected: + rejected_names = ", ".join(Path(p).name or p for p in rejected) + lines.append( + "🚫 Rejected by security policy " + "(outside APPROVED_DIRECTORY or blocked secret file): " + f"{rejected_names}" + ) + if lines: + try: + await update.message.reply_text( + "\n".join(lines), + reply_to_message_id=reply_to_message_id, + ) + except Exception as e: + logger.debug("Failed to send document error summary", error=str(e)) + async def agentic_text( self, update: Update, context: ContextTypes.DEFAULT_TYPE ) -> None: @@ -977,6 +1107,8 @@ async def agentic_text( tool_log: List[Dict[str, Any]] = [] start_time = time.time() mcp_images: List[ImageAttachment] = [] + mcp_files: List[FileAttachment] = [] + mcp_rejected_files: List[str] = [] # Stream drafts (private chats only) draft_streamer: Optional[DraftStreamer] = None @@ -996,6 +1128,8 @@ async def agentic_text( start_time, reply_markup=stop_kb, mcp_images=mcp_images, + mcp_files=mcp_files, + mcp_rejected_files=mcp_rejected_files, approved_directory=self.settings.approved_directory, draft_streamer=draft_streamer, interrupt_event=interrupt_event, @@ -1149,6 +1283,19 @@ async def agentic_text( except Exception as img_err: logger.warning("Image send failed", error=str(img_err)) + # Send MCP-collected files (from send_file_to_user tool calls) and + # notify the user about any paths rejected by bot-side validation. + if mcp_files or mcp_rejected_files: + try: + await self._send_documents( + update, + mcp_files, + rejected=mcp_rejected_files, + reply_to_message_id=update.message.message_id, + ) + except Exception as file_err: + logger.warning("Document send failed", error=str(file_err)) + # Audit log audit_logger = context.bot_data.get("audit_logger") if audit_logger: @@ -1246,12 +1393,16 @@ async def agentic_document( verbose_level = self._get_verbose_level(context) tool_log: List[Dict[str, Any]] = [] mcp_images_doc: List[ImageAttachment] = [] + mcp_files_doc: List[FileAttachment] = [] + mcp_rejected_files_doc: List[str] = [] on_stream = self._make_stream_callback( verbose_level, progress_msg, tool_log, time.time(), mcp_images=mcp_images_doc, + mcp_files=mcp_files_doc, + mcp_rejected_files=mcp_rejected_files_doc, approved_directory=self.settings.approved_directory, ) @@ -1330,6 +1481,17 @@ async def agentic_document( except Exception as img_err: logger.warning("Image send failed", error=str(img_err)) + if mcp_files_doc or mcp_rejected_files_doc: + try: + await self._send_documents( + update, + mcp_files_doc, + rejected=mcp_rejected_files_doc, + reply_to_message_id=update.message.message_id, + ) + except Exception as file_err: + logger.warning("Document send failed", error=str(file_err)) + except Exception as e: from .handlers.message import _format_error_message @@ -1455,12 +1617,16 @@ async def _handle_agentic_media_message( verbose_level = self._get_verbose_level(context) tool_log: List[Dict[str, Any]] = [] mcp_images_media: List[ImageAttachment] = [] + mcp_files_media: List[FileAttachment] = [] + mcp_rejected_files_media: List[str] = [] on_stream = self._make_stream_callback( verbose_level, progress_msg, tool_log, time.time(), mcp_images=mcp_images_media, + mcp_files=mcp_files_media, + mcp_rejected_files=mcp_rejected_files_media, approved_directory=self.settings.approved_directory, ) @@ -1540,6 +1706,17 @@ async def _handle_agentic_media_message( except Exception as img_err: logger.warning("Image send failed", error=str(img_err)) + if mcp_files_media or mcp_rejected_files_media: + try: + await self._send_documents( + update, + mcp_files_media, + rejected=mcp_rejected_files_media, + reply_to_message_id=update.message.message_id, + ) + except Exception as file_err: + logger.warning("Document send failed", error=str(file_err)) + async def _handle_unknown_command( self, update: Update, context: ContextTypes.DEFAULT_TYPE ) -> None: diff --git a/src/bot/utils/file_extractor.py b/src/bot/utils/file_extractor.py new file mode 100644 index 00000000..6e32bf29 --- /dev/null +++ b/src/bot/utils/file_extractor.py @@ -0,0 +1,145 @@ +"""Validate arbitrary file paths for Telegram document delivery. + +Used by the MCP ``send_file_to_user`` tool intercept — the stream callback +validates each path via :func:`validate_file_path` and collects +:class:`FileAttachment` objects for later Telegram delivery. + +Full security validation lives here (not in the MCP tool itself) so the tool +can run without access to the bot's runtime configuration. A path that passes +the tool-side syntactic check but fails here is logged as a warning and the +user sees a short error summary; Claude has already received "queued" but the +file simply never arrives. +""" + +import re +from dataclasses import dataclass +from pathlib import Path +from typing import Optional, Tuple + +import structlog + +from src.security.validators import SecurityValidator + +logger = structlog.get_logger() + +# Telegram Bot API document upload limit. +MAX_FILE_SIZE_BYTES = 50 * 1024 * 1024 + +# Reasons returned by :func:`validate_file_path`. Only ``outside_approved`` and +# ``blocked_secret`` are surfaced to the user as bot-side rejections — the +# others (``too_large``, ``empty``, ``not_absolute``, ``not_a_file``) are +# already caught by the MCP tool itself, so Claude receives the error and +# reports the real reason in natural language. +REJECTION_SURFACE_TO_USER = frozenset({"outside_approved", "blocked_secret"}) + + +# Telegram caption limit for photos and documents. +MAX_CAPTION_CHARS = 1024 + + +@dataclass +class FileAttachment: + """A file to attach to a Telegram response as a document. + + ``inode`` and ``device`` are captured at validation time so the caller + can detect TOCTOU replacement (symlink swap, file renamed over, etc.) + before re-opening the path for delivery. + """ + + path: Path + original_reference: str + size_bytes: int + caption: str = "" + inode: int = 0 + device: int = 0 + + +def _is_forbidden_filename(name: str) -> bool: + """Check whether *name* matches the SecurityValidator secrets blocklist. + + We reuse the validator's FORBIDDEN_FILENAMES and DANGEROUS_FILE_PATTERNS + (keys/certs/credentials) but NOT the extension whitelist — arbitrary file + types are intentionally allowed here. + """ + lower = name.lower() + if lower in {n.lower() for n in SecurityValidator.FORBIDDEN_FILENAMES}: + return True + for pattern in SecurityValidator.DANGEROUS_FILE_PATTERNS: + if re.match(pattern, name, re.IGNORECASE): + return True + return False + + +def validate_file_path( + file_path: str, + approved_directory: Path, + caption: str = "", +) -> Tuple[Optional[FileAttachment], Optional[str]]: + """Validate a file path from an MCP ``send_file_to_user`` call. + + Returns a tuple ``(attachment, reason)``: + + - on success: ``(FileAttachment, None)``; + - on failure: ``(None, reason)`` where *reason* is one of + ``not_absolute``, ``outside_approved``, ``not_a_file``, + ``blocked_secret``, ``empty``, ``too_large``. + + Callers should only surface ``reason ∈ REJECTION_SURFACE_TO_USER`` to the + user — other reasons duplicate errors the MCP tool itself already returns. + """ + try: + path = Path(file_path) + if not path.is_absolute(): + return None, "not_absolute" + + resolved = path.resolve() + + try: + resolved.relative_to(approved_directory.resolve()) + except ValueError: + logger.warning( + "MCP file path outside approved directory", + path=str(resolved), + approved=str(approved_directory), + ) + return None, "outside_approved" + + if not resolved.is_file(): + logger.debug("MCP file path is not a file", path=str(resolved)) + return None, "not_a_file" + + if _is_forbidden_filename(resolved.name): + logger.warning( + "MCP file path rejected by secrets blocklist", + path=str(resolved), + ) + return None, "blocked_secret" + + stat_result = resolved.stat() + size_bytes = stat_result.st_size + if size_bytes == 0: + logger.debug("MCP file is empty", path=str(resolved)) + return None, "empty" + if size_bytes > MAX_FILE_SIZE_BYTES: + logger.warning( + "MCP file too large", + path=str(resolved), + size=size_bytes, + limit=MAX_FILE_SIZE_BYTES, + ) + return None, "too_large" + + return ( + FileAttachment( + path=resolved, + original_reference=file_path, + size_bytes=size_bytes, + caption=caption[:MAX_CAPTION_CHARS], + inode=stat_result.st_ino, + device=stat_result.st_dev, + ), + None, + ) + except (OSError, ValueError) as e: + logger.debug("MCP file path validation failed", path=file_path, error=str(e)) + return None, "not_a_file" diff --git a/src/mcp/telegram_server.py b/src/mcp/telegram_server.py index cc320386..2813fab5 100644 --- a/src/mcp/telegram_server.py +++ b/src/mcp/telegram_server.py @@ -1,9 +1,10 @@ """MCP server exposing Telegram-specific tools to Claude. -Runs as a stdio transport server. The ``send_image_to_user`` tool validates -file existence and extension, then returns a success string. Actual Telegram -delivery is handled by the bot's stream callback which intercepts the tool -call. +Runs as a stdio transport server. The ``send_file_to_user`` tool validates +file existence and size, then returns a success string; ``send_image_to_user`` +is kept as a deprecated image-only alias. Actual Telegram delivery is handled +by the bot's stream callback which intercepts the tool call and applies full +security checks (approved directory, secrets blocklist). """ from pathlib import Path @@ -12,12 +13,55 @@ IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".gif", ".webp", ".bmp", ".svg"} +# Telegram Bot API document upload limit. +MAX_FILE_SIZE_BYTES = 50 * 1024 * 1024 + mcp = FastMCP("telegram") +@mcp.tool() +async def send_file_to_user(file_path: str, caption: str = "") -> str: + """Send a file of any type to the Telegram user as a document. + + Preferred tool for delivering files (PDF, zip, csv, logs, images, etc.) + back to the user. Full security validation (approved directory, secrets + blocklist) happens on the bot side; this tool only performs basic syntax + checks so it can run without access to the bot's runtime configuration. + + Args: + file_path: Absolute path to the file. + caption: Optional caption to display with the file. + + Returns: + Confirmation string when the file is queued for delivery. + """ + path = Path(file_path) + + if not path.is_absolute(): + return f"Error: path must be absolute, got '{file_path}'" + + if not path.is_file(): + return f"Error: file not found: {file_path}" + + size = path.stat().st_size + if size == 0: + return f"Error: file is empty: {file_path}" + if size > MAX_FILE_SIZE_BYTES: + return ( + f"Error: file too large ({size} bytes). " + f"Telegram Bot API limit is {MAX_FILE_SIZE_BYTES} bytes (50 MB)." + ) + + return f"File queued for delivery: {path.name}" + + @mcp.tool() async def send_image_to_user(file_path: str, caption: str = "") -> str: - """Send an image file to the Telegram user. + """DEPRECATED: use ``send_file_to_user`` instead. + + Kept for backward compatibility with existing prompts and MCP configs. + Accepts only image extensions; ``send_file_to_user`` accepts any file type + and is the preferred tool. Args: file_path: Absolute path to the image file. diff --git a/tests/unit/test_bot/test_file_extractor.py b/tests/unit/test_bot/test_file_extractor.py new file mode 100644 index 00000000..0013035a --- /dev/null +++ b/tests/unit/test_bot/test_file_extractor.py @@ -0,0 +1,207 @@ +"""Tests for arbitrary file validation for ``send_file_to_user``.""" + +from os import stat_result +from pathlib import Path +from unittest.mock import patch + +import pytest + +from src.bot.utils.file_extractor import ( + MAX_CAPTION_CHARS, + MAX_FILE_SIZE_BYTES, + REJECTION_SURFACE_TO_USER, + FileAttachment, + validate_file_path, +) + + +@pytest.fixture +def approved_dir(tmp_path: Path) -> Path: + return tmp_path + + +@pytest.fixture +def work_dir(tmp_path: Path) -> Path: + d = tmp_path / "project" + d.mkdir() + return d + + +def _ok(result): + """Assert success: attachment set, reason is None.""" + attachment, reason = result + assert isinstance(attachment, FileAttachment) + assert reason is None + return attachment + + +def _fail(result, expected_reason: str): + """Assert failure with expected reason code.""" + attachment, reason = result + assert attachment is None + assert reason == expected_reason + + +class TestValidateFilePath: + def test_valid_pdf(self, work_dir: Path, approved_dir: Path): + f = work_dir / "report.pdf" + f.write_bytes(b"%PDF-1.4\n" + b"x" * 1024) + attachment = _ok(validate_file_path(str(f), approved_dir, caption="hello")) + assert attachment.path == f.resolve() + assert attachment.caption == "hello" + assert attachment.size_bytes == f.stat().st_size + assert attachment.original_reference == str(f) + + def test_valid_zip(self, work_dir: Path, approved_dir: Path): + f = work_dir / "logs.zip" + f.write_bytes(b"PK" + b"x" * 100) + _ok(validate_file_path(str(f), approved_dir)) + + def test_valid_csv_no_caption(self, work_dir: Path, approved_dir: Path): + f = work_dir / "data.csv" + f.write_text("a,b,c\n1,2,3\n") + attachment = _ok(validate_file_path(str(f), approved_dir)) + assert attachment.caption == "" + + def test_unicode_filename(self, work_dir: Path, approved_dir: Path): + f = work_dir / "файл с пробелами.pdf" + f.write_bytes(b"%PDF" + b"x" * 50) + attachment = _ok(validate_file_path(str(f), approved_dir)) + assert attachment.path.name == "файл с пробелами.pdf" + + def test_unicode_filename_cyrillic(self, work_dir: Path, approved_dir: Path): + f = work_dir / "отчёт.zip" + f.write_bytes(b"PK" + b"x" * 100) + attachment = _ok(validate_file_path(str(f), approved_dir)) + assert attachment.path.name == "отчёт.zip" + + def test_relative_path_rejected(self, approved_dir: Path): + _fail(validate_file_path("report.pdf", approved_dir), "not_absolute") + + def test_nonexistent_file(self, approved_dir: Path): + _fail( + validate_file_path(str(approved_dir / "missing.pdf"), approved_dir), + "not_a_file", + ) + + def test_empty_file_rejected(self, work_dir: Path, approved_dir: Path): + f = work_dir / "empty.pdf" + f.write_bytes(b"") + _fail(validate_file_path(str(f), approved_dir), "empty") + + def test_too_large_rejected(self, work_dir: Path, approved_dir: Path): + f = work_dir / "huge.bin" + f.write_bytes(b"x" * 100) + real_stat = f.stat() + fake = stat_result( + ( + real_stat.st_mode, + real_stat.st_ino, + real_stat.st_dev, + real_stat.st_nlink, + real_stat.st_uid, + real_stat.st_gid, + MAX_FILE_SIZE_BYTES + 1, + real_stat.st_atime, + real_stat.st_mtime, + real_stat.st_ctime, + ) + ) + with patch.object(Path, "stat", return_value=fake): + _fail(validate_file_path(str(f), approved_dir), "too_large") + + def test_path_traversal_rejected(self, tmp_path: Path): + approved = tmp_path / "project" + approved.mkdir() + outside = tmp_path / "secret.pdf" + outside.write_bytes(b"%PDF" + b"x" * 50) + _fail(validate_file_path(str(outside), approved), "outside_approved") + + def test_env_file_rejected(self, work_dir: Path, approved_dir: Path): + f = work_dir / ".env" + f.write_text("SECRET=xxx\n") + _fail(validate_file_path(str(f), approved_dir), "blocked_secret") + + def test_env_production_rejected(self, work_dir: Path, approved_dir: Path): + f = work_dir / ".env.production" + f.write_text("SECRET=xxx\n") + _fail(validate_file_path(str(f), approved_dir), "blocked_secret") + + def test_id_rsa_rejected(self, work_dir: Path, approved_dir: Path): + f = work_dir / "id_rsa" + f.write_bytes(b"-----BEGIN OPENSSH PRIVATE KEY-----\n") + _fail(validate_file_path(str(f), approved_dir), "blocked_secret") + + def test_pem_rejected(self, work_dir: Path, approved_dir: Path): + f = work_dir / "cert.pem" + f.write_bytes(b"-----BEGIN CERTIFICATE-----\n") + _fail(validate_file_path(str(f), approved_dir), "blocked_secret") + + def test_exe_rejected(self, work_dir: Path, approved_dir: Path): + f = work_dir / "malware.exe" + f.write_bytes(b"MZ" + b"x" * 100) + _fail(validate_file_path(str(f), approved_dir), "blocked_secret") + + +class TestRejectionSurfaceContract: + def test_security_reasons_surfaced(self): + assert "outside_approved" in REJECTION_SURFACE_TO_USER + assert "blocked_secret" in REJECTION_SURFACE_TO_USER + + def test_tool_duplicates_not_surfaced(self): + # These are already reported by the MCP tool itself, so the bot must + # not surface a duplicate summary. + for reason in ("too_large", "empty", "not_absolute", "not_a_file"): + assert reason not in REJECTION_SURFACE_TO_USER + + +class TestSymlinkEscape: + """Symlink inside approved_dir pointing outside must be rejected.""" + + def test_symlink_to_outside_rejected(self, tmp_path: Path): + approved = tmp_path / "project" + approved.mkdir() + outside = tmp_path / "secret.pdf" + outside.write_bytes(b"%PDF" + b"x" * 50) + link = approved / "innocent.pdf" + link.symlink_to(outside) + # resolve() follows the symlink, then relative_to catches the escape. + _fail(validate_file_path(str(link), approved), "outside_approved") + + def test_symlink_within_approved_ok(self, tmp_path: Path): + approved = tmp_path + real = approved / "real.pdf" + real.write_bytes(b"%PDF" + b"x" * 50) + link = approved / "alias.pdf" + link.symlink_to(real) + attachment = _ok(validate_file_path(str(link), approved)) + # resolved path points to the real file, and identity captured. + assert attachment.path == real.resolve() + assert attachment.inode == real.stat().st_ino + + +class TestIdentityCaptured: + def test_inode_and_device_recorded(self, work_dir: Path, approved_dir: Path): + f = work_dir / "report.pdf" + f.write_bytes(b"%PDF" + b"x" * 100) + attachment = _ok(validate_file_path(str(f), approved_dir)) + st = f.stat() + assert attachment.inode == st.st_ino + assert attachment.device == st.st_dev + + +class TestCaptionTruncation: + def test_long_caption_truncated_to_telegram_limit( + self, work_dir: Path, approved_dir: Path + ): + f = work_dir / "report.pdf" + f.write_bytes(b"%PDF" + b"x" * 100) + long_caption = "x" * (MAX_CAPTION_CHARS + 500) + attachment = _ok(validate_file_path(str(f), approved_dir, caption=long_caption)) + assert len(attachment.caption) == MAX_CAPTION_CHARS + + def test_short_caption_preserved(self, work_dir: Path, approved_dir: Path): + f = work_dir / "report.pdf" + f.write_bytes(b"%PDF" + b"x" * 100) + attachment = _ok(validate_file_path(str(f), approved_dir, caption="short")) + assert attachment.caption == "short" diff --git a/tests/unit/test_mcp/test_telegram_server.py b/tests/unit/test_mcp/test_telegram_server.py index c40f8fed..b138a797 100644 --- a/tests/unit/test_mcp/test_telegram_server.py +++ b/tests/unit/test_mcp/test_telegram_server.py @@ -1,10 +1,15 @@ """Tests for the Telegram MCP server tool functions.""" from pathlib import Path +from unittest.mock import patch import pytest -from src.mcp.telegram_server import send_image_to_user +from src.mcp.telegram_server import ( + MAX_FILE_SIZE_BYTES, + send_file_to_user, + send_image_to_user, +) @pytest.fixture @@ -55,3 +60,68 @@ async def test_case_insensitive_extension(self, tmp_path: Path) -> None: img.write_bytes(b"\x00" * 10) result = await send_image_to_user(str(img)) assert "Image queued for delivery" in result + + +class TestSendFileToUser: + async def test_valid_pdf(self, tmp_path: Path) -> None: + f = tmp_path / "report.pdf" + f.write_bytes(b"%PDF-1.4\n" + b"x" * 200) + result = await send_file_to_user(str(f)) + assert "File queued for delivery" in result + assert "report.pdf" in result + + async def test_valid_zip_with_caption(self, tmp_path: Path) -> None: + f = tmp_path / "logs.zip" + f.write_bytes(b"PK" + b"x" * 100) + result = await send_file_to_user(str(f), caption="logs") + assert "File queued for delivery" in result + + async def test_any_extension_accepted(self, tmp_path: Path) -> None: + for name in ["data.csv", "notes.txt", "archive.tar.gz", "binary.bin"]: + f = tmp_path / name + f.write_bytes(b"x" * 64) + result = await send_file_to_user(str(f)) + assert "File queued for delivery" in result, f"Failed for {name}" + + async def test_relative_path_rejected(self) -> None: + result = await send_file_to_user("relative/path/report.pdf") + assert "Error" in result + assert "absolute" in result + + async def test_missing_file_rejected(self, tmp_path: Path) -> None: + missing = tmp_path / "nonexistent.pdf" + result = await send_file_to_user(str(missing)) + assert "Error" in result + assert "not found" in result + + async def test_empty_file_rejected(self, tmp_path: Path) -> None: + f = tmp_path / "empty.pdf" + f.write_bytes(b"") + result = await send_file_to_user(str(f)) + assert "Error" in result + assert "empty" in result + + async def test_too_large_rejected(self, tmp_path: Path) -> None: + f = tmp_path / "big.bin" + f.write_bytes(b"x" * 100) + real_stat = f.stat() + from os import stat_result + + fake = stat_result( + ( + real_stat.st_mode, + real_stat.st_ino, + real_stat.st_dev, + real_stat.st_nlink, + real_stat.st_uid, + real_stat.st_gid, + MAX_FILE_SIZE_BYTES + 1, + real_stat.st_atime, + real_stat.st_mtime, + real_stat.st_ctime, + ) + ) + with patch.object(Path, "stat", return_value=fake): + result = await send_file_to_user(str(f)) + assert "Error" in result + assert "too large" in result