diff --git a/agent/core/session_persistence.py b/agent/core/session_persistence.py index 760b5934..d1aa7549 100644 --- a/agent/core/session_persistence.py +++ b/agent/core/session_persistence.py @@ -65,6 +65,10 @@ class NoopSessionStore: async def init(self) -> None: return None + async def maybe_reconnect(self) -> bool: + """Return True when the store is usable; subclasses may retry a failed init.""" + return self.enabled + async def close(self) -> None: return None @@ -130,6 +134,16 @@ async def init(self) -> None: self.client = None self.db = None + async def maybe_reconnect(self) -> bool: + """Retry a failed init so one Mongo blip at boot doesn't disable + persistence (and with it the idle reaper) until the next restart.""" + if self.enabled: + return True + await self.init() + if self.enabled: + logger.info("Mongo session persistence recovered after earlier failure") + return self.enabled + async def close(self) -> None: if self.client is not None: await self.client.close() @@ -251,6 +265,34 @@ async def save_snapshot( raise RuntimeError("session store not ready") return now = _now() + if raise_on_error: + await self._write_snapshot_messages( + session_id=session_id, + messages=messages, + now=now, + raise_on_error=True, + ) + await self.upsert_session( + session_id=session_id, + user_id=user_id, + model=model, + title=title, + created_at=created_at, + runtime_state=runtime_state, + status=status, + message_count=len(messages), + turn_count=turn_count, + pending_approval=pending_approval, + notification_destinations=notification_destinations, + usage_window_started_at=usage_window_started_at, + inference_billing_session_id=inference_billing_session_id, + auto_approval_enabled=auto_approval_enabled, + auto_approval_cost_cap_usd=auto_approval_cost_cap_usd, + auto_approval_estimated_spend_usd=auto_approval_estimated_spend_usd, + usage_warning_next_threshold_usd=usage_warning_next_threshold_usd, + ) + return + await self.upsert_session( session_id=session_id, user_id=user_id, @@ -270,6 +312,21 @@ async def save_snapshot( auto_approval_estimated_spend_usd=auto_approval_estimated_spend_usd, usage_warning_next_threshold_usd=usage_warning_next_threshold_usd, ) + await self._write_snapshot_messages( + session_id=session_id, + messages=messages, + now=now, + raise_on_error=False, + ) + + async def _write_snapshot_messages( + self, + *, + session_id: str, + messages: list[dict[str, Any]], + now: datetime, + raise_on_error: bool, + ) -> None: ops: list[Any] = [] for idx, raw in enumerate(messages): ops.append( diff --git a/backend/session_manager.py b/backend/session_manager.py index cf277770..e6b11cb2 100644 --- a/backend/session_manager.py +++ b/backend/session_manager.py @@ -198,6 +198,15 @@ def __init__(self, message: str, error_type: str = "global") -> None: REAPER_INTERVAL_S: float = float(os.environ.get("REAPER_INTERVAL_S", "300")) REAP_TEARDOWN_TIMEOUT_S: float = float(os.environ.get("REAP_TEARDOWN_TIMEOUT_S", "30")) REAPER_IDLE = timedelta(minutes=REAPER_IDLE_MINUTES) +# Sessions parked on a real tool-permission prompt keep their slot longer than +# plain idle ones (the user may genuinely want to "approve later"), but not +# forever — an unanswered prompt must not pin a capacity slot indefinitely. +# Acknowledgement-type prompts (usage-threshold / YOLO-cap), which are created +# automatically at turn end with no user action, use the normal idle window. +REAPER_TOOL_APPROVAL_IDLE_MINUTES: float = float( + os.environ.get("REAPER_TOOL_APPROVAL_IDLE_MINUTES", "60") +) +REAPER_TOOL_APPROVAL_IDLE = timedelta(minutes=REAPER_TOOL_APPROVAL_IDLE_MINUTES) class SessionManager: @@ -251,6 +260,32 @@ def _count_user_sessions(self, user_id: str) -> int: 1 for s in self.sessions.values() if s.user_id == user_id and s.is_active ) + def _user_slot_breakdown(self, user_id: str) -> str: + """Summarize what holds a user's live slots, for the capacity error. + + Call under the manager lock so the breakdown matches the count that + triggered the error. + """ + tool_approvals = ack_prompts = processing = idle = 0 + for s in self.sessions.values(): + if s.user_id != user_id or not s.is_active: + continue + pending = s.session.pending_approval + if is_usage_threshold_pending(pending) or is_yolo_budget_pending(pending): + ack_prompts += 1 + elif pending: + tool_approvals += 1 + elif s.is_processing: + processing += 1 + else: + idle += 1 + return ( + f"{tool_approvals} awaiting tool approval, " + f"{ack_prompts} usage/cost prompts, " + f"{processing} still processing, " + f"{idle} idle" + ) + @staticmethod def _touch(agent_session: "AgentSession") -> None: """Stamp genuine activity so the idle reaper's clock resets. @@ -358,13 +393,45 @@ def _pending_tools_for_api(session: Session) -> list[dict[str, Any]] | None: ) return result + @staticmethod + def _pending_approval_docs( + pending_approval: Any, + *, + session_id: str | None = None, + ) -> list[dict[str, Any]]: + """Return persisted pending approval docs that are safe to inspect.""" + if not pending_approval: + return [] + if not isinstance(pending_approval, list): + logger.warning( + "Dropping malformed pending approval for %s: expected list, got %s", + session_id or "", + type(pending_approval).__name__, + ) + return [] + docs: list[dict[str, Any]] = [] + for raw in pending_approval: + if isinstance(raw, dict): + docs.append(raw) + else: + logger.warning( + "Dropping malformed pending approval item for %s: expected dict, got %s", + session_id or "", + type(raw).__name__, + ) + return docs + def _restore_pending_approval( - self, session: Session, pending_approval: list[dict[str, Any]] | None + self, session: Session, pending_approval: Any ) -> None: - if not pending_approval: + docs = self._pending_approval_docs( + pending_approval, + session_id=getattr(session, "session_id", None), + ) + if not docs: session.pending_approval = None return - first = pending_approval[0] + first = docs[0] if isinstance(first, dict) and first.get("kind") in { USAGE_THRESHOLD_TOOL_NAME, YOLO_BUDGET_TOOL_NAME, @@ -374,7 +441,7 @@ def _restore_pending_approval( from litellm import ChatCompletionMessageToolCall as ToolCall restored = [] - for raw in pending_approval: + for raw in docs: try: if "function" in raw: restored.append(ToolCall(**raw)) @@ -395,18 +462,19 @@ def _restore_pending_approval( @staticmethod def _pending_docs_for_api( - pending_approval: list[dict[str, Any]] | None, + pending_approval: Any, ) -> list[dict[str, Any]] | None: - if not pending_approval: + docs = SessionManager._pending_approval_docs(pending_approval) + if not docs: return None - first = pending_approval[0] + first = docs[0] if isinstance(first, dict) and first.get("kind") == USAGE_THRESHOLD_TOOL_NAME: return [usage_threshold_pending_to_tool(first)] if isinstance(first, dict) and first.get("kind") == YOLO_BUDGET_TOOL_NAME: return [yolo_budget_pending_to_tool(first)] result: list[dict[str, Any]] = [] - for raw in pending_approval: - if "function" in raw: + for raw in docs: + if "function" in raw and isinstance(raw.get("function"), dict): function = raw.get("function") or {} try: args = json.loads(function.get("arguments") or "{}") @@ -439,6 +507,133 @@ def _runtime_state(agent_session: AgentSession) -> str: return "ended" return "idle" + @staticmethod + def _reaper_verdict(agent_session: AgentSession, now: datetime) -> str: + """Classify a live session for the reaper. + + Returns an ``evict_*`` reason when the session should be torn down + this sweep, a ``skip_*`` label when it holds a slot for a known + reason, or ``"skip"`` when it is simply fresh. ``is_reaping`` is + deliberately not consulted here: callers handle it (_reap_one + re-checks the verdict after setting the flag itself). + + ``is_processing`` is evaluated before ``pending_approval``: when both + are true (an in-flight exec-approval continuation), in-flight work + keeps the slot until it finishes, fails, or is interrupted explicitly. + """ + if agent_session.user_id == "dev" or not agent_session.is_active: + return "skip" + if agent_session.is_processing: + return "skip_processing" + queue_empty = agent_session.submission_queue.empty() + idle_for = now - agent_session.last_active_at + pending = agent_session.session.pending_approval + if pending: + if is_usage_threshold_pending(pending) or is_yolo_budget_pending(pending): + if idle_for >= REAPER_IDLE and queue_empty: + return "evict_pending_ack" + return "skip_pending_ack" + if idle_for >= REAPER_TOOL_APPROVAL_IDLE and queue_empty: + return "evict_pending_tool" + return "skip_pending_tool" + if idle_for >= REAPER_IDLE and queue_empty: + return "evict_idle" + return "skip" + + @staticmethod + def _tool_call_identity(raw: Any) -> tuple[str | None, str | None]: + """Return (tool_call_id, tool_name) from a pending approval tool call.""" + if hasattr(raw, "function"): + function = getattr(raw, "function", None) + return getattr(raw, "id", None), getattr(function, "name", None) + if not isinstance(raw, dict): + return None, None + if isinstance(raw.get("function"), dict): + function = raw.get("function") or {} + return raw.get("id"), function.get("name") + return raw.get("tool_call_id"), raw.get("tool") + + @staticmethod + def _append_context_message(session: Session, message: Any) -> None: + add_message = getattr(session.context_manager, "add_message", None) + if callable(add_message): + add_message(message) + return + session.context_manager.items.append(message) + + async def _expire_pending_tool_approval( + self, + agent_session: AgentSession, + ) -> None: + """Expire stale real tool approvals before reaping their sandbox. + + Every assistant tool call needs a matching tool result for future LLM + requests to remain valid. We synthesize those results before clearing + the pending approval so the session restores as idle, not actionable. + """ + pending = agent_session.session.pending_approval + if ( + not isinstance(pending, dict) + or is_usage_threshold_pending(pending) + or is_yolo_budget_pending(pending) + ): + return + + from litellm import Message + + tool_calls = pending.get("tool_calls") or [] + expired = 0 + for raw in tool_calls: + tool_call_id, tool_name = self._tool_call_identity(raw) + if not tool_call_id or not tool_name: + logger.warning( + "Dropping malformed pending tool approval while reaping %s", + agent_session.session_id, + ) + continue + content = ( + "Tool approval expired after inactivity before execution. " + "The live session was released and its sandbox was reset." + ) + self._append_context_message( + agent_session.session, + Message( + role="tool", + content=content, + tool_call_id=tool_call_id, + name=tool_name, + ), + ) + expired += 1 + await agent_session.session.send_event( + Event( + event_type="tool_state_change", + data={ + "tool_call_id": tool_call_id, + "tool": tool_name, + "state": "abandoned", + "reason": "expired_inactive", + }, + ) + ) + agent_session.session.pending_approval = None + logger.info( + "Expired %d pending approval tool(s) before reaping %s", + expired, + agent_session.session_id, + ) + + def _reaper_verdict_still_matches( + self, + agent_session: AgentSession, + original_verdict: str, + now: datetime, + ) -> bool: + current_verdict = self._reaper_verdict(agent_session, now) + if original_verdict == "evict_pending_tool": + return current_verdict == "evict_idle" + return current_verdict == original_verdict + @staticmethod def _auto_approval_summary(session: Session) -> dict[str, Any]: if hasattr(session, "auto_approval_policy_summary"): @@ -1296,9 +1491,12 @@ async def create_session( if user_count >= MAX_SESSIONS_PER_USER: raise SessionCapacityError( f"You have reached the maximum of {MAX_SESSIONS_PER_USER} " - f"live sessions. Close an existing session, or wait " - f"{REAPER_IDLE_MINUTES:g} minutes after your last activity " - "for an idle session to be released.", + "live sessions. Close an existing session, or wait " + f"{REAPER_IDLE_MINUTES:g} minutes for idle or usage/cost " + "prompt sessions to be released, or " + f"{REAPER_TOOL_APPROVAL_IDLE_MINUTES:g} minutes for " + "unanswered tool approvals to expire. Currently held: " + f"{self._user_slot_breakdown(user_id)}.", error_type="per_user", ) self._pending_creates += 1 @@ -1523,80 +1721,118 @@ async def _reaper_loop(self) -> None: logger.error("Idle-session reaper sweep failed: %s", e) async def _reap_idle_sessions(self) -> None: - """Select idle candidates under the lock, then tear each down. - - Candidates are non-dev sessions that are live, not processing, not - awaiting tool approval (those are "approve later", not idle — reaping - would destroy the sandbox the approved tool needs), and untouched for - the idle window. We only snapshot IDs under the lock; the actual - teardown in _reap_one re-acquires it, because tearing a session down - while holding the lock would deadlock (the lock is non-reentrant). + """Classify live sessions under the lock, then tear down evictees. + + Each session gets a verdict from _reaper_verdict: plain idle or + approval prompts unanswered past their window. We only snapshot + (id, verdict) pairs under the lock; the actual teardown in _reap_one + re-acquires it, because tearing a session down while holding the lock + would deadlock (the lock is non-reentrant). """ # Reaping is only safe when sessions stay resumable from Mongo. With no - # store, eviction would destroy non-dev chats outright, so don't reap. - if not getattr(self._store(), "enabled", False): - return + # store, eviction would destroy non-dev chats outright, so don't reap — + # but give a Mongo store that failed its boot-time init a chance to + # recover instead of staying disabled until the next restart. The sweep + # interval is the retry backoff. + store = self._store() + if not getattr(store, "enabled", False): + if not await store.maybe_reconnect(): + return - cutoff = datetime.utcnow() - REAPER_IDLE + now = datetime.utcnow() + counters = { + "evicted_idle": 0, + "evicted_pending_ack": 0, + "evicted_pending_tool": 0, + "skipped_processing": 0, + "skipped_pending_tool_within_window": 0, + "skipped_pending_ack_within_window": 0, + "aborted": 0, + } + candidates: list[tuple[str, str]] = [] async with self._lock: - candidates = [ - agent_session.session_id - for agent_session in self.sessions.values() - if agent_session.is_active - and not agent_session.is_processing - and not agent_session.is_reaping - and agent_session.user_id != "dev" - and not agent_session.session.pending_approval - and agent_session.last_active_at <= cutoff - ] - if not candidates: - return - - reaped = 0 - for session_id in candidates: + for agent_session in self.sessions.values(): + if agent_session.is_reaping: + continue + verdict = self._reaper_verdict(agent_session, now) + if verdict.startswith("evict_"): + candidates.append((agent_session.session_id, verdict)) + elif verdict == "skip_processing": + counters["skipped_processing"] += 1 + elif verdict == "skip_pending_tool": + counters["skipped_pending_tool_within_window"] += 1 + elif verdict == "skip_pending_ack": + counters["skipped_pending_ack_within_window"] += 1 + + for session_id, verdict in candidates: try: - if await self._reap_one(session_id, cutoff): - reaped += 1 + if await self._reap_one(session_id, verdict=verdict, now=now): + counters["evicted_" + verdict.removeprefix("evict_")] += 1 + else: + counters["aborted"] += 1 except Exception as e: logger.warning("Failed to reap idle session %s: %s", session_id, e) - if reaped: - logger.info("Reaped %d idle session(s)", reaped) - - async def _reap_one(self, session_id: str, cutoff: datetime) -> bool: - """Tear down one idle session, leaving it resumable from Mongo. - - Re-checks every idle condition under the lock (a user may have become - active in the gap since selection), marks the session reaping, persists - a resumable snapshot outside the lock, then does one final locked - re-check before eviction. The runtime task is cancelled *outside* the - lock: its own ``finally`` frees the sandbox, and its identity-gated - persist no-ops because the session is already popped — so it can't - overwrite our resumable snapshot with ``"ended"`` and there's no - deadlock. Returns True if the session was reaped. + counters["aborted"] += 1 + if ( + counters["evicted_idle"] + or counters["evicted_pending_ack"] + or counters["evicted_pending_tool"] + or counters["aborted"] + ): + logger.info( + "Reaper sweep: evicted_idle=%d evicted_pending_ack=%d " + "evicted_pending_tool=%d skipped_processing=%d " + "skipped_pending_tool_within_window=%d " + "skipped_pending_ack_within_window=%d aborted=%d", + counters["evicted_idle"], + counters["evicted_pending_ack"], + counters["evicted_pending_tool"], + counters["skipped_processing"], + counters["skipped_pending_tool_within_window"], + counters["skipped_pending_ack_within_window"], + counters["aborted"], + ) + + async def _reap_one(self, session_id: str, *, verdict: str, now: datetime) -> bool: + """Tear down one evictable session, leaving it resumable from Mongo. + + Re-checks the verdict under the lock (a user may have become active — + or answered a prompt — in the gap since selection), marks the session + reaping, persists a resumable snapshot outside the lock, then does one + final locked re-check before eviction. Real tool approvals are expired + before snapshotting so they restore as idle after the sandbox is torn + down. The runtime task is cancelled *outside* the lock: its own + ``finally`` frees the sandbox, and its identity-gated persist no-ops + because the session is already popped — so it can't overwrite our + resumable snapshot with ``"ended"`` and there's no deadlock. Returns + True if the session was reaped. """ async with self._lock: agent_session = self.sessions.get(session_id) if ( agent_session is None - or not agent_session.is_active - or agent_session.is_processing or agent_session.is_reaping - or agent_session.session.pending_approval - or agent_session.last_active_at > cutoff - or not agent_session.submission_queue.empty() + or self._reaper_verdict(agent_session, now) != verdict ): return False agent_session.is_reaping = True # Persist a resumable snapshot *before* eviction so a concurrent reopen # reloads clean state. status="active" (never "ended") keeps it a normal - # chat in the sidebar. Do this outside the manager lock: Mongo writes can - # take network round trips, and is_reaping=True is enough to block submit - # from enqueueing while the snapshot is in flight. + # chat in the sidebar; runtime_state never persists as "processing", or + # an evicted row would render as processing forever in list_sessions. + # Do this outside the manager lock: Mongo writes can take network round + # trips, and is_reaping=True is enough to block submit from enqueueing + # while the snapshot is in flight. try: + if verdict == "evict_pending_tool": + await self._expire_pending_tool_approval(agent_session) + runtime_state = ( + "waiting_approval" if agent_session.session.pending_approval else "idle" + ) await self.persist_session_snapshot( agent_session, - runtime_state="idle", + runtime_state=runtime_state, status="active", raise_on_error=True, ) @@ -1615,13 +1851,7 @@ async def _reap_one(self, session_id: str, cutoff: datetime) -> bool: current = self.sessions.get(session_id) if current is not agent_session: return False - if ( - not agent_session.is_active - or agent_session.is_processing - or agent_session.session.pending_approval - or agent_session.last_active_at > cutoff - or not agent_session.submission_queue.empty() - ): + if not self._reaper_verdict_still_matches(agent_session, verdict, now): agent_session.is_reaping = False return False self.sessions.pop(session_id, None) @@ -1671,7 +1901,7 @@ async def _run_session( session = agent_session.session - # Start event broadcaster task + # Start event broadcaster task. broadcaster = EventBroadcaster(event_queue) agent_session.broadcaster = broadcaster broadcast_task = asyncio.create_task(broadcaster.run()) diff --git a/tests/unit/test_session_manager_persistence.py b/tests/unit/test_session_manager_persistence.py index c85a683b..3839ed70 100644 --- a/tests/unit/test_session_manager_persistence.py +++ b/tests/unit/test_session_manager_persistence.py @@ -252,6 +252,34 @@ def test_usage_threshold_pending_approval_serializes_and_restores(): assert restored.pending_approval == pending +def test_malformed_pending_approval_docs_are_dropped_for_api_and_restore(): + manager = _manager_with_store(NoopSessionStore()) + + assert manager._pending_docs_for_api({"not": "a-list"}) is None + assert manager._pending_docs_for_api([None, {"function": "not-a-dict"}]) is None + assert manager._pending_docs_for_api( + [ + object(), + { + "tool": "bash", + "tool_call_id": "tc-1", + "arguments": {"command": "echo hi"}, + }, + ] + ) == [ + { + "tool": "bash", + "tool_call_id": "tc-1", + "arguments": {"command": "echo hi"}, + } + ] + + restored = FakeRuntimeSession() + manager._restore_pending_approval(restored, {"not": "a-list"}) + + assert restored.pending_approval is None + + def test_usage_spend_prefers_hf_current_session_over_telemetry(): spend, source = SessionManager._usage_spend_from_response( { @@ -1329,6 +1357,31 @@ async def test_lazy_restore_preserves_pending_approval_tool_calls(): await _cancel_runtime_tasks(manager) +@pytest.mark.asyncio +async def test_lazy_restore_drops_malformed_pending_approval(): + store = RestoreStore( + metadata={ + "session_id": "bad-approval-session", + "user_id": "owner", + "model": "test-model", + "pending_approval": {"not": "a-list"}, + } + ) + manager = _manager_with_store(store) + stop = _install_fake_runtime(manager) + + try: + restored = await manager.ensure_session_loaded( + "bad-approval-session", user_id="owner" + ) + + assert restored is not None + assert restored.session.pending_approval is None + finally: + stop.set() + await _cancel_runtime_tasks(manager) + + @pytest.mark.asyncio async def test_lazy_restore_preserves_auto_approval_policy(): store = RestoreStore( @@ -1496,6 +1549,7 @@ async def list_sessions(self, user_id: str, **_: Any) -> list[dict[str, Any]]: "user_id": "bob", "model": "m", "created_at": datetime.now(UTC), + "pending_approval": {"not": "a-list"}, }, ] return [] @@ -1507,6 +1561,8 @@ async def list_sessions(self, user_id: str, **_: Any) -> list[dict[str, Any]]: assert store.seen_user_id == "dev" assert {session["session_id"] for session in sessions} == {"s1", "s2"} + malformed = next(session for session in sessions if session["session_id"] == "s2") + assert malformed["pending_approval"] is None yolo = next(session for session in sessions if session["session_id"] == "s1") assert yolo["auto_approval"] == { "enabled": True, diff --git a/tests/unit/test_session_persistence.py b/tests/unit/test_session_persistence.py index 5c5e2207..0953411c 100644 --- a/tests/unit/test_session_persistence.py +++ b/tests/unit/test_session_persistence.py @@ -3,6 +3,7 @@ from datetime import datetime import pytest +from pymongo.errors import PyMongoError from agent.core.session_persistence import ( MongoSessionStore, @@ -37,6 +38,50 @@ def test_unsafe_message_payload_is_replaced_with_marker(): assert marker["ml_intern_persistence_error"] == "message_too_large_or_invalid" +class _RecordingSessions: + def __init__(self) -> None: + self.update_calls = [] + + async def update_one(self, *args, **kwargs): + self.update_calls.append((args, kwargs)) + + +class _FailingSessionMessages: + async def bulk_write(self, *args, **kwargs): + raise PyMongoError("message write failed") + + +class _FailingSnapshotDB: + def __init__(self) -> None: + self.sessions = _RecordingSessions() + self.session_messages = _FailingSessionMessages() + + +def _store_with_snapshot_db(db) -> MongoSessionStore: + s = MongoSessionStore.__new__(MongoSessionStore) + s.enabled = True + s.db = db + return s + + +@pytest.mark.asyncio +async def test_strict_snapshot_does_not_update_metadata_when_message_write_fails(): + db = _FailingSnapshotDB() + store = _store_with_snapshot_db(db) + + with pytest.raises(PyMongoError): + await store.save_snapshot( + session_id="s1", + user_id="u1", + model="m", + messages=[{"role": "user", "content": "hello"}], + runtime_state="idle", + raise_on_error=True, + ) + + assert db.sessions.update_calls == [] + + # ── mark_pro_seen ───────────────────────────────────────────────────────── diff --git a/tests/unit/test_session_reaper.py b/tests/unit/test_session_reaper.py index 0a60f3ef..2515632f 100644 --- a/tests/unit/test_session_reaper.py +++ b/tests/unit/test_session_reaper.py @@ -8,6 +8,7 @@ from __future__ import annotations import asyncio +import logging import sys from datetime import datetime, timedelta from pathlib import Path @@ -21,7 +22,10 @@ sys.path.insert(0, str(_BACKEND_DIR)) import session_manager as sm # noqa: E402 -from agent.core.session_persistence import NoopSessionStore # noqa: E402 +from agent.core.session_persistence import ( # noqa: E402 + MongoSessionStore, + NoopSessionStore, +) from session_manager import ( # noqa: E402 AgentSession, Operation, @@ -36,6 +40,11 @@ def test_reaper_idle_default_is_fifteen_minutes(): assert sm.REAPER_IDLE == timedelta(minutes=15) +def test_reaper_window_defaults(): + assert sm.REAPER_TOOL_APPROVAL_IDLE_MINUTES == 60 + assert sm.REAPER_TOOL_APPROVAL_IDLE == timedelta(minutes=60) + + class RecordingStore(NoopSessionStore): """Captures every save_snapshot call so tests can assert persistence.""" @@ -51,6 +60,14 @@ def snapshots_for(self, session_id: str) -> list[dict[str, Any]]: return [s for s in self.snapshots if s.get("session_id") == session_id] +class FakeContextManager: + def __init__(self) -> None: + self.items: list[Any] = [] + + def add_message(self, message: Any, token_count: int | None = None) -> None: + self.items.append(message) + + class FakeSession: """Minimal Session stand-in supporting both persistence and _run_session.""" @@ -62,7 +79,7 @@ def __init__( ) -> None: self.hf_token = hf_token self.user_plan = user_plan - self.context_manager = SimpleNamespace(items=[]) + self.context_manager = FakeContextManager() self.pending_approval: Any = None self.turn_count = 0 self.config = SimpleNamespace(model_name="test-model", save_sessions=False) @@ -71,9 +88,14 @@ def __init__( self.auto_approval_cost_cap_usd = None self.auto_approval_estimated_spend_usd = 0.0 self.is_running = True + self.cancel_called = False + self.events: list[Any] = [] async def send_event(self, event: Any) -> None: - return None + self.events.append(event) + + def cancel(self) -> None: + self.cancel_called = True class FakeToolRouter: @@ -216,20 +238,25 @@ async def fake_cleanup(session: Any) -> None: [ # Fresh: touched just now. {"last_active_at": None}, - # Currently processing a turn. + # Processing work is never reaped while in flight. { "last_active_at": datetime.utcnow() - timedelta(hours=5), "is_processing": True, }, - # Awaiting tool approval ("approve later", not idle). + # Awaiting tool approval, still inside the 60-min grace window. { - "last_active_at": datetime.utcnow() - timedelta(hours=5), - "pending_approval": {"tool_calls": [object()]}, + "last_active_at": datetime.utcnow() - timedelta(minutes=30), + "pending_approval": {"tool_calls": [{"id": "tc-1"}]}, + }, + # Acknowledgement prompt raised moments ago. + { + "last_active_at": datetime.utcnow() - timedelta(minutes=5), + "pending_approval": {"kind": "usage_threshold", "tool_call_id": "u1"}, }, # Dev sessions are never reaped. {"last_active_at": datetime.utcnow() - timedelta(hours=5), "user_id": "dev"}, ], - ids=["fresh", "processing", "pending_approval", "dev"], + ids=["fresh", "processing", "pending_tool_in_window", "pending_ack_fresh", "dev"], ) async def test_reaper_spares(kwargs): manager = _manager() @@ -256,8 +283,9 @@ async def test_reap_aborts_when_message_enqueued_first(): manager.sessions["racing"] = agent_session agent_session.submission_queue.put_nowait(object()) - cutoff = datetime.utcnow() - sm.REAPER_IDLE - reaped = await manager._reap_one("racing", cutoff) + reaped = await manager._reap_one( + "racing", verdict="evict_idle", now=datetime.utcnow() + ) assert reaped is False assert "racing" in manager.sessions @@ -297,11 +325,13 @@ async def fake_cleanup(session: Any) -> None: agent_session = await _start_real_run_session( manager, "longturn", last_active_at=datetime.utcnow() ) + process_started = asyncio.Event() async def fake_process(session: Any, submission: Any) -> bool: # Simulate a turn that has been running far longer than the idle # window before it completes. agent_session.last_active_at = datetime.utcnow() - timedelta(hours=3) + process_started.set() return True monkeypatch.setattr(sm, "process_submission", fake_process) @@ -310,12 +340,17 @@ async def fake_process(session: Any, submission: Any) -> bool: sm.Submission(id="s1", operation=Operation(op_type=OpType.USER_INPUT, data={})) ) + await asyncio.wait_for(process_started.wait(), timeout=2.0) # Wait for the turn-finish stamp to land. for _ in range(200): await asyncio.sleep(0.01) - if datetime.utcnow() - agent_session.last_active_at < timedelta(minutes=1): + if ( + not agent_session.is_processing + and datetime.utcnow() - agent_session.last_active_at < timedelta(minutes=1) + ): break + assert not agent_session.is_processing assert datetime.utcnow() - agent_session.last_active_at < timedelta(minutes=1) await _cancel_tasks(manager) @@ -388,9 +423,14 @@ async def test_per_user_cap_frees_up_after_slot_reclaimed(): message = str(exc.value) assert f"maximum of {sm.MAX_SESSIONS_PER_USER} live sessions" in message assert "Close an existing session" in message - assert f"wait {sm.REAPER_IDLE_MINUTES:g} minutes" in message - assert "after your last activity" in message - assert "idle session to be released" in message + assert ( + f"wait {sm.REAPER_IDLE_MINUTES:g} minutes for idle or usage/cost " + "prompt sessions" + ) in message + assert ( + f"{sm.REAPER_TOOL_APPROVAL_IDLE_MINUTES:g} minutes for unanswered " + "tool approvals" + ) in message # Reclaiming a slot (the reaper evicts an idle session) frees capacity. manager.sessions.pop("owner-0") @@ -439,8 +479,9 @@ async def save_snapshot(self, **kwargs: Any) -> None: ) manager.sessions["idle"] = agent_session - cutoff = datetime.utcnow() - sm.REAPER_IDLE - reaped = await manager._reap_one("idle", cutoff) + reaped = await manager._reap_one( + "idle", verdict="evict_idle", now=datetime.utcnow() + ) assert reaped is False assert "idle" in manager.sessions @@ -470,8 +511,9 @@ async def save_snapshot( ) manager.sessions["idle"] = agent_session - cutoff = datetime.utcnow() - sm.REAPER_IDLE - reaped = await manager._reap_one("idle", cutoff) + reaped = await manager._reap_one( + "idle", verdict="evict_idle", now=datetime.utcnow() + ) assert reaped is False assert "idle" in manager.sessions @@ -501,8 +543,9 @@ async def slow_cleanup(session: Any) -> None: "restore-race", last_active_at=datetime.utcnow() - timedelta(hours=3), ) - cutoff = datetime.utcnow() - sm.REAPER_IDLE - reap_task = asyncio.create_task(manager._reap_one("restore-race", cutoff)) + reap_task = asyncio.create_task( + manager._reap_one("restore-race", verdict="evict_idle", now=datetime.utcnow()) + ) for _ in range(100): await asyncio.sleep(0.01) @@ -549,9 +592,10 @@ async def slow_cleanup(session: Any) -> None: agent_session = await _start_real_run_session( manager, "slow", last_active_at=datetime.utcnow() - timedelta(hours=3) ) - cutoff = datetime.utcnow() - sm.REAPER_IDLE - reap_task = asyncio.create_task(manager._reap_one("slow", cutoff)) + reap_task = asyncio.create_task( + manager._reap_one("slow", verdict="evict_idle", now=datetime.utcnow()) + ) # Let _reap_one persist + pop, then enter the teardown wait (the session # task is stuck in slow_cleanup, so the wait won't complete on its own). for _ in range(100): @@ -569,3 +613,295 @@ async def slow_cleanup(session: Any) -> None: release.set() if agent_session.task is not None: await asyncio.gather(agent_session.task, return_exceptions=True) + + +# ── Pending-approval eviction windows ──────────────────────────────────── + + +@pytest.mark.asyncio +@pytest.mark.parametrize("kind", ["usage_threshold", "yolo_budget"]) +async def test_reaper_evicts_pending_ack_after_idle_window(kind): + """Acknowledgement prompts (usage-threshold / YOLO-cap) are auto-created + at turn end with no user action; they must not pin a slot past the normal + idle window, and must stay answerable after restore.""" + manager = _manager() + + async def fake_cleanup(session: Any) -> None: + return None + + manager._cleanup_sandbox = fake_cleanup # type: ignore[method-assign] + + agent_session = await _start_real_run_session( + manager, + "ack", + last_active_at=datetime.utcnow() - timedelta(minutes=20), + ) + pending = {"kind": kind, "tool_call_id": "p1", "continuation": "continue_agent"} + agent_session.session.pending_approval = pending + + await manager._reap_idle_sessions() + + assert "ack" not in manager.sessions + snapshots = manager.persistence_store.snapshots_for("ack") + assert snapshots, "eviction must persist a resumable snapshot" + assert all(s["status"] == "active" for s in snapshots) + assert snapshots[-1]["runtime_state"] == "waiting_approval" + assert snapshots[-1]["pending_approval"] == [pending] + restored = FakeSession() + manager._restore_pending_approval(restored, snapshots[-1]["pending_approval"]) + assert restored.pending_approval == pending + + +@pytest.mark.asyncio +async def test_reaper_evicts_tool_approval_after_long_window(): + """Real tool-permission prompts expire before reaping. + + They must not remain actionable after their sandbox is torn down. + """ + manager = _manager() + + async def fake_cleanup(session: Any) -> None: + return None + + manager._cleanup_sandbox = fake_cleanup # type: ignore[method-assign] + from litellm import ChatCompletionMessageToolCall as ToolCall + + agent_session = await _start_real_run_session( + manager, + "tool-approval", + last_active_at=datetime.utcnow() - timedelta(hours=2), + ) + tool_call = ToolCall( + id="tc-1", + type="function", + function={"name": "create_file", "arguments": '{"path":"app.py"}'}, + ) + agent_session.session.pending_approval = {"tool_calls": [tool_call]} + serialized = manager._serialize_pending_approval(agent_session.session) + restored = FakeSession() + manager._restore_pending_approval(restored, serialized) + restored_tool_calls = restored.pending_approval["tool_calls"] + assert restored_tool_calls[0].id == "tc-1" + assert restored_tool_calls[0].function.name == "create_file" + + await manager._reap_idle_sessions() + + assert "tool-approval" not in manager.sessions + assert agent_session.session.pending_approval is None + snapshots = manager.persistence_store.snapshots_for("tool-approval") + assert snapshots + assert all(s["status"] == "active" for s in snapshots) + assert snapshots[-1]["runtime_state"] == "idle" + assert snapshots[-1]["pending_approval"] == [] + [expired_msg] = agent_session.session.context_manager.items + assert expired_msg.role == "tool" + assert expired_msg.tool_call_id == "tc-1" + assert expired_msg.name == "create_file" + assert "expired after inactivity" in expired_msg.content + [event] = [ + event + for event in agent_session.session.events + if event.event_type == "tool_state_change" + ] + assert event.event_type == "tool_state_change" + assert event.data["state"] == "abandoned" + assert event.data["reason"] == "expired_inactive" + + +@pytest.mark.asyncio +async def test_reaper_spares_quiet_processing_session(): + """Processing work is preserved even if it has been quiet for a long time.""" + manager = _manager() + agent_session = _make_agent_session( + "busy", + last_active_at=datetime.utcnow() - timedelta(hours=5), + is_processing=True, + ) + manager.sessions["busy"] = agent_session + + await manager._reap_idle_sessions() + + assert "busy" in manager.sessions + assert agent_session.is_reaping is False + + +@pytest.mark.asyncio +async def test_reaper_spares_quiet_processing_session_with_queued_messages(): + """Accepted queued submissions are not dropped by processing-session reaps.""" + manager = _manager() + + async def fake_cleanup(session: Any) -> None: + return None + + manager._cleanup_sandbox = fake_cleanup # type: ignore[method-assign] + + agent_session = _make_agent_session( + "busy-queued", + last_active_at=datetime.utcnow() - timedelta(hours=5), + is_processing=True, + ) + manager.sessions["busy-queued"] = agent_session + agent_session.submission_queue.put_nowait(object()) + + await manager._reap_idle_sessions() + + assert "busy-queued" in manager.sessions + assert agent_session.submission_queue.qsize() == 1 + assert agent_session.is_reaping is False + assert agent_session.session.cancel_called is False + assert manager.persistence_store.snapshots_for("busy-queued") == [] + + +# ── Verdict re-check ───────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_reap_one_aborts_on_verdict_mismatch(): + """A session whose state changes between selection and teardown (e.g. the + prompt was answered) must abort this reap cycle.""" + manager = _manager() + agent_session = _make_agent_session( + "answered", + last_active_at=datetime.utcnow() - timedelta(hours=2), + pending_approval={"tool_calls": [{"id": "tc-1"}]}, + ) + manager.sessions["answered"] = agent_session + + now = datetime.utcnow() + assert manager._reaper_verdict(agent_session, now) == "evict_pending_tool" + + # Approval answered in the gap between selection and teardown. + agent_session.session.pending_approval = None + reaped = await manager._reap_one("answered", verdict="evict_pending_tool", now=now) + + assert reaped is False + assert "answered" in manager.sessions + assert agent_session.is_reaping is False + + +# ── Mongo store retry ──────────────────────────────────────────────────── + + +class FlippableMongoStore(MongoSessionStore): + """MongoSessionStore whose init() flips enabled without touching the + network, to exercise the sweep's maybe_reconnect path.""" + + def __init__(self, *, recovers: bool) -> None: + super().__init__("mongodb://unreachable.invalid", "testdb") + self.recovers = recovers + self.init_calls = 0 + self.snapshots: list[dict[str, Any]] = [] + + async def init(self) -> None: + self.init_calls += 1 + self.enabled = self.recovers + + async def save_snapshot(self, **kwargs: Any) -> None: + self.snapshots.append(kwargs) + + +@pytest.mark.asyncio +async def test_reaper_retries_disabled_mongo_store(): + """A Mongo store that failed its boot-time init is retried at sweep time, + so one boot blip no longer disables reaping until the next restart.""" + manager = _manager() + + async def fake_cleanup(session: Any) -> None: + return None + + manager._cleanup_sandbox = fake_cleanup # type: ignore[method-assign] + store = FlippableMongoStore(recovers=True) + manager.persistence_store = store + manager.sessions["idle"] = _make_agent_session( + "idle", last_active_at=datetime.utcnow() - timedelta(hours=1) + ) + + await manager._reap_idle_sessions() + + assert store.init_calls == 1 + assert "idle" not in manager.sessions + assert store.snapshots + + +@pytest.mark.asyncio +async def test_reaper_stays_noop_while_mongo_store_down(): + manager = _manager() + store = FlippableMongoStore(recovers=False) + manager.persistence_store = store + agent_session = _make_agent_session( + "idle", last_active_at=datetime.utcnow() - timedelta(hours=1) + ) + manager.sessions["idle"] = agent_session + + await manager._reap_idle_sessions() + + assert store.init_calls == 1 + assert "idle" in manager.sessions + assert agent_session.is_reaping is False + + +# ── Capacity-error breakdown ───────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_capacity_error_message_breaks_down_held_slots(): + manager = _manager() + sessions = [] + for i in range(3): + sessions.append( + _make_agent_session( + f"tool-{i}", pending_approval={"tool_calls": [{"id": f"t{i}"}]} + ) + ) + for i in range(2): + sessions.append( + _make_agent_session( + f"ack-{i}", + pending_approval={"kind": "usage_threshold", "tool_call_id": f"a{i}"}, + ) + ) + sessions.append(_make_agent_session("busy", is_processing=True)) + for i in range(4): + sessions.append(_make_agent_session(f"idle-{i}")) + assert len(sessions) == sm.MAX_SESSIONS_PER_USER + for agent_session in sessions: + manager.sessions[agent_session.session_id] = agent_session + + with pytest.raises(SessionCapacityError) as exc: + await manager.create_session(user_id="owner") + + message = str(exc.value) + assert f"maximum of {sm.MAX_SESSIONS_PER_USER} live sessions" in message + assert "Close an existing session" in message + assert "Currently held: 3 awaiting tool approval, 2 usage/cost prompts" in message + assert "1 still processing, 4 idle." in message + + +# ── Sweep observability ────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_sweep_log_emitted_only_when_nonzero(caplog): + manager = _manager() + manager.sessions["busy-only"] = _make_agent_session("busy-only", is_processing=True) + + with caplog.at_level(logging.INFO): + await manager._reap_idle_sessions() + assert "Reaper sweep:" not in caplog.text + manager.sessions.clear() + + async def fake_cleanup(session: Any) -> None: + return None + + manager._cleanup_sandbox = fake_cleanup # type: ignore[method-assign] + manager.sessions["idle"] = _make_agent_session( + "idle", last_active_at=datetime.utcnow() - timedelta(hours=1) + ) + manager.sessions["busy"] = _make_agent_session("busy", is_processing=True) + + with caplog.at_level(logging.INFO): + await manager._reap_idle_sessions() + + assert "Reaper sweep:" in caplog.text + assert "evicted_idle=1" in caplog.text + assert "skipped_processing=1" in caplog.text