diff --git a/server/claude-sdk.js b/server/claude-sdk.js index 7239f81a4d..8fc3ddf13b 100644 --- a/server/claude-sdk.js +++ b/server/claude-sdk.js @@ -469,6 +469,83 @@ async function loadMcpConfig(cwd) { } } +/** + * Runs the SDK query iteration loop with retry on stale session errors. + * If a resume session fails with "No conversation found", falls back to a new session. + */ +async function runSDKQueryLoop(queryInstance, sdkOptions, prompt, sessionRef, originalSessionId, sessionCreatedSent, tempImagePaths, tempDir, ws, sessionsService) { + let currentInstance = queryInstance; + let hasRetried = false; + + while (true) { + try { + for await (const message of currentInstance) { + // Capture session ID from first message + if (message.session_id && !sessionRef.id) { + sessionRef.id = message.session_id; + addSession(sessionRef.id, currentInstance, tempImagePaths, tempDir, ws); + + if (ws.setSessionId && typeof ws.setSessionId === 'function') { + ws.setSessionId(sessionRef.id); + } + + // Send session_created: for new sessions OR when retry created a fresh session + const isNewOrRetrySession = (!originalSessionId && !sessionCreatedSent) || (hasRetried && sessionRef.id !== originalSessionId && !sessionCreatedSent); + if (isNewOrRetrySession) { + sessionCreatedSent = true; + ws.send(createNormalizedMessage({ kind: 'session_created', newSessionId: sessionRef.id, sessionId: sessionRef.id, provider: 'claude' })); + } + } + + const transformedMessage = transformMessage(message); + const sid = sessionRef.id || originalSessionId || null; + + const normalized = sessionsService.normalizeMessage('claude', transformedMessage, sid); + for (const msg of normalized) { + if (transformedMessage.parentToolUseId && !msg.parentToolUseId) { + msg.parentToolUseId = transformedMessage.parentToolUseId; + } + ws.send(msg); + } + + if (message.type === 'result') { + const tokenBudgetData = extractTokenBudget(message); + if (tokenBudgetData) { + ws.send(createNormalizedMessage({ kind: 'status', text: 'token_budget', tokenBudget: tokenBudgetData, sessionId: sessionRef.id || originalSessionId || null, provider: 'claude' })); + } + } + } + return; // Loop completed successfully + } catch (loopError) { + const errorMsg = loopError?.message || String(loopError); + if (!hasRetried && /no conversation found/i.test(errorMsg) && sdkOptions.resume) { + // Stale session: fall back to creating a new one + console.log(`[WARN] Resume session ${sdkOptions.resume} not found, creating new session`); + if (sessionRef.id) removeSession(sessionRef.id); + sessionRef.id = null; + + delete sdkOptions.resume; + process.env.CLAUDE_CODE_STREAM_CLOSE_TIMEOUT = '300000'; + + let retryInstance; + try { + retryInstance = query({ prompt, options: sdkOptions }); + } catch (hookError) { + console.warn('Retry query with hooks failed:', hookError?.message || hookError); + delete sdkOptions.hooks; + retryInstance = query({ prompt, options: sdkOptions }); + } + delete process.env.CLAUDE_CODE_STREAM_CLOSE_TIMEOUT; + + currentInstance = retryInstance; + hasRetried = true; + continue; // Retry the loop + } + throw loopError; + } + } +} + /** * Executes a Claude query using the SDK * @param {string} command - User prompt/command @@ -478,7 +555,7 @@ async function loadMcpConfig(cwd) { */ async function queryClaudeSDK(command, options = {}, ws) { const { sessionId, sessionSummary } = options; - let capturedSessionId = sessionId; + const sessionRef = { id: sessionId }; // Use object ref for mutation across retries let sessionCreatedSent = false; let tempImagePaths = []; let tempDir = null; @@ -514,13 +591,13 @@ async function queryClaudeSDK(command, options = {}, ws) { const message = typeof input?.message === 'string' ? input.message : 'Claude requires your attention.'; emitNotification(createNotificationEvent({ provider: 'claude', - sessionId: capturedSessionId || sessionId || null, + sessionId: sessionRef.id || sessionId || null, kind: 'action_required', code: 'agent.notification', meta: { message, sessionName: sessionSummary }, severity: 'warning', requiresUserAction: true, - dedupeKey: `claude:hook:notification:${capturedSessionId || sessionId || 'none'}:${message}` + dedupeKey: `claude:hook:notification:${sessionRef.id || sessionId || 'none'}:${message}` })); return {}; }] @@ -551,29 +628,29 @@ async function queryClaudeSDK(command, options = {}, ws) { } const requestId = createRequestId(); - ws.send(createNormalizedMessage({ kind: 'permission_request', requestId, toolName, input, sessionId: capturedSessionId || sessionId || null, provider: 'claude' })); + ws.send(createNormalizedMessage({ kind: 'permission_request', requestId, toolName, input, sessionId: sessionRef.id || sessionId || null, provider: 'claude' })); emitNotification(createNotificationEvent({ provider: 'claude', - sessionId: capturedSessionId || sessionId || null, + sessionId: sessionRef.id || sessionId || null, kind: 'action_required', code: 'permission.required', meta: { toolName, sessionName: sessionSummary }, severity: 'warning', requiresUserAction: true, - dedupeKey: `claude:permission:${capturedSessionId || sessionId || 'none'}:${requestId}` + dedupeKey: `claude:permission:${sessionRef.id || sessionId || 'none'}:${requestId}` })); const decision = await waitForToolApproval(requestId, { timeoutMs: requiresInteraction ? 0 : undefined, signal: context?.signal, metadata: { - _sessionId: capturedSessionId || sessionId || null, + _sessionId: sessionRef.id || sessionId || null, _toolName: toolName, _input: input, _receivedAt: new Date(), }, onCancel: (reason) => { - ws.send(createNormalizedMessage({ kind: 'permission_cancelled', requestId, reason, sessionId: capturedSessionId || sessionId || null, provider: 'claude' })); + ws.send(createNormalizedMessage({ kind: 'permission_cancelled', requestId, reason, sessionId: sessionRef.id || sessionId || null, provider: 'claude' })); } }); if (!decision) { @@ -627,75 +704,24 @@ async function queryClaudeSDK(command, options = {}, ws) { delete process.env.CLAUDE_CODE_STREAM_CLOSE_TIMEOUT; } - // Track the query instance for abort capability - if (capturedSessionId) { - addSession(capturedSessionId, queryInstance, tempImagePaths, tempDir, ws); - } - // Process streaming messages - console.log('Starting async generator loop for session:', capturedSessionId || 'NEW'); - for await (const message of queryInstance) { - // Capture session ID from first message - if (message.session_id && !capturedSessionId) { - - capturedSessionId = message.session_id; - addSession(capturedSessionId, queryInstance, tempImagePaths, tempDir, ws); - - // Set session ID on writer - if (ws.setSessionId && typeof ws.setSessionId === 'function') { - ws.setSessionId(capturedSessionId); - } - - // Send session-created event only once for new sessions - if (!sessionId && !sessionCreatedSent) { - sessionCreatedSent = true; - ws.send(createNormalizedMessage({ kind: 'session_created', newSessionId: capturedSessionId, sessionId: capturedSessionId, provider: 'claude' })); - } - } else { - // session_id already captured - } - - // Transform and normalize message via adapter - const transformedMessage = transformMessage(message); - const sid = capturedSessionId || sessionId || null; - - // Use adapter to normalize SDK events into NormalizedMessage[] - const normalized = sessionsService.normalizeMessage('claude', transformedMessage, sid); - for (const msg of normalized) { - // Preserve parentToolUseId from SDK wrapper for subagent tool grouping - if (transformedMessage.parentToolUseId && !msg.parentToolUseId) { - msg.parentToolUseId = transformedMessage.parentToolUseId; - } - ws.send(msg); - } - - // Extract and send token budget updates from result messages - if (message.type === 'result') { - const models = Object.keys(message.modelUsage || {}); - if (models.length > 0) { - // Model info available in result message - } - const tokenBudgetData = extractTokenBudget(message); - if (tokenBudgetData) { - ws.send(createNormalizedMessage({ kind: 'status', text: 'token_budget', tokenBudget: tokenBudgetData, sessionId: capturedSessionId || sessionId || null, provider: 'claude' })); - } - } - } + console.log('Starting async generator loop for session:', sessionRef.id || 'NEW'); + await runSDKQueryLoop(queryInstance, sdkOptions, finalCommand, sessionRef, sessionId, sessionCreatedSent, tempImagePaths, tempDir, ws, sessionsService); // Clean up session on completion - if (capturedSessionId) { - removeSession(capturedSessionId); + if (sessionRef.id) { + removeSession(sessionRef.id); } // Clean up temporary image files await cleanupTempFiles(tempImagePaths, tempDir); // Send completion event - ws.send(createNormalizedMessage({ kind: 'complete', exitCode: 0, isNewSession: !sessionId && !!command, sessionId: capturedSessionId, provider: 'claude' })); + ws.send(createNormalizedMessage({ kind: 'complete', exitCode: 0, isNewSession: !sessionId && !!command, sessionId: sessionRef.id || sessionId || null, provider: 'claude' })); notifyRunStopped({ userId: ws?.userId || null, provider: 'claude', - sessionId: capturedSessionId || sessionId || null, + sessionId: sessionRef.id || sessionId || null, sessionName: sessionSummary, stopReason: 'completed' }); @@ -705,8 +731,8 @@ async function queryClaudeSDK(command, options = {}, ws) { console.error('SDK query error:', error); // Clean up session on error - if (capturedSessionId) { - removeSession(capturedSessionId); + if (sessionRef.id) { + removeSession(sessionRef.id); } // Clean up temporary image files on error @@ -719,11 +745,11 @@ async function queryClaudeSDK(command, options = {}, ws) { : error.message; // Send error to WebSocket - ws.send(createNormalizedMessage({ kind: 'error', content: errorContent, sessionId: capturedSessionId || sessionId || null, provider: 'claude' })); + ws.send(createNormalizedMessage({ kind: 'error', content: errorContent, sessionId: sessionRef.id || sessionId || null, provider: 'claude' })); notifyRunFailed({ userId: ws?.userId || null, provider: 'claude', - sessionId: capturedSessionId || sessionId || null, + sessionId: sessionRef.id || sessionId || null, sessionName: sessionSummary, error }); diff --git a/server/openai-codex.js b/server/openai-codex.js index 5a7a9007ea..fea1ee5907 100644 --- a/server/openai-codex.js +++ b/server/openai-codex.js @@ -224,15 +224,27 @@ export async function queryCodex(command, options = {}, ws) { model }; - // Start or resume thread + // Start or resume thread — with fallback to new thread if stale + let resumeFailed = false; if (sessionId) { - thread = codex.resumeThread(sessionId, threadOptions); + try { + thread = codex.resumeThread(sessionId, threadOptions); + } catch (resumeError) { + const errorMsg = resumeError?.message || String(resumeError); + if (/thread/i.test(errorMsg) && /not found|no conversation/i.test(errorMsg)) { + console.log(`[WARN] Resume thread ${sessionId} not found, starting new thread`); + thread = codex.startThread(threadOptions); + resumeFailed = true; + } else { + throw resumeError; + } + } } else { thread = codex.startThread(threadOptions); } - // Get the thread ID - currentSessionId = thread.id || sessionId || `codex-${Date.now()}`; + // Get the thread ID — avoid falling back to stale sessionId after failed resume + currentSessionId = thread.id || (resumeFailed ? null : sessionId) || `codex-${Date.now()}`; // Track the session activeCodexSessions.set(currentSessionId, {