diff --git a/server/claude-sdk.js b/server/claude-sdk.js index 7239f81a4d..9149e25546 100644 --- a/server/claude-sdk.js +++ b/server/claude-sdk.js @@ -240,7 +240,7 @@ function addSession(sessionId, queryInstance, tempImagePaths = [], tempDir = nul status: 'active', tempImagePaths, tempDir, - writer + ws: writer && writer.isWebSocketWriter ? writer.ws : (writer && typeof writer.send === 'function' ? writer : null) }); } @@ -476,7 +476,7 @@ async function loadMcpConfig(cwd) { * @param {Object} ws - WebSocket connection * @returns {Promise} */ -async function queryClaudeSDK(command, options = {}, ws) { +async function queryClaudeSDK(command, options = {}, writer) { const { sessionId, sessionSummary } = options; let capturedSessionId = sessionId; let sessionCreatedSent = false; @@ -485,8 +485,8 @@ async function queryClaudeSDK(command, options = {}, ws) { const emitNotification = (event) => { notifyUserIfEnabled({ - userId: ws?.userId || null, - writer: ws, + userId: writer?.userId || null, + writer: writer, event }); }; @@ -551,7 +551,7 @@ async function queryClaudeSDK(command, options = {}, ws) { } const requestId = createRequestId(); - ws.send(createNormalizedMessage({ kind: 'permission_request', requestId, toolName, input, sessionId: capturedSessionId || sessionId || null, provider: 'claude' })); + writer.send(createNormalizedMessage({ kind: 'permission_request', requestId, toolName, input, sessionId: capturedSessionId || sessionId || null, provider: 'claude' })); emitNotification(createNotificationEvent({ provider: 'claude', sessionId: capturedSessionId || sessionId || null, @@ -573,7 +573,7 @@ async function queryClaudeSDK(command, options = {}, ws) { _receivedAt: new Date(), }, onCancel: (reason) => { - ws.send(createNormalizedMessage({ kind: 'permission_cancelled', requestId, reason, sessionId: capturedSessionId || sessionId || null, provider: 'claude' })); + writer.send(createNormalizedMessage({ kind: 'permission_cancelled', requestId, reason, sessionId: capturedSessionId || sessionId || null, provider: 'claude' })); } }); if (!decision) { @@ -629,7 +629,7 @@ async function queryClaudeSDK(command, options = {}, ws) { // Track the query instance for abort capability if (capturedSessionId) { - addSession(capturedSessionId, queryInstance, tempImagePaths, tempDir, ws); + addSession(capturedSessionId, queryInstance, tempImagePaths, tempDir, writer); } // Process streaming messages @@ -639,17 +639,17 @@ async function queryClaudeSDK(command, options = {}, ws) { if (message.session_id && !capturedSessionId) { capturedSessionId = message.session_id; - addSession(capturedSessionId, queryInstance, tempImagePaths, tempDir, ws); + addSession(capturedSessionId, queryInstance, tempImagePaths, tempDir, writer); // Set session ID on writer - if (ws.setSessionId && typeof ws.setSessionId === 'function') { - ws.setSessionId(capturedSessionId); + if (writer && typeof writer.setSessionId === 'function') { + writer.setSessionId(capturedSessionId); } // Send session-created event only once for new sessions if (!sessionId && !sessionCreatedSent) { sessionCreatedSent = true; - ws.send(createNormalizedMessage({ kind: 'session_created', newSessionId: capturedSessionId, sessionId: capturedSessionId, provider: 'claude' })); + writer.send(createNormalizedMessage({ kind: 'session_created', newSessionId: capturedSessionId, sessionId: capturedSessionId, provider: 'claude' })); } } else { // session_id already captured @@ -666,7 +666,7 @@ async function queryClaudeSDK(command, options = {}, ws) { if (transformedMessage.parentToolUseId && !msg.parentToolUseId) { msg.parentToolUseId = transformedMessage.parentToolUseId; } - ws.send(msg); + writer.send(msg); } // Extract and send token budget updates from result messages @@ -677,7 +677,7 @@ async function queryClaudeSDK(command, options = {}, ws) { } const tokenBudgetData = extractTokenBudget(message); if (tokenBudgetData) { - ws.send(createNormalizedMessage({ kind: 'status', text: 'token_budget', tokenBudget: tokenBudgetData, sessionId: capturedSessionId || sessionId || null, provider: 'claude' })); + writer.send(createNormalizedMessage({ kind: 'status', text: 'token_budget', tokenBudget: tokenBudgetData, sessionId: capturedSessionId || sessionId || null, provider: 'claude' })); } } } @@ -691,9 +691,9 @@ async function queryClaudeSDK(command, options = {}, ws) { await cleanupTempFiles(tempImagePaths, tempDir); // Send completion event - ws.send(createNormalizedMessage({ kind: 'complete', exitCode: 0, isNewSession: !sessionId && !!command, sessionId: capturedSessionId, provider: 'claude' })); + writer.send(createNormalizedMessage({ kind: 'complete', exitCode: 0, isNewSession: !sessionId && !!command, sessionId: capturedSessionId, provider: 'claude' })); notifyRunStopped({ - userId: ws?.userId || null, + userId: writer?.userId || null, provider: 'claude', sessionId: capturedSessionId || sessionId || null, sessionName: sessionSummary, @@ -719,9 +719,9 @@ async function queryClaudeSDK(command, options = {}, ws) { : error.message; // Send error to WebSocket - ws.send(createNormalizedMessage({ kind: 'error', content: errorContent, sessionId: capturedSessionId || sessionId || null, provider: 'claude' })); + writer.send(createNormalizedMessage({ kind: 'error', content: errorContent, sessionId: capturedSessionId || sessionId || null, provider: 'claude' })); notifyRunFailed({ - userId: ws?.userId || null, + userId: writer?.userId || null, provider: 'claude', sessionId: capturedSessionId || sessionId || null, sessionName: sessionSummary, @@ -765,6 +765,23 @@ async function abortClaudeSDKSession(sessionId) { } } +/** + * Aborts all sessions associated with a specific WebSocket + * @param {WebSocket} ws - The WebSocket to match + * @returns {number} - Number of sessions aborted + */ +function abortClaudeSDKSessionsForWebSocket(ws) { + let count = 0; + for (const [id, session] of activeSessions.entries()) { + if (session.ws === ws && session.status === 'active') { + console.log(`[Claude] Aborting orphaned session ${id} due to WebSocket disconnect`); + abortClaudeSDKSession(id); + count++; + } + } + return count; +} + /** * Checks if an SDK session is currently active * @param {string} sessionId - Session identifier @@ -780,7 +797,13 @@ function isClaudeSDKSessionActive(sessionId) { * @returns {Array} Array of active session IDs */ function getActiveClaudeSDKSessions() { - return getAllSessions(); + const activeIds = []; + for (const [id, session] of activeSessions.entries()) { + if (session.status === 'active') { + activeIds.push(id); + } + } + return activeIds; } /** @@ -820,10 +843,11 @@ function reconnectSessionWriter(sessionId, newRawWs) { return true; } -// Export public API +// Export public API - Fixed duplicate export issue export { queryClaudeSDK, abortClaudeSDKSession, + abortClaudeSDKSessionsForWebSocket, isClaudeSDKSessionActive, getActiveClaudeSDKSessions, resolveToolApproval, diff --git a/server/cursor-cli.js b/server/cursor-cli.js index 66af16ef50..2a0b495e9a 100644 --- a/server/cursor-cli.js +++ b/server/cursor-cli.js @@ -1,5 +1,6 @@ import { spawn } from 'child_process'; import crossSpawn from 'cross-spawn'; +import { StringDecoder } from 'string_decoder'; import { notifyRunFailed, notifyRunStopped } from './services/notification-orchestrator.js'; import { sessionsService } from './modules/providers/services/sessions.service.js'; import { providerAuthService } from './modules/providers/services/provider-auth.service.js'; @@ -25,7 +26,7 @@ function isWorkspaceTrustPrompt(text = '') { return WORKSPACE_TRUST_PATTERNS.some((pattern) => pattern.test(text)); } -async function spawnCursor(command, options = {}, ws) { +async function spawnCursor(command, options = {}, writer) { return new Promise(async (resolve, reject) => { const { sessionId, projectPath, cwd, resume, toolsSettings, skipPermissions, model, sessionSummary } = options; let capturedSessionId = sessionId; // Track session ID throughout the process @@ -97,7 +98,7 @@ async function spawnCursor(command, options = {}, ws) { const finalSessionId = capturedSessionId || sessionId || processKey; if (code === 0 && !error) { notifyRunStopped({ - userId: ws?.userId || null, + userId: writer?.userId || null, provider: 'cursor', sessionId: finalSessionId, sessionName: sessionSummary, @@ -107,7 +108,7 @@ async function spawnCursor(command, options = {}, ws) { } notifyRunFailed({ - userId: ws?.userId || null, + userId: writer?.userId || null, provider: 'cursor', sessionId: finalSessionId, sessionName: sessionSummary, @@ -129,6 +130,11 @@ async function spawnCursor(command, options = {}, ws) { env: { ...process.env } // Inherit all environment variables }); + // Store WebSocket reference for cleanup on disconnect + if (writer && writer.ws) { + cursorProcess.ws = writer.ws; + } + activeCursorProcesses.set(processKey, cursorProcess); const shouldSuppressForTrustRetry = (text) => { @@ -168,14 +174,12 @@ async function spawnCursor(command, options = {}, ws) { } // Set session ID on writer (for API endpoint compatibility) - if (ws.setSessionId && typeof ws.setSessionId === 'function') { - ws.setSessionId(capturedSessionId); - } + writer && typeof writer.setSessionId === 'function' && writer.setSessionId(capturedSessionId); // Send session-created event only once for new sessions if (!sessionId && !sessionCreatedSent) { sessionCreatedSent = true; - ws.send(createNormalizedMessage({ kind: 'session_created', newSessionId: capturedSessionId, model: response.model, cwd: response.cwd, sessionId: capturedSessionId, provider: 'cursor' })); + writer.send(createNormalizedMessage({ kind: 'session_created', newSessionId: capturedSessionId, model: response.model, cwd: response.cwd, sessionId: capturedSessionId, provider: 'cursor' })); } } @@ -191,7 +195,7 @@ async function spawnCursor(command, options = {}, ws) { // Accumulate assistant message chunks if (response.message && response.message.content && response.message.content.length > 0) { const normalized = sessionsService.normalizeMessage('cursor', response, capturedSessionId || sessionId || null); - for (const msg of normalized) ws.send(msg); + for (const msg of normalized) writer.send(msg); } break; @@ -199,7 +203,7 @@ async function spawnCursor(command, options = {}, ws) { // Session complete โ€” send stream end + lifecycle complete with result payload console.log('Cursor session result:', response); const resultText = typeof response.result === 'string' ? response.result : ''; - ws.send(createNormalizedMessage({ + writer.send(createNormalizedMessage({ kind: 'complete', exitCode: response.subtype === 'success' ? 0 : 1, resultText, @@ -221,13 +225,16 @@ async function spawnCursor(command, options = {}, ws) { // If not JSON, send as stream delta via adapter const normalized = sessionsService.normalizeMessage('cursor', line, capturedSessionId || sessionId || null); - for (const msg of normalized) ws.send(msg); + for (const msg of normalized) writer.send(msg); } }; + const stdoutDecoder = new StringDecoder('utf8'); + const stderrDecoder = new StringDecoder('utf8'); + // Handle stdout (streaming JSON responses) cursorProcess.stdout.on('data', (data) => { - const rawOutput = data.toString(); + const rawOutput = stdoutDecoder.write(data); console.log('Cursor CLI stdout:', rawOutput); // Stream chunks can split JSON objects across packets; keep trailing partial line. @@ -242,14 +249,14 @@ async function spawnCursor(command, options = {}, ws) { // Handle stderr cursorProcess.stderr.on('data', (data) => { - const stderrText = data.toString(); + const stderrText = stderrDecoder.write(data); console.error('Cursor CLI stderr:', stderrText); if (shouldSuppressForTrustRetry(stderrText)) { return; } - ws.send(createNormalizedMessage({ kind: 'error', content: stderrText, sessionId: capturedSessionId || sessionId || null, provider: 'cursor' })); + writer.send(createNormalizedMessage({ kind: 'error', content: stderrText, sessionId: capturedSessionId || sessionId || null, provider: 'cursor' })); }); // Handle process completion @@ -276,7 +283,7 @@ async function spawnCursor(command, options = {}, ws) { return; } - ws.send(createNormalizedMessage({ kind: 'complete', exitCode: code, isNewSession: !sessionId && !!command, sessionId: finalSessionId, provider: 'cursor' })); + writer.send(createNormalizedMessage({ kind: 'complete', exitCode: code, isNewSession: !sessionId && !!command, sessionId: finalSessionId, provider: 'cursor' })); if (code === 0) { notifyTerminalState({ code }); @@ -301,7 +308,7 @@ async function spawnCursor(command, options = {}, ws) { ? 'Cursor CLI is not installed. Please install it from https://cursor.com' : error.message; - ws.send(createNormalizedMessage({ kind: 'error', content: errorContent, sessionId: capturedSessionId || sessionId || null, provider: 'cursor' })); + writer.send(createNormalizedMessage({ kind: 'error', content: errorContent, sessionId: capturedSessionId || sessionId || null, provider: 'cursor' })); notifyTerminalState({ error }); settleOnce(() => reject(error)); @@ -326,6 +333,18 @@ function abortCursorSession(sessionId) { return false; } +function abortCursorSessionsForWebSocket(ws) { + let count = 0; + for (const [sessionId, proc] of activeCursorProcesses.entries()) { + if (proc.ws === ws) { + console.log(`[Cursor] Aborting orphaned session ${sessionId} due to WebSocket disconnect`); + abortCursorSession(sessionId); + count++; + } + } + return count; +} + function isCursorSessionActive(sessionId) { return activeCursorProcesses.has(sessionId); } @@ -337,6 +356,7 @@ function getActiveCursorSessions() { export { spawnCursor, abortCursorSession, + abortCursorSessionsForWebSocket, isCursorSessionActive, getActiveCursorSessions }; diff --git a/server/gemini-cli.js b/server/gemini-cli.js index 2e68a9388f..229d639228 100644 --- a/server/gemini-cli.js +++ b/server/gemini-cli.js @@ -1,5 +1,6 @@ import { spawn } from 'child_process'; import crossSpawn from 'cross-spawn'; +import { StringDecoder } from 'string_decoder'; // Use cross-spawn on Windows for correct .cmd resolution (same pattern as cursor-cli.js) const spawnFunction = process.platform === 'win32' ? crossSpawn : spawn; @@ -14,7 +15,7 @@ import { createNormalizedMessage } from './shared/utils.js'; let activeGeminiProcesses = new Map(); // Track active processes by session ID -async function spawnGemini(command, options = {}, ws) { +async function spawnGemini(command, options = {}, writer) { const { sessionId, projectPath, cwd, toolsSettings, permissionMode, images, sessionSummary } = options; let capturedSessionId = sessionId; // Track session ID throughout the process let sessionCreatedSent = false; // Track if we've already sent session-created event @@ -187,7 +188,7 @@ async function spawnGemini(command, options = {}, ws) { const finalSessionId = capturedSessionId || sessionId || processKey; if (code === 0 && !error) { notifyRunStopped({ - userId: ws?.userId || null, + userId: writer?.userId || null, provider: 'gemini', sessionId: finalSessionId, sessionName: sessionSummary, @@ -197,7 +198,7 @@ async function spawnGemini(command, options = {}, ws) { } notifyRunFailed({ - userId: ws?.userId || null, + userId: writer?.userId || null, provider: 'gemini', sessionId: finalSessionId, sessionName: sessionSummary, @@ -209,6 +210,11 @@ async function spawnGemini(command, options = {}, ws) { geminiProcess.tempImagePaths = tempImagePaths; geminiProcess.tempDir = tempDir; + // Store WebSocket reference for cleanup on disconnect + if (writer && writer.ws) { + geminiProcess.ws = writer.ws; + } + // Store process reference for potential abort const processKey = capturedSessionId || sessionId || Date.now().toString(); activeGeminiProcesses.set(processKey, geminiProcess); @@ -226,9 +232,9 @@ async function spawnGemini(command, options = {}, ws) { const startTimeout = () => { if (timeout) clearTimeout(timeout); timeout = setTimeout(() => { - const socketSessionId = typeof ws.getSessionId === 'function' ? ws.getSessionId() : (capturedSessionId || sessionId || processKey); + const socketSessionId = writer && typeof writer.getSessionId === 'function' ? writer.getSessionId() : (capturedSessionId || sessionId || processKey); terminalFailureReason = `Gemini CLI timeout - no response received for ${timeoutMs / 1000} seconds`; - ws.send(createNormalizedMessage({ kind: 'error', content: terminalFailureReason, sessionId: socketSessionId, provider: 'gemini' })); + writer.send(createNormalizedMessage({ kind: 'error', content: terminalFailureReason, sessionId: socketSessionId, provider: 'gemini' })); try { geminiProcess.kill('SIGTERM'); } catch (e) { } @@ -244,8 +250,8 @@ async function spawnGemini(command, options = {}, ws) { // Create response handler for NDJSON buffering let responseHandler; - if (ws) { - responseHandler = new GeminiResponseHandler(ws, { + if (writer) { + responseHandler = new GeminiResponseHandler(writer, { onContentFragment: (content) => { if (assistantBlocks.length > 0 && assistantBlocks[assistantBlocks.length - 1].type === 'text') { assistantBlocks[assistantBlocks.length - 1].text += content; @@ -287,9 +293,12 @@ async function spawnGemini(command, options = {}, ws) { }); } + const stdoutDecoder = new StringDecoder('utf8'); + const stderrDecoder = new StringDecoder('utf8'); + // Handle stdout geminiProcess.stdout.on('data', (data) => { - const rawOutput = data.toString(); + const rawOutput = stdoutDecoder.write(data); startTimeout(); // Re-arm the timeout // For new sessions, create a session ID FIRST @@ -311,10 +320,10 @@ async function spawnGemini(command, options = {}, ws) { activeGeminiProcesses.set(capturedSessionId, geminiProcess); } - ws.setSessionId && typeof ws.setSessionId === 'function' && ws.setSessionId(capturedSessionId); + writer && typeof writer.setSessionId === 'function' && writer.setSessionId(capturedSessionId); - ws.send(createNormalizedMessage({ kind: 'session_created', newSessionId: capturedSessionId, sessionId: capturedSessionId, provider: 'gemini' })); - } + writer.send(createNormalizedMessage({ kind: 'session_created', newSessionId: capturedSessionId, sessionId: capturedSessionId, provider: 'gemini' })); + } if (responseHandler) { responseHandler.processData(rawOutput); @@ -325,14 +334,14 @@ async function spawnGemini(command, options = {}, ws) { } else { assistantBlocks.push({ type: 'text', text: rawOutput }); } - const socketSessionId = typeof ws.getSessionId === 'function' ? ws.getSessionId() : (capturedSessionId || sessionId); - ws.send(createNormalizedMessage({ kind: 'stream_delta', content: rawOutput, sessionId: socketSessionId, provider: 'gemini' })); + const socketSessionId = writer && typeof writer.getSessionId === 'function' ? writer.getSessionId() : (capturedSessionId || sessionId); + writer.send(createNormalizedMessage({ kind: 'stream_delta', content: rawOutput, sessionId: socketSessionId, provider: 'gemini' })); } }); // Handle stderr geminiProcess.stderr.on('data', (data) => { - const errorMsg = data.toString(); + const errorMsg = stderrDecoder.write(data); // Filter out deprecation warnings and "Loaded cached credentials" message if (errorMsg.includes('[DEP0040]') || @@ -342,8 +351,8 @@ async function spawnGemini(command, options = {}, ws) { return; } - const socketSessionId = typeof ws.getSessionId === 'function' ? ws.getSessionId() : (capturedSessionId || sessionId); - ws.send(createNormalizedMessage({ kind: 'error', content: errorMsg, sessionId: socketSessionId, provider: 'gemini' })); + const socketSessionId = writer && typeof writer.getSessionId === 'function' ? writer.getSessionId() : (capturedSessionId || sessionId); + writer.send(createNormalizedMessage({ kind: 'error', content: errorMsg, sessionId: socketSessionId, provider: 'gemini' })); }); // Handle process completion @@ -365,7 +374,7 @@ async function spawnGemini(command, options = {}, ws) { sessionManager.addMessage(finalSessionId, 'assistant', assistantBlocks); } - ws.send(createNormalizedMessage({ kind: 'complete', exitCode: code, isNewSession: !sessionId && !!command, sessionId: finalSessionId, provider: 'gemini' })); + writer.send(createNormalizedMessage({ kind: 'complete', exitCode: code, isNewSession: !sessionId && !!command, sessionId: finalSessionId, provider: 'gemini' })); // Clean up temporary image files if any if (geminiProcess.tempImagePaths && geminiProcess.tempImagePaths.length > 0) { @@ -385,8 +394,8 @@ async function spawnGemini(command, options = {}, ws) { if (code === 127) { const installed = await providerAuthService.isProviderInstalled('gemini'); if (!installed) { - const socketSessionId = typeof ws.getSessionId === 'function' ? ws.getSessionId() : finalSessionId; - ws.send(createNormalizedMessage({ kind: 'error', content: 'Gemini CLI is not installed. Please install it first: https://github.com/google-gemini/gemini-cli', sessionId: socketSessionId, provider: 'gemini' })); + const socketSessionId = writer && typeof writer.getSessionId === 'function' ? writer.getSessionId() : finalSessionId; + writer.send(createNormalizedMessage({ kind: 'error', content: 'Gemini CLI is not installed. Please install it first: https://github.com/google-gemini/gemini-cli', sessionId: socketSessionId, provider: 'gemini' })); } } @@ -410,8 +419,8 @@ async function spawnGemini(command, options = {}, ws) { ? 'Gemini CLI is not installed. Please install it first: https://github.com/google-gemini/gemini-cli' : error.message; - const errorSessionId = typeof ws.getSessionId === 'function' ? ws.getSessionId() : finalSessionId; - ws.send(createNormalizedMessage({ kind: 'error', content: errorContent, sessionId: errorSessionId, provider: 'gemini' })); + const errorSessionId = writer && typeof writer.getSessionId === 'function' ? writer.getSessionId() : finalSessionId; + writer.send(createNormalizedMessage({ kind: 'error', content: errorContent, sessionId: errorSessionId, provider: 'gemini' })); notifyTerminalState({ error }); reject(error); @@ -453,6 +462,18 @@ function abortGeminiSession(sessionId) { return false; } +function abortGeminiSessionsForWebSocket(ws) { + let count = 0; + for (const [sessionId, proc] of activeGeminiProcesses.entries()) { + if (proc.ws === ws) { + console.log(`[Gemini] Aborting orphaned session ${sessionId} due to WebSocket disconnect`); + abortGeminiSession(sessionId); + count++; + } + } + return count; +} + function isGeminiSessionActive(sessionId) { return activeGeminiProcesses.has(sessionId); } @@ -464,6 +485,7 @@ function getActiveGeminiSessions() { export { spawnGemini, abortGeminiSession, + abortGeminiSessionsForWebSocket, isGeminiSessionActive, getActiveGeminiSessions }; diff --git a/server/gemini-response-handler.js b/server/gemini-response-handler.js index b0a1748550..843eb5e0b0 100644 --- a/server/gemini-response-handler.js +++ b/server/gemini-response-handler.js @@ -34,7 +34,7 @@ class GeminiResponseHandler { } handleEvent(event) { - const sid = typeof this.ws.getSessionId === 'function' ? this.ws.getSessionId() : null; + const sid = this.ws && typeof this.ws.getSessionId === 'function' ? this.ws.getSessionId() : null; if (event.type === 'init') { if (this.onInit) { @@ -58,7 +58,9 @@ class GeminiResponseHandler { // Normalize via adapter and send all resulting messages const normalized = sessionsService.normalizeMessage('gemini', event, sid); for (const msg of normalized) { - this.ws.send(msg); + if (this.ws) { + this.ws.send(msg); + } } } diff --git a/server/index.js b/server/index.js index 62d85130ba..35fc8c0243 100755 --- a/server/index.js +++ b/server/index.js @@ -248,6 +248,22 @@ const wss = new WebSocketServer({ } }); +// Setup heartbeat interval to detect dead connections +const heartbeatInterval = setInterval(() => { + wss.clients.forEach((ws) => { + if (ws.isAlive === false) { + console.log('๐Ÿ”Œ Terminating dead WebSocket connection'); + return ws.terminate(); + } + ws.isAlive = false; + ws.ping(); + }); +}, 30000); // 30 seconds + +wss.on('close', () => { + clearInterval(heartbeatInterval); +}); + // Make WebSocket server available to routes app.locals.wss = wss; @@ -1339,6 +1355,10 @@ wss.on('connection', (ws, request) => { const url = request.url; console.log('[INFO] Client connected to:', url); + // Initial liveness state for heartbeat + ws.isAlive = true; + ws.on('pong', () => { ws.isAlive = true; }); + // Parse URL to get pathname without query parameters const urlObj = new URL(url, 'http://localhost'); const pathname = urlObj.pathname; @@ -1371,9 +1391,15 @@ class WebSocketWriter { } send(data) { - if (this.ws.readyState === 1) { // WebSocket.OPEN - this.ws.send(JSON.stringify(data)); + try { + if (this.ws && this.ws.readyState === 1) { // WebSocket.OPEN + this.ws.send(JSON.stringify(data)); + return true; + } + } catch (error) { + console.error('[WebSocketWriter] Error sending message:', error.message); } + return false; } updateWebSocket(newRawWs) { @@ -1534,6 +1560,22 @@ function handleChatConnection(ws, request) { console.log('๐Ÿ”Œ Chat client disconnected'); // Remove from connected clients connectedClients.delete(ws); + + // Abort any active sessions associated with this specific WebSocket + try { + const counts = { + claude: abortClaudeSDKSessionsForWebSocket(ws), + cursor: abortCursorSessionsForWebSocket(ws), + codex: abortCodexSessionsForWebSocket(ws), + gemini: abortGeminiSessionsForWebSocket(ws) + }; + const total = counts.claude + counts.cursor + counts.codex + counts.gemini; + if (total > 0) { + console.log(`๐Ÿงน Cleaned up ${total} orphaned sessions (Claude:${counts.claude}, Cursor:${counts.cursor}, Codex:${counts.codex}, Gemini:${counts.gemini})`); + } + } catch (err) { + console.error('[Cleanup] Error during session cleanup:', err.message); + } }); } @@ -2312,6 +2354,9 @@ async function startServer() { // Initialize authentication database await initializeDatabase(); + // Ensure Gemini session store is loaded from disk before accepting requests + await sessionManager.ready; + // Configure Web Push (VAPID keys) configureWebPush(); diff --git a/server/openai-codex.js b/server/openai-codex.js index 5a7a9007ea..ad841e3023 100644 --- a/server/openai-codex.js +++ b/server/openai-codex.js @@ -192,7 +192,7 @@ function mapPermissionModeToCodexOptions(permissionMode) { * @param {object} options - Options including cwd, sessionId, model, permissionMode * @param {WebSocket|object} ws - WebSocket connection or response writer */ -export async function queryCodex(command, options = {}, ws) { +async function queryCodex(command, options = {}, ws) { const { sessionId, sessionSummary, @@ -240,7 +240,8 @@ export async function queryCodex(command, options = {}, ws) { codex, status: 'running', abortController, - startedAt: new Date().toISOString() + startedAt: new Date().toISOString(), + ws: ws && ws.isWebSocketWriter ? ws.ws : ws }); // Send session created event @@ -344,7 +345,7 @@ export async function queryCodex(command, options = {}, ws) { * @param {string} sessionId - Session ID to abort * @returns {boolean} - Whether abort was successful */ -export function abortCodexSession(sessionId) { +function abortCodexSession(sessionId) { const session = activeCodexSessions.get(sessionId); if (!session) { @@ -361,12 +362,29 @@ export function abortCodexSession(sessionId) { return true; } +/** + * Abort all Codex sessions associated with a specific WebSocket + * @param {WebSocket} ws - The WebSocket to match + * @returns {number} - Number of sessions aborted + */ +export function abortCodexSessionsForWebSocket(ws) { + let count = 0; + for (const [id, session] of activeCodexSessions.entries()) { + if (session.ws === ws && session.status === 'running') { + console.log(`[Codex] Aborting orphaned session ${id} due to WebSocket disconnect`); + abortCodexSession(id); + count++; + } + } + return count; +} + /** * Check if a session is active * @param {string} sessionId - Session ID to check * @returns {boolean} - Whether session is active */ -export function isCodexSessionActive(sessionId) { +function isCodexSessionActive(sessionId) { const session = activeCodexSessions.get(sessionId); return session?.status === 'running'; } @@ -375,7 +393,7 @@ export function isCodexSessionActive(sessionId) { * Get all active sessions * @returns {Array} - Array of active session info */ -export function getActiveCodexSessions() { +function getActiveCodexSessions() { const sessions = []; for (const [id, session] of activeCodexSessions.entries()) { @@ -391,6 +409,13 @@ export function getActiveCodexSessions() { return sessions; } +export { + queryCodex, + abortCodexSession, + isCodexSessionActive, + getActiveCodexSessions +}; + /** * Helper to send message via WebSocket or writer * @param {WebSocket|object} ws - WebSocket or response writer @@ -398,12 +423,14 @@ export function getActiveCodexSessions() { */ function sendMessage(ws, data) { try { - if (ws.isSSEStreamWriter || ws.isWebSocketWriter) { + if (ws && (ws.isSSEStreamWriter || ws.isWebSocketWriter)) { // Writer handles stringification (SSEStreamWriter or WebSocketWriter) ws.send(data); - } else if (typeof ws.send === 'function') { + } else if (ws && typeof ws.send === 'function') { // Raw WebSocket - stringify here - ws.send(JSON.stringify(data)); + if (ws.readyState === 1) { // WebSocket.OPEN + ws.send(JSON.stringify(data)); + } } } catch (error) { console.error('[Codex] Error sending message:', error); diff --git a/src/components/sidebar/view/subcomponents/SidebarHeader.tsx b/src/components/sidebar/view/subcomponents/SidebarHeader.tsx index 551c00956b..b6f34e6917 100644 --- a/src/components/sidebar/view/subcomponents/SidebarHeader.tsx +++ b/src/components/sidebar/view/subcomponents/SidebarHeader.tsx @@ -1,9 +1,10 @@ -import { Folder, FolderPlus, MessageSquare, Plus, RefreshCw, Search, X, PanelLeftClose } from 'lucide-react'; +import { Folder, FolderPlus, MessageSquare, Plus, RefreshCw, Search, X, PanelLeftClose, Globe } from 'lucide-react'; import type { TFunction } from 'i18next'; import { Button, Input } from '../../../../shared/view/ui'; import { IS_PLATFORM } from '../../../../constants/config'; import { cn } from '../../../../lib/utils'; import GitHubStarBadge from './GitHubStarBadge'; +import { useWebSocket } from '../../../../contexts/WebSocketContext'; type SearchMode = 'projects' | 'conversations'; @@ -40,12 +41,21 @@ export default function SidebarHeader({ onCollapseSidebar, t, }: SidebarHeaderProps) { + const { isConnected } = useWebSocket(); + const LogoBlock = () => (
-
+
+

{t('app.title')}