Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 57 additions & 2 deletions src/node/services/streamManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,70 @@ describe("StreamManager - stopWhen configuration", () => {
if (!Array.isArray(stopWhen)) {
throw new Error("Expected autonomous stopWhen to be an array of conditions");
}
expect(stopWhen).toHaveLength(2);
expect(stopWhen).toHaveLength(3);

const [maxStepCondition, queuedMessageCondition] = stopWhen;
const [maxStepCondition, queuedMessageCondition, agentReportCondition] = stopWhen;
expect(maxStepCondition({ steps: new Array(99999) })).toBe(false);
expect(maxStepCondition({ steps: new Array(100000) })).toBe(true);

expect(queuedMessageCondition({ steps: [] })).toBe(false);
queued = true;
expect(queuedMessageCondition({ steps: [] })).toBe(true);

expect(
agentReportCondition({
steps: [{ toolResults: [{ toolName: "agent_report", output: { success: true } }] }],
})
).toBe(true);
});

test("stops only after successful agent_report tool result in autonomous mode", () => {
const streamManager = new StreamManager(historyService);
const buildStopWhen = Reflect.get(streamManager, "createStopWhenCondition") as
| BuildStopWhenCondition
| undefined;
expect(typeof buildStopWhen).toBe("function");

const stopWhen = buildStopWhen!({ hasQueuedMessage: () => false });
if (!Array.isArray(stopWhen)) {
throw new Error("Expected autonomous stopWhen to be an array of conditions");
}

const [, , reportStop] = stopWhen;
if (!reportStop) {
throw new Error("Expected autonomous stopWhen to include agent_report condition");
}

// Returns true when step contains successful agent_report tool result.
expect(
reportStop({
steps: [{ toolResults: [{ toolName: "agent_report", output: { success: true } }] }],
})
).toBe(true);

// Returns false when step contains failed agent_report output.
expect(
reportStop({
steps: [{ toolResults: [{ toolName: "agent_report", output: { success: false } }] }],
})
).toBe(false);

// Returns false when step only contains agent_report tool call (no successful result yet).
expect(
reportStop({
steps: [{ toolCalls: [{ toolName: "agent_report" }] }],
})
).toBe(false);

// Returns false when step contains other tool results.
expect(
reportStop({
steps: [{ toolResults: [{ toolName: "bash", output: { success: true } }] }],
})
).toBe(false);

// Returns false when no steps.
expect(reportStop({ steps: [] })).toBe(false);
});

test("treats missing queued-message callback as not queued", () => {
Expand Down
48 changes: 43 additions & 5 deletions src/node/services/streamManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1019,9 +1019,34 @@ export class StreamManager extends EventEmitter {
return stepCountIs(1);
}

// Allow effectively unlimited autonomous steps while still yielding quickly
// when a queued user message should interrupt at the next step boundary.
return [stepCountIs(100000), () => request.hasQueuedMessage?.() ?? false];
// For autonomous loops: cap steps, check for queued messages, and stop after
// a successful agent_report result (`output.success === true`) so the stream ends
// naturally (preserving usage accounting) without allowing post-report tool execution.
const isSuccessfulAgentReportOutput = (value: unknown): boolean => {
return (
typeof value === "object" &&
value !== null &&
"success" in value &&
(value as { success?: unknown }).success === true
);
};

const hasSuccessfulAgentReportResult: ReturnType<typeof stepCountIs> = ({ steps }) => {
const lastStep = steps[steps.length - 1];
return (
lastStep?.toolResults?.some(
(toolResult) =>
toolResult.toolName === "agent_report" &&
isSuccessfulAgentReportOutput(toolResult.output)
) ?? false
);
};

return [
stepCountIs(100000),
() => request.hasQueuedMessage?.() ?? false,
hasSuccessfulAgentReportResult,
];
}

private createStreamResult(
Expand Down Expand Up @@ -1837,10 +1862,23 @@ export class StreamManager extends EventEmitter {

// CRITICAL: Delete partial.json before updating chat.jsonl
// On successful completion, partial.json becomes stale and must be removed
await this.historyService.deletePartial(workspaceId as string);
const deleteResult = await this.historyService.deletePartial(workspaceId as string);
if (!deleteResult.success) {
workspaceLog.warn("Failed to delete partial on stream end", {
error: deleteResult.error,
});
}

// Update the placeholder message in chat.jsonl with final content
await this.historyService.updateHistory(workspaceId as string, finalAssistantMessage);
const updateResult = await this.historyService.updateHistory(
workspaceId as string,
finalAssistantMessage
);
if (!updateResult.success) {
workspaceLog.warn("Failed to update history on stream end", {
error: updateResult.error,
});
}

// Update cumulative session usage (if service is available)
// Wrapped in try-catch: usage recording is non-critical and shouldn't block stream completion
Expand Down
Loading
Loading