From 2509c9917daf32dc39a4a6b45d27a65820fbd0db Mon Sep 17 00:00:00 2001 From: ethan Date: Wed, 11 Feb 2026 14:14:54 +1100 Subject: [PATCH 01/15] Add STOP message to agent_report tool result --- src/node/services/tools/agent_report.test.ts | 5 ++++- src/node/services/tools/agent_report.ts | 9 +++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/node/services/tools/agent_report.test.ts b/src/node/services/tools/agent_report.test.ts index c561485b12..5047dcc7a2 100644 --- a/src/node/services/tools/agent_report.test.ts +++ b/src/node/services/tools/agent_report.test.ts @@ -50,6 +50,9 @@ describe("agent_report tool", () => { tool.execute!({ reportMarkdown: "done", title: "t" }, mockToolCallOptions) ); - expect(result).toEqual({ success: true }); + expect(result).toEqual({ + success: true, + message: "Report submitted successfully. STOP. Do not generate any further output.", + }); }); }); diff --git a/src/node/services/tools/agent_report.ts b/src/node/services/tools/agent_report.ts index b6ce4124da..323fd2c338 100644 --- a/src/node/services/tools/agent_report.ts +++ b/src/node/services/tools/agent_report.ts @@ -9,7 +9,7 @@ export const createAgentReportTool: ToolFactory = (config: ToolConfiguration) => return tool({ description: TOOL_DEFINITIONS.agent_report.description, inputSchema: TOOL_DEFINITIONS.agent_report.schema, - execute: (): { success: true } => { + execute: (): { success: true; message: string } => { const workspaceId = requireWorkspaceId(config, "agent_report"); const taskService = requireTaskService(config, "agent_report"); @@ -22,7 +22,12 @@ export const createAgentReportTool: ToolFactory = (config: ToolConfiguration) => // Intentionally no side-effects. The backend orchestrator consumes the tool-call args // via persisted history/partial state once the tool call completes successfully. - return { success: true }; + // The stream continues after this so the SDK can record usage. + // The message tells the LLM to stop generating further output. + return { + success: true, + message: "Report submitted successfully. STOP. Do not generate any further output.", + }; }, }); }; From 9a22a1dba8f5c3abb346a744f7f10453292dd90c Mon Sep 17 00:00:00 2001 From: ethan Date: Wed, 11 Feb 2026 14:17:25 +1100 Subject: [PATCH 02/15] fix: log stream-end history result failures --- src/node/services/streamManager.ts | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/node/services/streamManager.ts b/src/node/services/streamManager.ts index 8399c44145..d542396343 100644 --- a/src/node/services/streamManager.ts +++ b/src/node/services/streamManager.ts @@ -1837,10 +1837,23 @@ export class StreamManager extends EventEmitter { // CRITICAL: Delete partial.json before updating chat.jsonl // On successful completion, partial.json becomes stale and must be removed - await this.historyService.deletePartial(workspaceId as string); + const deleteResult = await this.historyService.deletePartial(workspaceId as string); + if (!deleteResult.success) { + workspaceLog.warn("Failed to delete partial on stream end", { + error: deleteResult.error, + }); + } // Update the placeholder message in chat.jsonl with final content - await this.historyService.updateHistory(workspaceId as string, finalAssistantMessage); + const updateResult = await this.historyService.updateHistory( + workspaceId as string, + finalAssistantMessage + ); + if (!updateResult.success) { + workspaceLog.warn("Failed to update history on stream end", { + error: updateResult.error, + }); + } // Update cumulative session usage (if service is available) // Wrapped in try-catch: usage recording is non-critical and shouldn't block stream completion From ed0283773696aa5659460e1b506a528340ca5820 Mon Sep 17 00:00:00 2001 From: ethan Date: Wed, 11 Feb 2026 14:25:34 +1100 Subject: [PATCH 03/15] fix task report finalization cleanup timing --- src/node/services/taskService.test.ts | 34 ++++++-- src/node/services/taskService.ts | 116 ++++++++++++-------------- 2 files changed, 82 insertions(+), 68 deletions(-) diff --git a/src/node/services/taskService.test.ts b/src/node/services/taskService.test.ts index 3269076db4..f999cc3768 100644 --- a/src/node/services/taskService.test.ts +++ b/src/node/services/taskService.test.ts @@ -1904,7 +1904,7 @@ describe("TaskService", () => { taskSettings: { maxParallelAgentTasks: 3, maxTaskNestingDepth: 3 }, }); - const { aiService, stopStream } = createAIServiceMocks(config); + const { aiService } = createAIServiceMocks(config); const { workspaceService, sendMessage, remove, emit } = createWorkspaceServiceMocks(); const { historyService, partialService, taskService } = createTaskServiceHarness(config, { aiService, @@ -1980,6 +1980,7 @@ describe("TaskService", () => { result: unknown; timestamp: number; }) => Promise; + handleStreamEnd: (event: StreamEndEvent) => Promise; }; await internal.handleAgentReport({ type: "tool-call-end", @@ -1991,10 +1992,9 @@ describe("TaskService", () => { timestamp: Date.now(), }); - expect(stopStream).toHaveBeenCalledWith( - childId, - expect.objectContaining({ abandonPartial: false }) - ); + // Simulate stream manager committing the final partial at natural stream end. + const commitChildPartial = await partialService.commitPartial(childId); + expect(commitChildPartial.success).toBe(true); const childMessages = await collectFullHistory(historyService, childId); // Ensure the in-flight assistant message (tool calls + agent_report) was committed to history @@ -2064,7 +2064,18 @@ describe("TaskService", () => { expect.objectContaining({ workspaceId: childId }) ); - expect(remove).toHaveBeenCalled(); + expect(remove).not.toHaveBeenCalled(); // Cleanup deferred to stream-end + + // Simulate stream ending naturally — cleanup runs now + await internal.handleStreamEnd({ + type: "stream-end", + workspaceId: childId, + messageId: "assistant-child-partial", + metadata: { model: "test-model" }, + parts: childPartial.parts as StreamEndEvent["parts"], + }); + + expect(remove).toHaveBeenCalled(); // NOW cleanup runs expect(sendMessage).toHaveBeenCalledWith( parentId, expect.stringContaining("sub-agent task(s) have completed"), @@ -2523,6 +2534,7 @@ describe("TaskService", () => { result: unknown; timestamp: number; }) => Promise; + handleStreamEnd: (event: StreamEndEvent) => Promise; }; await internal.handleAgentReport({ type: "tool-call-end", @@ -2564,6 +2576,16 @@ describe("TaskService", () => { expect(text).toContain(childId); } + expect(remove).not.toHaveBeenCalled(); // Cleanup deferred to stream-end + + await internal.handleStreamEnd({ + type: "stream-end", + workspaceId: childId, + messageId: "assistant-child-partial", + metadata: { model: "test-model" }, + parts: childPartial.parts as StreamEndEvent["parts"], + }); + expect(remove).toHaveBeenCalled(); expect(sendMessageMock).toHaveBeenCalledWith( parentId, diff --git a/src/node/services/taskService.ts b/src/node/services/taskService.ts index 5b6dcd4009..ec04dd699c 100644 --- a/src/node/services/taskService.ts +++ b/src/node/services/taskService.ts @@ -39,7 +39,7 @@ import type { RuntimeConfig } from "@/common/types/runtime"; import { AgentIdSchema } from "@/common/orpc/schemas"; import { GitPatchArtifactService } from "@/node/services/gitPatchArtifactService"; import type { ThinkingLevel } from "@/common/types/thinking"; -import type { ToolCallEndEvent, StreamEndEvent } from "@/common/types/stream"; +import type { ToolCallEndEvent, StreamAbortEvent, StreamEndEvent } from "@/common/types/stream"; import { isDynamicToolPart, type DynamicToolPart } from "@/common/types/toolParts"; import { AgentReportToolArgsSchema, @@ -139,26 +139,27 @@ interface CompletedAgentReportCacheEntry { ancestorWorkspaceIds: string[]; } -function isToolCallEndEvent(value: unknown): value is ToolCallEndEvent { +function isTypedWorkspaceEvent(value: unknown, type: string): boolean { return ( typeof value === "object" && value !== null && "type" in value && - (value as { type: unknown }).type === "tool-call-end" && + (value as { type: unknown }).type === type && "workspaceId" in value && typeof (value as { workspaceId: unknown }).workspaceId === "string" ); } +function isToolCallEndEvent(value: unknown): value is ToolCallEndEvent { + return isTypedWorkspaceEvent(value, "tool-call-end"); +} + function isStreamEndEvent(value: unknown): value is StreamEndEvent { - return ( - typeof value === "object" && - value !== null && - "type" in value && - (value as { type: unknown }).type === "stream-end" && - "workspaceId" in value && - typeof (value as { workspaceId: unknown }).workspaceId === "string" - ); + return isTypedWorkspaceEvent(value, "stream-end"); +} + +function isStreamAbortEvent(value: unknown): value is StreamAbortEvent { + return isTypedWorkspaceEvent(value, "stream-abort"); } function hasAncestorWorkspaceId( @@ -261,6 +262,18 @@ export class TaskService { log.error("TaskService.handleStreamEnd failed", { error }); }); }); + + this.aiService.on("stream-abort", (payload: unknown) => { + if (!isStreamAbortEvent(payload)) return; + + void this.workspaceEventLocks + .withLock(payload.workspaceId, async () => { + await this.handleStreamAbort(payload); + }) + .catch((error: unknown) => { + log.error("TaskService.handleStreamAbort failed", { error }); + }); + }); } // Prefer per-agent settings so tasks inherit the correct agent defaults; @@ -2057,7 +2070,10 @@ export class TaskService { } const status = entry.workspace.taskStatus; - if (status === "reported") return; + if (status === "reported") { + await this.cleanupReportedLeafTask(workspaceId); + return; + } // Never allow a task to finish/report while it still has active descendant tasks. // We'll auto-resume this task once the last descendant reports. @@ -2072,12 +2088,14 @@ export class TaskService { const reportArgs = this.findAgentReportArgsInParts(event.parts); if (reportArgs) { await this.finalizeAgentTaskReport(workspaceId, entry, reportArgs); + await this.cleanupReportedLeafTask(workspaceId); return; } // If a task stream ends without agent_report, request it once. if (status === "awaiting_report" && this.remindedAwaitingReport.has(workspaceId)) { await this.fallbackReportMissingAgentReport(entry); + await this.cleanupReportedLeafTask(workspaceId); return; } @@ -2099,6 +2117,20 @@ export class TaskService { ); } + /** + * Clean up reported tasks whose stream was aborted before ending naturally. + * Only acts on tasks already in "reported" state (handleAgentReport ran). + */ + private async handleStreamAbort(event: StreamAbortEvent): Promise { + const cfg = this.config.loadConfigOrDefault(); + const entry = findWorkspaceEntry(cfg, event.workspaceId); + if (!entry?.workspace.parentWorkspaceId) return; + + if (entry.workspace.taskStatus === "reported") { + await this.cleanupReportedLeafTask(event.workspaceId); + } + } + private async fallbackReportMissingAgentReport(entry: { projectPath: string; workspace: WorkspaceConfigEntry; @@ -2189,16 +2221,14 @@ export class TaskService { return; } - await this.finalizeAgentTaskReport(childWorkspaceId, childEntryBeforeReport, reportArgs, { - stopStream: true, - }); + await this.finalizeAgentTaskReport(childWorkspaceId, childEntryBeforeReport, reportArgs); + // Cleanup deferred to handleStreamEnd/handleStreamAbort — stream is still running. } private async finalizeAgentTaskReport( childWorkspaceId: string, childEntry: { projectPath: string; workspace: WorkspaceConfigEntry } | null | undefined, - reportArgs: { reportMarkdown: string; title?: string }, - options?: { stopStream?: boolean } + reportArgs: { reportMarkdown: string; title?: string } ): Promise { assert( childWorkspaceId.length > 0, @@ -2228,49 +2258,9 @@ export class TaskService { await this.emitWorkspaceMetadata(childWorkspaceId); - if (options?.stopStream) { - // `agent_report` is terminal. Stop the child stream immediately to prevent any further token - // usage and to ensure parallelism accounting never "frees" a slot while the stream is still - // active (Claude/Anthropic can emit tool calls before the final assistant block completes). - // - // Important: Do NOT abandon the partial assistant message here. The in-flight assistant block - // contains the tool calls (including the agent_report tool call) that should be archived into - // chat.jsonl for transcript viewing after workspace cleanup. - try { - const stopResult = await this.aiService.stopStream(childWorkspaceId, { - abandonPartial: false, - }); - if (!stopResult.success) { - log.debug("Failed to stop task stream after agent_report", { - workspaceId: childWorkspaceId, - error: stopResult.error, - }); - } - } catch (error: unknown) { - log.debug("Failed to stop task stream after agent_report (threw)", { - workspaceId: childWorkspaceId, - error, - }); - } - - // stopStream() forwards stream-abort asynchronously (after partial cleanup). Workspace cleanup - // may archive/delete session files immediately after this method returns, so commit the partial - // synchronously here (best-effort) to ensure tool calls are present in the archived transcript. - try { - const commitResult = await this.historyService.commitPartial(childWorkspaceId); - if (!commitResult.success) { - log.error("Failed to commit final partial to history after agent_report", { - workspaceId: childWorkspaceId, - error: commitResult.error, - }); - } - } catch (error: unknown) { - log.error("Failed to commit final partial to history after agent_report (threw)", { - workspaceId: childWorkspaceId, - error, - }); - } - } + // NOTE: Stream continues — we intentionally do NOT abort it. + // The agent_report tool result tells the LLM to stop. + // recordSessionUsage runs when the stream ends naturally. const cfgAfterReport = this.config.loadConfigOrDefault(); const latestChildEntry = findWorkspaceEntry(cfgAfterReport, childWorkspaceId) ?? childEntry; @@ -2356,8 +2346,10 @@ export class TaskService { // Free slot and start queued tasks. await this.maybeStartQueuedTasks(); - // Attempt cleanup of reported tasks (leaf-first). - await this.cleanupReportedLeafTask(childWorkspaceId); + // NOTE: Cleanup (cleanupReportedLeafTask) is NOT called here. + // Callers trigger cleanup after confirming the stream has terminated + // (handleStreamEnd/handleStreamAbort), ensuring usage is persisted first. + // The gitPatchArtifactService callback handles deferred cleanup independently. // Auto-resume any parent stream that was waiting on a task tool call (restart-safe). const postCfg = this.config.loadConfigOrDefault(); From f7482ef698f2c7cf0e71022a95eb2a0274d2fe87 Mon Sep 17 00:00:00 2001 From: ethan Date: Wed, 11 Feb 2026 16:31:16 +1100 Subject: [PATCH 04/15] refactor: two-phase completion for exec task lifecycle --- src/node/services/taskService.test.ts | 52 ++++++--- src/node/services/taskService.ts | 157 +++++++++++++++++--------- 2 files changed, 135 insertions(+), 74 deletions(-) diff --git a/src/node/services/taskService.test.ts b/src/node/services/taskService.test.ts index f999cc3768..e4264b982a 100644 --- a/src/node/services/taskService.test.ts +++ b/src/node/services/taskService.test.ts @@ -2137,7 +2137,8 @@ describe("TaskService", () => { taskSettings: { maxParallelAgentTasks: 3, maxTaskNestingDepth: 3 }, }); - const { aiService } = createAIServiceMocks(config); + const isStreaming = mock(() => true); + const { aiService } = createAIServiceMocks(config, { isStreaming }); const { workspaceService, remove } = createWorkspaceServiceMocks(); const { partialService, taskService } = createTaskServiceHarness(config, { aiService, @@ -2191,6 +2192,7 @@ describe("TaskService", () => { result: unknown; timestamp: number; }) => Promise; + handleStreamEnd: (event: StreamEndEvent) => Promise; }; const parentSessionDir = config.getSessionDir(parentId); @@ -2201,7 +2203,7 @@ describe("TaskService", () => { requestingWorkspaceId: parentId, }); - const handleReportPromise = internal.handleAgentReport({ + await internal.handleAgentReport({ type: "tool-call-end", workspaceId: childId, messageId: "assistant-child-partial", @@ -2214,14 +2216,20 @@ describe("TaskService", () => { const report = await waiter; expect(report).toEqual({ reportMarkdown: "Hello from child", title: "Result" }); - const artifactAfterWait = await readSubagentGitPatchArtifact(parentSessionDir, childId); - expect(artifactAfterWait).not.toBeNull(); - expect(["pending", "ready"]).toContain(artifactAfterWait!.status); + // Report handling is purely logical now; patch generation/cleanup start at stream termination. + expect(remove).not.toHaveBeenCalled(); + const artifactAfterReport = await readSubagentGitPatchArtifact(parentSessionDir, childId); + expect(artifactAfterReport).toBeNull(); - await handleReportPromise; + isStreaming.mockImplementation(() => false); - // Cleanup should be deferred until git-format-patch generation completes. - expect(remove).not.toHaveBeenCalled(); + await internal.handleStreamEnd({ + type: "stream-end", + workspaceId: childId, + messageId: "assistant-child-partial", + metadata: { model: "test-model" }, + parts: childPartial.parts as StreamEndEvent["parts"], + }); const start = Date.now(); let lastArtifact: unknown = null; @@ -2244,7 +2252,7 @@ describe("TaskService", () => { ); } - if (Date.now() - start > 10_000) { + if (Date.now() - start > 20_000) { throw new Error( `Timed out waiting for patch artifact generation (removeCalled=${remove.mock.calls.length > 0}, lastArtifact=${JSON.stringify(lastArtifact)})` ); @@ -2321,7 +2329,8 @@ describe("TaskService", () => { taskSettings: { maxParallelAgentTasks: 3, maxTaskNestingDepth: 3 }, }); - const { aiService } = createAIServiceMocks(config); + const isStreaming = mock(() => true); + const { aiService } = createAIServiceMocks(config, { isStreaming }); const { workspaceService, remove } = createWorkspaceServiceMocks(); const { partialService, taskService } = createTaskServiceHarness(config, { aiService, @@ -2375,6 +2384,7 @@ describe("TaskService", () => { result: unknown; timestamp: number; }) => Promise; + handleStreamEnd: (event: StreamEndEvent) => Promise; }; const parentSessionDir = config.getSessionDir(parentId); @@ -2385,7 +2395,7 @@ describe("TaskService", () => { requestingWorkspaceId: parentId, }); - const handleReportPromise = internal.handleAgentReport({ + await internal.handleAgentReport({ type: "tool-call-end", workspaceId: childId, messageId: "assistant-child-partial", @@ -2398,14 +2408,20 @@ describe("TaskService", () => { const report = await waiter; expect(report).toEqual({ reportMarkdown: "Hello from child", title: "Result" }); - const artifactAfterWait = await readSubagentGitPatchArtifact(parentSessionDir, childId); - expect(artifactAfterWait).not.toBeNull(); - expect(["pending", "ready"]).toContain(artifactAfterWait!.status); + // Report handling is purely logical now; patch generation/cleanup start at stream termination. + expect(remove).not.toHaveBeenCalled(); + const artifactAfterReport = await readSubagentGitPatchArtifact(parentSessionDir, childId); + expect(artifactAfterReport).toBeNull(); - await handleReportPromise; + isStreaming.mockImplementation(() => false); - // Cleanup should be deferred until git-format-patch generation completes. - expect(remove).not.toHaveBeenCalled(); + await internal.handleStreamEnd({ + type: "stream-end", + workspaceId: childId, + messageId: "assistant-child-partial", + metadata: { model: "test-model" }, + parts: childPartial.parts as StreamEndEvent["parts"], + }); const start = Date.now(); let lastArtifact: unknown = null; @@ -2428,7 +2444,7 @@ describe("TaskService", () => { ); } - if (Date.now() - start > 10_000) { + if (Date.now() - start > 20_000) { throw new Error( `Timed out waiting for patch artifact generation (removeCalled=${remove.mock.calls.length > 0}, lastArtifact=${JSON.stringify(lastArtifact)})` ); diff --git a/src/node/services/taskService.ts b/src/node/services/taskService.ts index ec04dd699c..205e62a352 100644 --- a/src/node/services/taskService.ts +++ b/src/node/services/taskService.ts @@ -414,7 +414,7 @@ export class TaskService { await this.gitPatchArtifactService.maybeStartGeneration( task.parentWorkspaceId, task.id!, - (wsId) => this.cleanupReportedLeafTask(wsId) + (wsId) => this.requestReportedTaskCleanupRecheck(wsId) ); } catch (error: unknown) { log.error("Failed to resume subagent git patch generation on startup", { @@ -2071,7 +2071,7 @@ export class TaskService { const status = entry.workspace.taskStatus; if (status === "reported") { - await this.cleanupReportedLeafTask(workspaceId); + await this.finalizeTerminationPhaseForReportedTask(workspaceId); return; } @@ -2088,14 +2088,14 @@ export class TaskService { const reportArgs = this.findAgentReportArgsInParts(event.parts); if (reportArgs) { await this.finalizeAgentTaskReport(workspaceId, entry, reportArgs); - await this.cleanupReportedLeafTask(workspaceId); + await this.finalizeTerminationPhaseForReportedTask(workspaceId); return; } // If a task stream ends without agent_report, request it once. if (status === "awaiting_report" && this.remindedAwaitingReport.has(workspaceId)) { await this.fallbackReportMissingAgentReport(entry); - await this.cleanupReportedLeafTask(workspaceId); + await this.finalizeTerminationPhaseForReportedTask(workspaceId); return; } @@ -2127,10 +2127,58 @@ export class TaskService { if (!entry?.workspace.parentWorkspaceId) return; if (entry.workspace.taskStatus === "reported") { - await this.cleanupReportedLeafTask(event.workspaceId); + await this.finalizeTerminationPhaseForReportedTask(event.workspaceId); + } + } + + private async finalizeTerminationPhaseForReportedTask(workspaceId: string): Promise { + assert( + workspaceId.length > 0, + "finalizeTerminationPhaseForReportedTask: workspaceId must be non-empty" + ); + + await this.maybeStartPatchGenerationForReportedTask(workspaceId); + await this.cleanupReportedLeafTask(workspaceId); + } + + private async maybeStartPatchGenerationForReportedTask(workspaceId: string): Promise { + assert( + workspaceId.length > 0, + "maybeStartPatchGenerationForReportedTask: workspaceId must be non-empty" + ); + + const cfg = this.config.loadConfigOrDefault(); + const parentWorkspaceId = findWorkspaceEntry(cfg, workspaceId)?.workspace.parentWorkspaceId; + if (!parentWorkspaceId) { + return; + } + + try { + await this.gitPatchArtifactService.maybeStartGeneration( + parentWorkspaceId, + workspaceId, + (wsId) => this.requestReportedTaskCleanupRecheck(wsId) + ); + } catch (error: unknown) { + log.error("Failed to start subagent git patch generation", { + parentWorkspaceId, + childWorkspaceId: workspaceId, + error, + }); } } + private requestReportedTaskCleanupRecheck(workspaceId: string): Promise { + assert( + workspaceId.length > 0, + "requestReportedTaskCleanupRecheck: workspaceId must be non-empty" + ); + + return this.workspaceEventLocks.withLock(workspaceId, async () => { + await this.cleanupReportedLeafTask(workspaceId); + }); + } + private async fallbackReportMissingAgentReport(entry: { projectPath: string; workspace: WorkspaceConfigEntry; @@ -2319,38 +2367,12 @@ export class TaskService { reportArgs ); - // Begin git-format-patch generation (best-effort). - // - // This must run before cleanup so the reported task workspace isn't deleted while we're still - // reading commits from it. - // - // It must also run before resolving waiters so an immediate `task_await` result after - // `agent_report` can include at least a pending artifact record. - try { - await this.gitPatchArtifactService.maybeStartGeneration( - parentWorkspaceId, - childWorkspaceId, - (wsId) => this.cleanupReportedLeafTask(wsId) - ); - } catch (error: unknown) { - log.error("Failed to start subagent git patch generation", { - parentWorkspaceId, - childWorkspaceId, - error, - }); - } - // Resolve foreground waiters. this.resolveWaiters(childWorkspaceId, reportArgs); // Free slot and start queued tasks. await this.maybeStartQueuedTasks(); - // NOTE: Cleanup (cleanupReportedLeafTask) is NOT called here. - // Callers trigger cleanup after confirming the stream has terminated - // (handleStreamEnd/handleStreamAbort), ensuring usage is persisted first. - // The gitPatchArtifactService callback handles deferred cleanup independently. - // Auto-resume any parent stream that was waiting on a task tool call (restart-safe). const postCfg = this.config.loadConfigOrDefault(); if (!findWorkspaceEntry(postCfg, parentWorkspaceId)) { @@ -2643,6 +2665,51 @@ export class TaskService { return true; } + private async canCleanupReportedTask( + workspaceId: string + ): Promise<{ ok: true; parentWorkspaceId: string } | { ok: false; reason: string }> { + assert(workspaceId.length > 0, "canCleanupReportedTask: workspaceId must be non-empty"); + + const config = this.config.loadConfigOrDefault(); + const entry = findWorkspaceEntry(config, workspaceId); + if (!entry) { + return { ok: false, reason: "workspace_not_found" }; + } + + const parentWorkspaceId = entry.workspace.parentWorkspaceId; + if (!parentWorkspaceId) { + return { ok: false, reason: "missing_parent_workspace" }; + } + + if (entry.workspace.taskStatus !== "reported") { + return { ok: false, reason: "task_not_reported" }; + } + + if (this.aiService.isStreaming(workspaceId)) { + log.debug("cleanupReportedLeafTask: deferring auto-delete; stream still active", { + workspaceId, + parentWorkspaceId, + }); + return { ok: false, reason: "still_streaming" }; + } + + if (this.hasActiveDescendantAgentTasks(config, workspaceId)) { + return { ok: false, reason: "has_active_descendants" }; + } + + const parentSessionDir = this.config.getSessionDir(parentWorkspaceId); + const patchArtifact = await readSubagentGitPatchArtifact(parentSessionDir, workspaceId); + if (patchArtifact?.status === "pending") { + log.debug("cleanupReportedLeafTask: deferring auto-delete; patch artifact pending", { + workspaceId, + parentWorkspaceId, + }); + return { ok: false, reason: "patch_pending" }; + } + + return { ok: true, parentWorkspaceId }; + } + private async cleanupReportedLeafTask(workspaceId: string): Promise { assert(workspaceId.length > 0, "cleanupReportedLeafTask: workspaceId must be non-empty"); @@ -2657,33 +2724,11 @@ export class TaskService { } visited.add(currentWorkspaceId); - const config = this.config.loadConfigOrDefault(); - const entry = findWorkspaceEntry(config, currentWorkspaceId); - if (!entry) return; - - const ws = entry.workspace; - const parentWorkspaceId = ws.parentWorkspaceId; - if (!parentWorkspaceId) return; - if (ws.taskStatus !== "reported") return; - - const hasChildren = this.listAgentTaskWorkspaces(config).some( - (t) => t.parentWorkspaceId === currentWorkspaceId - ); - const parentSessionDir = this.config.getSessionDir(parentWorkspaceId); - const patchArtifact = await readSubagentGitPatchArtifact( - parentSessionDir, - currentWorkspaceId - ); - if (patchArtifact?.status === "pending") { - log.debug("cleanupReportedLeafTask: deferring auto-delete; patch artifact pending", { - workspaceId: currentWorkspaceId, - parentWorkspaceId, - }); + const cleanupEligibility = await this.canCleanupReportedTask(currentWorkspaceId); + if (!cleanupEligibility.ok) { return; } - if (hasChildren) return; - const removeResult = await this.workspaceService.remove(currentWorkspaceId, true); if (!removeResult.success) { log.error("Failed to auto-delete reported task workspace", { @@ -2693,7 +2738,7 @@ export class TaskService { return; } - currentWorkspaceId = parentWorkspaceId; + currentWorkspaceId = cleanupEligibility.parentWorkspaceId; } log.error("cleanupReportedLeafTask: exceeded max parent traversal depth", { From b9b70da9817e4fde395b8560d47f50d9367c05b0 Mon Sep 17 00:00:00 2001 From: ethan Date: Wed, 11 Feb 2026 17:08:57 +1100 Subject: [PATCH 05/15] fix: enforce structural-leaf ordering for reported task cleanup --- src/node/services/taskService.ts | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/src/node/services/taskService.ts b/src/node/services/taskService.ts index 205e62a352..6a0954b99b 100644 --- a/src/node/services/taskService.ts +++ b/src/node/services/taskService.ts @@ -1569,6 +1569,16 @@ export class TaskService { return false; } + /** + * Topology predicate: does this workspace still have child agent-task nodes in config? + * Unlike hasActiveDescendantAgentTasks (which checks runtime activity for scheduling), + * this checks structural tree shape — any child node blocks parent deletion regardless + * of its status. + */ + private hasChildAgentTasks(index: AgentTaskIndex, workspaceId: string): boolean { + return (index.childrenByParent.get(workspaceId)?.length ?? 0) > 0; + } + private getTaskDepth( config: ReturnType, workspaceId: string @@ -2693,8 +2703,12 @@ export class TaskService { return { ok: false, reason: "still_streaming" }; } - if (this.hasActiveDescendantAgentTasks(config, workspaceId)) { - return { ok: false, reason: "has_active_descendants" }; + // Topology gate: a reported task can only be cleaned up when it is a structural leaf + // (has no child agent tasks in config). This is status-agnostic — even reported children + // block parent deletion, ensuring artifact rollup always targets an existing parent path. + const index = this.buildAgentTaskIndex(config); + if (this.hasChildAgentTasks(index, workspaceId)) { + return { ok: false, reason: "has_child_tasks" }; } const parentSessionDir = this.config.getSessionDir(parentWorkspaceId); @@ -2713,6 +2727,9 @@ export class TaskService { private async cleanupReportedLeafTask(workspaceId: string): Promise { assert(workspaceId.length > 0, "cleanupReportedLeafTask: workspaceId must be non-empty"); + // Lineage reduction: each iteration removes exactly one leaf node, then re-evaluates + // the parent on fresh config. The structural-leaf gate in canCleanupReportedTask ensures + // parents are only removed after all children are gone. let currentWorkspaceId = workspaceId; const visited = new Set(); for (let depth = 0; depth < 32; depth++) { From fac705336fa34d0c2c71c71a7672d608f12d0237 Mon Sep 17 00:00:00 2001 From: ethan Date: Wed, 11 Feb 2026 17:09:02 +1100 Subject: [PATCH 06/15] test: verify structural-leaf cleanup ordering for reported tasks --- src/node/services/taskService.test.ts | 158 ++++++++++++++++++++++++++ 1 file changed, 158 insertions(+) diff --git a/src/node/services/taskService.test.ts b/src/node/services/taskService.test.ts index e4264b982a..3e7ff6db52 100644 --- a/src/node/services/taskService.test.ts +++ b/src/node/services/taskService.test.ts @@ -2949,6 +2949,164 @@ describe("TaskService", () => { expect(childEntry?.runtimeConfig?.type).toBe("worktree"); }, 20_000); + test("sibling reported task blocks parent deletion during cleanup", async () => { + const config = await createTestConfig(rootDir); + + const projectPath = path.join(rootDir, "repo"); + const rootWorkspaceId = "root-111"; + const parentTaskId = "parent-222"; + const childTaskAId = "child-a-333"; + const childTaskBId = "child-b-444"; + + await config.saveConfig({ + projects: new Map([ + [ + projectPath, + { + workspaces: [ + { path: path.join(projectPath, "root"), id: rootWorkspaceId, name: "root" }, + { + path: path.join(projectPath, "parent-task"), + id: parentTaskId, + name: "agent_exec_parent", + parentWorkspaceId: rootWorkspaceId, + agentType: "exec", + taskStatus: "reported", + }, + { + path: path.join(projectPath, "child-task-a"), + id: childTaskAId, + name: "agent_explore_child_a", + parentWorkspaceId: parentTaskId, + agentType: "explore", + taskStatus: "reported", + }, + { + path: path.join(projectPath, "child-task-b"), + id: childTaskBId, + name: "agent_explore_child_b", + parentWorkspaceId: parentTaskId, + agentType: "explore", + taskStatus: "reported", + }, + ], + }, + ], + ]), + taskSettings: { maxParallelAgentTasks: 3, maxTaskNestingDepth: 3 }, + }); + + const isStreaming = mock(() => false); + const { aiService } = createAIServiceMocks(config, { isStreaming }); + const remove = mock(async (workspaceId: string): Promise> => { + await config.removeWorkspace(workspaceId); + return Ok(undefined); + }); + const { workspaceService } = createWorkspaceServiceMocks({ remove }); + const { taskService } = createTaskServiceHarness(config, { aiService, workspaceService }); + + const internal = taskService as unknown as { + handleStreamEnd: (event: StreamEndEvent) => Promise; + }; + + await internal.handleStreamEnd({ + type: "stream-end", + workspaceId: childTaskAId, + messageId: "assistant-child-a", + metadata: { model: "openai:gpt-4o-mini" }, + parts: [], + }); + + expect(remove).toHaveBeenCalledWith(childTaskAId, true); + + const removedWorkspaceIds = ( + remove as unknown as { mock: { calls: Array<[string, boolean]> } } + ).mock.calls.map((call) => call[0]); + expect(removedWorkspaceIds).not.toContain(parentTaskId); + }); + + test("parent deletes only after all sibling children are cleaned up", async () => { + const config = await createTestConfig(rootDir); + + const projectPath = path.join(rootDir, "repo"); + const rootWorkspaceId = "root-111"; + const parentTaskId = "parent-222"; + const childTaskAId = "child-a-333"; + const childTaskBId = "child-b-444"; + + await config.saveConfig({ + projects: new Map([ + [ + projectPath, + { + workspaces: [ + { path: path.join(projectPath, "root"), id: rootWorkspaceId, name: "root" }, + { + path: path.join(projectPath, "parent-task"), + id: parentTaskId, + name: "agent_exec_parent", + parentWorkspaceId: rootWorkspaceId, + agentType: "exec", + taskStatus: "reported", + }, + { + path: path.join(projectPath, "child-task-a"), + id: childTaskAId, + name: "agent_explore_child_a", + parentWorkspaceId: parentTaskId, + agentType: "explore", + taskStatus: "reported", + }, + { + path: path.join(projectPath, "child-task-b"), + id: childTaskBId, + name: "agent_explore_child_b", + parentWorkspaceId: parentTaskId, + agentType: "explore", + taskStatus: "reported", + }, + ], + }, + ], + ]), + taskSettings: { maxParallelAgentTasks: 3, maxTaskNestingDepth: 3 }, + }); + + const isStreaming = mock(() => false); + const { aiService } = createAIServiceMocks(config, { isStreaming }); + const remove = mock(async (workspaceId: string): Promise> => { + await config.removeWorkspace(workspaceId); + return Ok(undefined); + }); + const { workspaceService } = createWorkspaceServiceMocks({ remove }); + const { taskService } = createTaskServiceHarness(config, { aiService, workspaceService }); + + const internal = taskService as unknown as { + handleStreamEnd: (event: StreamEndEvent) => Promise; + }; + + await internal.handleStreamEnd({ + type: "stream-end", + workspaceId: childTaskAId, + messageId: "assistant-child-a", + metadata: { model: "openai:gpt-4o-mini" }, + parts: [], + }); + + await internal.handleStreamEnd({ + type: "stream-end", + workspaceId: childTaskBId, + messageId: "assistant-child-b", + metadata: { model: "openai:gpt-4o-mini" }, + parts: [], + }); + + expect(remove).toHaveBeenCalledTimes(3); + expect(remove).toHaveBeenNthCalledWith(1, childTaskAId, true); + expect(remove).toHaveBeenNthCalledWith(2, childTaskBId, true); + expect(remove).toHaveBeenNthCalledWith(3, parentTaskId, true); + }); + describe("parent auto-resume flood protection", () => { async function setupParentWithActiveChild(rootDirPath: string) { const config = await createTestConfig(rootDirPath); From e014cf704cda9afe13c0798ca332060d54ce2377 Mon Sep 17 00:00:00 2001 From: ethan Date: Wed, 11 Feb 2026 17:36:51 +1100 Subject: [PATCH 07/15] fix: deterministically stop streams after agent_report via stopWhen --- src/node/services/streamManager.test.ts | 45 +++++++++++++++++++++++-- src/node/services/streamManager.ts | 12 +++++-- src/node/services/taskService.ts | 5 +-- 3 files changed, 55 insertions(+), 7 deletions(-) diff --git a/src/node/services/streamManager.test.ts b/src/node/services/streamManager.test.ts index 19fc0a6655..4cef708b14 100644 --- a/src/node/services/streamManager.test.ts +++ b/src/node/services/streamManager.test.ts @@ -107,15 +107,56 @@ describe("StreamManager - stopWhen configuration", () => { if (!Array.isArray(stopWhen)) { throw new Error("Expected autonomous stopWhen to be an array of conditions"); } - expect(stopWhen).toHaveLength(2); + expect(stopWhen).toHaveLength(3); - const [maxStepCondition, queuedMessageCondition] = stopWhen; + const [maxStepCondition, queuedMessageCondition, agentReportCondition] = stopWhen; expect(maxStepCondition({ steps: new Array(99999) })).toBe(false); expect(maxStepCondition({ steps: new Array(100000) })).toBe(true); expect(queuedMessageCondition({ steps: [] })).toBe(false); queued = true; expect(queuedMessageCondition({ steps: [] })).toBe(true); + + expect( + agentReportCondition({ + steps: [{ toolCalls: [{ toolName: "agent_report" }] }], + }) + ).toBe(true); + }); + + test("stops after agent_report tool call in autonomous mode", () => { + const streamManager = new StreamManager(historyService); + const buildStopWhen = Reflect.get(streamManager, "createStopWhenCondition") as + | BuildStopWhenCondition + | undefined; + expect(typeof buildStopWhen).toBe("function"); + + const stopWhen = buildStopWhen!({ hasQueuedMessage: () => false }); + if (!Array.isArray(stopWhen)) { + throw new Error("Expected autonomous stopWhen to be an array of conditions"); + } + + const [, , reportStop] = stopWhen; + if (!reportStop) { + throw new Error("Expected autonomous stopWhen to include agent_report condition"); + } + + // Returns true when step contains agent_report tool call + expect( + reportStop({ + steps: [{ toolCalls: [{ toolName: "agent_report" }] }], + }) + ).toBe(true); + + // Returns false when step contains other tool calls + expect( + reportStop({ + steps: [{ toolCalls: [{ toolName: "bash" }] }], + }) + ).toBe(false); + + // Returns false when no steps + expect(reportStop({ steps: [] })).toBe(false); }); test("treats missing queued-message callback as not queued", () => { diff --git a/src/node/services/streamManager.ts b/src/node/services/streamManager.ts index d542396343..a33a3ab06a 100644 --- a/src/node/services/streamManager.ts +++ b/src/node/services/streamManager.ts @@ -4,6 +4,7 @@ import { PlatformPaths } from "@/common/utils/paths"; import { streamText, stepCountIs, + hasToolCall, type ModelMessage, type LanguageModel, type Tool, @@ -1019,9 +1020,14 @@ export class StreamManager extends EventEmitter { return stepCountIs(1); } - // Allow effectively unlimited autonomous steps while still yielding quickly - // when a queued user message should interrupt at the next step boundary. - return [stepCountIs(100000), () => request.hasQueuedMessage?.() ?? false]; + // For autonomous loops: cap steps, check for queued messages, and stop after + // agent_report so the stream ends naturally (preserving usage accounting) + // without allowing post-report tool execution. + return [ + stepCountIs(100000), + () => request.hasQueuedMessage?.() ?? false, + hasToolCall("agent_report"), + ]; } private createStreamResult( diff --git a/src/node/services/taskService.ts b/src/node/services/taskService.ts index 6a0954b99b..c90c607bf1 100644 --- a/src/node/services/taskService.ts +++ b/src/node/services/taskService.ts @@ -2317,8 +2317,9 @@ export class TaskService { await this.emitWorkspaceMetadata(childWorkspaceId); // NOTE: Stream continues — we intentionally do NOT abort it. - // The agent_report tool result tells the LLM to stop. - // recordSessionUsage runs when the stream ends naturally. + // Deterministic termination is enforced by stopWhen(hasToolCall("agent_report")) + // in StreamManager, which ends the stream at the step boundary (preserving usage + // accounting). recordSessionUsage runs when the stream ends naturally. const cfgAfterReport = this.config.loadConfigOrDefault(); const latestChildEntry = findWorkspaceEntry(cfgAfterReport, childWorkspaceId) ?? childEntry; From 683e05bf750cb88e2c5a07d8cea11f3971f13d88 Mon Sep 17 00:00:00 2001 From: ethan Date: Wed, 11 Feb 2026 17:42:22 +1100 Subject: [PATCH 08/15] chore: neutralize agent_report success message --- src/node/services/tools/agent_report.test.ts | 2 +- src/node/services/tools/agent_report.ts | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/node/services/tools/agent_report.test.ts b/src/node/services/tools/agent_report.test.ts index 5047dcc7a2..7b4d98388b 100644 --- a/src/node/services/tools/agent_report.test.ts +++ b/src/node/services/tools/agent_report.test.ts @@ -52,7 +52,7 @@ describe("agent_report tool", () => { expect(result).toEqual({ success: true, - message: "Report submitted successfully. STOP. Do not generate any further output.", + message: "Report submitted successfully.", }); }); }); diff --git a/src/node/services/tools/agent_report.ts b/src/node/services/tools/agent_report.ts index 323fd2c338..9253fc0f5f 100644 --- a/src/node/services/tools/agent_report.ts +++ b/src/node/services/tools/agent_report.ts @@ -22,11 +22,11 @@ export const createAgentReportTool: ToolFactory = (config: ToolConfiguration) => // Intentionally no side-effects. The backend orchestrator consumes the tool-call args // via persisted history/partial state once the tool call completes successfully. - // The stream continues after this so the SDK can record usage. - // The message tells the LLM to stop generating further output. + // The stream continues after this so the SDK can record usage, while StreamManager + // stopWhen(hasToolCall("agent_report")) deterministically ends the stream boundary. return { success: true, - message: "Report submitted successfully. STOP. Do not generate any further output.", + message: "Report submitted successfully.", }; }, }); From f11cb431078f056b62aa57a5f63e96b49bf4e768 Mon Sep 17 00:00:00 2001 From: ethan Date: Wed, 11 Feb 2026 18:36:45 +1100 Subject: [PATCH 09/15] refactor: make stream-end the single source of truth for agent_report lifecycle Move all report finalization from tool-call-end to handleStreamEnd, solving two bugs: - P1: patch artifact now created before waiters resolve - P2: task not marked reported until stream completes (no intra-step race) Remove handleAgentReport, readLatestAgentReportArgs, findAgentReportArgsInMessage. --- src/node/services/taskService.test.ts | 184 +++++--------------------- src/node/services/taskService.ts | 101 ++------------ 2 files changed, 43 insertions(+), 242 deletions(-) diff --git a/src/node/services/taskService.test.ts b/src/node/services/taskService.test.ts index 3e7ff6db52..e53d415124 100644 --- a/src/node/services/taskService.test.ts +++ b/src/node/services/taskService.test.ts @@ -1971,59 +1971,20 @@ describe("TaskService", () => { expect(writeChildPartial.success).toBe(true); const internal = taskService as unknown as { - handleAgentReport: (event: { - type: "tool-call-end"; - workspaceId: string; - messageId: string; - toolCallId: string; - toolName: string; - result: unknown; - timestamp: number; - }) => Promise; handleStreamEnd: (event: StreamEndEvent) => Promise; }; - await internal.handleAgentReport({ - type: "tool-call-end", - workspaceId: childId, - messageId: "assistant-child-partial", - toolCallId: "agent-report-call-1", - toolName: "agent_report", - result: { success: true }, - timestamp: Date.now(), - }); - // Simulate stream manager committing the final partial at natural stream end. + // Simulate stream manager committing the final partial right before natural stream end. const commitChildPartial = await partialService.commitPartial(childId); expect(commitChildPartial.success).toBe(true); - const childMessages = await collectFullHistory(historyService, childId); - // Ensure the in-flight assistant message (tool calls + agent_report) was committed to history - // before workspace cleanup archives the transcript. - expect(childMessages.length).toBeGreaterThan(1); - - const assistantMsg = childMessages.find((m) => m.id === "assistant-child-partial") ?? null; - expect(assistantMsg).not.toBeNull(); - if (assistantMsg) { - const toolPart = assistantMsg.parts.find( - (p) => - p && - typeof p === "object" && - "type" in p && - (p as { type?: unknown }).type === "dynamic-tool" && - "toolName" in p && - (p as { toolName?: unknown }).toolName === "agent_report" - ) as unknown as - | { - toolName: string; - state: string; - input?: unknown; - } - | undefined; - - expect(toolPart?.toolName).toBe("agent_report"); - expect(toolPart?.state).toBe("output-available"); - expect(JSON.stringify(toolPart?.input)).toContain("Hello from child"); - } + await internal.handleStreamEnd({ + type: "stream-end", + workspaceId: childId, + messageId: "assistant-child-partial", + metadata: { model: "test-model" }, + parts: childPartial.parts as StreamEndEvent["parts"], + }); const updatedChildPartial = await partialService.readPartial(childId); expect(updatedChildPartial).toBeNull(); @@ -2064,18 +2025,7 @@ describe("TaskService", () => { expect.objectContaining({ workspaceId: childId }) ); - expect(remove).not.toHaveBeenCalled(); // Cleanup deferred to stream-end - - // Simulate stream ending naturally — cleanup runs now - await internal.handleStreamEnd({ - type: "stream-end", - workspaceId: childId, - messageId: "assistant-child-partial", - metadata: { model: "test-model" }, - parts: childPartial.parts as StreamEndEvent["parts"], - }); - - expect(remove).toHaveBeenCalled(); // NOW cleanup runs + expect(remove).toHaveBeenCalled(); expect(sendMessage).toHaveBeenCalledWith( parentId, expect.stringContaining("sub-agent task(s) have completed"), @@ -2137,8 +2087,7 @@ describe("TaskService", () => { taskSettings: { maxParallelAgentTasks: 3, maxTaskNestingDepth: 3 }, }); - const isStreaming = mock(() => true); - const { aiService } = createAIServiceMocks(config, { isStreaming }); + const { aiService } = createAIServiceMocks(config); const { workspaceService, remove } = createWorkspaceServiceMocks(); const { partialService, taskService } = createTaskServiceHarness(config, { aiService, @@ -2183,15 +2132,6 @@ describe("TaskService", () => { expect(writeChildPartial.success).toBe(true); const internal = taskService as unknown as { - handleAgentReport: (event: { - type: "tool-call-end"; - workspaceId: string; - messageId: string; - toolCallId: string; - toolName: string; - result: unknown; - timestamp: number; - }) => Promise; handleStreamEnd: (event: StreamEndEvent) => Promise; }; @@ -2203,26 +2143,6 @@ describe("TaskService", () => { requestingWorkspaceId: parentId, }); - await internal.handleAgentReport({ - type: "tool-call-end", - workspaceId: childId, - messageId: "assistant-child-partial", - toolCallId: "agent-report-call-1", - toolName: "agent_report", - result: { success: true }, - timestamp: Date.now(), - }); - - const report = await waiter; - expect(report).toEqual({ reportMarkdown: "Hello from child", title: "Result" }); - - // Report handling is purely logical now; patch generation/cleanup start at stream termination. - expect(remove).not.toHaveBeenCalled(); - const artifactAfterReport = await readSubagentGitPatchArtifact(parentSessionDir, childId); - expect(artifactAfterReport).toBeNull(); - - isStreaming.mockImplementation(() => false); - await internal.handleStreamEnd({ type: "stream-end", workspaceId: childId, @@ -2231,6 +2151,14 @@ describe("TaskService", () => { parts: childPartial.parts as StreamEndEvent["parts"], }); + const report = await waiter; + expect(report).toEqual({ reportMarkdown: "Hello from child", title: "Result" }); + + const artifactAfterStreamEnd = await readSubagentGitPatchArtifact(parentSessionDir, childId); + expect( + artifactAfterStreamEnd?.status === "pending" || artifactAfterStreamEnd?.status === "ready" + ).toBe(true); + const start = Date.now(); let lastArtifact: unknown = null; while (true) { @@ -2329,8 +2257,7 @@ describe("TaskService", () => { taskSettings: { maxParallelAgentTasks: 3, maxTaskNestingDepth: 3 }, }); - const isStreaming = mock(() => true); - const { aiService } = createAIServiceMocks(config, { isStreaming }); + const { aiService } = createAIServiceMocks(config); const { workspaceService, remove } = createWorkspaceServiceMocks(); const { partialService, taskService } = createTaskServiceHarness(config, { aiService, @@ -2375,15 +2302,6 @@ describe("TaskService", () => { expect(writeChildPartial.success).toBe(true); const internal = taskService as unknown as { - handleAgentReport: (event: { - type: "tool-call-end"; - workspaceId: string; - messageId: string; - toolCallId: string; - toolName: string; - result: unknown; - timestamp: number; - }) => Promise; handleStreamEnd: (event: StreamEndEvent) => Promise; }; @@ -2395,26 +2313,6 @@ describe("TaskService", () => { requestingWorkspaceId: parentId, }); - await internal.handleAgentReport({ - type: "tool-call-end", - workspaceId: childId, - messageId: "assistant-child-partial", - toolCallId: "agent-report-call-1", - toolName: "agent_report", - result: { success: true }, - timestamp: Date.now(), - }); - - const report = await waiter; - expect(report).toEqual({ reportMarkdown: "Hello from child", title: "Result" }); - - // Report handling is purely logical now; patch generation/cleanup start at stream termination. - expect(remove).not.toHaveBeenCalled(); - const artifactAfterReport = await readSubagentGitPatchArtifact(parentSessionDir, childId); - expect(artifactAfterReport).toBeNull(); - - isStreaming.mockImplementation(() => false); - await internal.handleStreamEnd({ type: "stream-end", workspaceId: childId, @@ -2423,6 +2321,14 @@ describe("TaskService", () => { parts: childPartial.parts as StreamEndEvent["parts"], }); + const report = await waiter; + expect(report).toEqual({ reportMarkdown: "Hello from child", title: "Result" }); + + const artifactAfterStreamEnd = await readSubagentGitPatchArtifact(parentSessionDir, childId); + expect( + artifactAfterStreamEnd?.status === "pending" || artifactAfterStreamEnd?.status === "ready" + ).toBe(true); + const start = Date.now(); let lastArtifact: unknown = null; while (true) { @@ -2541,25 +2447,15 @@ describe("TaskService", () => { expect(writeChildPartial.success).toBe(true); const internal = taskService as unknown as { - handleAgentReport: (event: { - type: "tool-call-end"; - workspaceId: string; - messageId: string; - toolCallId: string; - toolName: string; - result: unknown; - timestamp: number; - }) => Promise; handleStreamEnd: (event: StreamEndEvent) => Promise; }; - await internal.handleAgentReport({ - type: "tool-call-end", + + await internal.handleStreamEnd({ + type: "stream-end", workspaceId: childId, messageId: "assistant-child-partial", - toolCallId: "agent-report-call-1", - toolName: "agent_report", - result: { success: true }, - timestamp: Date.now(), + metadata: { model: "test-model" }, + parts: childPartial.parts as StreamEndEvent["parts"], }); const parentMessages = await collectFullHistory(historyService, parentId); @@ -2592,16 +2488,6 @@ describe("TaskService", () => { expect(text).toContain(childId); } - expect(remove).not.toHaveBeenCalled(); // Cleanup deferred to stream-end - - await internal.handleStreamEnd({ - type: "stream-end", - workspaceId: childId, - messageId: "assistant-child-partial", - metadata: { model: "test-model" }, - parts: childPartial.parts as StreamEndEvent["parts"], - }); - expect(remove).toHaveBeenCalled(); expect(sendMessageMock).toHaveBeenCalledWith( parentId, @@ -2611,7 +2497,7 @@ describe("TaskService", () => { ); }); - test("uses agent_report from stream-end parts instead of fallback", async () => { + test("stream-end with agent_report parts finalizes report and triggers cleanup", async () => { const config = await createTestConfig(rootDir); const projectPath = path.join(rootDir, "repo"); @@ -2731,9 +2617,7 @@ describe("TaskService", () => { expect(ws?.taskStatus).toBe("reported"); expect(remove).toHaveBeenCalled(); - // sendMessage is called once for the "ended without agent_report" reminder - // and NOT for resumeStream (since "second attempt" was already simulated). - // The parent auto-resume sendMessage also fires since no active descendants remain. + // Parent auto-resume fires after the child report is finalized at stream-end. expect(sendMessage).toHaveBeenCalled(); }); diff --git a/src/node/services/taskService.ts b/src/node/services/taskService.ts index c90c607bf1..3f30ecdba5 100644 --- a/src/node/services/taskService.ts +++ b/src/node/services/taskService.ts @@ -39,7 +39,7 @@ import type { RuntimeConfig } from "@/common/types/runtime"; import { AgentIdSchema } from "@/common/orpc/schemas"; import { GitPatchArtifactService } from "@/node/services/gitPatchArtifactService"; import type { ThinkingLevel } from "@/common/types/thinking"; -import type { ToolCallEndEvent, StreamAbortEvent, StreamEndEvent } from "@/common/types/stream"; +import type { StreamAbortEvent, StreamEndEvent } from "@/common/types/stream"; import { isDynamicToolPart, type DynamicToolPart } from "@/common/types/toolParts"; import { AgentReportToolArgsSchema, @@ -150,10 +150,6 @@ function isTypedWorkspaceEvent(value: unknown, type: string): boolean { ); } -function isToolCallEndEvent(value: unknown): value is ToolCallEndEvent { - return isTypedWorkspaceEvent(value, "tool-call-end"); -} - function isStreamEndEvent(value: unknown): value is StreamEndEvent { return isTypedWorkspaceEvent(value, "stream-end"); } @@ -209,8 +205,8 @@ function getIsoNow(): string { } export class TaskService { - // Serialize stream-end/tool-call-end processing per workspace to avoid races (e.g. - // stream-end observing awaiting_report before agent_report handling flips the status). + // Serialize stream-end/stream-abort processing per workspace to avoid races when + // finalizing reported tasks and cleanup state transitions. private readonly workspaceEventLocks = new MutexMap(); private readonly mutex = new AsyncMutex(); private readonly pendingWaitersByTaskId = new Map(); @@ -236,21 +232,6 @@ export class TaskService { ) { this.gitPatchArtifactService = new GitPatchArtifactService(config); - this.aiService.on("tool-call-end", (payload: unknown) => { - if (!isToolCallEndEvent(payload)) return; - if (payload.toolName !== "agent_report") return; - // Ignore failed agent_report attempts (e.g. tool rejected due to active descendants). - if (!isSuccessfulToolResult(payload.result)) return; - - void this.workspaceEventLocks - .withLock(payload.workspaceId, async () => { - await this.handleAgentReport(payload); - }) - .catch((error: unknown) => { - log.error("TaskService.handleAgentReport failed", { error }); - }); - }); - this.aiService.on("stream-end", (payload: unknown) => { if (!isStreamEndEvent(payload)) return; @@ -1531,9 +1512,9 @@ export class TaskService { continue; } - // Defensive: a task may still be streaming even after it transitioned to another status - // (e.g. tool-call-end happened but the stream hasn't ended yet). Count it as active so we - // never exceed the configured parallel limit. + // Defensive: task status and runtime stream state can be briefly out of sync during + // termination/cleanup boundaries. Count streaming tasks as active so we never exceed + // the configured parallel limit. if (task.id && this.aiService.isStreaming(task.id)) { activeCount += 1; } @@ -2129,7 +2110,7 @@ export class TaskService { /** * Clean up reported tasks whose stream was aborted before ending naturally. - * Only acts on tasks already in "reported" state (handleAgentReport ran). + * Only acts on tasks already in "reported" state. */ private async handleStreamAbort(event: StreamAbortEvent): Promise { const cfg = this.config.loadConfigOrDefault(); @@ -2147,7 +2128,6 @@ export class TaskService { "finalizeTerminationPhaseForReportedTask: workspaceId must be non-empty" ); - await this.maybeStartPatchGenerationForReportedTask(workspaceId); await this.cleanupReportedLeafTask(workspaceId); } @@ -2252,37 +2232,6 @@ export class TaskService { return combined; } - private async handleAgentReport(event: ToolCallEndEvent): Promise { - const childWorkspaceId = event.workspaceId; - - if (!isSuccessfulToolResult(event.result)) { - return; - } - - const cfgBeforeReport = this.config.loadConfigOrDefault(); - const childEntryBeforeReport = findWorkspaceEntry(cfgBeforeReport, childWorkspaceId); - if (childEntryBeforeReport?.workspace.taskStatus === "reported") { - return; - } - - if (this.hasActiveDescendantAgentTasks(cfgBeforeReport, childWorkspaceId)) { - log.error("agent_report called while task has active descendants; ignoring", { - childWorkspaceId, - }); - return; - } - - // Read report payload from the tool-call input (persisted in partial/history). - const reportArgs = await this.readLatestAgentReportArgs(childWorkspaceId); - if (!reportArgs) { - log.error("agent_report tool-call args not found", { childWorkspaceId }); - return; - } - - await this.finalizeAgentTaskReport(childWorkspaceId, childEntryBeforeReport, reportArgs); - // Cleanup deferred to handleStreamEnd/handleStreamAbort — stream is still running. - } - private async finalizeAgentTaskReport( childWorkspaceId: string, childEntry: { projectPath: string; workspace: WorkspaceConfigEntry } | null | undefined, @@ -2371,6 +2320,8 @@ export class TaskService { } } + await this.maybeStartPatchGenerationForReportedTask(childWorkspaceId); + await this.deliverReportToParent( parentWorkspaceId, childWorkspaceId, @@ -2465,34 +2416,6 @@ export class TaskService { } } - private async readLatestAgentReportArgs( - workspaceId: string - ): Promise<{ reportMarkdown: string; title?: string } | null> { - const partial = await this.historyService.readPartial(workspaceId); - if (partial) { - const args = this.findAgentReportArgsInMessage(partial); - if (args) return args; - } - - // Only need recent messages to find agent_report — avoid full-file read. - // getLastMessages returns chronological order; scan in reverse for newest-first. - const historyResult = await this.historyService.getLastMessages(workspaceId, 20); - if (!historyResult.success) { - log.error("Failed to read history for agent_report args", { - workspaceId, - error: historyResult.error, - }); - return null; - } - - for (let i = historyResult.data.length - 1; i >= 0; i--) { - const args = this.findAgentReportArgsInMessage(historyResult.data[i]); - if (args) return args; - } - - return null; - } - private findAgentReportArgsInParts( parts: readonly unknown[] ): { reportMarkdown: string; title?: string } | null { @@ -2511,12 +2434,6 @@ export class TaskService { return null; } - private findAgentReportArgsInMessage( - msg: MuxMessage - ): { reportMarkdown: string; title?: string } | null { - return this.findAgentReportArgsInParts(msg.parts); - } - private async deliverReportToParent( parentWorkspaceId: string, childWorkspaceId: string, From 2caf51a2e1fdb95f1354a7e60d31c5cd15e79af3 Mon Sep 17 00:00:00 2001 From: ethan Date: Wed, 11 Feb 2026 20:01:19 +1100 Subject: [PATCH 10/15] fix: unify task stream termination finalization --- src/node/services/taskService.test.ts | 245 +++++++++++++++++++++++++- src/node/services/taskService.ts | 111 ++++++++++-- 2 files changed, 339 insertions(+), 17 deletions(-) diff --git a/src/node/services/taskService.test.ts b/src/node/services/taskService.test.ts index e53d415124..65aad50798 100644 --- a/src/node/services/taskService.test.ts +++ b/src/node/services/taskService.test.ts @@ -16,7 +16,7 @@ import type { WorkspaceForkParams } from "@/node/runtime/Runtime"; import { WorktreeRuntime } from "@/node/runtime/WorktreeRuntime"; import { createRuntime } from "@/node/runtime/runtimeFactory"; import { Ok, Err, type Result } from "@/common/types/result"; -import type { StreamEndEvent } from "@/common/types/stream"; +import type { StreamAbortEvent, StreamEndEvent } from "@/common/types/stream"; import { createMuxMessage, type MuxMessage } from "@/common/types/message"; import type { WorkspaceMetadata } from "@/common/types/workspace"; import type { AIService } from "@/node/services/aiService"; @@ -2621,6 +2621,249 @@ describe("TaskService", () => { expect(sendMessage).toHaveBeenCalled(); }); + test("stream-abort after agent_report finalizes report and triggers cleanup", async () => { + const config = await createTestConfig(rootDir); + + const projectPath = path.join(rootDir, "repo"); + const parentId = "parent-111"; + const childId = "child-222"; + + await config.saveConfig({ + projects: new Map([ + [ + projectPath, + { + workspaces: [ + { path: path.join(projectPath, "parent"), id: parentId, name: "parent" }, + { + path: path.join(projectPath, "child"), + id: childId, + name: "agent_explore_child", + parentWorkspaceId: parentId, + agentType: "explore", + taskStatus: "running", + taskModelString: "openai:gpt-4o-mini", + }, + ], + }, + ], + ]), + taskSettings: { maxParallelAgentTasks: 3, maxTaskNestingDepth: 3 }, + }); + + const { aiService } = createAIServiceMocks(config); + const { workspaceService, sendMessage, remove } = createWorkspaceServiceMocks(); + const { historyService, partialService, taskService } = createTaskServiceHarness(config, { + aiService, + workspaceService, + }); + + const parentPartial = createMuxMessage( + "assistant-parent-partial", + "assistant", + "Waiting on subagent…", + { timestamp: Date.now() }, + [ + { + type: "dynamic-tool", + toolCallId: "task-call-1", + toolName: "task", + input: { subagent_type: "explore", prompt: "do the thing", title: "Test task" }, + state: "input-available", + }, + ] + ); + const writeParentPartial = await partialService.writePartial(parentId, parentPartial); + expect(writeParentPartial.success).toBe(true); + + const childReportMessage = createMuxMessage( + "assistant-child-output", + "assistant", + "report body", + { timestamp: Date.now() }, + [ + { + type: "dynamic-tool", + toolCallId: "agent-report-call-1", + toolName: "agent_report", + input: { reportMarkdown: "Hello from child", title: "Result" }, + state: "output-available", + output: { success: true }, + }, + ] + ); + const appendChildHistory = await historyService.appendToHistory(childId, childReportMessage); + expect(appendChildHistory.success).toBe(true); + + const internal = taskService as unknown as { + handleStreamAbort: (event: StreamAbortEvent) => Promise; + }; + + await internal.handleStreamAbort({ + type: "stream-abort", + workspaceId: childId, + messageId: childReportMessage.id, + }); + + const sendCalls = (sendMessage as unknown as { mock: { calls: unknown[][] } }).mock.calls; + for (const call of sendCalls) { + const msg = call[1] as string; + expect(msg).not.toContain("agent_report"); + } + + const updatedParentPartial = await partialService.readPartial(parentId); + expect(updatedParentPartial).not.toBeNull(); + if (updatedParentPartial) { + const toolPart = updatedParentPartial.parts.find( + (p) => + p && + typeof p === "object" && + "type" in p && + (p as { type?: unknown }).type === "dynamic-tool" + ) as unknown as + | { + toolName: string; + state: string; + output?: unknown; + } + | undefined; + expect(toolPart?.toolName).toBe("task"); + expect(toolPart?.state).toBe("output-available"); + const outputJson = JSON.stringify(toolPart?.output); + expect(outputJson).toContain("Hello from child"); + expect(outputJson).toContain("Result"); + } + + const postCfg = config.loadConfigOrDefault(); + const ws = Array.from(postCfg.projects.values()) + .flatMap((p) => p.workspaces) + .find((w) => w.id === childId); + expect(ws?.taskStatus).toBe("reported"); + + expect(remove).toHaveBeenCalled(); + expect(sendMessage).toHaveBeenCalledWith( + parentId, + expect.stringContaining("sub-agent task(s) have completed"), + expect.any(Object), + expect.objectContaining({ skipAutoResumeReset: true, synthetic: true }) + ); + }); + + test("stream-abort does not finalize from stale messageId", async () => { + const config = await createTestConfig(rootDir); + + const projectPath = path.join(rootDir, "repo"); + const parentId = "parent-111"; + const childId = "child-222"; + + await config.saveConfig({ + projects: new Map([ + [ + projectPath, + { + workspaces: [ + { path: path.join(projectPath, "parent"), id: parentId, name: "parent" }, + { + path: path.join(projectPath, "child"), + id: childId, + name: "agent_explore_child", + parentWorkspaceId: parentId, + agentType: "explore", + taskStatus: "running", + taskModelString: "openai:gpt-4o-mini", + }, + ], + }, + ], + ]), + taskSettings: { maxParallelAgentTasks: 3, maxTaskNestingDepth: 3 }, + }); + + const { aiService } = createAIServiceMocks(config); + const { workspaceService, sendMessage, remove } = createWorkspaceServiceMocks(); + const { historyService, partialService, taskService } = createTaskServiceHarness(config, { + aiService, + workspaceService, + }); + + const parentPartial = createMuxMessage( + "assistant-parent-partial", + "assistant", + "Waiting on subagent…", + { timestamp: Date.now() }, + [ + { + type: "dynamic-tool", + toolCallId: "task-call-1", + toolName: "task", + input: { subagent_type: "explore", prompt: "do the thing", title: "Test task" }, + state: "input-available", + }, + ] + ); + const writeParentPartial = await partialService.writePartial(parentId, parentPartial); + expect(writeParentPartial.success).toBe(true); + + const oldChildReportMessage = createMuxMessage( + "assistant-child-old-report", + "assistant", + "old report", + { timestamp: Date.now() }, + [ + { + type: "dynamic-tool", + toolCallId: "agent-report-call-old", + toolName: "agent_report", + input: { reportMarkdown: "Old report", title: "Old" }, + state: "output-available", + output: { success: true }, + }, + ] + ); + const appendOldHistory = await historyService.appendToHistory(childId, oldChildReportMessage); + expect(appendOldHistory.success).toBe(true); + + const internal = taskService as unknown as { + handleStreamAbort: (event: StreamAbortEvent) => Promise; + }; + + await internal.handleStreamAbort({ + type: "stream-abort", + workspaceId: childId, + messageId: "assistant-child-newer-message", + }); + + const postCfg = config.loadConfigOrDefault(); + const ws = Array.from(postCfg.projects.values()) + .flatMap((p) => p.workspaces) + .find((w) => w.id === childId); + expect(ws?.taskStatus).toBe("running"); + + const unchangedParentPartial = await partialService.readPartial(parentId); + expect(unchangedParentPartial).not.toBeNull(); + if (unchangedParentPartial) { + const toolPart = unchangedParentPartial.parts.find( + (p) => + p && + typeof p === "object" && + "type" in p && + (p as { type?: unknown }).type === "dynamic-tool" + ) as unknown as + | { + toolName: string; + state: string; + output?: unknown; + } + | undefined; + expect(toolPart?.toolName).toBe("task"); + expect(toolPart?.state).toBe("input-available"); + expect(toolPart?.output).toBeUndefined(); + } + + expect(remove).not.toHaveBeenCalled(); + expect(sendMessage).not.toHaveBeenCalled(); + }); + test("missing agent_report triggers one reminder, then posts fallback output and cleans up", async () => { const config = await createTestConfig(rootDir); diff --git a/src/node/services/taskService.ts b/src/node/services/taskService.ts index 3f30ecdba5..65c005761e 100644 --- a/src/node/services/taskService.ts +++ b/src/node/services/taskService.ts @@ -61,6 +61,8 @@ export type TaskKind = "agent"; export type AgentTaskStatus = NonNullable; +type TaskTerminationSource = "stream-end" | "stream-abort"; + export interface TaskCreateArgs { parentWorkspaceId: string; kind: TaskKind; @@ -2060,43 +2062,82 @@ export class TaskService { return; } + await this.handleTaskTermination({ + workspaceId, + messageId: event.messageId, + source: "stream-end", + parts: event.parts, + }); + } + + /** + * Finalize task termination when a stream ends/aborts. + * + * Invariant: once we observe stream termination for a task message, we should + * recover any persisted `agent_report` output by messageId and complete cleanup. + * This prevents tasks from staying active when the stream aborts after + * `agent_report` succeeds but before `stream-end` fires. + */ + private async handleTaskTermination(params: { + workspaceId: string; + messageId: string; + source: TaskTerminationSource; + parts?: readonly unknown[]; + }): Promise { + const cfg = this.config.loadConfigOrDefault(); + const entry = findWorkspaceEntry(cfg, params.workspaceId); + if (!entry?.workspace.parentWorkspaceId) return; + const status = entry.workspace.taskStatus; if (status === "reported") { - await this.finalizeTerminationPhaseForReportedTask(workspaceId); + await this.finalizeTerminationPhaseForReportedTask(params.workspaceId); return; } // Never allow a task to finish/report while it still has active descendant tasks. // We'll auto-resume this task once the last descendant reports. - const hasActiveDescendants = this.hasActiveDescendantAgentTasks(cfg, workspaceId); + const hasActiveDescendants = this.hasActiveDescendantAgentTasks(cfg, params.workspaceId); if (hasActiveDescendants) { if (status === "awaiting_report") { - await this.setTaskStatus(workspaceId, "running"); + await this.setTaskStatus(params.workspaceId, "running"); } return; } - const reportArgs = this.findAgentReportArgsInParts(event.parts); + const terminationParts = await this.loadTerminationParts({ + workspaceId: params.workspaceId, + messageId: params.messageId, + inlineParts: params.parts, + }); + const reportArgs = this.findAgentReportArgsInParts(terminationParts); if (reportArgs) { - await this.finalizeAgentTaskReport(workspaceId, entry, reportArgs); - await this.finalizeTerminationPhaseForReportedTask(workspaceId); + await this.finalizeAgentTaskReport(params.workspaceId, entry, reportArgs); + await this.finalizeTerminationPhaseForReportedTask(params.workspaceId); + return; + } + + if (params.source === "stream-abort") { + log.debug("Task stream aborted without recoverable agent_report output", { + workspaceId: params.workspaceId, + messageId: params.messageId, + }); return; } // If a task stream ends without agent_report, request it once. - if (status === "awaiting_report" && this.remindedAwaitingReport.has(workspaceId)) { + if (status === "awaiting_report" && this.remindedAwaitingReport.has(params.workspaceId)) { await this.fallbackReportMissingAgentReport(entry); - await this.finalizeTerminationPhaseForReportedTask(workspaceId); + await this.finalizeTerminationPhaseForReportedTask(params.workspaceId); return; } - await this.setTaskStatus(workspaceId, "awaiting_report"); + await this.setTaskStatus(params.workspaceId, "awaiting_report"); - this.remindedAwaitingReport.add(workspaceId); + this.remindedAwaitingReport.add(params.workspaceId); const model = entry.workspace.taskModelString ?? defaultModel; await this.workspaceService.sendMessage( - workspaceId, + params.workspaceId, "Your stream ended without calling agent_report. Call agent_report exactly once now with your final report.", { model, @@ -2108,18 +2149,56 @@ export class TaskService { ); } + private async loadTerminationParts(params: { + workspaceId: string; + messageId: string; + inlineParts?: readonly unknown[]; + }): Promise { + if (params.inlineParts !== undefined) { + return params.inlineParts; + } + + // Regression fix: stream-abort can arrive after AIService persisted the final + // assistant message, so recover by messageId before declaring report missing. + const partial = await this.historyService.readPartial(params.workspaceId); + if (partial?.id === params.messageId) { + return partial.parts; + } + + const historyResult = await this.historyService.getLastMessages(params.workspaceId, 20); + if (!historyResult.success) { + log.error("Failed to read history while recovering task termination parts", { + workspaceId: params.workspaceId, + messageId: params.messageId, + error: historyResult.error, + }); + return []; + } + + for (let i = historyResult.data.length - 1; i >= 0; i--) { + const message = historyResult.data[i]; + if (message?.id === params.messageId) { + return message.parts; + } + } + + return []; + } + /** - * Clean up reported tasks whose stream was aborted before ending naturally. - * Only acts on tasks already in "reported" state. + * Handle task stream aborts via the same termination pipeline as stream-end. + * This allows aborts to finalize newly-produced `agent_report` output. */ private async handleStreamAbort(event: StreamAbortEvent): Promise { const cfg = this.config.loadConfigOrDefault(); const entry = findWorkspaceEntry(cfg, event.workspaceId); if (!entry?.workspace.parentWorkspaceId) return; - if (entry.workspace.taskStatus === "reported") { - await this.finalizeTerminationPhaseForReportedTask(event.workspaceId); - } + await this.handleTaskTermination({ + workspaceId: event.workspaceId, + messageId: event.messageId, + source: "stream-abort", + }); } private async finalizeTerminationPhaseForReportedTask(workspaceId: string): Promise { From 6c412b5497bd821d5597a3913171e2e2d359ced3 Mon Sep 17 00:00:00 2001 From: ethan Date: Wed, 11 Feb 2026 20:21:47 +1100 Subject: [PATCH 11/15] Revert "fix: unify task stream termination finalization" This reverts commit 11fc863de1176d9b560ccbd33112eae895dbf6fe. --- src/node/services/taskService.test.ts | 245 +------------------------- src/node/services/taskService.ts | 111 ++---------- 2 files changed, 17 insertions(+), 339 deletions(-) diff --git a/src/node/services/taskService.test.ts b/src/node/services/taskService.test.ts index 65aad50798..e53d415124 100644 --- a/src/node/services/taskService.test.ts +++ b/src/node/services/taskService.test.ts @@ -16,7 +16,7 @@ import type { WorkspaceForkParams } from "@/node/runtime/Runtime"; import { WorktreeRuntime } from "@/node/runtime/WorktreeRuntime"; import { createRuntime } from "@/node/runtime/runtimeFactory"; import { Ok, Err, type Result } from "@/common/types/result"; -import type { StreamAbortEvent, StreamEndEvent } from "@/common/types/stream"; +import type { StreamEndEvent } from "@/common/types/stream"; import { createMuxMessage, type MuxMessage } from "@/common/types/message"; import type { WorkspaceMetadata } from "@/common/types/workspace"; import type { AIService } from "@/node/services/aiService"; @@ -2621,249 +2621,6 @@ describe("TaskService", () => { expect(sendMessage).toHaveBeenCalled(); }); - test("stream-abort after agent_report finalizes report and triggers cleanup", async () => { - const config = await createTestConfig(rootDir); - - const projectPath = path.join(rootDir, "repo"); - const parentId = "parent-111"; - const childId = "child-222"; - - await config.saveConfig({ - projects: new Map([ - [ - projectPath, - { - workspaces: [ - { path: path.join(projectPath, "parent"), id: parentId, name: "parent" }, - { - path: path.join(projectPath, "child"), - id: childId, - name: "agent_explore_child", - parentWorkspaceId: parentId, - agentType: "explore", - taskStatus: "running", - taskModelString: "openai:gpt-4o-mini", - }, - ], - }, - ], - ]), - taskSettings: { maxParallelAgentTasks: 3, maxTaskNestingDepth: 3 }, - }); - - const { aiService } = createAIServiceMocks(config); - const { workspaceService, sendMessage, remove } = createWorkspaceServiceMocks(); - const { historyService, partialService, taskService } = createTaskServiceHarness(config, { - aiService, - workspaceService, - }); - - const parentPartial = createMuxMessage( - "assistant-parent-partial", - "assistant", - "Waiting on subagent…", - { timestamp: Date.now() }, - [ - { - type: "dynamic-tool", - toolCallId: "task-call-1", - toolName: "task", - input: { subagent_type: "explore", prompt: "do the thing", title: "Test task" }, - state: "input-available", - }, - ] - ); - const writeParentPartial = await partialService.writePartial(parentId, parentPartial); - expect(writeParentPartial.success).toBe(true); - - const childReportMessage = createMuxMessage( - "assistant-child-output", - "assistant", - "report body", - { timestamp: Date.now() }, - [ - { - type: "dynamic-tool", - toolCallId: "agent-report-call-1", - toolName: "agent_report", - input: { reportMarkdown: "Hello from child", title: "Result" }, - state: "output-available", - output: { success: true }, - }, - ] - ); - const appendChildHistory = await historyService.appendToHistory(childId, childReportMessage); - expect(appendChildHistory.success).toBe(true); - - const internal = taskService as unknown as { - handleStreamAbort: (event: StreamAbortEvent) => Promise; - }; - - await internal.handleStreamAbort({ - type: "stream-abort", - workspaceId: childId, - messageId: childReportMessage.id, - }); - - const sendCalls = (sendMessage as unknown as { mock: { calls: unknown[][] } }).mock.calls; - for (const call of sendCalls) { - const msg = call[1] as string; - expect(msg).not.toContain("agent_report"); - } - - const updatedParentPartial = await partialService.readPartial(parentId); - expect(updatedParentPartial).not.toBeNull(); - if (updatedParentPartial) { - const toolPart = updatedParentPartial.parts.find( - (p) => - p && - typeof p === "object" && - "type" in p && - (p as { type?: unknown }).type === "dynamic-tool" - ) as unknown as - | { - toolName: string; - state: string; - output?: unknown; - } - | undefined; - expect(toolPart?.toolName).toBe("task"); - expect(toolPart?.state).toBe("output-available"); - const outputJson = JSON.stringify(toolPart?.output); - expect(outputJson).toContain("Hello from child"); - expect(outputJson).toContain("Result"); - } - - const postCfg = config.loadConfigOrDefault(); - const ws = Array.from(postCfg.projects.values()) - .flatMap((p) => p.workspaces) - .find((w) => w.id === childId); - expect(ws?.taskStatus).toBe("reported"); - - expect(remove).toHaveBeenCalled(); - expect(sendMessage).toHaveBeenCalledWith( - parentId, - expect.stringContaining("sub-agent task(s) have completed"), - expect.any(Object), - expect.objectContaining({ skipAutoResumeReset: true, synthetic: true }) - ); - }); - - test("stream-abort does not finalize from stale messageId", async () => { - const config = await createTestConfig(rootDir); - - const projectPath = path.join(rootDir, "repo"); - const parentId = "parent-111"; - const childId = "child-222"; - - await config.saveConfig({ - projects: new Map([ - [ - projectPath, - { - workspaces: [ - { path: path.join(projectPath, "parent"), id: parentId, name: "parent" }, - { - path: path.join(projectPath, "child"), - id: childId, - name: "agent_explore_child", - parentWorkspaceId: parentId, - agentType: "explore", - taskStatus: "running", - taskModelString: "openai:gpt-4o-mini", - }, - ], - }, - ], - ]), - taskSettings: { maxParallelAgentTasks: 3, maxTaskNestingDepth: 3 }, - }); - - const { aiService } = createAIServiceMocks(config); - const { workspaceService, sendMessage, remove } = createWorkspaceServiceMocks(); - const { historyService, partialService, taskService } = createTaskServiceHarness(config, { - aiService, - workspaceService, - }); - - const parentPartial = createMuxMessage( - "assistant-parent-partial", - "assistant", - "Waiting on subagent…", - { timestamp: Date.now() }, - [ - { - type: "dynamic-tool", - toolCallId: "task-call-1", - toolName: "task", - input: { subagent_type: "explore", prompt: "do the thing", title: "Test task" }, - state: "input-available", - }, - ] - ); - const writeParentPartial = await partialService.writePartial(parentId, parentPartial); - expect(writeParentPartial.success).toBe(true); - - const oldChildReportMessage = createMuxMessage( - "assistant-child-old-report", - "assistant", - "old report", - { timestamp: Date.now() }, - [ - { - type: "dynamic-tool", - toolCallId: "agent-report-call-old", - toolName: "agent_report", - input: { reportMarkdown: "Old report", title: "Old" }, - state: "output-available", - output: { success: true }, - }, - ] - ); - const appendOldHistory = await historyService.appendToHistory(childId, oldChildReportMessage); - expect(appendOldHistory.success).toBe(true); - - const internal = taskService as unknown as { - handleStreamAbort: (event: StreamAbortEvent) => Promise; - }; - - await internal.handleStreamAbort({ - type: "stream-abort", - workspaceId: childId, - messageId: "assistant-child-newer-message", - }); - - const postCfg = config.loadConfigOrDefault(); - const ws = Array.from(postCfg.projects.values()) - .flatMap((p) => p.workspaces) - .find((w) => w.id === childId); - expect(ws?.taskStatus).toBe("running"); - - const unchangedParentPartial = await partialService.readPartial(parentId); - expect(unchangedParentPartial).not.toBeNull(); - if (unchangedParentPartial) { - const toolPart = unchangedParentPartial.parts.find( - (p) => - p && - typeof p === "object" && - "type" in p && - (p as { type?: unknown }).type === "dynamic-tool" - ) as unknown as - | { - toolName: string; - state: string; - output?: unknown; - } - | undefined; - expect(toolPart?.toolName).toBe("task"); - expect(toolPart?.state).toBe("input-available"); - expect(toolPart?.output).toBeUndefined(); - } - - expect(remove).not.toHaveBeenCalled(); - expect(sendMessage).not.toHaveBeenCalled(); - }); - test("missing agent_report triggers one reminder, then posts fallback output and cleans up", async () => { const config = await createTestConfig(rootDir); diff --git a/src/node/services/taskService.ts b/src/node/services/taskService.ts index 65c005761e..3f30ecdba5 100644 --- a/src/node/services/taskService.ts +++ b/src/node/services/taskService.ts @@ -61,8 +61,6 @@ export type TaskKind = "agent"; export type AgentTaskStatus = NonNullable; -type TaskTerminationSource = "stream-end" | "stream-abort"; - export interface TaskCreateArgs { parentWorkspaceId: string; kind: TaskKind; @@ -2062,82 +2060,43 @@ export class TaskService { return; } - await this.handleTaskTermination({ - workspaceId, - messageId: event.messageId, - source: "stream-end", - parts: event.parts, - }); - } - - /** - * Finalize task termination when a stream ends/aborts. - * - * Invariant: once we observe stream termination for a task message, we should - * recover any persisted `agent_report` output by messageId and complete cleanup. - * This prevents tasks from staying active when the stream aborts after - * `agent_report` succeeds but before `stream-end` fires. - */ - private async handleTaskTermination(params: { - workspaceId: string; - messageId: string; - source: TaskTerminationSource; - parts?: readonly unknown[]; - }): Promise { - const cfg = this.config.loadConfigOrDefault(); - const entry = findWorkspaceEntry(cfg, params.workspaceId); - if (!entry?.workspace.parentWorkspaceId) return; - const status = entry.workspace.taskStatus; if (status === "reported") { - await this.finalizeTerminationPhaseForReportedTask(params.workspaceId); + await this.finalizeTerminationPhaseForReportedTask(workspaceId); return; } // Never allow a task to finish/report while it still has active descendant tasks. // We'll auto-resume this task once the last descendant reports. - const hasActiveDescendants = this.hasActiveDescendantAgentTasks(cfg, params.workspaceId); + const hasActiveDescendants = this.hasActiveDescendantAgentTasks(cfg, workspaceId); if (hasActiveDescendants) { if (status === "awaiting_report") { - await this.setTaskStatus(params.workspaceId, "running"); + await this.setTaskStatus(workspaceId, "running"); } return; } - const terminationParts = await this.loadTerminationParts({ - workspaceId: params.workspaceId, - messageId: params.messageId, - inlineParts: params.parts, - }); - const reportArgs = this.findAgentReportArgsInParts(terminationParts); + const reportArgs = this.findAgentReportArgsInParts(event.parts); if (reportArgs) { - await this.finalizeAgentTaskReport(params.workspaceId, entry, reportArgs); - await this.finalizeTerminationPhaseForReportedTask(params.workspaceId); - return; - } - - if (params.source === "stream-abort") { - log.debug("Task stream aborted without recoverable agent_report output", { - workspaceId: params.workspaceId, - messageId: params.messageId, - }); + await this.finalizeAgentTaskReport(workspaceId, entry, reportArgs); + await this.finalizeTerminationPhaseForReportedTask(workspaceId); return; } // If a task stream ends without agent_report, request it once. - if (status === "awaiting_report" && this.remindedAwaitingReport.has(params.workspaceId)) { + if (status === "awaiting_report" && this.remindedAwaitingReport.has(workspaceId)) { await this.fallbackReportMissingAgentReport(entry); - await this.finalizeTerminationPhaseForReportedTask(params.workspaceId); + await this.finalizeTerminationPhaseForReportedTask(workspaceId); return; } - await this.setTaskStatus(params.workspaceId, "awaiting_report"); + await this.setTaskStatus(workspaceId, "awaiting_report"); - this.remindedAwaitingReport.add(params.workspaceId); + this.remindedAwaitingReport.add(workspaceId); const model = entry.workspace.taskModelString ?? defaultModel; await this.workspaceService.sendMessage( - params.workspaceId, + workspaceId, "Your stream ended without calling agent_report. Call agent_report exactly once now with your final report.", { model, @@ -2149,56 +2108,18 @@ export class TaskService { ); } - private async loadTerminationParts(params: { - workspaceId: string; - messageId: string; - inlineParts?: readonly unknown[]; - }): Promise { - if (params.inlineParts !== undefined) { - return params.inlineParts; - } - - // Regression fix: stream-abort can arrive after AIService persisted the final - // assistant message, so recover by messageId before declaring report missing. - const partial = await this.historyService.readPartial(params.workspaceId); - if (partial?.id === params.messageId) { - return partial.parts; - } - - const historyResult = await this.historyService.getLastMessages(params.workspaceId, 20); - if (!historyResult.success) { - log.error("Failed to read history while recovering task termination parts", { - workspaceId: params.workspaceId, - messageId: params.messageId, - error: historyResult.error, - }); - return []; - } - - for (let i = historyResult.data.length - 1; i >= 0; i--) { - const message = historyResult.data[i]; - if (message?.id === params.messageId) { - return message.parts; - } - } - - return []; - } - /** - * Handle task stream aborts via the same termination pipeline as stream-end. - * This allows aborts to finalize newly-produced `agent_report` output. + * Clean up reported tasks whose stream was aborted before ending naturally. + * Only acts on tasks already in "reported" state. */ private async handleStreamAbort(event: StreamAbortEvent): Promise { const cfg = this.config.loadConfigOrDefault(); const entry = findWorkspaceEntry(cfg, event.workspaceId); if (!entry?.workspace.parentWorkspaceId) return; - await this.handleTaskTermination({ - workspaceId: event.workspaceId, - messageId: event.messageId, - source: "stream-abort", - }); + if (entry.workspace.taskStatus === "reported") { + await this.finalizeTerminationPhaseForReportedTask(event.workspaceId); + } } private async finalizeTerminationPhaseForReportedTask(workspaceId: string): Promise { From c3ad8d32ecaadf51b92e439c09f87eb712310bf6 Mon Sep 17 00:00:00 2001 From: ethan Date: Wed, 11 Feb 2026 20:27:01 +1100 Subject: [PATCH 12/15] chore: remove unused task stream-abort handler --- src/node/services/taskService.ts | 34 ++------------------------------ 1 file changed, 2 insertions(+), 32 deletions(-) diff --git a/src/node/services/taskService.ts b/src/node/services/taskService.ts index 3f30ecdba5..d7f0eb3e5f 100644 --- a/src/node/services/taskService.ts +++ b/src/node/services/taskService.ts @@ -39,7 +39,7 @@ import type { RuntimeConfig } from "@/common/types/runtime"; import { AgentIdSchema } from "@/common/orpc/schemas"; import { GitPatchArtifactService } from "@/node/services/gitPatchArtifactService"; import type { ThinkingLevel } from "@/common/types/thinking"; -import type { StreamAbortEvent, StreamEndEvent } from "@/common/types/stream"; +import type { StreamEndEvent } from "@/common/types/stream"; import { isDynamicToolPart, type DynamicToolPart } from "@/common/types/toolParts"; import { AgentReportToolArgsSchema, @@ -154,10 +154,6 @@ function isStreamEndEvent(value: unknown): value is StreamEndEvent { return isTypedWorkspaceEvent(value, "stream-end"); } -function isStreamAbortEvent(value: unknown): value is StreamAbortEvent { - return isTypedWorkspaceEvent(value, "stream-abort"); -} - function hasAncestorWorkspaceId( entry: { ancestorWorkspaceIds?: unknown } | null | undefined, ancestorWorkspaceId: string @@ -205,7 +201,7 @@ function getIsoNow(): string { } export class TaskService { - // Serialize stream-end/stream-abort processing per workspace to avoid races when + // Serialize stream-end processing per workspace to avoid races when // finalizing reported tasks and cleanup state transitions. private readonly workspaceEventLocks = new MutexMap(); private readonly mutex = new AsyncMutex(); @@ -243,18 +239,6 @@ export class TaskService { log.error("TaskService.handleStreamEnd failed", { error }); }); }); - - this.aiService.on("stream-abort", (payload: unknown) => { - if (!isStreamAbortEvent(payload)) return; - - void this.workspaceEventLocks - .withLock(payload.workspaceId, async () => { - await this.handleStreamAbort(payload); - }) - .catch((error: unknown) => { - log.error("TaskService.handleStreamAbort failed", { error }); - }); - }); } // Prefer per-agent settings so tasks inherit the correct agent defaults; @@ -2108,20 +2092,6 @@ export class TaskService { ); } - /** - * Clean up reported tasks whose stream was aborted before ending naturally. - * Only acts on tasks already in "reported" state. - */ - private async handleStreamAbort(event: StreamAbortEvent): Promise { - const cfg = this.config.loadConfigOrDefault(); - const entry = findWorkspaceEntry(cfg, event.workspaceId); - if (!entry?.workspace.parentWorkspaceId) return; - - if (entry.workspace.taskStatus === "reported") { - await this.finalizeTerminationPhaseForReportedTask(event.workspaceId); - } - } - private async finalizeTerminationPhaseForReportedTask(workspaceId: string): Promise { assert( workspaceId.length > 0, From 1f59b700e43b5d8abb31c4d94bbf7450aee7f43f Mon Sep 17 00:00:00 2001 From: ethan Date: Wed, 11 Feb 2026 20:49:52 +1100 Subject: [PATCH 13/15] fix: update stale comment in task_await referencing removed handleAgentReport --- src/node/services/tools/task_await.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/node/services/tools/task_await.ts b/src/node/services/tools/task_await.ts index 852c502c39..51bc274e06 100644 --- a/src/node/services/tools/task_await.ts +++ b/src/node/services/tools/task_await.ts @@ -66,9 +66,9 @@ export const createTaskAwaitTool: ToolFactory = (config: ToolConfiguration) => { } ).filterDescendantAgentTaskIds; - // Read patch artifacts lazily (after waiting) to avoid stale results. Patch generation is - // started after `resolveWaiters` in `handleAgentReport`, so reading once up-front can miss - // artifacts that appear while we're awaiting reports. + // Read patch artifacts lazily (after waiting) to avoid stale results. Patch generation + // runs asynchronously (started in `finalizeAgentTaskReport` before waiters resolve), so + // the artifact may still be "pending" at read time — task_apply_git_patch does a fresh read. const readGitFormatPatchArtifact = async (childTaskId: string) => { if (!config.workspaceSessionDir) return null; return await readSubagentGitPatchArtifact(config.workspaceSessionDir, childTaskId); From 85e3c13764e7494c700a3b61aede1fe5e734fe53 Mon Sep 17 00:00:00 2001 From: ethan Date: Thu, 12 Feb 2026 00:03:53 +1100 Subject: [PATCH 14/15] =?UTF-8?q?=F0=9F=A4=96=20fix:=20stop=20autonomous?= =?UTF-8?q?=20streams=20only=20after=20successful=20agent=5Freport?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Prevent deadlock when agent_report is called before descendant tasks finish. - change StreamManager autonomous stopWhen to check last-step toolResults for a successful agent_report result - keep failed agent_report calls in-stream so the model can recover instead of ending in idle-running state - update streamManager stopWhen tests to cover success-result vs call-only behavior - refresh stale comments that referenced old hasToolCall semantics --- _Generated with `mux` • Model: `openai:gpt-5.3-codex` • Thinking: `xhigh` • Cost: `4.85`_ --- src/node/services/streamManager.test.ts | 21 ++++++++++++++------- src/node/services/streamManager.ts | 14 ++++++++++---- src/node/services/taskService.ts | 6 +++--- src/node/services/tools/agent_report.ts | 2 +- 4 files changed, 28 insertions(+), 15 deletions(-) diff --git a/src/node/services/streamManager.test.ts b/src/node/services/streamManager.test.ts index 4cef708b14..64a3834738 100644 --- a/src/node/services/streamManager.test.ts +++ b/src/node/services/streamManager.test.ts @@ -119,12 +119,12 @@ describe("StreamManager - stopWhen configuration", () => { expect( agentReportCondition({ - steps: [{ toolCalls: [{ toolName: "agent_report" }] }], + steps: [{ toolResults: [{ toolName: "agent_report" }] }], }) ).toBe(true); }); - test("stops after agent_report tool call in autonomous mode", () => { + test("stops only after successful agent_report tool result in autonomous mode", () => { const streamManager = new StreamManager(historyService); const buildStopWhen = Reflect.get(streamManager, "createStopWhenCondition") as | BuildStopWhenCondition @@ -141,21 +141,28 @@ describe("StreamManager - stopWhen configuration", () => { throw new Error("Expected autonomous stopWhen to include agent_report condition"); } - // Returns true when step contains agent_report tool call + // Returns true when step contains successful agent_report tool result. expect( reportStop({ - steps: [{ toolCalls: [{ toolName: "agent_report" }] }], + steps: [{ toolResults: [{ toolName: "agent_report" }] }], }) ).toBe(true); - // Returns false when step contains other tool calls + // Returns false when step only contains agent_report tool call (no successful result yet). + expect( + reportStop({ + steps: [{ toolCalls: [{ toolName: "agent_report" }] }], + }) + ).toBe(false); + + // Returns false when step contains other tool results. expect( reportStop({ - steps: [{ toolCalls: [{ toolName: "bash" }] }], + steps: [{ toolResults: [{ toolName: "bash" }] }], }) ).toBe(false); - // Returns false when no steps + // Returns false when no steps. expect(reportStop({ steps: [] })).toBe(false); }); diff --git a/src/node/services/streamManager.ts b/src/node/services/streamManager.ts index a33a3ab06a..ff9da7b4d6 100644 --- a/src/node/services/streamManager.ts +++ b/src/node/services/streamManager.ts @@ -4,7 +4,6 @@ import { PlatformPaths } from "@/common/utils/paths"; import { streamText, stepCountIs, - hasToolCall, type ModelMessage, type LanguageModel, type Tool, @@ -1021,12 +1020,19 @@ export class StreamManager extends EventEmitter { } // For autonomous loops: cap steps, check for queued messages, and stop after - // agent_report so the stream ends naturally (preserving usage accounting) - // without allowing post-report tool execution. + // a successful agent_report result so the stream ends naturally (preserving + // usage accounting) without allowing post-report tool execution. + const hasSuccessfulAgentReportResult: ReturnType = ({ steps }) => { + const lastStep = steps[steps.length - 1]; + return ( + lastStep?.toolResults?.some((toolResult) => toolResult.toolName === "agent_report") ?? false + ); + }; + return [ stepCountIs(100000), () => request.hasQueuedMessage?.() ?? false, - hasToolCall("agent_report"), + hasSuccessfulAgentReportResult, ]; } diff --git a/src/node/services/taskService.ts b/src/node/services/taskService.ts index d7f0eb3e5f..8c617dd256 100644 --- a/src/node/services/taskService.ts +++ b/src/node/services/taskService.ts @@ -2236,9 +2236,9 @@ export class TaskService { await this.emitWorkspaceMetadata(childWorkspaceId); // NOTE: Stream continues — we intentionally do NOT abort it. - // Deterministic termination is enforced by stopWhen(hasToolCall("agent_report")) - // in StreamManager, which ends the stream at the step boundary (preserving usage - // accounting). recordSessionUsage runs when the stream ends naturally. + // Deterministic termination is enforced by StreamManager stopWhen logic that + // waits for a successful agent_report tool result at the step boundary + // (preserving usage accounting). recordSessionUsage runs when the stream ends naturally. const cfgAfterReport = this.config.loadConfigOrDefault(); const latestChildEntry = findWorkspaceEntry(cfgAfterReport, childWorkspaceId) ?? childEntry; diff --git a/src/node/services/tools/agent_report.ts b/src/node/services/tools/agent_report.ts index 9253fc0f5f..3cae8f0c82 100644 --- a/src/node/services/tools/agent_report.ts +++ b/src/node/services/tools/agent_report.ts @@ -23,7 +23,7 @@ export const createAgentReportTool: ToolFactory = (config: ToolConfiguration) => // Intentionally no side-effects. The backend orchestrator consumes the tool-call args // via persisted history/partial state once the tool call completes successfully. // The stream continues after this so the SDK can record usage, while StreamManager - // stopWhen(hasToolCall("agent_report")) deterministically ends the stream boundary. + // stops autonomous loops once it observes a successful agent_report tool result. return { success: true, message: "Report submitted successfully.", From b3f8d17c328dec9cec9205dc36f43109bc21b52e Mon Sep 17 00:00:00 2001 From: ethan Date: Thu, 12 Feb 2026 01:50:10 +1100 Subject: [PATCH 15/15] =?UTF-8?q?=F0=9F=A4=96=20fix:=20require=20successfu?= =?UTF-8?q?l=20agent=5Freport=20result=20before=20stop?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tighten autonomous stopWhen semantics so failed agent_report results do not terminate the stream. - require agent_report tool result output.success === true before stopWhen ends autonomous loops - keep failed agent_report attempts in-stream so the model can recover within the same run - extend stopWhen tests to cover success:true vs success:false result cases - refresh related comments to reflect success-gated behavior --- _Generated with `mux` • Model: `openai:gpt-5.3-codex` • Thinking: `xhigh` • Cost: `4.85`_ --- src/node/services/streamManager.test.ts | 13 ++++++++++--- src/node/services/streamManager.ts | 19 ++++++++++++++++--- src/node/services/taskService.ts | 5 +++-- src/node/services/tools/agent_report.ts | 2 +- 4 files changed, 30 insertions(+), 9 deletions(-) diff --git a/src/node/services/streamManager.test.ts b/src/node/services/streamManager.test.ts index 64a3834738..d57684cbd3 100644 --- a/src/node/services/streamManager.test.ts +++ b/src/node/services/streamManager.test.ts @@ -119,7 +119,7 @@ describe("StreamManager - stopWhen configuration", () => { expect( agentReportCondition({ - steps: [{ toolResults: [{ toolName: "agent_report" }] }], + steps: [{ toolResults: [{ toolName: "agent_report", output: { success: true } }] }], }) ).toBe(true); }); @@ -144,10 +144,17 @@ describe("StreamManager - stopWhen configuration", () => { // Returns true when step contains successful agent_report tool result. expect( reportStop({ - steps: [{ toolResults: [{ toolName: "agent_report" }] }], + steps: [{ toolResults: [{ toolName: "agent_report", output: { success: true } }] }], }) ).toBe(true); + // Returns false when step contains failed agent_report output. + expect( + reportStop({ + steps: [{ toolResults: [{ toolName: "agent_report", output: { success: false } }] }], + }) + ).toBe(false); + // Returns false when step only contains agent_report tool call (no successful result yet). expect( reportStop({ @@ -158,7 +165,7 @@ describe("StreamManager - stopWhen configuration", () => { // Returns false when step contains other tool results. expect( reportStop({ - steps: [{ toolResults: [{ toolName: "bash" }] }], + steps: [{ toolResults: [{ toolName: "bash", output: { success: true } }] }], }) ).toBe(false); diff --git a/src/node/services/streamManager.ts b/src/node/services/streamManager.ts index ff9da7b4d6..6ec40761f0 100644 --- a/src/node/services/streamManager.ts +++ b/src/node/services/streamManager.ts @@ -1020,12 +1020,25 @@ export class StreamManager extends EventEmitter { } // For autonomous loops: cap steps, check for queued messages, and stop after - // a successful agent_report result so the stream ends naturally (preserving - // usage accounting) without allowing post-report tool execution. + // a successful agent_report result (`output.success === true`) so the stream ends + // naturally (preserving usage accounting) without allowing post-report tool execution. + const isSuccessfulAgentReportOutput = (value: unknown): boolean => { + return ( + typeof value === "object" && + value !== null && + "success" in value && + (value as { success?: unknown }).success === true + ); + }; + const hasSuccessfulAgentReportResult: ReturnType = ({ steps }) => { const lastStep = steps[steps.length - 1]; return ( - lastStep?.toolResults?.some((toolResult) => toolResult.toolName === "agent_report") ?? false + lastStep?.toolResults?.some( + (toolResult) => + toolResult.toolName === "agent_report" && + isSuccessfulAgentReportOutput(toolResult.output) + ) ?? false ); }; diff --git a/src/node/services/taskService.ts b/src/node/services/taskService.ts index 8c617dd256..b5445e126e 100644 --- a/src/node/services/taskService.ts +++ b/src/node/services/taskService.ts @@ -2237,8 +2237,9 @@ export class TaskService { // NOTE: Stream continues — we intentionally do NOT abort it. // Deterministic termination is enforced by StreamManager stopWhen logic that - // waits for a successful agent_report tool result at the step boundary - // (preserving usage accounting). recordSessionUsage runs when the stream ends naturally. + // waits for an agent_report tool result where output.success === true at the + // step boundary (preserving usage accounting). recordSessionUsage runs when + // the stream ends naturally. const cfgAfterReport = this.config.loadConfigOrDefault(); const latestChildEntry = findWorkspaceEntry(cfgAfterReport, childWorkspaceId) ?? childEntry; diff --git a/src/node/services/tools/agent_report.ts b/src/node/services/tools/agent_report.ts index 3cae8f0c82..99f949a1cb 100644 --- a/src/node/services/tools/agent_report.ts +++ b/src/node/services/tools/agent_report.ts @@ -23,7 +23,7 @@ export const createAgentReportTool: ToolFactory = (config: ToolConfiguration) => // Intentionally no side-effects. The backend orchestrator consumes the tool-call args // via persisted history/partial state once the tool call completes successfully. // The stream continues after this so the SDK can record usage, while StreamManager - // stops autonomous loops once it observes a successful agent_report tool result. + // stops autonomous loops once it observes agent_report with output.success === true. return { success: true, message: "Report submitted successfully.",