diff --git a/src/browser/components/AppLoader.tsx b/src/browser/components/AppLoader.tsx index 1a1f2a047c..8f93c98830 100644 --- a/src/browser/components/AppLoader.tsx +++ b/src/browser/components/AppLoader.tsx @@ -92,6 +92,11 @@ function AppLoaderInner() { getPRStatusStoreInstance().setClient(api ?? null); if (!workspaceContext.loading) { + // Tell the store which workspace is selected before syncing, so it only + // subscribes to onChat for that workspace (not all of them). + workspaceStoreInstance.setSelectedWorkspaceId( + workspaceContext.selectedWorkspace?.workspaceId ?? null + ); workspaceStoreInstance.syncWorkspaces(workspaceContext.workspaceMetadata); gitStatusStore.syncWorkspaces(workspaceContext.workspaceMetadata); @@ -107,6 +112,7 @@ function AppLoaderInner() { }, [ workspaceContext.loading, workspaceContext.workspaceMetadata, + workspaceContext.selectedWorkspace?.workspaceId, workspaceStoreInstance, gitStatusStore, backgroundBashStore, diff --git a/src/browser/components/ConnectionStatusToast.tsx b/src/browser/components/ConnectionStatusToast.tsx index 7bcbbf27de..9a336abe2f 100644 --- a/src/browser/components/ConnectionStatusToast.tsx +++ b/src/browser/components/ConnectionStatusToast.tsx @@ -32,7 +32,7 @@ export const ConnectionStatusToast: React.FC = ({ wr return null; } - if (apiState.status === "degraded" || apiState.status === "reconnecting") { + if (apiState.status === "reconnecting") { const content = (
= ({ wr > - {apiState.status === "degraded" ? ( - "Connection unstable — messages may be delayed" - ) : ( - <> - Reconnecting to server - {apiState.attempt > 1 && ` (attempt ${apiState.attempt})`}… - - )} + Reconnecting to server + {apiState.attempt > 1 && ` (attempt ${apiState.attempt})`}…
); diff --git a/src/browser/contexts/API.test.tsx b/src/browser/contexts/API.test.tsx index 271e6cab10..0cf359f7b1 100644 --- a/src/browser/contexts/API.test.tsx +++ b/src/browser/contexts/API.test.tsx @@ -1,63 +1,20 @@ import { act, cleanup, render, waitFor } from "@testing-library/react"; +import type { APIClient } from "@/browser/contexts/API"; import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test"; import { GlobalWindow } from "happy-dom"; -// Mock WebSocket that we can control -class MockWebSocket { - static instances: MockWebSocket[] = []; - url: string; - readyState = 0; // CONNECTING - eventListeners = new Map void>>(); - - constructor(url: string) { - this.url = url; - MockWebSocket.instances.push(this); - } - - addEventListener(event: string, handler: (event?: unknown) => void) { - const handlers = this.eventListeners.get(event) ?? []; - handlers.push(handler); - this.eventListeners.set(event, handlers); - } - - close() { - this.readyState = 3; // CLOSED - } - - // Test helpers - simulateOpen() { - this.readyState = 1; // OPEN - this.eventListeners.get("open")?.forEach((h) => h()); - } - - simulateClose(code: number) { - this.readyState = 3; - this.eventListeners.get("close")?.forEach((h) => h({ code })); - } - - simulateError() { - this.eventListeners.get("error")?.forEach((h) => h()); - } - - static reset() { - MockWebSocket.instances = []; - } - - static lastInstance(): MockWebSocket | undefined { - return MockWebSocket.instances[MockWebSocket.instances.length - 1]; - } -} +// Control what ping() returns across tests. +let mockPing: (input: string) => Promise = () => Promise.resolve("pong"); -// Mock orpc client void mock.module("@/common/orpc/client", () => ({ createClient: () => ({ general: { - ping: () => Promise.resolve("pong"), + ping: (input: string) => mockPing(input), }, }), })); -void mock.module("@orpc/client/websocket", () => ({ +void mock.module("@orpc/client/fetch", () => ({ RPCLink: class {}, })); @@ -98,10 +55,7 @@ function APIStateObserver(props: { onState: (state: UseAPIResult) => void }) { return null; } -// Factory that creates MockWebSocket instances (injected via prop) -const createMockWebSocket = (url: string) => new MockWebSocket(url) as unknown as WebSocket; - -describe("API reconnection", () => { +describe("API connection (fetch transport)", () => { beforeEach(() => { // Minimal DOM setup required by @testing-library/react. // @@ -110,185 +64,126 @@ describe("API reconnection", () => { const happyWindow = new GlobalWindow({ url: "https://mux.example.com/" }); globalThis.window = happyWindow as unknown as Window & typeof globalThis; globalThis.document = happyWindow.document as unknown as Document; - MockWebSocket.reset(); + + // Default: ping succeeds. + mockPing = () => Promise.resolve("pong"); }); afterEach(() => { cleanup(); - MockWebSocket.reset(); globalThis.window = undefined as unknown as Window & typeof globalThis; globalThis.document = undefined as unknown as Document; }); - test("constructs WebSocket URL with app proxy prefix", () => { - window.location.href = "https://coder.example.com/@u/ws/apps/mux/?token=abc"; - - render( - - undefined} /> - - ); - - const ws1 = MockWebSocket.lastInstance(); - expect(ws1).toBeDefined(); - expect(ws1!.url).toBe("wss://coder.example.com/@u/ws/apps/mux/orpc/ws?token=abc"); - }); - - test("reconnects on close without showing auth_required when previously connected", async () => { + test("transitions to connected when ping succeeds", async () => { const states: string[] = []; render( - + states.push(s.status)} /> ); - const ws1 = MockWebSocket.lastInstance(); - expect(ws1).toBeDefined(); - - // Simulate successful connection (open + ping success) - await act(async () => { - ws1!.simulateOpen(); - // Wait for ping promise to resolve - await new Promise((r) => setTimeout(r, 10)); - }); - - expect(states).toContain("connected"); - - // Simulate server restart (close code 1006 = abnormal closure) - act(() => { - ws1!.simulateClose(1006); - }); - - // Should be "reconnecting", NOT "auth_required" - await waitFor(() => { - expect(states).toContain("reconnecting"); - }); - - expect(states.filter((s) => s === "auth_required")).toHaveLength(0); - - // New WebSocket should be created for reconnect attempt (after delay) await waitFor(() => { - expect(MockWebSocket.instances.length).toBeGreaterThan(1); + expect(states).toContain("connected"); }); }); - test("shows auth_required on close with auth error codes (4401)", async () => { + test("shows auth_required when ping returns an auth error", async () => { + mockPing = () => Promise.reject(new Error("401 Unauthorized")); + const states: string[] = []; render( - + states.push(s.status)} /> ); - const ws1 = MockWebSocket.lastInstance(); - expect(ws1).toBeDefined(); - - await act(async () => { - ws1!.simulateOpen(); - await new Promise((r) => setTimeout(r, 10)); - }); - - expect(states).toContain("connected"); - - act(() => { - ws1!.simulateClose(4401); - }); - await waitFor(() => { expect(states).toContain("auth_required"); }); }); - test("shows auth_required on close with auth error codes (1008)", async () => { + test("retries on first connection failure without showing auth_required", async () => { + // First call fails with a network error, subsequent calls succeed. + let callCount = 0; + mockPing = () => { + callCount++; + if (callCount === 1) { + return Promise.reject(new Error("Failed to fetch")); + } + return Promise.resolve("pong"); + }; + const states: string[] = []; render( - + states.push(s.status)} /> ); - const ws1 = MockWebSocket.lastInstance(); - expect(ws1).toBeDefined(); - - await act(async () => { - ws1!.simulateOpen(); - await new Promise((r) => setTimeout(r, 10)); + // Should retry via reconnection, not show auth_required. + await waitFor(() => { + expect(states).toContain("reconnecting"); }); - expect(states).toContain("connected"); - - act(() => { - ws1!.simulateClose(1008); - }); + expect(states.filter((s) => s === "auth_required")).toHaveLength(0); + // Eventually connects on retry. await waitFor(() => { - expect(states).toContain("auth_required"); + expect(states).toContain("connected"); }); }); - test("retries on first connection failure without showing auth_required", async () => { + test("uses pre-created client and skips connection flow", async () => { + const mockClient = { + general: { ping: () => Promise.resolve("pong") }, + } as unknown as APIClient; + const states: string[] = []; render( - + states.push(s.status)} /> ); - const ws1 = MockWebSocket.lastInstance(); - expect(ws1).toBeDefined(); - - // First connection fails - browser fires error then close. - act(() => { - ws1!.simulateError(); - ws1!.simulateClose(1006); - }); - + // Should immediately be connected without going through connecting. await waitFor(() => { - expect(states).toContain("reconnecting"); - }); - - expect(states.filter((s) => s === "auth_required")).toHaveLength(0); - - // Should create a new WebSocket for the reconnect attempt. - await waitFor(() => { - expect(MockWebSocket.instances.length).toBeGreaterThan(1); + expect(states[0]).toBe("connected"); }); }); - test("reconnects on connection loss when previously connected", async () => { - const states: string[] = []; + test("authenticate() triggers reconnection with new token", async () => { + mockPing = () => Promise.reject(new Error("401 Unauthorized")); + + const capturedStates: UseAPIResult[] = []; render( - - states.push(s.status)} /> + + capturedStates.push(s)} /> ); - const ws1 = MockWebSocket.lastInstance(); - expect(ws1).toBeDefined(); - - await act(async () => { - ws1!.simulateOpen(); - await new Promise((r) => setTimeout(r, 10)); + // Wait for auth_required state. + await waitFor(() => { + expect(capturedStates.some((s) => s.status === "auth_required")).toBe(true); }); - expect(states).toContain("connected"); + // Now make ping succeed and call authenticate. + mockPing = () => Promise.resolve("pong"); - // Connection lost after being connected - act(() => { - ws1!.simulateError(); - ws1!.simulateClose(1006); + await act(async () => { + const lastState = capturedStates[capturedStates.length - 1]; + expect(lastState.status).toBe("auth_required"); + lastState.authenticate("new-token"); + await new Promise((r) => setTimeout(r, 50)); }); await waitFor(() => { - expect(states).toContain("reconnecting"); + expect(capturedStates.some((s) => s.status === "connected")).toBe(true); }); - - const authRequiredAfterConnected = states.slice(states.indexOf("connected") + 1); - expect(authRequiredAfterConnected.filter((s) => s === "auth_required")).toHaveLength(0); }); }); diff --git a/src/browser/contexts/API.tsx b/src/browser/contexts/API.tsx index 4bbb9285ea..6e99c16a8c 100644 --- a/src/browser/contexts/API.tsx +++ b/src/browser/contexts/API.tsx @@ -1,14 +1,14 @@ import { createContext, + useCallback, useContext, useEffect, - useState, - useCallback, - useRef, useMemo, + useRef, + useState, } from "react"; import { createClient } from "@/common/orpc/client"; -import { RPCLink as WebSocketLink } from "@orpc/client/websocket"; +import { RPCLink } from "@orpc/client/fetch"; import { RPCLink as MessagePortLink } from "@orpc/client/message-port"; import { getStoredAuthToken, @@ -25,7 +25,6 @@ export type { APIClient }; export type APIState = | { status: "connecting"; api: null; error: null } | { status: "connected"; api: APIClient; error: null } - | { status: "degraded"; api: APIClient; error: null } // Connected but pings failing | { status: "reconnecting"; api: null; error: null; attempt: number } | { status: "auth_required"; api: null; error: string | null } | { status: "error"; api: null; error: string }; @@ -42,7 +41,6 @@ export type UseAPIResult = APIState & APIStateMethods; type ConnectionState = | { status: "connecting" } | { status: "connected"; client: APIClient; cleanup: () => void } - | { status: "degraded"; client: APIClient; cleanup: () => void } // Pings failing | { status: "reconnecting"; attempt: number } | { status: "auth_required"; error?: string } | { status: "error"; error: string }; @@ -55,8 +53,7 @@ const MAX_DELAY_MS = 10000; // Liveness check constants const LIVENESS_INTERVAL_MS = 5000; // Check every 5 seconds const LIVENESS_TIMEOUT_MS = 3000; // Ping must respond within 3 seconds -const CONSECUTIVE_FAILURES_FOR_DEGRADED = 2; // Mark degraded after N failures -const CONSECUTIVE_FAILURES_FOR_RECONNECT = 3; // Force reconnect after N failures (WS may be half-open) +const CONSECUTIVE_FAILURES_FOR_RECONNECT = 3; // Force reconnect after N consecutive failures const APIContext = createContext(null); @@ -64,19 +61,6 @@ interface APIProviderProps { children: React.ReactNode; /** Optional pre-created client. If provided, skips internal connection setup. */ client?: APIClient; - /** WebSocket factory for testing. Defaults to native WebSocket constructor. */ - createWebSocket?: (url: string) => WebSocket; -} - -function closeWebSocketSafely(ws: WebSocket) { - try { - // readyState: 0 = CONNECTING, 1 = OPEN, 2 = CLOSING, 3 = CLOSED - if (ws.readyState === 2 || ws.readyState === 3) return; - ws.close(); - } catch { - // Some browsers throw if close() is called while already closing/closed. - // Since our cleanup can be invoked from multiple code paths, treat close as idempotent. - } } function createElectronClient(): { client: APIClient; cleanup: () => void } { @@ -92,29 +76,22 @@ function createElectronClient(): { client: APIClient; cleanup: () => void } { }; } -function createBrowserClient( - authToken: string | null, - createWebSocket: (url: string) => WebSocket -): { +function createBrowserClient(authToken: string | null): { client: APIClient; cleanup: () => void; - ws: WebSocket; } { const apiBaseUrl = getBrowserBackendBaseUrl(); - const wsUrl = new URL(`${apiBaseUrl}/orpc/ws`); - wsUrl.protocol = wsUrl.protocol === "https:" ? "wss:" : "ws:"; - if (authToken) { - wsUrl.searchParams.set("token", authToken); - } - - const ws = createWebSocket(wsUrl.toString()); - const link = new WebSocketLink({ websocket: ws }); + const link = new RPCLink({ + url: `${apiBaseUrl}/orpc`, + headers: authToken ? { Authorization: `Bearer ${authToken}` } : {}, + }); return { client: createClient(link), - cleanup: () => closeWebSocketSafely(ws), - ws, + // HTTP/fetch transport has no persistent connection to close. + // eslint-disable-next-line @typescript-eslint/no-empty-function + cleanup: () => {}, }; } @@ -145,12 +122,6 @@ export const APIProvider = (props: APIProviderProps) => { const scheduleReconnectRef = useRef<(() => void) | null>(null); const consecutivePingFailuresRef = useRef(0); const connectionIdRef = useRef(0); - const forceReconnectInProgressRef = useRef(false); - - const wsFactory = useMemo( - () => props.createWebSocket ?? ((url: string) => new WebSocket(url)), - [props.createWebSocket] - ); const connect = useCallback( (token: string | null) => { @@ -172,8 +143,7 @@ export const APIProvider = (props: APIProviderProps) => { return; } - // Skip Electron detection if custom WebSocket factory provided (for testing) - if (!props.createWebSocket && window.api) { + if (window.api) { const { client, cleanup } = createElectronClient(); window.__ORPC_CLIENT__ = client; cleanupRef.current = cleanup; @@ -181,96 +151,51 @@ export const APIProvider = (props: APIProviderProps) => { return; } + // HTTP/fetch transport — verify reachability via auth-check ping. setState({ status: "connecting" }); - const { client, cleanup, ws } = createBrowserClient(token, wsFactory); - - ws.addEventListener("open", () => { - // Ignore stale connections (can happen if we force reconnect while the old socket is mid-flight). - if (connectionId !== connectionIdRef.current) { - cleanup(); - return; - } - - client.general - .ping("auth-check") - .then(() => { - // Ignore stale connections (e.g., auth-check returned after a new connect()). - if (connectionId !== connectionIdRef.current) { - cleanup(); - return; - } - - hasConnectedRef.current = true; - reconnectAttemptRef.current = 0; - consecutivePingFailuresRef.current = 0; - forceReconnectInProgressRef.current = false; - window.__ORPC_CLIENT__ = client; - cleanupRef.current = cleanup; - setState({ status: "connected", client, cleanup }); - }) - .catch((err: unknown) => { - if (connectionId !== connectionIdRef.current) { - cleanup(); - return; - } + const { client, cleanup } = createBrowserClient(token); + client.general + .ping("auth-check") + .then(() => { + // Ignore stale connections (e.g., auth-check returned after a new connect()). + if (connectionId !== connectionIdRef.current) { cleanup(); - forceReconnectInProgressRef.current = false; - const errMsg = err instanceof Error ? err.message : String(err); - const errMsgLower = errMsg.toLowerCase(); - const isAuthError = - errMsgLower.includes("unauthorized") || - errMsgLower.includes("401") || - errMsgLower.includes("auth token") || - errMsgLower.includes("authentication"); - if (isAuthError) { - clearStoredAuthToken(); - setState({ status: "auth_required", error: token ? "Invalid token" : undefined }); - } else { - setState({ status: "error", error: errMsg }); - } - }); - }); - - // Note: Browser fires 'error' before 'close', so we handle reconnection - // only in 'close' to avoid double-scheduling. The 'error' event just - // signals that something went wrong; 'close' provides the final state. - ws.addEventListener("error", () => { - // Error occurred - close event will follow and handle reconnection - // We don't call cleanup() here since close handler will do it - }); - - ws.addEventListener("close", (event) => { - cleanup(); - - // Ignore stale connections (can happen if we force reconnect while the old socket is mid-flight). - if (connectionId !== connectionIdRef.current) { - return; - } - - forceReconnectInProgressRef.current = false; - - // Auth-specific close codes - if (event.code === 1008 || event.code === 4401) { - clearStoredAuthToken(); - hasConnectedRef.current = false; // Reset - need fresh auth - setState({ status: "auth_required", error: "Authentication required" }); - return; - } - - // If we were previously connected, try to reconnect - if (hasConnectedRef.current) { - scheduleReconnectRef.current?.(); - return; - } + return; + } + + hasConnectedRef.current = true; + reconnectAttemptRef.current = 0; + consecutivePingFailuresRef.current = 0; + window.__ORPC_CLIENT__ = client; + cleanupRef.current = cleanup; + setState({ status: "connected", client, cleanup }); + }) + .catch((err: unknown) => { + if (connectionId !== connectionIdRef.current) { + cleanup(); + return; + } - // First connection failed. - // This can happen in dev-server mode if the UI boots before the backend is ready. - // Prefer retry/backoff over forcing the auth modal (auth will be detected via ping/close codes). - scheduleReconnectRef.current?.(); - }); + cleanup(); + const errMsg = err instanceof Error ? err.message : String(err); + const errMsgLower = errMsg.toLowerCase(); + const isAuthError = + errMsgLower.includes("unauthorized") || + errMsgLower.includes("401") || + errMsgLower.includes("auth token") || + errMsgLower.includes("authentication"); + + if (isAuthError) { + clearStoredAuthToken(); + setState({ status: "auth_required", error: token ? "Invalid token" : undefined }); + } else { + // Network error or backend not ready — retry with backoff. + scheduleReconnectRef.current?.(); + } + }); }, - [props.client, props.createWebSocket, wsFactory] + [props.client] ); // Schedule reconnection with exponential backoff @@ -308,56 +233,34 @@ export const APIProvider = (props: APIProviderProps) => { // eslint-disable-next-line react-hooks/exhaustive-deps }, []); - // Liveness check: periodic ping to detect degraded connections - // Only runs for browser WebSocket connections (not Electron or test clients) + // Liveness check: periodic ping to detect backend unavailability. + // Only runs for browser HTTP connections (not Electron or test clients). useEffect(() => { - // Only check liveness for connected/degraded browser connections - if (state.status !== "connected" && state.status !== "degraded") return; + if (state.status !== "connected") return; // Skip for Electron (MessagePort) and test clients (externally provided) - if (props.client || (!props.createWebSocket && window.api)) return; + if (props.client || window.api) return; const client = state.client; - const cleanup = state.cleanup; const checkLiveness = async () => { try { - // Race ping against timeout const pingPromise = client.general.ping("liveness"); const timeoutPromise = new Promise((_, reject) => setTimeout(() => reject(new Error("Ping timeout")), LIVENESS_TIMEOUT_MS) ); await Promise.race([pingPromise, timeoutPromise]); - - // Ping succeeded - reset failure count and restore connected state if degraded consecutivePingFailuresRef.current = 0; - forceReconnectInProgressRef.current = false; - if (state.status === "degraded") { - setState({ status: "connected", client, cleanup }); - } } catch { - // Ping failed consecutivePingFailuresRef.current++; - if ( - consecutivePingFailuresRef.current >= CONSECUTIVE_FAILURES_FOR_RECONNECT && - !forceReconnectInProgressRef.current - ) { - forceReconnectInProgressRef.current = true; + if (consecutivePingFailuresRef.current >= CONSECUTIVE_FAILURES_FOR_RECONNECT) { console.warn( `[APIProvider] Liveness ping failed ${consecutivePingFailuresRef.current} times; reconnecting...` ); - cleanup(); connect(authToken); return; } - - if ( - consecutivePingFailuresRef.current >= CONSECUTIVE_FAILURES_FOR_DEGRADED && - state.status === "connected" - ) { - setState({ status: "degraded", client, cleanup }); - } } }; @@ -365,7 +268,7 @@ export const APIProvider = (props: APIProviderProps) => { void checkLiveness(); }, LIVENESS_INTERVAL_MS); return () => clearInterval(intervalId); - }, [state, props.client, props.createWebSocket, connect, authToken]); + }, [state, props.client, connect, authToken]); const authenticate = useCallback( (token: string) => { @@ -388,8 +291,6 @@ export const APIProvider = (props: APIProviderProps) => { return { status: "connecting", api: null, error: null, ...base }; case "connected": return { status: "connected", api: state.client, error: null, ...base }; - case "degraded": - return { status: "degraded", api: state.client, error: null, ...base }; case "reconnecting": return { status: "reconnecting", api: null, error: null, attempt: state.attempt, ...base }; case "auth_required": diff --git a/src/browser/contexts/WorkspaceContext.test.tsx b/src/browser/contexts/WorkspaceContext.test.tsx index 180cae0ffb..7e48baf073 100644 --- a/src/browser/contexts/WorkspaceContext.test.tsx +++ b/src/browser/contexts/WorkspaceContext.test.tsx @@ -69,6 +69,11 @@ describe("WorkspaceContext", () => { workspace: { list: () => Promise.resolve(initialWorkspaces), }, + // Navigate to the workspace so it becomes the selected one — + // only the selected workspace gets an onChat subscription. + localStorage: { + [SELECTED_WORKSPACE_KEY]: JSON.stringify({ workspaceId: "ws-sync-load" }), + }, }); const ctx = await setup(); diff --git a/src/browser/contexts/WorkspaceContext.tsx b/src/browser/contexts/WorkspaceContext.tsx index 38041d599c..ddf68267ff 100644 --- a/src/browser/contexts/WorkspaceContext.tsx +++ b/src/browser/contexts/WorkspaceContext.tsx @@ -516,6 +516,13 @@ export function WorkspaceProvider(props: WorkspaceProviderProps) { } = useRouter(); const workspaceStore = useWorkspaceStoreRaw(); + + // Ref tracking the current workspace ID from the URL so the setWorkspaceMetadata + // callback can read it without adding currentWorkspaceId to its dependency array + // (which would destabilize the callback reference on every navigation). + const currentWorkspaceIdRef = useRef(currentWorkspaceId); + currentWorkspaceIdRef.current = currentWorkspaceId; + const [workspaceMetadata, setWorkspaceMetadataState] = useState< Map >(new Map()); @@ -523,11 +530,10 @@ export function WorkspaceProvider(props: WorkspaceProviderProps) { (update: SetStateAction>) => { setWorkspaceMetadataState((prev) => { const next = typeof update === "function" ? update(prev) : update; - // IMPORTANT: Sync the imperative WorkspaceStore first so hooks (AIView, - // LeftSidebar, etc.) never render with a selected workspace ID before - // the store has subscribed and created its aggregator. Otherwise the - // render path hits WorkspaceStore.assertGet() and throws the - // "Workspace not found - must call addWorkspace() first" assert. + // IMPORTANT: Tell the store which workspace is selected BEFORE syncing + // so it only subscribes to onChat for that one workspace. Reading from + // the ref ensures we always have the latest URL-derived selection. + workspaceStore.setSelectedWorkspaceId(currentWorkspaceIdRef.current ?? null); workspaceStore.syncWorkspaces(next); return next; }); @@ -799,6 +805,20 @@ export function WorkspaceProvider(props: WorkspaceProviderProps) { const selectedWorkspaceRef = useRef(selectedWorkspace); selectedWorkspaceRef.current = selectedWorkspace; + // Keep the WorkspaceStore in sync with the selected workspace. Must run + // synchronously during render (not in a useEffect) because child components + // call useWorkspaceState() which requires the aggregator to exist. + // If deferred to useEffect, there's a render gap where WorkspaceShell tries + // to access the aggregator before it's created → assertion failure. + // This follows the same pattern as setWorkspaceMetadata (line 531) which + // also calls store methods synchronously inside a React state updater. + const prevSelectedIdRef = useRef(null); + const currentSelectedId = selectedWorkspace?.workspaceId ?? null; + if (prevSelectedIdRef.current !== currentSelectedId) { + prevSelectedIdRef.current = currentSelectedId; + workspaceStore.setSelectedWorkspaceId(currentSelectedId); + } + // setSelectedWorkspace navigates to the workspace URL (or clears if null) const setSelectedWorkspace = useCallback( (update: SetStateAction) => { diff --git a/src/browser/stores/WorkspaceStore.test.ts b/src/browser/stores/WorkspaceStore.test.ts index c6d2696fff..ea2d63cb88 100644 --- a/src/browser/stores/WorkspaceStore.test.ts +++ b/src/browser/stores/WorkspaceStore.test.ts @@ -232,12 +232,31 @@ describe("WorkspaceStore", () => { runtimeConfig: DEFAULT_RUNTIME_CONFIG, }; + // Only the selected workspace gets an onChat subscription + store.setSelectedWorkspaceId("workspace-1"); const workspaceMap = new Map([[metadata1.id, metadata1]]); store.syncWorkspaces(workspaceMap); expect(mockOnChat).toHaveBeenCalledWith({ workspaceId: "workspace-1" }, expect.anything()); }); + it("should not subscribe to non-selected workspaces", () => { + const metadata1: FrontendWorkspaceMetadata = { + id: "workspace-1", + name: "workspace-1", + projectName: "project-1", + projectPath: "/project-1", + namedWorkspacePath: "/path/1", + createdAt: new Date().toISOString(), + runtimeConfig: DEFAULT_RUNTIME_CONFIG, + }; + + // No workspace selected — syncWorkspaces should store metadata but not subscribe + store.syncWorkspaces(new Map([[metadata1.id, metadata1]])); + + expect(mockOnChat).not.toHaveBeenCalled(); + }); + it("should remove deleted workspaces", () => { const metadata1: FrontendWorkspaceMetadata = { id: "workspace-1", diff --git a/src/browser/stores/WorkspaceStore.ts b/src/browser/stores/WorkspaceStore.ts index 6a2c4ced7f..40e114bb14 100644 --- a/src/browser/stores/WorkspaceStore.ts +++ b/src/browser/stores/WorkspaceStore.ts @@ -1,7 +1,11 @@ import assert from "@/common/utils/assert"; import type { MuxMessage, DisplayedMessage, QueuedMessage } from "@/common/types/message"; import type { FrontendWorkspaceMetadata } from "@/common/types/workspace"; -import type { WorkspaceChatMessage, WorkspaceStatsSnapshot } from "@/common/orpc/types"; +import type { + WorkspaceChatMessage, + WorkspaceStatsSnapshot, + WorkspaceActivitySnapshot, +} from "@/common/orpc/types"; import type { RouterClient } from "@orpc/server"; import type { AppRouter } from "@/node/orpc/router"; import type { TodoItem } from "@/common/types/tools"; @@ -151,6 +155,12 @@ export interface SessionTimingStats { byModel: Record; } +/** + * Shared empty array for suspended workspace sidebar state. + * Using a stable reference prevents useSyncExternalStore from detecting false changes. + */ +const EMPTY_SKILLS: LoadedSkill[] = []; + /** * Subset of WorkspaceState needed for sidebar display. * Subscribing to only these fields prevents re-renders when messages update. @@ -366,6 +376,18 @@ export class WorkspaceStore { private sessionUsage = new Map>(); + // --- Single-subscription architecture --- + // Only the selected workspace gets an onChat subscription (full chat replay). + // Non-selected workspaces use cached sidebar state + activity stream updates. + private selectedWorkspaceId: string | null = null; + + // Sidebar state snapshot for suspended (non-selected) workspaces. + // Populated when a workspace is suspended, updated by the activity stream. + private cachedSidebarState = new Map(); + + // Cleanup function for the activity stream subscription + private activityUnsubscribe: (() => void) | null = null; + // Idle compaction notification callbacks (called when backend signals idle compaction needed) private idleCompactionCallbacks = new Set<(workspaceId: string) => void>(); @@ -713,6 +735,224 @@ export class WorkspaceStore { this.subscribeToStats(workspaceId); } } + + // Restart activity stream subscription for sidebar updates on non-selected workspaces + this.subscribeToActivityStream(); + } + + /** + * Set the currently selected workspace. Only the selected workspace gets + * a full onChat subscription (chat history replay). Non-selected workspaces + * are suspended: their aggregator is freed and sidebar state is served from + * a cached snapshot updated by the activity stream. + */ + setSelectedWorkspaceId(workspaceId: string | null): void { + if (this.selectedWorkspaceId === workspaceId) return; + + const previousId = this.selectedWorkspaceId; + this.selectedWorkspaceId = workspaceId; + + // Suspend the previously-selected workspace (free aggregator + onChat sub) + if (previousId && previousId !== workspaceId) { + this.suspendWorkspace(previousId); + } + + // Subscribe to the newly-selected workspace (if not already subscribed) + if (workspaceId && !this.ipcUnsubscribers.has(workspaceId)) { + const metadata = this.workspaceMetadata.get(workspaceId); + if (metadata) { + this.addWorkspace(metadata); + } + } + } + + /** + * Suspend a workspace: cache its sidebar state, then tear down the + * onChat subscription and aggregator. Lighter than removeWorkspace — + * preserves metadata, recencyCache, sessionUsage, and cachedSidebarState. + */ + private suspendWorkspace(workspaceId: string): void { + // Cache sidebar state BEFORE freeing the aggregator + this.cacheSidebarStateFromAggregator(workspaceId); + + // Abort onChat subscription + const unsubscribe = this.ipcUnsubscribers.get(workspaceId); + if (unsubscribe) { + unsubscribe(); + this.ipcUnsubscribers.delete(workspaceId); + } + + // Cancel any pending idle bump for this workspace + this.cancelPendingIdleBump(workspaceId); + + // Free aggregator + transient state + this.aggregators.delete(workspaceId); + this.chatTransientState.delete(workspaceId); + this.pendingReplayReset.delete(workspaceId); + this.consumerManager.removeWorkspace(workspaceId); + + // Clean up MapStore entries that depend on the aggregator + this.states.delete(workspaceId); + this.usageStore.delete(workspaceId); + this.consumersStore.delete(workspaceId); + this.sidebarStateSourceState.delete(workspaceId); + + // Clean up stats subscription + const statsUnsub = this.statsUnsubscribers.get(workspaceId); + if (statsUnsub) { + statsUnsub(); + this.statsUnsubscribers.delete(workspaceId); + } + this.workspaceStats.delete(workspaceId); + this.statsStore.delete(workspaceId); + + // NOTE: Keep workspaceMetadata, cachedSidebarState, recencyCache, + // sessionUsage, previousSidebarValues, workspaceCreatedAt, sidebarStateCache + } + + /** + * Snapshot the current sidebar state from a live aggregator into the cache. + * Called just before suspending so the sidebar can continue to display + * meaningful state without the aggregator. + */ + private cacheSidebarStateFromAggregator(workspaceId: string): void { + if (!this.aggregators.has(workspaceId)) return; + try { + const state = this.getWorkspaceSidebarState(workspaceId); + this.cachedSidebarState.set(workspaceId, state); + } catch { + // Aggregator might be in a bad state; use whatever is already cached + } + } + + /** + * Subscribe to the backend activity stream for real-time sidebar updates + * on non-selected (suspended) workspaces. Updates cached sidebar state when + * streaming/model status changes. + */ + private subscribeToActivityStream(): void { + // Tear down any previous subscription + this.activityUnsubscribe?.(); + this.activityUnsubscribe = null; + + // Guard: client must expose the activity endpoints + if (!this.client?.workspace?.activity) return; + + const controller = new AbortController(); + this.activityUnsubscribe = () => controller.abort(); + + const activity = this.client.workspace.activity; + void (async () => { + let retryMs = 1000; + const maxRetryMs = 30_000; + + while (!controller.signal.aborted) { + try { + // Subscribe FIRST so no events are missed, then seed initial state. + // Events arriving between subscribe and list are deduplicated by + // updateCachedSidebarFromActivity's change detection. + const stream = await activity.subscribe(undefined, { + signal: controller.signal, + }); + + // Seed cached sidebar state from the current snapshot + const initial = await activity.list(); + if (controller.signal.aborted) return; + for (const [workspaceId, snapshot] of Object.entries(initial)) { + this.updateCachedSidebarFromActivity(workspaceId, snapshot); + } + + // Reset backoff on successful connection + retryMs = 1000; + + // Stream ongoing activity updates + for await (const event of stream) { + if (event.activity) { + this.updateCachedSidebarFromActivity(event.workspaceId, event.activity); + } + } + + // Stream ended normally — reconnect + } catch (error: unknown) { + if (controller.signal.aborted) return; + console.warn("[WorkspaceStore] Activity stream error, retrying:", error); + } + + // Exponential backoff before retrying + if (!controller.signal.aborted) { + await new Promise((resolve) => { + const timer = setTimeout(resolve, retryMs); + controller.signal.addEventListener( + "abort", + () => { + clearTimeout(timer); + resolve(); + }, + { once: true } + ); + }); + retryMs = Math.min(retryMs * 2, maxRetryMs); + } + } + })(); + } + + /** + * Apply an activity snapshot to the cached sidebar state for a workspace. + * Only updates non-selected (suspended) workspaces — workspaces with an + * active onChat subscription get their state from the aggregator. + */ + private updateCachedSidebarFromActivity( + workspaceId: string, + activity: WorkspaceActivitySnapshot + ): void { + // Active subscriptions get live state from the aggregator; skip them. + if (this.ipcUnsubscribers.has(workspaceId)) return; + + const existing = this.cachedSidebarState.get(workspaceId); + const canInterrupt = activity.streaming; + + if (existing) { + // Only update + notify if something actually changed + if ( + existing.canInterrupt === canInterrupt && + existing.currentModel === activity.lastModel && + existing.recencyTimestamp === activity.recency + ) { + return; + } + this.cachedSidebarState.set(workspaceId, { + ...existing, + canInterrupt, + currentModel: activity.lastModel, + recencyTimestamp: activity.recency, + // When streaming stops, clear the starting state + ...(canInterrupt ? {} : { isStarting: false }), + }); + } else { + // No cached state yet — create a minimal one from the activity snapshot + this.cachedSidebarState.set(workspaceId, { + canInterrupt, + isStarting: false, + awaitingUserQuestion: false, + currentModel: activity.lastModel, + recencyTimestamp: activity.recency, + loadedSkills: EMPTY_SKILLS, + agentStatus: undefined, + }); + } + + // Update recency cache so getWorkspaceRecency() reflects activity + const oldRecency = this.recencyCache.get(workspaceId); + if (activity.recency !== oldRecency) { + this.recencyCache.set(workspaceId, activity.recency); + this.derived.bump("recency"); + } + + // Invalidate sidebar caches so consumers re-render + this.sidebarStateCache.delete(workspaceId); + this.sidebarStateSourceState.delete(workspaceId); + this.states.bump(workspaceId); } /** @@ -995,6 +1235,17 @@ export class WorkspaceStore { * REQUIRES: Workspace must have been added via addWorkspace() first. */ getWorkspaceState(workspaceId: string): WorkspaceState { + // Ensure aggregator exists before accessing state. This handles race conditions + // during workspace switching (effect hasn't fired yet) and Storybook story + // transitions (singleton store outlives React tree remounts). The addWorkspace + // call is idempotent — returns immediately if already subscribed. + if (!this.aggregators.has(workspaceId)) { + const metadata = this.workspaceMetadata.get(workspaceId); + if (metadata) { + this.addWorkspace(metadata); + } + } + return this.states.get(workspaceId, () => { const aggregator = this.assertGet(workspaceId); @@ -1053,8 +1304,32 @@ export class WorkspaceStore { * Get sidebar state for a workspace (subset of full state). * Returns cached reference if values haven't changed. * This is critical for useSyncExternalStore - must return stable references. + * + * For suspended workspaces (no aggregator), returns the cached sidebar state + * snapshot, or a default if no cache exists. The activity stream keeps the + * cache up to date. */ getWorkspaceSidebarState(workspaceId: string): WorkspaceSidebarState { + // Suspended workspace — no aggregator, return cached state. + // IMPORTANT: Must return a referentially stable object for useSyncExternalStore. + // If no cache exists yet, seed it now so subsequent calls return the same reference. + if (!this.aggregators.has(workspaceId)) { + let cached = this.cachedSidebarState.get(workspaceId); + if (!cached) { + cached = { + canInterrupt: false, + isStarting: false, + awaitingUserQuestion: false, + currentModel: null, + recencyTimestamp: this.recencyCache.get(workspaceId) ?? null, + loadedSkills: EMPTY_SKILLS, + agentStatus: undefined, + }; + this.cachedSidebarState.set(workspaceId, cached); + } + return cached; + } + const fullState = this.getWorkspaceState(workspaceId); const isStarting = fullState.pendingStreamStartTime !== null && !fullState.canInterrupt; @@ -1144,16 +1419,30 @@ export class WorkspaceStore { /** * Get recency timestamps for all workspaces (for sorting in command palette). * Derived on-demand from individual workspace states. + * + * Includes both active (subscribed) and suspended workspaces. Active + * workspaces get recency from their aggregator; suspended workspaces + * use the recencyCache (maintained by the activity stream). */ getWorkspaceRecency(): Record { return this.derived.get("recency", () => { const timestamps: Record = {}; + + // Active workspaces: recency from aggregator via getWorkspaceState for (const workspaceId of this.aggregators.keys()) { const state = this.getWorkspaceState(workspaceId); if (state.recencyTimestamp !== null) { timestamps[workspaceId] = state.recencyTimestamp; } } + + // Suspended workspaces: recency from cache + for (const [workspaceId, recency] of this.recencyCache) { + if (!this.aggregators.has(workspaceId) && recency !== null) { + timestamps[workspaceId] = recency; + } + } + return timestamps; }) as Record; } @@ -1831,26 +2120,59 @@ export class WorkspaceStore { this.workspaceStats.delete(workspaceId); this.statsStore.delete(workspaceId); this.sessionUsage.delete(workspaceId); + this.cachedSidebarState.delete(workspaceId); } /** - * Sync workspaces with metadata - add new, remove deleted. + * Sync workspaces with metadata. + * + * Only the selected workspace (set via setSelectedWorkspaceId) gets a full + * onChat subscription. All other workspaces have their metadata stored but + * are kept in a suspended state — sidebar data comes from the cached + * sidebar snapshot + the activity stream. */ syncWorkspaces(workspaceMetadata: Map): void { const metadataIds = new Set(Array.from(workspaceMetadata.values()).map((m) => m.id)); - const currentIds = new Set(this.ipcUnsubscribers.keys()); + const subscribedIds = new Set(this.ipcUnsubscribers.keys()); - // Add new workspaces - for (const metadata of workspaceMetadata.values()) { - if (!currentIds.has(metadata.id)) { - this.addWorkspace(metadata); + // Update metadata for ALL workspaces (needed for name lookup, recency, etc.) + for (const [id, metadata] of workspaceMetadata) { + this.workspaceMetadata.set(id, metadata); + } + + // Only subscribe to onChat for the selected workspace + if ( + this.selectedWorkspaceId && + metadataIds.has(this.selectedWorkspaceId) && + !subscribedIds.has(this.selectedWorkspaceId) + ) { + const metadata = workspaceMetadata.get(this.selectedWorkspaceId); + assert(metadata, `Selected workspace ${this.selectedWorkspaceId} missing from metadata`); + this.addWorkspace(metadata); + } + + // Suspend any subscribed workspace that is no longer selected + for (const id of subscribedIds) { + if (id !== this.selectedWorkspaceId && metadataIds.has(id)) { + this.suspendWorkspace(id); } } - // Remove deleted workspaces - for (const workspaceId of currentIds) { - if (!metadataIds.has(workspaceId)) { - this.removeWorkspace(workspaceId); + // Fully remove workspaces that no longer exist at all + for (const id of subscribedIds) { + if (!metadataIds.has(id)) { + this.removeWorkspace(id); + } + } + + // Clean up metadata + cached state for workspaces that were removed + for (const id of this.workspaceMetadata.keys()) { + if (!metadataIds.has(id)) { + this.workspaceMetadata.delete(id); + this.cachedSidebarState.delete(id); + this.recencyCache.delete(id); + this.previousSidebarValues.delete(id); + this.sidebarStateCache.delete(id); } } } @@ -1885,6 +2207,11 @@ export class WorkspaceStore { this.previousSidebarValues.clear(); this.sidebarStateCache.clear(); this.workspaceCreatedAt.clear(); + this.cachedSidebarState.clear(); + + // Clean up activity stream subscription + this.activityUnsubscribe?.(); + this.activityUnsubscribe = null; } /** diff --git a/src/browser/utils/backendBaseUrl.ts b/src/browser/utils/backendBaseUrl.ts index 470bdb7ff4..9e7aae1b25 100644 --- a/src/browser/utils/backendBaseUrl.ts +++ b/src/browser/utils/backendBaseUrl.ts @@ -6,7 +6,7 @@ * * /@user//apps/ * - * In those cases, backend routes (ORPC WebSocket + /auth/*) also live under that + * In those cases, backend routes (ORPC + /auth/*) also live under that * prefix, so the frontend must include it when constructing URLs. */