diff --git a/src/node/services/streamManager.test.ts b/src/node/services/streamManager.test.ts index 19fc0a6655..d57684cbd3 100644 --- a/src/node/services/streamManager.test.ts +++ b/src/node/services/streamManager.test.ts @@ -107,15 +107,70 @@ describe("StreamManager - stopWhen configuration", () => { if (!Array.isArray(stopWhen)) { throw new Error("Expected autonomous stopWhen to be an array of conditions"); } - expect(stopWhen).toHaveLength(2); + expect(stopWhen).toHaveLength(3); - const [maxStepCondition, queuedMessageCondition] = stopWhen; + const [maxStepCondition, queuedMessageCondition, agentReportCondition] = stopWhen; expect(maxStepCondition({ steps: new Array(99999) })).toBe(false); expect(maxStepCondition({ steps: new Array(100000) })).toBe(true); expect(queuedMessageCondition({ steps: [] })).toBe(false); queued = true; expect(queuedMessageCondition({ steps: [] })).toBe(true); + + expect( + agentReportCondition({ + steps: [{ toolResults: [{ toolName: "agent_report", output: { success: true } }] }], + }) + ).toBe(true); + }); + + test("stops only after successful agent_report tool result in autonomous mode", () => { + const streamManager = new StreamManager(historyService); + const buildStopWhen = Reflect.get(streamManager, "createStopWhenCondition") as + | BuildStopWhenCondition + | undefined; + expect(typeof buildStopWhen).toBe("function"); + + const stopWhen = buildStopWhen!({ hasQueuedMessage: () => false }); + if (!Array.isArray(stopWhen)) { + throw new Error("Expected autonomous stopWhen to be an array of conditions"); + } + + const [, , reportStop] = stopWhen; + if (!reportStop) { + throw new Error("Expected autonomous stopWhen to include agent_report condition"); + } + + // Returns true when step contains successful agent_report tool result. + expect( + reportStop({ + steps: [{ toolResults: [{ toolName: "agent_report", output: { success: true } }] }], + }) + ).toBe(true); + + // Returns false when step contains failed agent_report output. + expect( + reportStop({ + steps: [{ toolResults: [{ toolName: "agent_report", output: { success: false } }] }], + }) + ).toBe(false); + + // Returns false when step only contains agent_report tool call (no successful result yet). + expect( + reportStop({ + steps: [{ toolCalls: [{ toolName: "agent_report" }] }], + }) + ).toBe(false); + + // Returns false when step contains other tool results. + expect( + reportStop({ + steps: [{ toolResults: [{ toolName: "bash", output: { success: true } }] }], + }) + ).toBe(false); + + // Returns false when no steps. + expect(reportStop({ steps: [] })).toBe(false); }); test("treats missing queued-message callback as not queued", () => { diff --git a/src/node/services/streamManager.ts b/src/node/services/streamManager.ts index 8399c44145..6ec40761f0 100644 --- a/src/node/services/streamManager.ts +++ b/src/node/services/streamManager.ts @@ -1019,9 +1019,34 @@ export class StreamManager extends EventEmitter { return stepCountIs(1); } - // Allow effectively unlimited autonomous steps while still yielding quickly - // when a queued user message should interrupt at the next step boundary. - return [stepCountIs(100000), () => request.hasQueuedMessage?.() ?? false]; + // For autonomous loops: cap steps, check for queued messages, and stop after + // a successful agent_report result (`output.success === true`) so the stream ends + // naturally (preserving usage accounting) without allowing post-report tool execution. + const isSuccessfulAgentReportOutput = (value: unknown): boolean => { + return ( + typeof value === "object" && + value !== null && + "success" in value && + (value as { success?: unknown }).success === true + ); + }; + + const hasSuccessfulAgentReportResult: ReturnType = ({ steps }) => { + const lastStep = steps[steps.length - 1]; + return ( + lastStep?.toolResults?.some( + (toolResult) => + toolResult.toolName === "agent_report" && + isSuccessfulAgentReportOutput(toolResult.output) + ) ?? false + ); + }; + + return [ + stepCountIs(100000), + () => request.hasQueuedMessage?.() ?? false, + hasSuccessfulAgentReportResult, + ]; } private createStreamResult( @@ -1837,10 +1862,23 @@ export class StreamManager extends EventEmitter { // CRITICAL: Delete partial.json before updating chat.jsonl // On successful completion, partial.json becomes stale and must be removed - await this.historyService.deletePartial(workspaceId as string); + const deleteResult = await this.historyService.deletePartial(workspaceId as string); + if (!deleteResult.success) { + workspaceLog.warn("Failed to delete partial on stream end", { + error: deleteResult.error, + }); + } // Update the placeholder message in chat.jsonl with final content - await this.historyService.updateHistory(workspaceId as string, finalAssistantMessage); + const updateResult = await this.historyService.updateHistory( + workspaceId as string, + finalAssistantMessage + ); + if (!updateResult.success) { + workspaceLog.warn("Failed to update history on stream end", { + error: updateResult.error, + }); + } // Update cumulative session usage (if service is available) // Wrapped in try-catch: usage recording is non-critical and shouldn't block stream completion diff --git a/src/node/services/taskService.test.ts b/src/node/services/taskService.test.ts index 3269076db4..e53d415124 100644 --- a/src/node/services/taskService.test.ts +++ b/src/node/services/taskService.test.ts @@ -1904,7 +1904,7 @@ describe("TaskService", () => { taskSettings: { maxParallelAgentTasks: 3, maxTaskNestingDepth: 3 }, }); - const { aiService, stopStream } = createAIServiceMocks(config); + const { aiService } = createAIServiceMocks(config); const { workspaceService, sendMessage, remove, emit } = createWorkspaceServiceMocks(); const { historyService, partialService, taskService } = createTaskServiceHarness(config, { aiService, @@ -1971,60 +1971,21 @@ describe("TaskService", () => { expect(writeChildPartial.success).toBe(true); const internal = taskService as unknown as { - handleAgentReport: (event: { - type: "tool-call-end"; - workspaceId: string; - messageId: string; - toolCallId: string; - toolName: string; - result: unknown; - timestamp: number; - }) => Promise; + handleStreamEnd: (event: StreamEndEvent) => Promise; }; - await internal.handleAgentReport({ - type: "tool-call-end", + + // Simulate stream manager committing the final partial right before natural stream end. + const commitChildPartial = await partialService.commitPartial(childId); + expect(commitChildPartial.success).toBe(true); + + await internal.handleStreamEnd({ + type: "stream-end", workspaceId: childId, messageId: "assistant-child-partial", - toolCallId: "agent-report-call-1", - toolName: "agent_report", - result: { success: true }, - timestamp: Date.now(), + metadata: { model: "test-model" }, + parts: childPartial.parts as StreamEndEvent["parts"], }); - expect(stopStream).toHaveBeenCalledWith( - childId, - expect.objectContaining({ abandonPartial: false }) - ); - - const childMessages = await collectFullHistory(historyService, childId); - // Ensure the in-flight assistant message (tool calls + agent_report) was committed to history - // before workspace cleanup archives the transcript. - expect(childMessages.length).toBeGreaterThan(1); - - const assistantMsg = childMessages.find((m) => m.id === "assistant-child-partial") ?? null; - expect(assistantMsg).not.toBeNull(); - if (assistantMsg) { - const toolPart = assistantMsg.parts.find( - (p) => - p && - typeof p === "object" && - "type" in p && - (p as { type?: unknown }).type === "dynamic-tool" && - "toolName" in p && - (p as { toolName?: unknown }).toolName === "agent_report" - ) as unknown as - | { - toolName: string; - state: string; - input?: unknown; - } - | undefined; - - expect(toolPart?.toolName).toBe("agent_report"); - expect(toolPart?.state).toBe("output-available"); - expect(JSON.stringify(toolPart?.input)).toContain("Hello from child"); - } - const updatedChildPartial = await partialService.readPartial(childId); expect(updatedChildPartial).toBeNull(); @@ -2171,15 +2132,7 @@ describe("TaskService", () => { expect(writeChildPartial.success).toBe(true); const internal = taskService as unknown as { - handleAgentReport: (event: { - type: "tool-call-end"; - workspaceId: string; - messageId: string; - toolCallId: string; - toolName: string; - result: unknown; - timestamp: number; - }) => Promise; + handleStreamEnd: (event: StreamEndEvent) => Promise; }; const parentSessionDir = config.getSessionDir(parentId); @@ -2190,27 +2143,21 @@ describe("TaskService", () => { requestingWorkspaceId: parentId, }); - const handleReportPromise = internal.handleAgentReport({ - type: "tool-call-end", + await internal.handleStreamEnd({ + type: "stream-end", workspaceId: childId, messageId: "assistant-child-partial", - toolCallId: "agent-report-call-1", - toolName: "agent_report", - result: { success: true }, - timestamp: Date.now(), + metadata: { model: "test-model" }, + parts: childPartial.parts as StreamEndEvent["parts"], }); const report = await waiter; expect(report).toEqual({ reportMarkdown: "Hello from child", title: "Result" }); - const artifactAfterWait = await readSubagentGitPatchArtifact(parentSessionDir, childId); - expect(artifactAfterWait).not.toBeNull(); - expect(["pending", "ready"]).toContain(artifactAfterWait!.status); - - await handleReportPromise; - - // Cleanup should be deferred until git-format-patch generation completes. - expect(remove).not.toHaveBeenCalled(); + const artifactAfterStreamEnd = await readSubagentGitPatchArtifact(parentSessionDir, childId); + expect( + artifactAfterStreamEnd?.status === "pending" || artifactAfterStreamEnd?.status === "ready" + ).toBe(true); const start = Date.now(); let lastArtifact: unknown = null; @@ -2233,7 +2180,7 @@ describe("TaskService", () => { ); } - if (Date.now() - start > 10_000) { + if (Date.now() - start > 20_000) { throw new Error( `Timed out waiting for patch artifact generation (removeCalled=${remove.mock.calls.length > 0}, lastArtifact=${JSON.stringify(lastArtifact)})` ); @@ -2355,15 +2302,7 @@ describe("TaskService", () => { expect(writeChildPartial.success).toBe(true); const internal = taskService as unknown as { - handleAgentReport: (event: { - type: "tool-call-end"; - workspaceId: string; - messageId: string; - toolCallId: string; - toolName: string; - result: unknown; - timestamp: number; - }) => Promise; + handleStreamEnd: (event: StreamEndEvent) => Promise; }; const parentSessionDir = config.getSessionDir(parentId); @@ -2374,27 +2313,21 @@ describe("TaskService", () => { requestingWorkspaceId: parentId, }); - const handleReportPromise = internal.handleAgentReport({ - type: "tool-call-end", + await internal.handleStreamEnd({ + type: "stream-end", workspaceId: childId, messageId: "assistant-child-partial", - toolCallId: "agent-report-call-1", - toolName: "agent_report", - result: { success: true }, - timestamp: Date.now(), + metadata: { model: "test-model" }, + parts: childPartial.parts as StreamEndEvent["parts"], }); const report = await waiter; expect(report).toEqual({ reportMarkdown: "Hello from child", title: "Result" }); - const artifactAfterWait = await readSubagentGitPatchArtifact(parentSessionDir, childId); - expect(artifactAfterWait).not.toBeNull(); - expect(["pending", "ready"]).toContain(artifactAfterWait!.status); - - await handleReportPromise; - - // Cleanup should be deferred until git-format-patch generation completes. - expect(remove).not.toHaveBeenCalled(); + const artifactAfterStreamEnd = await readSubagentGitPatchArtifact(parentSessionDir, childId); + expect( + artifactAfterStreamEnd?.status === "pending" || artifactAfterStreamEnd?.status === "ready" + ).toBe(true); const start = Date.now(); let lastArtifact: unknown = null; @@ -2417,7 +2350,7 @@ describe("TaskService", () => { ); } - if (Date.now() - start > 10_000) { + if (Date.now() - start > 20_000) { throw new Error( `Timed out waiting for patch artifact generation (removeCalled=${remove.mock.calls.length > 0}, lastArtifact=${JSON.stringify(lastArtifact)})` ); @@ -2514,24 +2447,15 @@ describe("TaskService", () => { expect(writeChildPartial.success).toBe(true); const internal = taskService as unknown as { - handleAgentReport: (event: { - type: "tool-call-end"; - workspaceId: string; - messageId: string; - toolCallId: string; - toolName: string; - result: unknown; - timestamp: number; - }) => Promise; + handleStreamEnd: (event: StreamEndEvent) => Promise; }; - await internal.handleAgentReport({ - type: "tool-call-end", + + await internal.handleStreamEnd({ + type: "stream-end", workspaceId: childId, messageId: "assistant-child-partial", - toolCallId: "agent-report-call-1", - toolName: "agent_report", - result: { success: true }, - timestamp: Date.now(), + metadata: { model: "test-model" }, + parts: childPartial.parts as StreamEndEvent["parts"], }); const parentMessages = await collectFullHistory(historyService, parentId); @@ -2573,7 +2497,7 @@ describe("TaskService", () => { ); }); - test("uses agent_report from stream-end parts instead of fallback", async () => { + test("stream-end with agent_report parts finalizes report and triggers cleanup", async () => { const config = await createTestConfig(rootDir); const projectPath = path.join(rootDir, "repo"); @@ -2693,9 +2617,7 @@ describe("TaskService", () => { expect(ws?.taskStatus).toBe("reported"); expect(remove).toHaveBeenCalled(); - // sendMessage is called once for the "ended without agent_report" reminder - // and NOT for resumeStream (since "second attempt" was already simulated). - // The parent auto-resume sendMessage also fires since no active descendants remain. + // Parent auto-resume fires after the child report is finalized at stream-end. expect(sendMessage).toHaveBeenCalled(); }); @@ -2911,6 +2833,164 @@ describe("TaskService", () => { expect(childEntry?.runtimeConfig?.type).toBe("worktree"); }, 20_000); + test("sibling reported task blocks parent deletion during cleanup", async () => { + const config = await createTestConfig(rootDir); + + const projectPath = path.join(rootDir, "repo"); + const rootWorkspaceId = "root-111"; + const parentTaskId = "parent-222"; + const childTaskAId = "child-a-333"; + const childTaskBId = "child-b-444"; + + await config.saveConfig({ + projects: new Map([ + [ + projectPath, + { + workspaces: [ + { path: path.join(projectPath, "root"), id: rootWorkspaceId, name: "root" }, + { + path: path.join(projectPath, "parent-task"), + id: parentTaskId, + name: "agent_exec_parent", + parentWorkspaceId: rootWorkspaceId, + agentType: "exec", + taskStatus: "reported", + }, + { + path: path.join(projectPath, "child-task-a"), + id: childTaskAId, + name: "agent_explore_child_a", + parentWorkspaceId: parentTaskId, + agentType: "explore", + taskStatus: "reported", + }, + { + path: path.join(projectPath, "child-task-b"), + id: childTaskBId, + name: "agent_explore_child_b", + parentWorkspaceId: parentTaskId, + agentType: "explore", + taskStatus: "reported", + }, + ], + }, + ], + ]), + taskSettings: { maxParallelAgentTasks: 3, maxTaskNestingDepth: 3 }, + }); + + const isStreaming = mock(() => false); + const { aiService } = createAIServiceMocks(config, { isStreaming }); + const remove = mock(async (workspaceId: string): Promise> => { + await config.removeWorkspace(workspaceId); + return Ok(undefined); + }); + const { workspaceService } = createWorkspaceServiceMocks({ remove }); + const { taskService } = createTaskServiceHarness(config, { aiService, workspaceService }); + + const internal = taskService as unknown as { + handleStreamEnd: (event: StreamEndEvent) => Promise; + }; + + await internal.handleStreamEnd({ + type: "stream-end", + workspaceId: childTaskAId, + messageId: "assistant-child-a", + metadata: { model: "openai:gpt-4o-mini" }, + parts: [], + }); + + expect(remove).toHaveBeenCalledWith(childTaskAId, true); + + const removedWorkspaceIds = ( + remove as unknown as { mock: { calls: Array<[string, boolean]> } } + ).mock.calls.map((call) => call[0]); + expect(removedWorkspaceIds).not.toContain(parentTaskId); + }); + + test("parent deletes only after all sibling children are cleaned up", async () => { + const config = await createTestConfig(rootDir); + + const projectPath = path.join(rootDir, "repo"); + const rootWorkspaceId = "root-111"; + const parentTaskId = "parent-222"; + const childTaskAId = "child-a-333"; + const childTaskBId = "child-b-444"; + + await config.saveConfig({ + projects: new Map([ + [ + projectPath, + { + workspaces: [ + { path: path.join(projectPath, "root"), id: rootWorkspaceId, name: "root" }, + { + path: path.join(projectPath, "parent-task"), + id: parentTaskId, + name: "agent_exec_parent", + parentWorkspaceId: rootWorkspaceId, + agentType: "exec", + taskStatus: "reported", + }, + { + path: path.join(projectPath, "child-task-a"), + id: childTaskAId, + name: "agent_explore_child_a", + parentWorkspaceId: parentTaskId, + agentType: "explore", + taskStatus: "reported", + }, + { + path: path.join(projectPath, "child-task-b"), + id: childTaskBId, + name: "agent_explore_child_b", + parentWorkspaceId: parentTaskId, + agentType: "explore", + taskStatus: "reported", + }, + ], + }, + ], + ]), + taskSettings: { maxParallelAgentTasks: 3, maxTaskNestingDepth: 3 }, + }); + + const isStreaming = mock(() => false); + const { aiService } = createAIServiceMocks(config, { isStreaming }); + const remove = mock(async (workspaceId: string): Promise> => { + await config.removeWorkspace(workspaceId); + return Ok(undefined); + }); + const { workspaceService } = createWorkspaceServiceMocks({ remove }); + const { taskService } = createTaskServiceHarness(config, { aiService, workspaceService }); + + const internal = taskService as unknown as { + handleStreamEnd: (event: StreamEndEvent) => Promise; + }; + + await internal.handleStreamEnd({ + type: "stream-end", + workspaceId: childTaskAId, + messageId: "assistant-child-a", + metadata: { model: "openai:gpt-4o-mini" }, + parts: [], + }); + + await internal.handleStreamEnd({ + type: "stream-end", + workspaceId: childTaskBId, + messageId: "assistant-child-b", + metadata: { model: "openai:gpt-4o-mini" }, + parts: [], + }); + + expect(remove).toHaveBeenCalledTimes(3); + expect(remove).toHaveBeenNthCalledWith(1, childTaskAId, true); + expect(remove).toHaveBeenNthCalledWith(2, childTaskBId, true); + expect(remove).toHaveBeenNthCalledWith(3, parentTaskId, true); + }); + describe("parent auto-resume flood protection", () => { async function setupParentWithActiveChild(rootDirPath: string) { const config = await createTestConfig(rootDirPath); diff --git a/src/node/services/taskService.ts b/src/node/services/taskService.ts index 5b6dcd4009..b5445e126e 100644 --- a/src/node/services/taskService.ts +++ b/src/node/services/taskService.ts @@ -39,7 +39,7 @@ import type { RuntimeConfig } from "@/common/types/runtime"; import { AgentIdSchema } from "@/common/orpc/schemas"; import { GitPatchArtifactService } from "@/node/services/gitPatchArtifactService"; import type { ThinkingLevel } from "@/common/types/thinking"; -import type { ToolCallEndEvent, StreamEndEvent } from "@/common/types/stream"; +import type { StreamEndEvent } from "@/common/types/stream"; import { isDynamicToolPart, type DynamicToolPart } from "@/common/types/toolParts"; import { AgentReportToolArgsSchema, @@ -139,26 +139,19 @@ interface CompletedAgentReportCacheEntry { ancestorWorkspaceIds: string[]; } -function isToolCallEndEvent(value: unknown): value is ToolCallEndEvent { +function isTypedWorkspaceEvent(value: unknown, type: string): boolean { return ( typeof value === "object" && value !== null && "type" in value && - (value as { type: unknown }).type === "tool-call-end" && + (value as { type: unknown }).type === type && "workspaceId" in value && typeof (value as { workspaceId: unknown }).workspaceId === "string" ); } function isStreamEndEvent(value: unknown): value is StreamEndEvent { - return ( - typeof value === "object" && - value !== null && - "type" in value && - (value as { type: unknown }).type === "stream-end" && - "workspaceId" in value && - typeof (value as { workspaceId: unknown }).workspaceId === "string" - ); + return isTypedWorkspaceEvent(value, "stream-end"); } function hasAncestorWorkspaceId( @@ -208,8 +201,8 @@ function getIsoNow(): string { } export class TaskService { - // Serialize stream-end/tool-call-end processing per workspace to avoid races (e.g. - // stream-end observing awaiting_report before agent_report handling flips the status). + // Serialize stream-end processing per workspace to avoid races when + // finalizing reported tasks and cleanup state transitions. private readonly workspaceEventLocks = new MutexMap(); private readonly mutex = new AsyncMutex(); private readonly pendingWaitersByTaskId = new Map(); @@ -235,21 +228,6 @@ export class TaskService { ) { this.gitPatchArtifactService = new GitPatchArtifactService(config); - this.aiService.on("tool-call-end", (payload: unknown) => { - if (!isToolCallEndEvent(payload)) return; - if (payload.toolName !== "agent_report") return; - // Ignore failed agent_report attempts (e.g. tool rejected due to active descendants). - if (!isSuccessfulToolResult(payload.result)) return; - - void this.workspaceEventLocks - .withLock(payload.workspaceId, async () => { - await this.handleAgentReport(payload); - }) - .catch((error: unknown) => { - log.error("TaskService.handleAgentReport failed", { error }); - }); - }); - this.aiService.on("stream-end", (payload: unknown) => { if (!isStreamEndEvent(payload)) return; @@ -401,7 +379,7 @@ export class TaskService { await this.gitPatchArtifactService.maybeStartGeneration( task.parentWorkspaceId, task.id!, - (wsId) => this.cleanupReportedLeafTask(wsId) + (wsId) => this.requestReportedTaskCleanupRecheck(wsId) ); } catch (error: unknown) { log.error("Failed to resume subagent git patch generation on startup", { @@ -1518,9 +1496,9 @@ export class TaskService { continue; } - // Defensive: a task may still be streaming even after it transitioned to another status - // (e.g. tool-call-end happened but the stream hasn't ended yet). Count it as active so we - // never exceed the configured parallel limit. + // Defensive: task status and runtime stream state can be briefly out of sync during + // termination/cleanup boundaries. Count streaming tasks as active so we never exceed + // the configured parallel limit. if (task.id && this.aiService.isStreaming(task.id)) { activeCount += 1; } @@ -1556,6 +1534,16 @@ export class TaskService { return false; } + /** + * Topology predicate: does this workspace still have child agent-task nodes in config? + * Unlike hasActiveDescendantAgentTasks (which checks runtime activity for scheduling), + * this checks structural tree shape — any child node blocks parent deletion regardless + * of its status. + */ + private hasChildAgentTasks(index: AgentTaskIndex, workspaceId: string): boolean { + return (index.childrenByParent.get(workspaceId)?.length ?? 0) > 0; + } + private getTaskDepth( config: ReturnType, workspaceId: string @@ -2057,7 +2045,10 @@ export class TaskService { } const status = entry.workspace.taskStatus; - if (status === "reported") return; + if (status === "reported") { + await this.finalizeTerminationPhaseForReportedTask(workspaceId); + return; + } // Never allow a task to finish/report while it still has active descendant tasks. // We'll auto-resume this task once the last descendant reports. @@ -2072,12 +2063,14 @@ export class TaskService { const reportArgs = this.findAgentReportArgsInParts(event.parts); if (reportArgs) { await this.finalizeAgentTaskReport(workspaceId, entry, reportArgs); + await this.finalizeTerminationPhaseForReportedTask(workspaceId); return; } // If a task stream ends without agent_report, request it once. if (status === "awaiting_report" && this.remindedAwaitingReport.has(workspaceId)) { await this.fallbackReportMissingAgentReport(entry); + await this.finalizeTerminationPhaseForReportedTask(workspaceId); return; } @@ -2099,6 +2092,53 @@ export class TaskService { ); } + private async finalizeTerminationPhaseForReportedTask(workspaceId: string): Promise { + assert( + workspaceId.length > 0, + "finalizeTerminationPhaseForReportedTask: workspaceId must be non-empty" + ); + + await this.cleanupReportedLeafTask(workspaceId); + } + + private async maybeStartPatchGenerationForReportedTask(workspaceId: string): Promise { + assert( + workspaceId.length > 0, + "maybeStartPatchGenerationForReportedTask: workspaceId must be non-empty" + ); + + const cfg = this.config.loadConfigOrDefault(); + const parentWorkspaceId = findWorkspaceEntry(cfg, workspaceId)?.workspace.parentWorkspaceId; + if (!parentWorkspaceId) { + return; + } + + try { + await this.gitPatchArtifactService.maybeStartGeneration( + parentWorkspaceId, + workspaceId, + (wsId) => this.requestReportedTaskCleanupRecheck(wsId) + ); + } catch (error: unknown) { + log.error("Failed to start subagent git patch generation", { + parentWorkspaceId, + childWorkspaceId: workspaceId, + error, + }); + } + } + + private requestReportedTaskCleanupRecheck(workspaceId: string): Promise { + assert( + workspaceId.length > 0, + "requestReportedTaskCleanupRecheck: workspaceId must be non-empty" + ); + + return this.workspaceEventLocks.withLock(workspaceId, async () => { + await this.cleanupReportedLeafTask(workspaceId); + }); + } + private async fallbackReportMissingAgentReport(entry: { projectPath: string; workspace: WorkspaceConfigEntry; @@ -2162,43 +2202,10 @@ export class TaskService { return combined; } - private async handleAgentReport(event: ToolCallEndEvent): Promise { - const childWorkspaceId = event.workspaceId; - - if (!isSuccessfulToolResult(event.result)) { - return; - } - - const cfgBeforeReport = this.config.loadConfigOrDefault(); - const childEntryBeforeReport = findWorkspaceEntry(cfgBeforeReport, childWorkspaceId); - if (childEntryBeforeReport?.workspace.taskStatus === "reported") { - return; - } - - if (this.hasActiveDescendantAgentTasks(cfgBeforeReport, childWorkspaceId)) { - log.error("agent_report called while task has active descendants; ignoring", { - childWorkspaceId, - }); - return; - } - - // Read report payload from the tool-call input (persisted in partial/history). - const reportArgs = await this.readLatestAgentReportArgs(childWorkspaceId); - if (!reportArgs) { - log.error("agent_report tool-call args not found", { childWorkspaceId }); - return; - } - - await this.finalizeAgentTaskReport(childWorkspaceId, childEntryBeforeReport, reportArgs, { - stopStream: true, - }); - } - private async finalizeAgentTaskReport( childWorkspaceId: string, childEntry: { projectPath: string; workspace: WorkspaceConfigEntry } | null | undefined, - reportArgs: { reportMarkdown: string; title?: string }, - options?: { stopStream?: boolean } + reportArgs: { reportMarkdown: string; title?: string } ): Promise { assert( childWorkspaceId.length > 0, @@ -2228,49 +2235,11 @@ export class TaskService { await this.emitWorkspaceMetadata(childWorkspaceId); - if (options?.stopStream) { - // `agent_report` is terminal. Stop the child stream immediately to prevent any further token - // usage and to ensure parallelism accounting never "frees" a slot while the stream is still - // active (Claude/Anthropic can emit tool calls before the final assistant block completes). - // - // Important: Do NOT abandon the partial assistant message here. The in-flight assistant block - // contains the tool calls (including the agent_report tool call) that should be archived into - // chat.jsonl for transcript viewing after workspace cleanup. - try { - const stopResult = await this.aiService.stopStream(childWorkspaceId, { - abandonPartial: false, - }); - if (!stopResult.success) { - log.debug("Failed to stop task stream after agent_report", { - workspaceId: childWorkspaceId, - error: stopResult.error, - }); - } - } catch (error: unknown) { - log.debug("Failed to stop task stream after agent_report (threw)", { - workspaceId: childWorkspaceId, - error, - }); - } - - // stopStream() forwards stream-abort asynchronously (after partial cleanup). Workspace cleanup - // may archive/delete session files immediately after this method returns, so commit the partial - // synchronously here (best-effort) to ensure tool calls are present in the archived transcript. - try { - const commitResult = await this.historyService.commitPartial(childWorkspaceId); - if (!commitResult.success) { - log.error("Failed to commit final partial to history after agent_report", { - workspaceId: childWorkspaceId, - error: commitResult.error, - }); - } - } catch (error: unknown) { - log.error("Failed to commit final partial to history after agent_report (threw)", { - workspaceId: childWorkspaceId, - error, - }); - } - } + // NOTE: Stream continues — we intentionally do NOT abort it. + // Deterministic termination is enforced by StreamManager stopWhen logic that + // waits for an agent_report tool result where output.success === true at the + // step boundary (preserving usage accounting). recordSessionUsage runs when + // the stream ends naturally. const cfgAfterReport = this.config.loadConfigOrDefault(); const latestChildEntry = findWorkspaceEntry(cfgAfterReport, childWorkspaceId) ?? childEntry; @@ -2322,6 +2291,8 @@ export class TaskService { } } + await this.maybeStartPatchGenerationForReportedTask(childWorkspaceId); + await this.deliverReportToParent( parentWorkspaceId, childWorkspaceId, @@ -2329,36 +2300,12 @@ export class TaskService { reportArgs ); - // Begin git-format-patch generation (best-effort). - // - // This must run before cleanup so the reported task workspace isn't deleted while we're still - // reading commits from it. - // - // It must also run before resolving waiters so an immediate `task_await` result after - // `agent_report` can include at least a pending artifact record. - try { - await this.gitPatchArtifactService.maybeStartGeneration( - parentWorkspaceId, - childWorkspaceId, - (wsId) => this.cleanupReportedLeafTask(wsId) - ); - } catch (error: unknown) { - log.error("Failed to start subagent git patch generation", { - parentWorkspaceId, - childWorkspaceId, - error, - }); - } - // Resolve foreground waiters. this.resolveWaiters(childWorkspaceId, reportArgs); // Free slot and start queued tasks. await this.maybeStartQueuedTasks(); - // Attempt cleanup of reported tasks (leaf-first). - await this.cleanupReportedLeafTask(childWorkspaceId); - // Auto-resume any parent stream that was waiting on a task tool call (restart-safe). const postCfg = this.config.loadConfigOrDefault(); if (!findWorkspaceEntry(postCfg, parentWorkspaceId)) { @@ -2440,34 +2387,6 @@ export class TaskService { } } - private async readLatestAgentReportArgs( - workspaceId: string - ): Promise<{ reportMarkdown: string; title?: string } | null> { - const partial = await this.historyService.readPartial(workspaceId); - if (partial) { - const args = this.findAgentReportArgsInMessage(partial); - if (args) return args; - } - - // Only need recent messages to find agent_report — avoid full-file read. - // getLastMessages returns chronological order; scan in reverse for newest-first. - const historyResult = await this.historyService.getLastMessages(workspaceId, 20); - if (!historyResult.success) { - log.error("Failed to read history for agent_report args", { - workspaceId, - error: historyResult.error, - }); - return null; - } - - for (let i = historyResult.data.length - 1; i >= 0; i--) { - const args = this.findAgentReportArgsInMessage(historyResult.data[i]); - if (args) return args; - } - - return null; - } - private findAgentReportArgsInParts( parts: readonly unknown[] ): { reportMarkdown: string; title?: string } | null { @@ -2486,12 +2405,6 @@ export class TaskService { return null; } - private findAgentReportArgsInMessage( - msg: MuxMessage - ): { reportMarkdown: string; title?: string } | null { - return this.findAgentReportArgsInParts(msg.parts); - } - private async deliverReportToParent( parentWorkspaceId: string, childWorkspaceId: string, @@ -2651,9 +2564,61 @@ export class TaskService { return true; } + private async canCleanupReportedTask( + workspaceId: string + ): Promise<{ ok: true; parentWorkspaceId: string } | { ok: false; reason: string }> { + assert(workspaceId.length > 0, "canCleanupReportedTask: workspaceId must be non-empty"); + + const config = this.config.loadConfigOrDefault(); + const entry = findWorkspaceEntry(config, workspaceId); + if (!entry) { + return { ok: false, reason: "workspace_not_found" }; + } + + const parentWorkspaceId = entry.workspace.parentWorkspaceId; + if (!parentWorkspaceId) { + return { ok: false, reason: "missing_parent_workspace" }; + } + + if (entry.workspace.taskStatus !== "reported") { + return { ok: false, reason: "task_not_reported" }; + } + + if (this.aiService.isStreaming(workspaceId)) { + log.debug("cleanupReportedLeafTask: deferring auto-delete; stream still active", { + workspaceId, + parentWorkspaceId, + }); + return { ok: false, reason: "still_streaming" }; + } + + // Topology gate: a reported task can only be cleaned up when it is a structural leaf + // (has no child agent tasks in config). This is status-agnostic — even reported children + // block parent deletion, ensuring artifact rollup always targets an existing parent path. + const index = this.buildAgentTaskIndex(config); + if (this.hasChildAgentTasks(index, workspaceId)) { + return { ok: false, reason: "has_child_tasks" }; + } + + const parentSessionDir = this.config.getSessionDir(parentWorkspaceId); + const patchArtifact = await readSubagentGitPatchArtifact(parentSessionDir, workspaceId); + if (patchArtifact?.status === "pending") { + log.debug("cleanupReportedLeafTask: deferring auto-delete; patch artifact pending", { + workspaceId, + parentWorkspaceId, + }); + return { ok: false, reason: "patch_pending" }; + } + + return { ok: true, parentWorkspaceId }; + } + private async cleanupReportedLeafTask(workspaceId: string): Promise { assert(workspaceId.length > 0, "cleanupReportedLeafTask: workspaceId must be non-empty"); + // Lineage reduction: each iteration removes exactly one leaf node, then re-evaluates + // the parent on fresh config. The structural-leaf gate in canCleanupReportedTask ensures + // parents are only removed after all children are gone. let currentWorkspaceId = workspaceId; const visited = new Set(); for (let depth = 0; depth < 32; depth++) { @@ -2665,33 +2630,11 @@ export class TaskService { } visited.add(currentWorkspaceId); - const config = this.config.loadConfigOrDefault(); - const entry = findWorkspaceEntry(config, currentWorkspaceId); - if (!entry) return; - - const ws = entry.workspace; - const parentWorkspaceId = ws.parentWorkspaceId; - if (!parentWorkspaceId) return; - if (ws.taskStatus !== "reported") return; - - const hasChildren = this.listAgentTaskWorkspaces(config).some( - (t) => t.parentWorkspaceId === currentWorkspaceId - ); - const parentSessionDir = this.config.getSessionDir(parentWorkspaceId); - const patchArtifact = await readSubagentGitPatchArtifact( - parentSessionDir, - currentWorkspaceId - ); - if (patchArtifact?.status === "pending") { - log.debug("cleanupReportedLeafTask: deferring auto-delete; patch artifact pending", { - workspaceId: currentWorkspaceId, - parentWorkspaceId, - }); + const cleanupEligibility = await this.canCleanupReportedTask(currentWorkspaceId); + if (!cleanupEligibility.ok) { return; } - if (hasChildren) return; - const removeResult = await this.workspaceService.remove(currentWorkspaceId, true); if (!removeResult.success) { log.error("Failed to auto-delete reported task workspace", { @@ -2701,7 +2644,7 @@ export class TaskService { return; } - currentWorkspaceId = parentWorkspaceId; + currentWorkspaceId = cleanupEligibility.parentWorkspaceId; } log.error("cleanupReportedLeafTask: exceeded max parent traversal depth", { diff --git a/src/node/services/tools/agent_report.test.ts b/src/node/services/tools/agent_report.test.ts index c561485b12..7b4d98388b 100644 --- a/src/node/services/tools/agent_report.test.ts +++ b/src/node/services/tools/agent_report.test.ts @@ -50,6 +50,9 @@ describe("agent_report tool", () => { tool.execute!({ reportMarkdown: "done", title: "t" }, mockToolCallOptions) ); - expect(result).toEqual({ success: true }); + expect(result).toEqual({ + success: true, + message: "Report submitted successfully.", + }); }); }); diff --git a/src/node/services/tools/agent_report.ts b/src/node/services/tools/agent_report.ts index b6ce4124da..99f949a1cb 100644 --- a/src/node/services/tools/agent_report.ts +++ b/src/node/services/tools/agent_report.ts @@ -9,7 +9,7 @@ export const createAgentReportTool: ToolFactory = (config: ToolConfiguration) => return tool({ description: TOOL_DEFINITIONS.agent_report.description, inputSchema: TOOL_DEFINITIONS.agent_report.schema, - execute: (): { success: true } => { + execute: (): { success: true; message: string } => { const workspaceId = requireWorkspaceId(config, "agent_report"); const taskService = requireTaskService(config, "agent_report"); @@ -22,7 +22,12 @@ export const createAgentReportTool: ToolFactory = (config: ToolConfiguration) => // Intentionally no side-effects. The backend orchestrator consumes the tool-call args // via persisted history/partial state once the tool call completes successfully. - return { success: true }; + // The stream continues after this so the SDK can record usage, while StreamManager + // stops autonomous loops once it observes agent_report with output.success === true. + return { + success: true, + message: "Report submitted successfully.", + }; }, }); }; diff --git a/src/node/services/tools/task_await.ts b/src/node/services/tools/task_await.ts index 852c502c39..51bc274e06 100644 --- a/src/node/services/tools/task_await.ts +++ b/src/node/services/tools/task_await.ts @@ -66,9 +66,9 @@ export const createTaskAwaitTool: ToolFactory = (config: ToolConfiguration) => { } ).filterDescendantAgentTaskIds; - // Read patch artifacts lazily (after waiting) to avoid stale results. Patch generation is - // started after `resolveWaiters` in `handleAgentReport`, so reading once up-front can miss - // artifacts that appear while we're awaiting reports. + // Read patch artifacts lazily (after waiting) to avoid stale results. Patch generation + // runs asynchronously (started in `finalizeAgentTaskReport` before waiters resolve), so + // the artifact may still be "pending" at read time — task_apply_git_patch does a fresh read. const readGitFormatPatchArtifact = async (childTaskId: string) => { if (!config.workspaceSessionDir) return null; return await readSubagentGitPatchArtifact(config.workspaceSessionDir, childTaskId);