diff --git a/autobot-backend/api/mcp_registry.py b/autobot-backend/api/mcp_registry.py index b2549808c..31e4d38f3 100644 --- a/autobot-backend/api/mcp_registry.py +++ b/autobot-backend/api/mcp_registry.py @@ -304,6 +304,19 @@ class MCPRegistryStats(BaseModel): "list_available_metrics", ], ), + ( + "redis_mcp", + "Redis Data & Operations - Direct Redis access, vector search, server ops", + "/api/redis/mcp/tools", + [ + "data_access", + "vector_search", + "hybrid_search", + "ops_intelligence", + "stream_health", + "rbac_filtering", + ], + ), ] diff --git a/autobot-backend/api/redis_mcp/__init__.py b/autobot-backend/api/redis_mcp/__init__.py new file mode 100644 index 000000000..87d697c0f --- /dev/null +++ b/autobot-backend/api/redis_mcp/__init__.py @@ -0,0 +1,20 @@ +# AutoBot - AI-Powered Automation Platform +# Copyright (c) 2025 mrveiss +# Author: mrveiss +""" +Redis MCP Bridge — Agent-facing Redis access, vector search, ops intelligence. + +Issue #2511: 11th native MCP bridge providing 25 tools across 3 categories: +- Data Access (15 tools): get/set, hash, list, sorted set, stream, scan, type, ttl, delete +- Vector Search (4 tools): create_index, vector_search, hybrid_search, index_info +- Ops Intelligence (6 tools): server_info, dbsize, memory_stats, stream_health, + client_list, slowlog + +RBAC Model: +- Users: Read all + write autobot:agent:* namespace only, read-only ops +- Admins: Full access, destructive ops require approval +""" + +from api.redis_mcp.router import router + +__all__ = ["router"] diff --git a/autobot-backend/api/redis_mcp/data_access.py b/autobot-backend/api/redis_mcp/data_access.py new file mode 100644 index 000000000..1f3421bf8 --- /dev/null +++ b/autobot-backend/api/redis_mcp/data_access.py @@ -0,0 +1,297 @@ +# AutoBot - AI-Powered Automation Platform +# Copyright (c) 2025 mrveiss +# Author: mrveiss +""" +Data Access tool handlers for Redis MCP Bridge (15 tools). + +Issue #2511: get/set, hash, list, sorted set, stream, scan, type, ttl, delete. +All handlers use autobot_shared.redis_client — no direct redis.Redis(). +""" + +import logging +from typing import Any, Dict, List, Optional + +from type_defs.common import Metadata + +from autobot_shared.redis_client import get_redis_client + +logger = logging.getLogger(__name__) + +# Maximum keys returned by scan to prevent unbounded responses +_SCAN_MAX_KEYS = 100 + + +def _decode(value): + """Decode bytes to UTF-8 string, pass through other types.""" + return value.decode("utf-8") if isinstance(value, bytes) else value + + +async def _get_client(database: str = "main"): + """Get an async Redis client for the given database.""" + return await get_redis_client(async_client=True, database=database) + + +# --------------------------------------------------------------------------- +# String operations +# --------------------------------------------------------------------------- + + +async def handle_redis_get(key: str, database: str = "main") -> Metadata: + """Get a string value by key.""" + client = await _get_client(database) + value = await client.get(key) + return { + "status": "success", + "key": key, + "value": value.decode("utf-8") if isinstance(value, bytes) else value, + "exists": value is not None, + } + + +async def handle_redis_set( + key: str, + value: str, + ttl: Optional[int] = None, + database: str = "main", +) -> Metadata: + """Set a string value with optional TTL.""" + client = await _get_client(database) + if ttl and ttl > 0: + await client.setex(key, ttl, value) + else: + await client.set(key, value) + return {"status": "success", "key": key, "ttl": ttl} + + +async def handle_redis_delete(keys: List[str], database: str = "main") -> Metadata: + """Delete one or more keys.""" + client = await _get_client(database) + deleted = await client.delete(*keys) + return { + "status": "success", + "deleted_count": deleted, + "keys": keys, + } + + +# --------------------------------------------------------------------------- +# Hash operations +# --------------------------------------------------------------------------- + + +async def handle_redis_hget(key: str, field: str, database: str = "main") -> Metadata: + """Get a single hash field.""" + client = await _get_client(database) + value = await client.hget(key, field) + return { + "status": "success", + "key": key, + "field": field, + "value": value.decode("utf-8") if isinstance(value, bytes) else value, + "exists": value is not None, + } + + +async def handle_redis_hgetall(key: str, database: str = "main") -> Metadata: + """Get all fields and values from a hash.""" + client = await _get_client(database) + raw = await client.hgetall(key) + data = { + (k.decode("utf-8") if isinstance(k, bytes) else k): ( + v.decode("utf-8") if isinstance(v, bytes) else v + ) + for k, v in raw.items() + } + return { + "status": "success", + "key": key, + "fields": data, + "field_count": len(data), + } + + +async def handle_redis_hset( + key: str, mapping: Dict[str, Any], database: str = "main" +) -> Metadata: + """Set one or more hash fields.""" + client = await _get_client(database) + # Convert all values to strings for Redis + str_mapping = {k: str(v) for k, v in mapping.items()} + added = await client.hset(key, mapping=str_mapping) + return { + "status": "success", + "key": key, + "fields_added": added, + } + + +# --------------------------------------------------------------------------- +# List operations +# --------------------------------------------------------------------------- + + +async def handle_redis_lrange( + key: str, start: int = 0, stop: int = -1, database: str = "main" +) -> Metadata: + """Get a range of list elements.""" + client = await _get_client(database) + raw = await client.lrange(key, start, stop) + items = [v.decode("utf-8") if isinstance(v, bytes) else v for v in raw] + return { + "status": "success", + "key": key, + "items": items, + "count": len(items), + } + + +async def handle_redis_lpush( + key: str, values: List[str], database: str = "main" +) -> Metadata: + """Push values to the left of a list.""" + client = await _get_client(database) + length = await client.lpush(key, *values) + return {"status": "success", "key": key, "list_length": length} + + +async def handle_redis_rpush( + key: str, values: List[str], database: str = "main" +) -> Metadata: + """Push values to the right of a list.""" + client = await _get_client(database) + length = await client.rpush(key, *values) + return {"status": "success", "key": key, "list_length": length} + + +# --------------------------------------------------------------------------- +# Sorted set operations +# --------------------------------------------------------------------------- + + +async def handle_redis_zrange( + key: str, + start: int = 0, + stop: int = -1, + withscores: bool = False, + database: str = "main", +) -> Metadata: + """Get a range from a sorted set.""" + client = await _get_client(database) + raw = await client.zrange(key, start, stop, withscores=withscores) + if withscores: + items = [ + { + "member": m.decode("utf-8") if isinstance(m, bytes) else m, + "score": s, + } + for m, s in raw + ] + else: + items = [v.decode("utf-8") if isinstance(v, bytes) else v for v in raw] + return { + "status": "success", + "key": key, + "items": items, + "count": len(items), + } + + +# --------------------------------------------------------------------------- +# Stream operations +# --------------------------------------------------------------------------- + + +async def handle_redis_xrange( + key: str, + start: str = "-", + end: str = "+", + count: Optional[int] = None, + database: str = "main", +) -> Metadata: + """Read stream entries.""" + client = await _get_client(database) + kwargs: Dict[str, Any] = {} + if count is not None: + kwargs["count"] = count + raw = await client.xrange(key, min=start, max=end, **kwargs) + entries = [ + {"id": _decode(eid), "fields": {_decode(k): _decode(v) for k, v in f.items()}} + for eid, f in raw + ] + return { + "status": "success", + "key": key, + "entries": entries, + "count": len(entries), + } + + +async def handle_redis_xadd( + key: str, + fields: Dict[str, str], + maxlen: Optional[int] = None, + database: str = "main", +) -> Metadata: + """Add an entry to a stream.""" + client = await _get_client(database) + kwargs: Dict[str, Any] = {} + if maxlen is not None: + kwargs["maxlen"] = maxlen + kwargs["approximate"] = True + entry_id = await client.xadd(key, fields, **kwargs) + decoded_id = entry_id.decode("utf-8") if isinstance(entry_id, bytes) else entry_id + return {"status": "success", "key": key, "entry_id": decoded_id} + + +# --------------------------------------------------------------------------- +# Key inspection operations +# --------------------------------------------------------------------------- + + +async def handle_redis_scan_keys( + pattern: str = "*", + count: int = 100, + database: str = "main", +) -> Metadata: + """Scan keys matching a pattern (bounded to _SCAN_MAX_KEYS).""" + client = await _get_client(database) + keys: List[str] = [] + cursor = 0 + scan_count = min(count, _SCAN_MAX_KEYS) + while True: + cursor, batch = await client.scan( + cursor=cursor, match=pattern, count=scan_count + ) + for k in batch: + keys.append(k.decode("utf-8") if isinstance(k, bytes) else k) + if len(keys) >= _SCAN_MAX_KEYS: + break + if cursor == 0 or len(keys) >= _SCAN_MAX_KEYS: + break + return { + "status": "success", + "pattern": pattern, + "keys": keys, + "count": len(keys), + "truncated": len(keys) >= _SCAN_MAX_KEYS, + } + + +async def handle_redis_type(key: str, database: str = "main") -> Metadata: + """Get the data type of a key.""" + client = await _get_client(database) + key_type = await client.type(key) + decoded = key_type.decode("utf-8") if isinstance(key_type, bytes) else key_type + return {"status": "success", "key": key, "type": decoded} + + +async def handle_redis_ttl(key: str, database: str = "main") -> Metadata: + """Get the TTL of a key.""" + client = await _get_client(database) + ttl_val = await client.ttl(key) + return { + "status": "success", + "key": key, + "ttl": ttl_val, + "has_expiry": ttl_val >= 0, + } diff --git a/autobot-backend/api/redis_mcp/ops_intelligence.py b/autobot-backend/api/redis_mcp/ops_intelligence.py new file mode 100644 index 000000000..b7ae2054a --- /dev/null +++ b/autobot-backend/api/redis_mcp/ops_intelligence.py @@ -0,0 +1,186 @@ +# AutoBot - AI-Powered Automation Platform +# Copyright (c) 2025 mrveiss +# Author: mrveiss +""" +Ops Intelligence tool handlers for Redis MCP Bridge (6 tools). + +Issue #2511: server_info, dbsize, memory_stats, stream_health, client_list, slowlog. +Agents can chain these tools for composite diagnostics. +""" + +import logging +from typing import Any, Dict, List, Optional + +from type_defs.common import Metadata + +from autobot_shared.redis_client import get_redis_client + +logger = logging.getLogger(__name__) + + +async def _get_client(database: str = "main"): + """Get an async Redis client for ops queries.""" + return await get_redis_client(async_client=True, database=database) + + +async def handle_redis_server_info( + section: Optional[str] = None, database: str = "main" +) -> Metadata: + """Get Redis server stats.""" + client = await _get_client(database) + if section: + info = await client.info(section) + else: + info = await client.info() + # info() returns a dict — stringify nested values for JSON + cleaned = _stringify_info(info) + return {"status": "success", "info": cleaned} + + +async def handle_redis_dbsize(database: str = "main") -> Metadata: + """Get key count in the current database.""" + client = await _get_client(database) + size = await client.dbsize() + return {"status": "success", "database": database, "key_count": size} + + +async def handle_redis_memory_stats(database: str = "main") -> Metadata: + """Get detailed memory analysis.""" + client = await _get_client(database) + info = await client.info("memory") + return { + "status": "success", + "used_memory_human": info.get("used_memory_human", "unknown"), + "used_memory_peak_human": info.get("used_memory_peak_human", "unknown"), + "used_memory_rss_human": info.get("used_memory_rss_human", "unknown"), + "mem_fragmentation_ratio": info.get("mem_fragmentation_ratio"), + "mem_allocator": info.get("mem_allocator", "unknown"), + "total_system_memory_human": info.get("total_system_memory_human", "unknown"), + } + + +async def handle_redis_stream_health(key: str, database: str = "main") -> Metadata: + """Check stream health: length, groups, pending, last ID.""" + client = await _get_client(database) + try: + info = await client.xinfo_stream(key) + except Exception as e: + if "no such key" in str(e).lower(): + return { + "status": "error", + "message": f"Stream '{key}' does not exist", + "code": "STREAM_NOT_FOUND", + } + raise + + result: Dict[str, Any] = { + "status": "success", + "key": key, + "length": info.get("length", 0), + "first_entry": _format_stream_entry(info.get("first-entry")), + "last_entry": _format_stream_entry(info.get("last-entry")), + } + + # Consumer group info + try: + groups = await client.xinfo_groups(key) + result["groups"] = [ + { + "name": _decode(g.get("name")), + "consumers": g.get("consumers", 0), + "pending": g.get("pending", 0), + "last_delivered_id": _decode(g.get("last-delivered-id")), + } + for g in groups + ] + except Exception: + result["groups"] = [] + + return result + + +async def handle_redis_client_list(database: str = "main") -> Metadata: + """List connected Redis clients (admin only).""" + client = await _get_client(database) + clients_raw = await client.client_list() + # Limit to first 50 clients to avoid huge responses + clients = clients_raw[:50] + summary = [ + { + "id": c.get("id"), + "addr": c.get("addr"), + "name": c.get("name", ""), + "age": c.get("age"), + "idle": c.get("idle"), + "db": c.get("db"), + "cmd": c.get("cmd"), + } + for c in clients + ] + return { + "status": "success", + "clients": summary, + "total": len(clients_raw), + "shown": len(summary), + } + + +async def handle_redis_slowlog(count: int = 10, database: str = "main") -> Metadata: + """Get recent slow queries (admin only).""" + client = await _get_client(database) + entries = await client.slowlog_get(count) + formatted: List[Dict[str, Any]] = [] + for entry in entries: + formatted.append( + { + "id": entry.get("id"), + "timestamp": entry.get("start_time"), + "duration_us": entry.get("duration"), + "command": _decode(entry.get("command", b"")), + "client_addr": _decode(entry.get("client_address", b"")), + } + ) + return { + "status": "success", + "entries": formatted, + "count": len(formatted), + } + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _decode(value: Any) -> Any: + """Decode bytes to str if needed.""" + if isinstance(value, bytes): + return value.decode("utf-8") + return value + + +def _stringify_info(info: dict) -> dict: + """Ensure all values in a Redis INFO dict are JSON-serializable.""" + result = {} + for k, v in info.items(): + if isinstance(v, dict): + result[k] = _stringify_info(v) + elif isinstance(v, bytes): + result[k] = v.decode("utf-8") + else: + result[k] = v + return result + + +def _format_stream_entry(entry) -> Optional[Dict[str, Any]]: + """Format a stream entry tuple (id, fields) into a dict.""" + if not entry: + return None + entry_id, fields = entry + decoded_id = _decode(entry_id) + decoded_fields = ( + {_decode(k): _decode(v) for k, v in fields.items()} + if isinstance(fields, dict) + else {} + ) + return {"id": decoded_id, "fields": decoded_fields} diff --git a/autobot-backend/api/redis_mcp/rbac.py b/autobot-backend/api/redis_mcp/rbac.py new file mode 100644 index 000000000..24eead160 --- /dev/null +++ b/autobot-backend/api/redis_mcp/rbac.py @@ -0,0 +1,105 @@ +# AutoBot - AI-Powered Automation Platform +# Copyright (c) 2025 mrveiss +# Author: mrveiss +""" +RBAC filtering for Redis MCP Bridge tools. + +Issue #2511: Role-based permission filtering for Redis MCP tools. + +Roles: +- user: Read all + write autobot:agent:* namespace only, no client_list/slowlog +- admin: Full access, destructive ops (delete) require approval +""" + +import logging +from enum import Enum +from typing import Optional + +logger = logging.getLogger(__name__) + +# Namespace that non-admin users are allowed to write to +AGENT_NAMESPACE_PREFIX = "autobot:agent:" + + +class ToolAccess(str, Enum): + """Access levels for Redis MCP tools.""" + + READ = "read" + SCOPED_WRITE = "scoped" # Write only to autobot:agent:* namespace + FULL_WRITE = "full" + APPROVAL_REQUIRED = "approval" + BLOCKED = "blocked" + + +# Tool access matrix: tool_name -> (user_access, admin_access) +TOOL_ACCESS_MATRIX = { + # Data Access — read tools + "redis_get": (ToolAccess.READ, ToolAccess.READ), + "redis_hget": (ToolAccess.READ, ToolAccess.READ), + "redis_hgetall": (ToolAccess.READ, ToolAccess.READ), + "redis_lrange": (ToolAccess.READ, ToolAccess.READ), + "redis_zrange": (ToolAccess.READ, ToolAccess.READ), + "redis_xrange": (ToolAccess.READ, ToolAccess.READ), + "redis_scan_keys": (ToolAccess.READ, ToolAccess.READ), + "redis_type": (ToolAccess.READ, ToolAccess.READ), + "redis_ttl": (ToolAccess.READ, ToolAccess.READ), + # Data Access — write tools + "redis_set": (ToolAccess.SCOPED_WRITE, ToolAccess.FULL_WRITE), + "redis_hset": (ToolAccess.SCOPED_WRITE, ToolAccess.FULL_WRITE), + "redis_lpush": (ToolAccess.SCOPED_WRITE, ToolAccess.FULL_WRITE), + "redis_rpush": (ToolAccess.SCOPED_WRITE, ToolAccess.FULL_WRITE), + "redis_xadd": (ToolAccess.SCOPED_WRITE, ToolAccess.FULL_WRITE), + "redis_delete": (ToolAccess.SCOPED_WRITE, ToolAccess.APPROVAL_REQUIRED), + # Vector Search — index creation restricted for users (#2511) + "redis_vector_create_index": (ToolAccess.SCOPED_WRITE, ToolAccess.FULL_WRITE), + "redis_vector_search": (ToolAccess.READ, ToolAccess.READ), + "redis_hybrid_search": (ToolAccess.READ, ToolAccess.READ), + "redis_vector_index_info": (ToolAccess.READ, ToolAccess.READ), + # Ops Intelligence + "redis_server_info": (ToolAccess.READ, ToolAccess.READ), + "redis_dbsize": (ToolAccess.READ, ToolAccess.READ), + "redis_memory_stats": (ToolAccess.READ, ToolAccess.READ), + "redis_stream_health": (ToolAccess.READ, ToolAccess.READ), + "redis_client_list": (ToolAccess.BLOCKED, ToolAccess.READ), + "redis_slowlog": (ToolAccess.BLOCKED, ToolAccess.READ), +} + + +def get_tool_access(tool_name: str, is_admin: bool) -> ToolAccess: + """Return the access level for a tool given the user's role.""" + entry = TOOL_ACCESS_MATRIX.get(tool_name) + if entry is None: + return ToolAccess.BLOCKED + user_access, admin_access = entry + return admin_access if is_admin else user_access + + +def check_tool_permission(tool_name: str, is_admin: bool) -> tuple[bool, Optional[str]]: + """Check if a tool call is permitted. + + Returns: + (allowed, error_message) — allowed is True if the call can proceed. + """ + access = get_tool_access(tool_name, is_admin) + if access == ToolAccess.BLOCKED: + return False, f"Tool '{tool_name}' is not available for your role" + return True, None + + +def validate_key_namespace( + key: str, is_admin: bool, access: ToolAccess +) -> tuple[bool, Optional[str]]: + """Validate that a key write is within the allowed namespace. + + Scoped-write users can only write to keys prefixed with autobot:agent:*. + Admins with full access have no restriction. + """ + if access == ToolAccess.FULL_WRITE: + return True, None + if access == ToolAccess.SCOPED_WRITE and not is_admin: + if not key.startswith(AGENT_NAMESPACE_PREFIX): + return False, ( + f"Write denied: key '{key}' is outside the " + f"allowed namespace '{AGENT_NAMESPACE_PREFIX}*'" + ) + return True, None diff --git a/autobot-backend/api/redis_mcp/router.py b/autobot-backend/api/redis_mcp/router.py new file mode 100644 index 000000000..104b452ee --- /dev/null +++ b/autobot-backend/api/redis_mcp/router.py @@ -0,0 +1,439 @@ +# AutoBot - AI-Powered Automation Platform +# Copyright (c) 2025 mrveiss +# Author: mrveiss +""" +FastAPI router for Redis MCP Bridge. + +Issue #2511: Provides /api/redis/mcp/tools listing and per-tool POST endpoints. +Integrates RBAC filtering and routes to the appropriate handler module. +""" + +import logging +from typing import Any, Dict, List + +from api.redis_mcp.data_access import ( + handle_redis_delete, + handle_redis_get, + handle_redis_hget, + handle_redis_hgetall, + handle_redis_hset, + handle_redis_lpush, + handle_redis_lrange, + handle_redis_rpush, + handle_redis_scan_keys, + handle_redis_set, + handle_redis_ttl, + handle_redis_type, + handle_redis_xadd, + handle_redis_xrange, + handle_redis_zrange, +) +from api.redis_mcp.ops_intelligence import ( + handle_redis_client_list, + handle_redis_dbsize, + handle_redis_memory_stats, + handle_redis_server_info, + handle_redis_slowlog, + handle_redis_stream_health, +) +from api.redis_mcp.rbac import ( + ToolAccess, + check_tool_permission, + get_tool_access, + validate_key_namespace, +) +from api.redis_mcp.tools import get_all_tools +from api.redis_mcp.vector_search import ( + handle_redis_hybrid_search, + handle_redis_vector_create_index, + handle_redis_vector_index_info, + handle_redis_vector_search, +) +from auth_middleware import get_current_user +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel, Field +from type_defs.common import Metadata + +from autobot_shared.error_boundaries import ErrorCategory, with_error_handling +from autobot_shared.redis_management.types import DATABASE_MAPPING + +logger = logging.getLogger(__name__) + +# Valid database names for parameter validation (#2511) +_VALID_DATABASES = frozenset(DATABASE_MAPPING.keys()) + +router = APIRouter( + tags=["redis_mcp", "mcp"], +) + + +# --------------------------------------------------------------------------- +# Request models +# --------------------------------------------------------------------------- + + +class RedisToolCallRequest(BaseModel): + """Generic tool call request for dispatch endpoint.""" + + tool_name: str = "" + arguments: Dict[str, Any] = Field(default_factory=dict) + + +# --------------------------------------------------------------------------- +# Tool listing endpoint +# --------------------------------------------------------------------------- + + +@with_error_handling( + category=ErrorCategory.SERVER_ERROR, + operation="get_redis_mcp_tools", + error_code_prefix="REDIS_MCP", +) +@router.get("/mcp/tools") +async def get_redis_mcp_tools( + current_user: dict = Depends(get_current_user), +) -> List[dict]: + """List all Redis MCP tools available to the current user.""" + is_admin = _is_admin(current_user) + all_tools = get_all_tools() + # Filter out tools blocked for this role + visible = [] + for tool in all_tools: + allowed, _ = check_tool_permission(tool.name, is_admin) + if allowed: + visible.append(tool.model_dump()) + return visible + + +# --------------------------------------------------------------------------- +# Unified dispatch endpoint +# --------------------------------------------------------------------------- + + +@with_error_handling( + category=ErrorCategory.SERVER_ERROR, + operation="call_redis_mcp_tool", + error_code_prefix="REDIS_MCP", +) +@router.post("/mcp/call") +async def call_redis_mcp_tool( + request: RedisToolCallRequest, + current_user: dict = Depends(get_current_user), +) -> Metadata: + """Dispatch a Redis MCP tool call with RBAC enforcement.""" + tool_name = request.tool_name + args = request.arguments + is_admin = _is_admin(current_user) + + # RBAC check + allowed, error_msg = check_tool_permission(tool_name, is_admin) + if not allowed: + raise HTTPException(status_code=403, detail=error_msg) + + access = get_tool_access(tool_name, is_admin) + + # Approval gate for destructive admin ops (Issue #2622) + # If the caller provides approved=true (after user confirmation), skip the gate. + if access == ToolAccess.APPROVAL_REQUIRED and not args.get("approved"): + return { + "status": "approval_required", + "tool": tool_name, + "message": ( + f"Tool '{tool_name}' requires explicit approval. " + "Confirm to proceed with this destructive operation." + ), + "arguments": args, + } + + # Namespace validation for write tools — validate ALL keys (#2511) + keys = _extract_keys(args) + if keys and access in (ToolAccess.SCOPED_WRITE, ToolAccess.FULL_WRITE): + for key in keys: + valid, ns_error = validate_key_namespace(key, is_admin, access) + if not valid: + raise HTTPException(status_code=403, detail=ns_error) + + # Database parameter validation (#2511) + db = args.get("database") + if db and db not in _VALID_DATABASES: + raise HTTPException( + status_code=400, + detail=f"Invalid database '{db}'. Valid: {sorted(_VALID_DATABASES)}", + ) + + # Route to handler + handler = _TOOL_HANDLERS.get(tool_name) + if handler is None: + raise HTTPException( + status_code=400, detail=f"Unknown Redis MCP tool: {tool_name}" + ) + return await handler(args) + + +# --------------------------------------------------------------------------- +# Per-tool POST endpoints (for MCP registry endpoint pattern) +# --------------------------------------------------------------------------- + + +@with_error_handling( + category=ErrorCategory.SERVER_ERROR, + operation="redis_mcp_tool_endpoint", + error_code_prefix="REDIS_MCP", +) +@router.post("/mcp/{tool_name}") +async def redis_mcp_tool_endpoint( + tool_name: str, + request: RedisToolCallRequest, + current_user: dict = Depends(get_current_user), +) -> Metadata: + """Individual tool endpoint for MCP registry compatibility.""" + request.tool_name = tool_name + return await call_redis_mcp_tool(request, current_user) + + +# --------------------------------------------------------------------------- +# Handler dispatch table +# --------------------------------------------------------------------------- + + +async def _wrap_data_get(args: dict) -> Metadata: + return await handle_redis_get( + key=args["key"], database=args.get("database", "main") + ) + + +async def _wrap_data_set(args: dict) -> Metadata: + return await handle_redis_set( + key=args["key"], + value=args["value"], + ttl=args.get("ttl"), + database=args.get("database", "main"), + ) + + +async def _wrap_data_delete(args: dict) -> Metadata: + return await handle_redis_delete( + keys=args["keys"], database=args.get("database", "main") + ) + + +async def _wrap_data_hget(args: dict) -> Metadata: + return await handle_redis_hget( + key=args["key"], + field=args["field"], + database=args.get("database", "main"), + ) + + +async def _wrap_data_hgetall(args: dict) -> Metadata: + return await handle_redis_hgetall( + key=args["key"], database=args.get("database", "main") + ) + + +async def _wrap_data_hset(args: dict) -> Metadata: + return await handle_redis_hset( + key=args["key"], + mapping=args["mapping"], + database=args.get("database", "main"), + ) + + +async def _wrap_data_lrange(args: dict) -> Metadata: + return await handle_redis_lrange( + key=args["key"], + start=args.get("start", 0), + stop=args.get("stop", -1), + database=args.get("database", "main"), + ) + + +async def _wrap_data_lpush(args: dict) -> Metadata: + return await handle_redis_lpush( + key=args["key"], + values=args["values"], + database=args.get("database", "main"), + ) + + +async def _wrap_data_rpush(args: dict) -> Metadata: + return await handle_redis_rpush( + key=args["key"], + values=args["values"], + database=args.get("database", "main"), + ) + + +async def _wrap_data_zrange(args: dict) -> Metadata: + return await handle_redis_zrange( + key=args["key"], + start=args.get("start", 0), + stop=args.get("stop", -1), + withscores=args.get("withscores", False), + database=args.get("database", "main"), + ) + + +async def _wrap_data_xrange(args: dict) -> Metadata: + return await handle_redis_xrange( + key=args["key"], + start=args.get("start", "-"), + end=args.get("end", "+"), + count=args.get("count"), + database=args.get("database", "main"), + ) + + +async def _wrap_data_xadd(args: dict) -> Metadata: + return await handle_redis_xadd( + key=args["key"], + fields=args["fields"], + maxlen=args.get("maxlen"), + database=args.get("database", "main"), + ) + + +async def _wrap_data_scan_keys(args: dict) -> Metadata: + return await handle_redis_scan_keys( + pattern=args.get("pattern", "*"), + count=args.get("count", 100), + database=args.get("database", "main"), + ) + + +async def _wrap_data_type(args: dict) -> Metadata: + return await handle_redis_type( + key=args["key"], database=args.get("database", "main") + ) + + +async def _wrap_data_ttl(args: dict) -> Metadata: + return await handle_redis_ttl( + key=args["key"], database=args.get("database", "main") + ) + + +async def _wrap_vector_create_index(args: dict) -> Metadata: + return await handle_redis_vector_create_index( + index_name=args.get("index_name", "idx:agent_memory"), + prefix=args.get("prefix", "autobot:agent:memory:"), + vector_field=args.get("vector_field", "embedding"), + dimensions=args.get("dimensions", 1536), + distance_metric=args.get("distance_metric", "COSINE"), + extra_fields=args.get("extra_fields"), + database=args.get("database", "vectors"), + ) + + +async def _wrap_vector_search(args: dict) -> Metadata: + return await handle_redis_vector_search( + query_vector=args.get("query_vector"), + query_text=args.get("query_text"), + index_name=args.get("index_name", "idx:agent_memory"), + top_k=args.get("top_k", 10), + return_fields=args.get("return_fields"), + database=args.get("database", "vectors"), + ) + + +async def _wrap_hybrid_search(args: dict) -> Metadata: + return await handle_redis_hybrid_search( + query_vector=args.get("query_vector"), + query_text=args.get("query_text"), + filter_expression=args.get("filter_expression", ""), + index_name=args.get("index_name", "idx:agent_memory"), + top_k=args.get("top_k", 10), + return_fields=args.get("return_fields"), + database=args.get("database", "vectors"), + ) + + +async def _wrap_vector_index_info(args: dict) -> Metadata: + return await handle_redis_vector_index_info( + index_name=args.get("index_name", "idx:agent_memory"), + database=args.get("database", "vectors"), + ) + + +async def _wrap_ops_server_info(args: dict) -> Metadata: + return await handle_redis_server_info( + section=args.get("section"), + database=args.get("database", "main"), + ) + + +async def _wrap_ops_dbsize(args: dict) -> Metadata: + return await handle_redis_dbsize(database=args.get("database", "main")) + + +async def _wrap_ops_memory_stats(args: dict) -> Metadata: + return await handle_redis_memory_stats(database=args.get("database", "main")) + + +async def _wrap_ops_stream_health(args: dict) -> Metadata: + return await handle_redis_stream_health( + key=args["key"], database=args.get("database", "main") + ) + + +async def _wrap_ops_client_list(args: dict) -> Metadata: + return await handle_redis_client_list(database=args.get("database", "main")) + + +async def _wrap_ops_slowlog(args: dict) -> Metadata: + return await handle_redis_slowlog( + count=args.get("count", 10), + database=args.get("database", "main"), + ) + + +_TOOL_HANDLERS = { + # Data Access + "redis_get": _wrap_data_get, + "redis_set": _wrap_data_set, + "redis_delete": _wrap_data_delete, + "redis_hget": _wrap_data_hget, + "redis_hgetall": _wrap_data_hgetall, + "redis_hset": _wrap_data_hset, + "redis_lrange": _wrap_data_lrange, + "redis_lpush": _wrap_data_lpush, + "redis_rpush": _wrap_data_rpush, + "redis_zrange": _wrap_data_zrange, + "redis_xrange": _wrap_data_xrange, + "redis_xadd": _wrap_data_xadd, + "redis_scan_keys": _wrap_data_scan_keys, + "redis_type": _wrap_data_type, + "redis_ttl": _wrap_data_ttl, + # Vector Search + "redis_vector_create_index": _wrap_vector_create_index, + "redis_vector_search": _wrap_vector_search, + "redis_hybrid_search": _wrap_hybrid_search, + "redis_vector_index_info": _wrap_vector_index_info, + # Ops Intelligence + "redis_server_info": _wrap_ops_server_info, + "redis_dbsize": _wrap_ops_dbsize, + "redis_memory_stats": _wrap_ops_memory_stats, + "redis_stream_health": _wrap_ops_stream_health, + "redis_client_list": _wrap_ops_client_list, + "redis_slowlog": _wrap_ops_slowlog, +} + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _is_admin(user: dict) -> bool: + """Check if the user has admin role.""" + role = user.get("role", "user") + return role in ("admin", "superadmin") + + +def _extract_keys(args: dict) -> list[str]: + """Extract all keys from tool arguments for namespace validation.""" + if "key" in args: + return [args["key"]] + if "keys" in args and args["keys"]: + return list(args["keys"]) + return [] diff --git a/autobot-backend/api/redis_mcp/tools.py b/autobot-backend/api/redis_mcp/tools.py new file mode 100644 index 000000000..ad650b17e --- /dev/null +++ b/autobot-backend/api/redis_mcp/tools.py @@ -0,0 +1,644 @@ +# AutoBot - AI-Powered Automation Platform +# Copyright (c) 2025 mrveiss +# Author: mrveiss +""" +MCP tool definitions (schemas and descriptions) for Redis MCP Bridge. + +Issue #2511: 25 tool definitions across 3 categories. +Follows the pattern established by filesystem_mcp.py (Issue #620 refactoring). +""" + +from __future__ import annotations + +from pydantic import BaseModel +from type_defs.common import JSONObject + + +class MCPTool(BaseModel): + """Standard MCP tool definition.""" + + name: str + description: str + input_schema: JSONObject + + +# --------------------------------------------------------------------------- +# Data Access Tools (15) +# --------------------------------------------------------------------------- + + +def _tool_redis_get() -> MCPTool: + return MCPTool( + name="redis_get", + description="Get a string value by key from Redis.", + input_schema={ + "type": "object", + "properties": { + "key": {"type": "string", "description": "Redis key to retrieve"}, + "database": { + "type": "string", + "description": "Named database (default: main)", + "default": "main", + }, + }, + "required": ["key"], + }, + ) + + +def _tool_redis_set() -> MCPTool: + return MCPTool( + name="redis_set", + description="Set a string value with optional TTL. Users: autobot:agent:* only.", + input_schema={ + "type": "object", + "properties": { + "key": {"type": "string", "description": "Redis key to set"}, + "value": {"type": "string", "description": "Value to store"}, + "ttl": { + "type": "integer", + "description": "Optional TTL in seconds", + }, + "database": { + "type": "string", + "description": "Named database (default: main)", + "default": "main", + }, + }, + "required": ["key", "value"], + }, + ) + + +def _tool_redis_delete() -> MCPTool: + return MCPTool( + name="redis_delete", + description="Delete one or more keys. Users: autobot:agent:* only. Admins: approval required.", + input_schema={ + "type": "object", + "properties": { + "keys": { + "type": "array", + "items": {"type": "string"}, + "description": "List of keys to delete", + }, + "database": { + "type": "string", + "description": "Named database (default: main)", + "default": "main", + }, + }, + "required": ["keys"], + }, + ) + + +def _tool_redis_hget() -> MCPTool: + return MCPTool( + name="redis_hget", + description="Get a single field from a Redis hash.", + input_schema={ + "type": "object", + "properties": { + "key": {"type": "string", "description": "Hash key"}, + "field": {"type": "string", "description": "Field name"}, + "database": {"type": "string", "default": "main"}, + }, + "required": ["key", "field"], + }, + ) + + +def _tool_redis_hgetall() -> MCPTool: + return MCPTool( + name="redis_hgetall", + description="Get all fields and values from a Redis hash.", + input_schema={ + "type": "object", + "properties": { + "key": {"type": "string", "description": "Hash key"}, + "database": {"type": "string", "default": "main"}, + }, + "required": ["key"], + }, + ) + + +def _tool_redis_hset() -> MCPTool: + return MCPTool( + name="redis_hset", + description="Set one or more fields in a Redis hash. Users: autobot:agent:* only.", + input_schema={ + "type": "object", + "properties": { + "key": {"type": "string", "description": "Hash key"}, + "mapping": { + "type": "object", + "description": "Field-value pairs to set", + }, + "database": {"type": "string", "default": "main"}, + }, + "required": ["key", "mapping"], + }, + ) + + +def _tool_redis_lrange() -> MCPTool: + return MCPTool( + name="redis_lrange", + description="Get a range of elements from a Redis list.", + input_schema={ + "type": "object", + "properties": { + "key": {"type": "string", "description": "List key"}, + "start": { + "type": "integer", + "description": "Start index (0-based)", + "default": 0, + }, + "stop": { + "type": "integer", + "description": "Stop index (-1 for all)", + "default": -1, + }, + "database": {"type": "string", "default": "main"}, + }, + "required": ["key"], + }, + ) + + +def _tool_redis_lpush() -> MCPTool: + return MCPTool( + name="redis_lpush", + description="Push values to the left of a Redis list. Users: autobot:agent:* only.", + input_schema={ + "type": "object", + "properties": { + "key": {"type": "string", "description": "List key"}, + "values": { + "type": "array", + "items": {"type": "string"}, + "description": "Values to push", + }, + "database": {"type": "string", "default": "main"}, + }, + "required": ["key", "values"], + }, + ) + + +def _tool_redis_rpush() -> MCPTool: + return MCPTool( + name="redis_rpush", + description="Push values to the right of a Redis list. Users: autobot:agent:* only.", + input_schema={ + "type": "object", + "properties": { + "key": {"type": "string", "description": "List key"}, + "values": { + "type": "array", + "items": {"type": "string"}, + "description": "Values to push", + }, + "database": {"type": "string", "default": "main"}, + }, + "required": ["key", "values"], + }, + ) + + +def _tool_redis_zrange() -> MCPTool: + return MCPTool( + name="redis_zrange", + description="Get a range of elements from a sorted set, with optional scores.", + input_schema={ + "type": "object", + "properties": { + "key": {"type": "string", "description": "Sorted set key"}, + "start": {"type": "integer", "default": 0}, + "stop": {"type": "integer", "default": -1}, + "withscores": { + "type": "boolean", + "description": "Include scores", + "default": False, + }, + "database": {"type": "string", "default": "main"}, + }, + "required": ["key"], + }, + ) + + +def _tool_redis_xrange() -> MCPTool: + return MCPTool( + name="redis_xrange", + description="Read entries from a Redis stream.", + input_schema={ + "type": "object", + "properties": { + "key": {"type": "string", "description": "Stream key"}, + "start": { + "type": "string", + "description": "Start ID (default: '-' for earliest)", + "default": "-", + }, + "end": { + "type": "string", + "description": "End ID (default: '+' for latest)", + "default": "+", + }, + "count": { + "type": "integer", + "description": "Max entries to return", + }, + "database": {"type": "string", "default": "main"}, + }, + "required": ["key"], + }, + ) + + +def _tool_redis_xadd() -> MCPTool: + return MCPTool( + name="redis_xadd", + description="Add an entry to a Redis stream. Users: autobot:agent:* only.", + input_schema={ + "type": "object", + "properties": { + "key": {"type": "string", "description": "Stream key"}, + "fields": { + "type": "object", + "description": "Field-value pairs for the stream entry", + }, + "maxlen": { + "type": "integer", + "description": "Optional max stream length (approximate trim)", + }, + "database": {"type": "string", "default": "main"}, + }, + "required": ["key", "fields"], + }, + ) + + +def _tool_redis_scan_keys() -> MCPTool: + return MCPTool( + name="redis_scan_keys", + description="Scan keys matching a glob pattern. Returns up to 100 keys per call.", + input_schema={ + "type": "object", + "properties": { + "pattern": { + "type": "string", + "description": "Glob pattern (e.g. 'autobot:agent:*')", + "default": "*", + }, + "count": { + "type": "integer", + "description": "Hint for keys per scan iteration", + "default": 100, + }, + "database": {"type": "string", "default": "main"}, + }, + }, + ) + + +def _tool_redis_type() -> MCPTool: + return MCPTool( + name="redis_type", + description="Get the data type of a Redis key.", + input_schema={ + "type": "object", + "properties": { + "key": {"type": "string", "description": "Redis key"}, + "database": {"type": "string", "default": "main"}, + }, + "required": ["key"], + }, + ) + + +def _tool_redis_ttl() -> MCPTool: + return MCPTool( + name="redis_ttl", + description="Get the remaining TTL (in seconds) of a Redis key. -1 = no expiry, -2 = key missing.", + input_schema={ + "type": "object", + "properties": { + "key": {"type": "string", "description": "Redis key"}, + "database": {"type": "string", "default": "main"}, + }, + "required": ["key"], + }, + ) + + +# --------------------------------------------------------------------------- +# Vector Search Tools (4) +# --------------------------------------------------------------------------- + + +def _tool_redis_vector_create_index() -> MCPTool: + return MCPTool( + name="redis_vector_create_index", + description=( + "Create a RediSearch vector index on hash keys using HNSW algorithm. " + "Default: idx:agent_memory on autobot:agent:memory:* with 1536 dims." + ), + input_schema={ + "type": "object", + "properties": { + "index_name": { + "type": "string", + "description": "Index name", + "default": "idx:agent_memory", + }, + "prefix": { + "type": "string", + "description": "Key prefix to index", + "default": "autobot:agent:memory:", + }, + "vector_field": { + "type": "string", + "description": "Field name for the vector", + "default": "embedding", + }, + "dimensions": { + "type": "integer", + "description": "Vector dimensions", + "default": 1536, + }, + "distance_metric": { + "type": "string", + "enum": ["COSINE", "L2", "IP"], + "description": "Distance metric", + "default": "COSINE", + }, + "extra_fields": { + "type": "array", + "items": { + "type": "object", + "properties": { + "name": {"type": "string"}, + "type": { + "type": "string", + "enum": ["TEXT", "TAG", "NUMERIC"], + }, + }, + "required": ["name", "type"], + }, + "description": "Additional schema fields (TEXT, TAG, NUMERIC)", + }, + "database": {"type": "string", "default": "vectors"}, + }, + }, + ) + + +def _tool_redis_vector_search() -> MCPTool: + return MCPTool( + name="redis_vector_search", + description=( + "Similarity search using a RediSearch index. " + "Provide query_text (auto-embedded) or query_vector (raw floats)." + ), + input_schema={ + "type": "object", + "properties": { + "index_name": { + "type": "string", + "description": "Index to search", + "default": "idx:agent_memory", + }, + "query_text": { + "type": "string", + "description": "Text to embed and search (alternative to query_vector)", + }, + "query_vector": { + "type": "array", + "items": {"type": "number"}, + "description": "Raw embedding vector (alternative to query_text)", + }, + "top_k": { + "type": "integer", + "description": "Number of results", + "default": 10, + }, + "return_fields": { + "type": "array", + "items": {"type": "string"}, + "description": "Fields to return (default: all)", + }, + "database": {"type": "string", "default": "vectors"}, + }, + }, + ) + + +def _tool_redis_hybrid_search() -> MCPTool: + return MCPTool( + name="redis_hybrid_search", + description=( + "Combined vector + filter query using RediSearch. " + "Provide query_text (auto-embedded) or query_vector (raw floats)." + ), + input_schema={ + "type": "object", + "properties": { + "index_name": { + "type": "string", + "default": "idx:agent_memory", + }, + "query_text": { + "type": "string", + "description": "Text to embed and search (alternative to query_vector)", + }, + "query_vector": { + "type": "array", + "items": {"type": "number"}, + "description": "Raw embedding vector (alternative to query_text)", + }, + "filter_expression": { + "type": "string", + "description": "RediSearch filter (e.g. '@agent_id:{agent_42}')", + }, + "top_k": {"type": "integer", "default": 10}, + "return_fields": { + "type": "array", + "items": {"type": "string"}, + }, + "database": {"type": "string", "default": "vectors"}, + }, + "required": ["filter_expression"], + }, + ) + + +def _tool_redis_vector_index_info() -> MCPTool: + return MCPTool( + name="redis_vector_index_info", + description="Get schema, key count, and stats for a RediSearch index.", + input_schema={ + "type": "object", + "properties": { + "index_name": { + "type": "string", + "default": "idx:agent_memory", + }, + "database": {"type": "string", "default": "vectors"}, + }, + }, + ) + + +# --------------------------------------------------------------------------- +# Ops Intelligence Tools (6) +# --------------------------------------------------------------------------- + + +def _tool_redis_server_info() -> MCPTool: + return MCPTool( + name="redis_server_info", + description="Get Redis server stats: memory, clients, replication, keyspace.", + input_schema={ + "type": "object", + "properties": { + "section": { + "type": "string", + "description": "Optional INFO section (e.g. 'memory', 'clients', 'stats')", + }, + "database": {"type": "string", "default": "main"}, + }, + }, + ) + + +def _tool_redis_dbsize() -> MCPTool: + return MCPTool( + name="redis_dbsize", + description="Get the number of keys in the current database.", + input_schema={ + "type": "object", + "properties": { + "database": {"type": "string", "default": "main"}, + }, + }, + ) + + +def _tool_redis_memory_stats() -> MCPTool: + return MCPTool( + name="redis_memory_stats", + description="Get detailed memory analysis: used, peak, fragmentation ratio, allocator stats.", + input_schema={ + "type": "object", + "properties": { + "database": {"type": "string", "default": "main"}, + }, + }, + ) + + +def _tool_redis_stream_health() -> MCPTool: + return MCPTool( + name="redis_stream_health", + description="Check stream health: length, consumer groups, pending entries, last entry ID.", + input_schema={ + "type": "object", + "properties": { + "key": {"type": "string", "description": "Stream key to inspect"}, + "database": {"type": "string", "default": "main"}, + }, + "required": ["key"], + }, + ) + + +def _tool_redis_client_list() -> MCPTool: + return MCPTool( + name="redis_client_list", + description="List connected Redis clients. Admin only.", + input_schema={ + "type": "object", + "properties": { + "database": {"type": "string", "default": "main"}, + }, + }, + ) + + +def _tool_redis_slowlog() -> MCPTool: + return MCPTool( + name="redis_slowlog", + description="Get recent slow queries from Redis slowlog. Admin only.", + input_schema={ + "type": "object", + "properties": { + "count": { + "type": "integer", + "description": "Number of entries to return", + "default": 10, + }, + "database": {"type": "string", "default": "main"}, + }, + }, + ) + + +# --------------------------------------------------------------------------- +# Grouped accessors (Issue #620 pattern) +# --------------------------------------------------------------------------- + + +def get_data_access_tools() -> list[MCPTool]: + """Return all 15 data access tool definitions.""" + return [ + _tool_redis_get(), + _tool_redis_set(), + _tool_redis_delete(), + _tool_redis_hget(), + _tool_redis_hgetall(), + _tool_redis_hset(), + _tool_redis_lrange(), + _tool_redis_lpush(), + _tool_redis_rpush(), + _tool_redis_zrange(), + _tool_redis_xrange(), + _tool_redis_xadd(), + _tool_redis_scan_keys(), + _tool_redis_type(), + _tool_redis_ttl(), + ] + + +def get_vector_search_tools() -> list[MCPTool]: + """Return all 4 vector search tool definitions.""" + return [ + _tool_redis_vector_create_index(), + _tool_redis_vector_search(), + _tool_redis_hybrid_search(), + _tool_redis_vector_index_info(), + ] + + +def get_ops_intelligence_tools() -> list[MCPTool]: + """Return all 6 ops intelligence tool definitions.""" + return [ + _tool_redis_server_info(), + _tool_redis_dbsize(), + _tool_redis_memory_stats(), + _tool_redis_stream_health(), + _tool_redis_client_list(), + _tool_redis_slowlog(), + ] + + +def get_all_tools() -> list[MCPTool]: + """Return all 25 Redis MCP tool definitions.""" + tools: list[MCPTool] = [] + tools.extend(get_data_access_tools()) + tools.extend(get_vector_search_tools()) + tools.extend(get_ops_intelligence_tools()) + return tools diff --git a/autobot-backend/api/redis_mcp/vector_search.py b/autobot-backend/api/redis_mcp/vector_search.py new file mode 100644 index 000000000..36d95d9c2 --- /dev/null +++ b/autobot-backend/api/redis_mcp/vector_search.py @@ -0,0 +1,297 @@ +# AutoBot - AI-Powered Automation Platform +# Copyright (c) 2025 mrveiss +# Author: mrveiss +""" +Vector Search tool handlers for Redis MCP Bridge (4 tools). + +Issue #2511: RediSearch vector index creation, similarity search, +hybrid search, and index info — using Redis Stack 7.4.0 FT.* commands. +Issue #2623: Transparent text-to-embedding conversion via NPU/Ollama fallback. +""" + +import logging +import re +import struct +from typing import Any, Dict, List, Optional + +from type_defs.common import Metadata + +from autobot_shared.redis_client import get_redis_client + +logger = logging.getLogger(__name__) + +# Upper bound for KNN top_k to prevent resource exhaustion (#2511) +_MAX_TOP_K = 500 + +# Allow only safe RediSearch filter tokens: alphanumerics, field refs (@), +# comparisons, parentheses, spaces, colons, hyphens, underscores, dots. +_SAFE_FILTER_PATTERN = re.compile(r"^[@\w\s:.\-()=<>|&*{}\[\],\"']+$") + + +async def _text_to_embedding(text: str) -> List[float]: + """Convert text to embedding using the existing RAG pipeline (Issue #2623). + + Uses NPU worker with Ollama fallback — same pipeline as knowledge base. + """ + from knowledge.facts import _generate_embedding_with_npu_fallback + + return await _generate_embedding_with_npu_fallback(text) + + +async def _get_client(database: str = "vectors"): + """Get an async Redis client for vector operations.""" + return await get_redis_client(async_client=True, database=database) + + +def _float_list_to_bytes(vector: List[float]) -> bytes: + """Pack a list of floats into a binary blob for RediSearch KNN queries.""" + return struct.pack(f"{len(vector)}f", *vector) + + +# --------------------------------------------------------------------------- +# Index management +# --------------------------------------------------------------------------- + + +def _build_index_schema( + vector_field: str, + dimensions: int, + distance_metric: str, + extra_fields: Optional[List[Dict[str, str]]] = None, +) -> List[Any]: + """Build FT.CREATE SCHEMA arguments for a vector index (#2511).""" + schema: List[Any] = [] + if extra_fields: + for field in extra_fields: + schema.extend([field["name"], field["type"]]) + schema.extend( + [ + vector_field, + "VECTOR", + "HNSW", + "6", + "TYPE", + "FLOAT32", + "DIM", + str(dimensions), + "DISTANCE_METRIC", + distance_metric, + ] + ) + return schema + + +async def handle_redis_vector_create_index( + index_name: str = "idx:agent_memory", + prefix: str = "autobot:agent:memory:", + vector_field: str = "embedding", + dimensions: int = 1536, + distance_metric: str = "COSINE", + extra_fields: Optional[List[Dict[str, str]]] = None, + database: str = "vectors", +) -> Metadata: + """Create a RediSearch vector index using HNSW.""" + client = await _get_client(database) + schema_args = _build_index_schema( + vector_field, dimensions, distance_metric, extra_fields + ) + try: + await client.execute_command( + "FT.CREATE", + index_name, + "ON", + "HASH", + "PREFIX", + "1", + prefix, + "SCHEMA", + *schema_args, + ) + return { + "status": "success", + "index_name": index_name, + "prefix": prefix, + "dimensions": dimensions, + "distance_metric": distance_metric, + } + except Exception as e: + if "Index already exists" in str(e): + return { + "status": "success", + "index_name": index_name, + "message": "Index already exists", + } + raise + + +async def _execute_vector_query( + query_str: str, + query_vector: Optional[List[float]], + query_text: Optional[str], + index_name: str, + top_k: int, + return_fields: Optional[List[str]], + database: str, + extra_meta: Optional[Dict[str, Any]] = None, +) -> Metadata: + """Shared KNN query execution for vector and hybrid search (#2511).""" + if query_vector is None and query_text is None: + return { + "status": "error", + "message": "Provide either query_text or query_vector", + "code": "MISSING_QUERY", + } + top_k = min(max(1, top_k), _MAX_TOP_K) + if query_vector is None: + query_vector = await _text_to_embedding(query_text) + client = await _get_client(database) + blob = _float_list_to_bytes(query_vector) + + cmd_args = [ + "FT.SEARCH", + index_name, + query_str, + "PARAMS", + "2", + "BLOB", + blob, + "SORTBY", + "score", + "DIALECT", + "2", + ] + if return_fields: + cmd_args.extend(["RETURN", str(len(return_fields) + 1), "score"]) + cmd_args.extend(return_fields) + + raw = await client.execute_command(*cmd_args) + results = _parse_ft_search_results(raw) + meta: Dict[str, Any] = { + "status": "success", + "index_name": index_name, + "results": results, + "count": len(results), + } + if extra_meta: + meta.update(extra_meta) + return meta + + +async def handle_redis_vector_search( + query_vector: Optional[List[float]] = None, + query_text: Optional[str] = None, + index_name: str = "idx:agent_memory", + top_k: int = 10, + return_fields: Optional[List[str]] = None, + database: str = "vectors", +) -> Metadata: + """Similarity search by embedding vector or text (Issue #2623).""" + top_k = min(max(1, top_k), _MAX_TOP_K) + query_str = f"*=>[KNN {top_k} @embedding $BLOB AS score]" + return await _execute_vector_query( + query_str, + query_vector, + query_text, + index_name, + top_k, + return_fields, + database, + ) + + +async def handle_redis_hybrid_search( + query_vector: Optional[List[float]] = None, + query_text: Optional[str] = None, + filter_expression: str = "", + index_name: str = "idx:agent_memory", + top_k: int = 10, + return_fields: Optional[List[str]] = None, + database: str = "vectors", +) -> Metadata: + """Vector + filter combined query (Issue #2623: accepts query_text).""" + if filter_expression and not _SAFE_FILTER_PATTERN.match(filter_expression): + return { + "status": "error", + "message": "Invalid filter_expression: contains disallowed characters", + "code": "INVALID_FILTER", + } + top_k = min(max(1, top_k), _MAX_TOP_K) + pre = f"({filter_expression})" if filter_expression else "*" + query_str = f"{pre}=>[KNN {top_k} @embedding $BLOB AS score]" + return await _execute_vector_query( + query_str, + query_vector, + query_text, + index_name, + top_k, + return_fields, + database, + extra_meta={"filter": filter_expression} if filter_expression else None, + ) + + +async def handle_redis_vector_index_info( + index_name: str = "idx:agent_memory", + database: str = "vectors", +) -> Metadata: + """Get index schema and stats.""" + client = await _get_client(database) + try: + raw = await client.execute_command("FT.INFO", index_name) + info = _parse_ft_info(raw) + return {"status": "success", "index_name": index_name, "info": info} + except Exception as e: + if "Unknown Index name" in str(e): + return { + "status": "error", + "message": f"Index '{index_name}' does not exist", + "code": "INDEX_NOT_FOUND", + } + raise + + +# --------------------------------------------------------------------------- +# Result parsers +# --------------------------------------------------------------------------- + + +def _parse_ft_search_results(raw) -> List[Dict[str, Any]]: + """Parse FT.SEARCH response into a list of result dicts.""" + if not raw or not isinstance(raw, (list, tuple)): + return [] + total = raw[0] if isinstance(raw[0], int) else int(raw[0]) + if total == 0: + return [] + results = [] + i = 1 + while i < len(raw) - 1: + doc_id = raw[i] + if isinstance(doc_id, bytes): + doc_id = doc_id.decode("utf-8") + fields_raw = raw[i + 1] + fields = _pairs_to_dict(fields_raw) + fields["_id"] = doc_id + results.append(fields) + i += 2 + return results + + +def _pairs_to_dict(flat_list) -> Dict[str, Any]: + """Convert a flat [key, value, key, value, ...] list to a dict.""" + result: Dict[str, Any] = {} + if not flat_list: + return result + for idx in range(0, len(flat_list) - 1, 2): + key = flat_list[idx] + val = flat_list[idx + 1] + if isinstance(key, bytes): + key = key.decode("utf-8") + if isinstance(val, bytes): + val = val.decode("utf-8") + result[key] = val + return result + + +def _parse_ft_info(raw) -> Dict[str, Any]: + """Parse FT.INFO flat list response into a readable dict.""" + return _pairs_to_dict(raw) diff --git a/autobot-backend/chat_workflow/tool_handler.py b/autobot-backend/chat_workflow/tool_handler.py index 04ddb1bcd..27aa12a6f 100644 --- a/autobot-backend/chat_workflow/tool_handler.py +++ b/autobot-backend/chat_workflow/tool_handler.py @@ -210,6 +210,43 @@ def _create_execution_result( } +def _build_mcp_approval_message( + tool_name: str, + bridge: str, + raw_result: dict, + execution_results: list[dict[str, Any]], +) -> WorkflowMessage: + """Build a WorkflowMessage for MCP bridge approval requests (Issue #2622).""" + approval_msg = raw_result.get("message", "This operation requires approval.") + execution_results.append( + { + "tool": tool_name, + "bridge": bridge, + "result": approval_msg, + "status": "approval_required", + } + ) + logger.info( + "[Issue #2622] MCP approval required: tool=%s bridge=%s", + tool_name, + bridge, + ) + return WorkflowMessage( + type="tool_result", + content=( + f"[{bridge}] **Approval required:** {approval_msg}\n" + "Ask the user to confirm, then retry with `approved: true` " + "in the arguments." + ), + metadata={ + "tool_name": tool_name, + "bridge": bridge, + "mcp_dispatch": True, + "approval_required": True, + }, + ) + + async def _try_mcp_dispatch( tool_name: str, tool_call: dict[str, Any], @@ -230,8 +267,6 @@ async def _try_mcp_dispatch( from services.mcp_dispatch import get_mcp_dispatcher dispatcher = get_mcp_dispatcher() - # Lazy-load cache on first call; if cache is already loaded and tool is - # absent, skip a network round-trip. if not dispatcher._cache_loaded: await dispatcher.refresh_tool_cache() @@ -243,8 +278,15 @@ async def _try_mcp_dispatch( mcp_result = await dispatcher.dispatch(tool_name, arguments, role=role) bridge = mcp_result.get("bridge", "unknown") success = mcp_result.get("success", False) - result_text = str(mcp_result.get("result", "")) + raw_result = mcp_result.get("result", "") + + # Issue #2622: Detect approval_required from MCP bridges + if isinstance(raw_result, dict) and raw_result.get("status") == "approval_required": + return _build_mcp_approval_message( + tool_name, bridge, raw_result, execution_results + ) + result_text = str(raw_result) execution_results.append( { "tool": tool_name, diff --git a/autobot-backend/initialization/router_registry/core_routers.py b/autobot-backend/initialization/router_registry/core_routers.py index 6bf069a60..745588ca2 100644 --- a/autobot-backend/initialization/router_registry/core_routers.py +++ b/autobot-backend/initialization/router_registry/core_routers.py @@ -55,6 +55,7 @@ from api.prometheus_mcp import router as prometheus_mcp_router from api.prompts import router as prompts_router from api.redis import router as redis_router +from api.redis_mcp import router as redis_mcp_router # Issue #2511 from api.sequential_thinking_mcp import router as sequential_thinking_mcp_router from api.service_messages import router as service_messages_router from api.settings import router as settings_router @@ -266,6 +267,7 @@ def _get_mcp_routers() -> list: ["prometheus_mcp", "mcp"], "prometheus_mcp", ), + (redis_mcp_router, "/redis", ["redis_mcp", "mcp"], "redis_mcp"), # Issue #2511 ]