Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 95 additions & 69 deletions server/claude-sdk.js
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,83 @@ async function loadMcpConfig(cwd) {
}
}

/**
* Runs the SDK query iteration loop with retry on stale session errors.
* If a resume session fails with "No conversation found", falls back to a new session.
*/
async function runSDKQueryLoop(queryInstance, sdkOptions, prompt, sessionRef, originalSessionId, sessionCreatedSent, tempImagePaths, tempDir, ws, sessionsService) {
let currentInstance = queryInstance;
let hasRetried = false;

while (true) {
try {
for await (const message of currentInstance) {
// Capture session ID from first message
if (message.session_id && !sessionRef.id) {
sessionRef.id = message.session_id;
addSession(sessionRef.id, currentInstance, tempImagePaths, tempDir, ws);

if (ws.setSessionId && typeof ws.setSessionId === 'function') {
ws.setSessionId(sessionRef.id);
}

// Send session_created: for new sessions OR when retry created a fresh session
const isNewOrRetrySession = (!originalSessionId && !sessionCreatedSent) || (hasRetried && sessionRef.id !== originalSessionId && !sessionCreatedSent);
if (isNewOrRetrySession) {
sessionCreatedSent = true;
ws.send(createNormalizedMessage({ kind: 'session_created', newSessionId: sessionRef.id, sessionId: sessionRef.id, provider: 'claude' }));
}
}

const transformedMessage = transformMessage(message);
const sid = sessionRef.id || originalSessionId || null;

const normalized = sessionsService.normalizeMessage('claude', transformedMessage, sid);
for (const msg of normalized) {
if (transformedMessage.parentToolUseId && !msg.parentToolUseId) {
msg.parentToolUseId = transformedMessage.parentToolUseId;
}
ws.send(msg);
}

if (message.type === 'result') {
const tokenBudgetData = extractTokenBudget(message);
if (tokenBudgetData) {
ws.send(createNormalizedMessage({ kind: 'status', text: 'token_budget', tokenBudget: tokenBudgetData, sessionId: sessionRef.id || originalSessionId || null, provider: 'claude' }));
}
}
}
return; // Loop completed successfully
Comment thread
coderabbitai[bot] marked this conversation as resolved.
} catch (loopError) {
const errorMsg = loopError?.message || String(loopError);
if (!hasRetried && /no conversation found/i.test(errorMsg) && sdkOptions.resume) {
// Stale session: fall back to creating a new one
console.log(`[WARN] Resume session ${sdkOptions.resume} not found, creating new session`);
if (sessionRef.id) removeSession(sessionRef.id);
sessionRef.id = null;

delete sdkOptions.resume;
process.env.CLAUDE_CODE_STREAM_CLOSE_TIMEOUT = '300000';

let retryInstance;
try {
retryInstance = query({ prompt, options: sdkOptions });
} catch (hookError) {
console.warn('Retry query with hooks failed:', hookError?.message || hookError);
delete sdkOptions.hooks;
retryInstance = query({ prompt, options: sdkOptions });
}
delete process.env.CLAUDE_CODE_STREAM_CLOSE_TIMEOUT;
Comment on lines +527 to +538
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Retry path clobbers any user-set CLAUDE_CODE_STREAM_CLOSE_TIMEOUT instead of save/restore.

Outer queryClaudeSDK correctly snapshots prevStreamTimeout at line 680 and restores it at lines 700–705. The retry inside runSDKQueryLoop does neither: line 528 unconditionally writes '300000', and line 538 unconditionally deletes the var — so if a user (or test) had CLAUDE_CODE_STREAM_CLOSE_TIMEOUT set in the host env, that value is lost as a side effect of a stale-resume retry. Mirror the outer save/restore to keep behavior symmetric:

🛠️ Save/restore around the retry query construction
         delete sdkOptions.resume;
-        process.env.CLAUDE_CODE_STREAM_CLOSE_TIMEOUT = '300000';
-
-        let retryInstance;
-        try {
-          retryInstance = query({ prompt, options: sdkOptions });
-        } catch (hookError) {
-          console.warn('Retry query with hooks failed:', hookError?.message || hookError);
-          delete sdkOptions.hooks;
-          retryInstance = query({ prompt, options: sdkOptions });
-        }
-        delete process.env.CLAUDE_CODE_STREAM_CLOSE_TIMEOUT;
+        const prevRetryTimeout = process.env.CLAUDE_CODE_STREAM_CLOSE_TIMEOUT;
+        process.env.CLAUDE_CODE_STREAM_CLOSE_TIMEOUT = '300000';
+        let retryInstance;
+        try {
+          try {
+            retryInstance = query({ prompt, options: sdkOptions });
+          } catch (hookError) {
+            console.warn('Retry query with hooks failed:', hookError?.message || hookError);
+            delete sdkOptions.hooks;
+            retryInstance = query({ prompt, options: sdkOptions });
+          }
+        } finally {
+          if (prevRetryTimeout !== undefined) {
+            process.env.CLAUDE_CODE_STREAM_CLOSE_TIMEOUT = prevRetryTimeout;
+          } else {
+            delete process.env.CLAUDE_CODE_STREAM_CLOSE_TIMEOUT;
+          }
+        }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/claude-sdk.js` around lines 527 - 538, The retry path in
runSDKQueryLoop unconditionally sets and deletes
process.env.CLAUDE_CODE_STREAM_CLOSE_TIMEOUT, clobbering any pre-existing value;
fix by snapshotting the existing value (e.g., const prevStreamTimeout =
process.env.CLAUDE_CODE_STREAM_CLOSE_TIMEOUT) before setting it to '300000',
perform the retry construction (retryInstance = query({ prompt, options:
sdkOptions })) inside try/catch, and then restore the original value in a
finally-style step (if prevStreamTimeout is undefined delete the env var,
otherwise restore process.env.CLAUDE_CODE_STREAM_CLOSE_TIMEOUT =
prevStreamTimeout); apply this around both the initial try and the catch retry
so the env is always restored, referencing the existing retryInstance,
sdkOptions, and query(...) call and mirroring the snapshot/restore logic used by
queryClaudeSDK/prevStreamTimeout.


currentInstance = retryInstance;
hasRetried = true;
continue; // Retry the loop
}
throw loopError;
}
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

/**
* Executes a Claude query using the SDK
* @param {string} command - User prompt/command
Expand All @@ -478,7 +555,7 @@ async function loadMcpConfig(cwd) {
*/
async function queryClaudeSDK(command, options = {}, ws) {
const { sessionId, sessionSummary } = options;
let capturedSessionId = sessionId;
const sessionRef = { id: sessionId }; // Use object ref for mutation across retries
let sessionCreatedSent = false;
let tempImagePaths = [];
let tempDir = null;
Expand Down Expand Up @@ -514,13 +591,13 @@ async function queryClaudeSDK(command, options = {}, ws) {
const message = typeof input?.message === 'string' ? input.message : 'Claude requires your attention.';
emitNotification(createNotificationEvent({
provider: 'claude',
sessionId: capturedSessionId || sessionId || null,
sessionId: sessionRef.id || sessionId || null,
kind: 'action_required',
code: 'agent.notification',
meta: { message, sessionName: sessionSummary },
severity: 'warning',
requiresUserAction: true,
dedupeKey: `claude:hook:notification:${capturedSessionId || sessionId || 'none'}:${message}`
dedupeKey: `claude:hook:notification:${sessionRef.id || sessionId || 'none'}:${message}`
}));
return {};
}]
Expand Down Expand Up @@ -551,29 +628,29 @@ async function queryClaudeSDK(command, options = {}, ws) {
}

const requestId = createRequestId();
ws.send(createNormalizedMessage({ kind: 'permission_request', requestId, toolName, input, sessionId: capturedSessionId || sessionId || null, provider: 'claude' }));
ws.send(createNormalizedMessage({ kind: 'permission_request', requestId, toolName, input, sessionId: sessionRef.id || sessionId || null, provider: 'claude' }));
emitNotification(createNotificationEvent({
provider: 'claude',
sessionId: capturedSessionId || sessionId || null,
sessionId: sessionRef.id || sessionId || null,
kind: 'action_required',
code: 'permission.required',
meta: { toolName, sessionName: sessionSummary },
severity: 'warning',
requiresUserAction: true,
dedupeKey: `claude:permission:${capturedSessionId || sessionId || 'none'}:${requestId}`
dedupeKey: `claude:permission:${sessionRef.id || sessionId || 'none'}:${requestId}`
}));

const decision = await waitForToolApproval(requestId, {
timeoutMs: requiresInteraction ? 0 : undefined,
signal: context?.signal,
metadata: {
_sessionId: capturedSessionId || sessionId || null,
_sessionId: sessionRef.id || sessionId || null,
_toolName: toolName,
_input: input,
_receivedAt: new Date(),
},
onCancel: (reason) => {
ws.send(createNormalizedMessage({ kind: 'permission_cancelled', requestId, reason, sessionId: capturedSessionId || sessionId || null, provider: 'claude' }));
ws.send(createNormalizedMessage({ kind: 'permission_cancelled', requestId, reason, sessionId: sessionRef.id || sessionId || null, provider: 'claude' }));
}
});
if (!decision) {
Expand Down Expand Up @@ -627,75 +704,24 @@ async function queryClaudeSDK(command, options = {}, ws) {
delete process.env.CLAUDE_CODE_STREAM_CLOSE_TIMEOUT;
}

// Track the query instance for abort capability
if (capturedSessionId) {
addSession(capturedSessionId, queryInstance, tempImagePaths, tempDir, ws);
}

// Process streaming messages
console.log('Starting async generator loop for session:', capturedSessionId || 'NEW');
for await (const message of queryInstance) {
// Capture session ID from first message
if (message.session_id && !capturedSessionId) {

capturedSessionId = message.session_id;
addSession(capturedSessionId, queryInstance, tempImagePaths, tempDir, ws);

// Set session ID on writer
if (ws.setSessionId && typeof ws.setSessionId === 'function') {
ws.setSessionId(capturedSessionId);
}

// Send session-created event only once for new sessions
if (!sessionId && !sessionCreatedSent) {
sessionCreatedSent = true;
ws.send(createNormalizedMessage({ kind: 'session_created', newSessionId: capturedSessionId, sessionId: capturedSessionId, provider: 'claude' }));
}
} else {
// session_id already captured
}

// Transform and normalize message via adapter
const transformedMessage = transformMessage(message);
const sid = capturedSessionId || sessionId || null;

// Use adapter to normalize SDK events into NormalizedMessage[]
const normalized = sessionsService.normalizeMessage('claude', transformedMessage, sid);
for (const msg of normalized) {
// Preserve parentToolUseId from SDK wrapper for subagent tool grouping
if (transformedMessage.parentToolUseId && !msg.parentToolUseId) {
msg.parentToolUseId = transformedMessage.parentToolUseId;
}
ws.send(msg);
}

// Extract and send token budget updates from result messages
if (message.type === 'result') {
const models = Object.keys(message.modelUsage || {});
if (models.length > 0) {
// Model info available in result message
}
const tokenBudgetData = extractTokenBudget(message);
if (tokenBudgetData) {
ws.send(createNormalizedMessage({ kind: 'status', text: 'token_budget', tokenBudget: tokenBudgetData, sessionId: capturedSessionId || sessionId || null, provider: 'claude' }));
}
}
}
console.log('Starting async generator loop for session:', sessionRef.id || 'NEW');
await runSDKQueryLoop(queryInstance, sdkOptions, finalCommand, sessionRef, sessionId, sessionCreatedSent, tempImagePaths, tempDir, ws, sessionsService);

// Clean up session on completion
if (capturedSessionId) {
removeSession(capturedSessionId);
if (sessionRef.id) {
removeSession(sessionRef.id);
}

// Clean up temporary image files
await cleanupTempFiles(tempImagePaths, tempDir);

// Send completion event
ws.send(createNormalizedMessage({ kind: 'complete', exitCode: 0, isNewSession: !sessionId && !!command, sessionId: capturedSessionId, provider: 'claude' }));
ws.send(createNormalizedMessage({ kind: 'complete', exitCode: 0, isNewSession: !sessionId && !!command, sessionId: sessionRef.id || sessionId || null, provider: 'claude' }));
notifyRunStopped({
userId: ws?.userId || null,
provider: 'claude',
sessionId: capturedSessionId || sessionId || null,
sessionId: sessionRef.id || sessionId || null,
sessionName: sessionSummary,
stopReason: 'completed'
});
Expand All @@ -705,8 +731,8 @@ async function queryClaudeSDK(command, options = {}, ws) {
console.error('SDK query error:', error);

// Clean up session on error
if (capturedSessionId) {
removeSession(capturedSessionId);
if (sessionRef.id) {
removeSession(sessionRef.id);
}

// Clean up temporary image files on error
Expand All @@ -719,11 +745,11 @@ async function queryClaudeSDK(command, options = {}, ws) {
: error.message;

// Send error to WebSocket
ws.send(createNormalizedMessage({ kind: 'error', content: errorContent, sessionId: capturedSessionId || sessionId || null, provider: 'claude' }));
ws.send(createNormalizedMessage({ kind: 'error', content: errorContent, sessionId: sessionRef.id || sessionId || null, provider: 'claude' }));
notifyRunFailed({
userId: ws?.userId || null,
provider: 'claude',
sessionId: capturedSessionId || sessionId || null,
sessionId: sessionRef.id || sessionId || null,
sessionName: sessionSummary,
error
});
Expand Down
20 changes: 16 additions & 4 deletions server/openai-codex.js
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,27 @@ export async function queryCodex(command, options = {}, ws) {
model
};

// Start or resume thread
// Start or resume thread — with fallback to new thread if stale
let resumeFailed = false;
if (sessionId) {
thread = codex.resumeThread(sessionId, threadOptions);
try {
thread = codex.resumeThread(sessionId, threadOptions);
} catch (resumeError) {
const errorMsg = resumeError?.message || String(resumeError);
if (/thread/i.test(errorMsg) && /not found|no conversation/i.test(errorMsg)) {
console.log(`[WARN] Resume thread ${sessionId} not found, starting new thread`);
thread = codex.startThread(threadOptions);
resumeFailed = true;
} else {
throw resumeError;
}
}
} else {
thread = codex.startThread(threadOptions);
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// Get the thread ID
currentSessionId = thread.id || sessionId || `codex-${Date.now()}`;
// Get the thread ID — avoid falling back to stale sessionId after failed resume
currentSessionId = thread.id || (resumeFailed ? null : sessionId) || `codex-${Date.now()}`;

// Track the session
activeCodexSessions.set(currentSessionId, {
Expand Down