diff --git a/apps/web-evals/src/app/runs/[id]/run.tsx b/apps/web-evals/src/app/runs/[id]/run.tsx index a4b39100245..41581a21c41 100644 --- a/apps/web-evals/src/app/runs/[id]/run.tsx +++ b/apps/web-evals/src/app/runs/[id]/run.tsx @@ -242,7 +242,7 @@ function formatLogContent(log: string): React.ReactNode[] { export function Run({ run }: { run: Run }) { const runStatus = useRunStatus(run) - const { tasks, tokenUsage, usageUpdatedAt, heartbeat, runners } = runStatus + const { tasks, tokenUsage, toolUsage, usageUpdatedAt, heartbeat, runners } = runStatus const [selectedTask, setSelectedTask] = useState(null) const [taskLog, setTaskLog] = useState(null) @@ -336,37 +336,70 @@ export function Run({ run }: { run: Run }) { ) const taskMetrics: Record = useMemo(() => { + // Reference usageUpdatedAt to trigger recomputation when Map contents change + void usageUpdatedAt const metrics: Record = {} tasks?.forEach((task) => { - const usage = tokenUsage.get(task.id) - - if (task.finishedAt && task.taskMetrics) { - metrics[task.id] = task.taskMetrics - } else if (usage) { + const streamingUsage = tokenUsage.get(task.id) + const dbMetrics = task.taskMetrics + + // For finished tasks, prefer DB values but fall back to streaming values + // This handles race conditions during timeout where DB might not have latest data + if (task.finishedAt) { + // Check if DB metrics have meaningful values (not just default/empty) + const dbHasData = dbMetrics && (dbMetrics.tokensIn > 0 || dbMetrics.tokensOut > 0 || dbMetrics.cost > 0) + if (dbHasData) { + metrics[task.id] = dbMetrics + } else if (streamingUsage) { + // Fall back to streaming values if DB is empty/stale + metrics[task.id] = { + tokensIn: streamingUsage.totalTokensIn, + tokensOut: streamingUsage.totalTokensOut, + tokensContext: streamingUsage.contextTokens, + duration: streamingUsage.duration ?? 0, + cost: streamingUsage.totalCost, + } + } + } else if (streamingUsage) { + // For running tasks, use streaming values metrics[task.id] = { - tokensIn: usage.totalTokensIn, - tokensOut: usage.totalTokensOut, - tokensContext: usage.contextTokens, - duration: usage.duration ?? 0, - cost: usage.totalCost, + tokensIn: streamingUsage.totalTokensIn, + tokensOut: streamingUsage.totalTokensOut, + tokensContext: streamingUsage.contextTokens, + duration: streamingUsage.duration ?? 0, + cost: streamingUsage.totalCost, } } }) return metrics - // eslint-disable-next-line react-hooks/exhaustive-deps }, [tasks, tokenUsage, usageUpdatedAt]) // Collect all unique tool names from all tasks and sort by total attempts const toolColumns = useMemo(() => { + // Reference usageUpdatedAt to trigger recomputation when Map contents change + void usageUpdatedAt if (!tasks) return [] const toolTotals = new Map() for (const task of tasks) { - if (task.taskMetrics?.toolUsage) { - for (const [toolName, usage] of Object.entries(task.taskMetrics.toolUsage)) { + // Get both DB and streaming values + const dbToolUsage = task.taskMetrics?.toolUsage + const streamingToolUsage = toolUsage.get(task.id) + + // For finished tasks, prefer DB values but fall back to streaming values + // For running tasks, use streaming values + // This handles race conditions during timeout where DB might not have latest data + const taskToolUsage = task.finishedAt + ? dbToolUsage && Object.keys(dbToolUsage).length > 0 + ? dbToolUsage + : streamingToolUsage + : streamingToolUsage + + if (taskToolUsage) { + for (const [toolName, usage] of Object.entries(taskToolUsage)) { const tool = toolName as ToolName const current = toolTotals.get(tool) ?? 0 toolTotals.set(tool, current + usage.attempts) @@ -378,10 +411,13 @@ export function Run({ run }: { run: Run }) { return Array.from(toolTotals.entries()) .sort((a, b) => b[1] - a[1]) .map(([name]): ToolName => name) - }, [tasks]) + // toolUsage ref is stable; usageUpdatedAt triggers recomputation when Map contents change + }, [tasks, toolUsage, usageUpdatedAt]) // Compute aggregate stats const stats = useMemo(() => { + // Reference usageUpdatedAt to trigger recomputation when Map contents change + void usageUpdatedAt if (!tasks) return null const passed = tasks.filter((t) => t.passed === true).length @@ -393,8 +429,8 @@ export function Run({ run }: { run: Run }) { let totalCost = 0 let totalDuration = 0 - // Aggregate tool usage from completed tasks - const toolUsage: ToolUsage = {} + // Aggregate tool usage from all tasks (both finished and running) + const toolUsageAggregate: ToolUsage = {} for (const task of tasks) { const metrics = taskMetrics[task.id] @@ -405,15 +441,24 @@ export function Run({ run }: { run: Run }) { totalDuration += metrics.duration } - // Aggregate tool usage from finished tasks with taskMetrics - if (task.finishedAt && task.taskMetrics?.toolUsage) { - for (const [key, usage] of Object.entries(task.taskMetrics.toolUsage)) { + // Aggregate tool usage: prefer DB values for finished tasks, fall back to streaming values + // This handles race conditions during timeout where DB might not have latest data + const dbToolUsage = task.taskMetrics?.toolUsage + const streamingToolUsage = toolUsage.get(task.id) + const taskToolUsage = task.finishedAt + ? dbToolUsage && Object.keys(dbToolUsage).length > 0 + ? dbToolUsage + : streamingToolUsage + : streamingToolUsage + + if (taskToolUsage) { + for (const [key, usage] of Object.entries(taskToolUsage)) { const tool = key as keyof ToolUsage - if (!toolUsage[tool]) { - toolUsage[tool] = { attempts: 0, failures: 0 } + if (!toolUsageAggregate[tool]) { + toolUsageAggregate[tool] = { attempts: 0, failures: 0 } } - toolUsage[tool].attempts += usage.attempts - toolUsage[tool].failures += usage.failures + toolUsageAggregate[tool].attempts += usage.attempts + toolUsageAggregate[tool].failures += usage.failures } } } @@ -427,13 +472,15 @@ export function Run({ run }: { run: Run }) { totalTokensOut, totalCost, totalDuration, - toolUsage, + toolUsage: toolUsageAggregate, } - // eslint-disable-next-line react-hooks/exhaustive-deps - }, [tasks, taskMetrics, tokenUsage, usageUpdatedAt]) + // Map refs are stable; usageUpdatedAt triggers recomputation when Map contents change + }, [tasks, taskMetrics, toolUsage, usageUpdatedAt]) // Calculate elapsed time (wall-clock time from run creation to completion or now) const elapsedTime = useMemo(() => { + // Reference usageUpdatedAt to trigger recomputation for live elapsed time updates + void usageUpdatedAt if (!tasks || tasks.length === 0) return null const startTime = new Date(run.createdAt).getTime() @@ -452,7 +499,6 @@ export function Run({ run }: { run: Run }) { // If still running, use current time return Date.now() - startTime - // eslint-disable-next-line react-hooks/exhaustive-deps }, [tasks, run.createdAt, run.taskMetricsId, usageUpdatedAt]) return ( @@ -655,7 +701,14 @@ export function Run({ run }: { run: Run }) { {formatTokens(taskMetrics[task.id]!.tokensContext)} {toolColumns.map((toolName) => { - const usage = task.taskMetrics?.toolUsage?.[toolName] + // Use DB values for finished tasks, but fall back to streaming values + // if DB values are missing (handles race condition during timeout) + const dbUsage = task.taskMetrics?.toolUsage?.[toolName] + const streamingUsage = toolUsage.get(task.id)?.[toolName] + const usage = task.finishedAt + ? (dbUsage ?? streamingUsage) + : streamingUsage + const successRate = usage && usage.attempts > 0 ? ((usage.attempts - usage.failures) / usage.attempts) * 100 diff --git a/apps/web-evals/src/hooks/use-run-status.ts b/apps/web-evals/src/hooks/use-run-status.ts index 5ad9e19ff36..cc7bc8f56b7 100644 --- a/apps/web-evals/src/hooks/use-run-status.ts +++ b/apps/web-evals/src/hooks/use-run-status.ts @@ -1,7 +1,7 @@ import { useState, useCallback, useRef } from "react" import { useQuery, keepPreviousData } from "@tanstack/react-query" -import { type TokenUsage, RooCodeEventName, taskEventSchema } from "@roo-code/types" +import { type TokenUsage, type ToolUsage, RooCodeEventName, taskEventSchema } from "@roo-code/types" import type { Run, Task, TaskMetrics } from "@roo-code/evals" import { getHeartbeat } from "@/actions/heartbeat" @@ -15,6 +15,7 @@ export type RunStatus = { runners: string[] | undefined tasks: (Task & { taskMetrics: TaskMetrics | null })[] | undefined tokenUsage: Map + toolUsage: Map usageUpdatedAt: number | undefined } @@ -23,6 +24,7 @@ export const useRunStatus = (run: Run): RunStatus => { const [usageUpdatedAt, setUsageUpdatedAt] = useState() const tokenUsage = useRef>(new Map()) + const toolUsage = useRef>(new Map()) const startTimes = useRef>(new Map()) const { data: heartbeat } = useQuery({ @@ -78,6 +80,12 @@ export const useRunStatus = (run: Run): RunStatus => { const startTime = startTimes.current.get(taskId) const duration = startTime ? Date.now() - startTime : undefined tokenUsage.current.set(taskId, { ...payload[1], duration }) + + // Track tool usage from streaming updates + if (payload[2]) { + toolUsage.current.set(taskId, payload[2]) + } + setUsageUpdatedAt(Date.now()) break } @@ -96,6 +104,7 @@ export const useRunStatus = (run: Run): RunStatus => { runners, tasks, tokenUsage: tokenUsage.current, + toolUsage: toolUsage.current, usageUpdatedAt, } } diff --git a/packages/evals/src/cli/runTask.ts b/packages/evals/src/cli/runTask.ts index 65b9633338c..5f737c1ad5d 100644 --- a/packages/evals/src/cli/runTask.ts +++ b/packages/evals/src/cli/runTask.ts @@ -13,6 +13,7 @@ import { RooCodeEventName, IpcMessageType, EVALS_SETTINGS, + type ToolUsage, } from "@roo-code/types" import { IpcClient } from "@roo-code/ipc" @@ -277,6 +278,8 @@ export const runTask = async ({ run, task, publish, logger, jobToken }: RunTaskO let taskMetricsId: number | undefined let rooTaskId: string | undefined let isClientDisconnected = false + // Track accumulated tool usage across task instances (handles rehydration after abort) + const accumulatedToolUsage: ToolUsage = {} const ignoreEvents: Record<"broadcast" | "log", RooCodeEventName[]> = { broadcast: [RooCodeEventName.Message], @@ -373,6 +376,27 @@ export const runTask = async ({ run, task, publish, logger, jobToken }: RunTaskO const { totalCost, totalTokensIn, totalTokensOut, contextTokens, totalCacheWrites, totalCacheReads } = payload[1] + // For both TaskTokenUsageUpdated and TaskCompleted: toolUsage is payload[2] + const incomingToolUsage: ToolUsage = payload[2] ?? {} + + // Merge incoming tool usage with accumulated data using MAX strategy. + // This handles the case where a task is rehydrated after abort: + // - Empty rehydrated data won't overwrite existing: max(5, 0) = 5 + // - Legitimate restart with additional work is captured: max(5, 8) = 8 + // Each task instance tracks its own cumulative values, so we take the max + // to preserve the highest values seen across all instances. + for (const [toolName, usage] of Object.entries(incomingToolUsage)) { + const existing = accumulatedToolUsage[toolName as keyof ToolUsage] + if (existing) { + accumulatedToolUsage[toolName as keyof ToolUsage] = { + attempts: Math.max(existing.attempts, usage.attempts), + failures: Math.max(existing.failures, usage.failures), + } + } else { + accumulatedToolUsage[toolName as keyof ToolUsage] = { ...usage } + } + } + await updateTaskMetrics(taskMetricsId, { cost: totalCost, tokensIn: totalTokensIn, @@ -381,14 +405,10 @@ export const runTask = async ({ run, task, publish, logger, jobToken }: RunTaskO duration, cacheWrites: totalCacheWrites ?? 0, cacheReads: totalCacheReads ?? 0, + toolUsage: accumulatedToolUsage, }) } - if (eventName === RooCodeEventName.TaskCompleted && taskMetricsId) { - const toolUsage = payload[2] - await updateTaskMetrics(taskMetricsId, { toolUsage }) - } - if (eventName === RooCodeEventName.TaskAborted) { taskAbortedAt = Date.now() } diff --git a/packages/types/src/events.ts b/packages/types/src/events.ts index 5e4415db207..5743ac29407 100644 --- a/packages/types/src/events.ts +++ b/packages/types/src/events.ts @@ -102,7 +102,7 @@ export const rooCodeEventsSchema = z.object({ [RooCodeEventName.TaskUserMessage]: z.tuple([z.string()]), [RooCodeEventName.TaskToolFailed]: z.tuple([z.string(), toolNamesSchema, z.string()]), - [RooCodeEventName.TaskTokenUsageUpdated]: z.tuple([z.string(), tokenUsageSchema]), + [RooCodeEventName.TaskTokenUsageUpdated]: z.tuple([z.string(), tokenUsageSchema, toolUsageSchema]), [RooCodeEventName.ModeChanged]: z.tuple([z.string()]), [RooCodeEventName.ProviderProfileChanged]: z.tuple([z.object({ name: z.string(), provider: z.string() })]), diff --git a/packages/types/src/task.ts b/packages/types/src/task.ts index 0067bfb96d6..3f6a0aa581c 100644 --- a/packages/types/src/task.ts +++ b/packages/types/src/task.ts @@ -78,7 +78,7 @@ export type TaskProviderEvents = { [RooCodeEventName.TaskUserMessage]: [taskId: string] - [RooCodeEventName.TaskTokenUsageUpdated]: [taskId: string, tokenUsage: TokenUsage] + [RooCodeEventName.TaskTokenUsageUpdated]: [taskId: string, tokenUsage: TokenUsage, toolUsage: ToolUsage] [RooCodeEventName.ModeChanged]: [mode: string] [RooCodeEventName.ProviderProfileChanged]: [config: { name: string; provider?: string }] @@ -159,5 +159,5 @@ export type TaskEvents = { // Task Analytics [RooCodeEventName.TaskToolFailed]: [taskId: string, tool: ToolName, error: string] - [RooCodeEventName.TaskTokenUsageUpdated]: [taskId: string, tokenUsage: TokenUsage] + [RooCodeEventName.TaskTokenUsageUpdated]: [taskId: string, tokenUsage: TokenUsage, toolUsage: ToolUsage] } diff --git a/src/__tests__/nested-delegation-resume.spec.ts b/src/__tests__/nested-delegation-resume.spec.ts index 61e61c88387..0c97ab5e2bf 100644 --- a/src/__tests__/nested-delegation-resume.spec.ts +++ b/src/__tests__/nested-delegation-resume.spec.ts @@ -180,6 +180,7 @@ describe("Nested delegation resume (A → B → C)", () => { clineMessages: [], userMessageContent: [], consecutiveMistakeCount: 0, + emitFinalTokenUsageUpdate: vi.fn(), } as unknown as Task const blockC = { @@ -223,6 +224,7 @@ describe("Nested delegation resume (A → B → C)", () => { clineMessages: [], userMessageContent: [], consecutiveMistakeCount: 0, + emitFinalTokenUsageUpdate: vi.fn(), } as unknown as Task const blockB = { diff --git a/src/core/task/Task.ts b/src/core/task/Task.ts index d63527aecde..488562e8f9a 100644 --- a/src/core/task/Task.ts +++ b/src/core/task/Task.ts @@ -8,6 +8,7 @@ import { AskIgnoredError } from "./AskIgnoredError" import { Anthropic } from "@anthropic-ai/sdk" import OpenAI from "openai" +import debounce from "lodash.debounce" import delay from "delay" import pWaitFor from "p-wait-for" import { serializeError } from "serialize-error" @@ -63,7 +64,7 @@ import { combineApiRequests } from "../../shared/combineApiRequests" import { combineCommandSequences } from "../../shared/combineCommandSequences" import { t } from "../../i18n" import { ClineApiReqCancelReason, ClineApiReqInfo } from "../../shared/ExtensionMessage" -import { getApiMetrics, hasTokenUsageChanged } from "../../shared/getApiMetrics" +import { getApiMetrics, hasTokenUsageChanged, hasToolUsageChanged } from "../../shared/getApiMetrics" import { ClineAskResponse } from "../../shared/WebviewMessage" import { defaultModeSlug, getModeBySlug, getGroupName } from "../../shared/modes" import { DiffStrategy, type ToolUse, type ToolParamName, toolParamNames } from "../../shared/tools" @@ -324,6 +325,13 @@ export class Task extends EventEmitter implements TaskLike { private tokenUsageSnapshot?: TokenUsage private tokenUsageSnapshotAt?: number + // Tool Usage Cache + private toolUsageSnapshot?: ToolUsage + + // Token Usage Throttling - Debounced emit function + private readonly TOKEN_USAGE_EMIT_INTERVAL_MS = 2000 // 2 seconds + private debouncedEmitTokenUsage: ReturnType + // Cloud Sync Tracking private cloudSyncedMessageTimestamps: Set = new Set() @@ -500,6 +508,28 @@ export class Task extends EventEmitter implements TaskLike { this.todoList = initialTodos } + // Initialize debounced token usage emit function + // Uses debounce with maxWait to achieve throttle-like behavior: + // - leading: true - Emit immediately on first call + // - trailing: true - Emit final state when updates stop + // - maxWait - Ensures at most one emit per interval during rapid updates (throttle behavior) + this.debouncedEmitTokenUsage = debounce( + (tokenUsage: TokenUsage, toolUsage: ToolUsage) => { + const tokenChanged = hasTokenUsageChanged(tokenUsage, this.tokenUsageSnapshot) + const toolChanged = hasToolUsageChanged(toolUsage, this.toolUsageSnapshot) + + if (tokenChanged || toolChanged) { + this.emit(RooCodeEventName.TaskTokenUsageUpdated, this.taskId, tokenUsage, toolUsage) + this.tokenUsageSnapshot = tokenUsage + this.tokenUsageSnapshotAt = this.clineMessages.at(-1)?.ts + // Deep copy tool usage for snapshot + this.toolUsageSnapshot = JSON.parse(JSON.stringify(toolUsage)) + } + }, + this.TOKEN_USAGE_EMIT_INTERVAL_MS, + { leading: true, trailing: true, maxWait: this.TOKEN_USAGE_EMIT_INTERVAL_MS }, + ) + onCreated?.(this) if (startTask) { @@ -920,11 +950,12 @@ export class Task extends EventEmitter implements TaskLike { initialStatus: this.initialStatus, }) - if (hasTokenUsageChanged(tokenUsage, this.tokenUsageSnapshot)) { - this.emit(RooCodeEventName.TaskTokenUsageUpdated, this.taskId, tokenUsage) - this.tokenUsageSnapshot = undefined - this.tokenUsageSnapshotAt = undefined - } + // Emit token/tool usage updates using debounced function + // The debounce with maxWait ensures: + // - Immediate first emit (leading: true) + // - At most one emit per interval during rapid updates (maxWait) + // - Final state is emitted when updates stop (trailing: true) + this.debouncedEmitTokenUsage(tokenUsage, this.toolUsage) await this.providerRef.deref()?.updateTaskHistory(historyItem) } catch (error) { @@ -1842,6 +1873,17 @@ export class Task extends EventEmitter implements TaskLike { } } + /** + * Force emit a final token usage update, ignoring throttle. + * Called before task completion or abort to ensure final stats are captured. + * Triggers the debounce with current values and immediately flushes to ensure emit. + */ + public emitFinalTokenUsageUpdate(): void { + const tokenUsage = this.getTokenUsage() + this.debouncedEmitTokenUsage(tokenUsage, this.toolUsage) + this.debouncedEmitTokenUsage.flush() + } + public async abortTask(isAbandoned = false) { // Aborting task @@ -1851,6 +1893,10 @@ export class Task extends EventEmitter implements TaskLike { } this.abort = true + + // Force final token usage update before abort event + this.emitFinalTokenUsageUpdate() + this.emit(RooCodeEventName.TaskAborted) try { diff --git a/src/core/task/__tests__/Task.throttle.test.ts b/src/core/task/__tests__/Task.throttle.test.ts new file mode 100644 index 00000000000..1d5911be9f9 --- /dev/null +++ b/src/core/task/__tests__/Task.throttle.test.ts @@ -0,0 +1,619 @@ +import { RooCodeEventName, ProviderSettings, TokenUsage, ToolUsage } from "@roo-code/types" + +import { Task } from "../Task" +import { ClineProvider } from "../../webview/ClineProvider" +import { hasToolUsageChanged, hasTokenUsageChanged } from "../../../shared/getApiMetrics" + +// Mock dependencies +vi.mock("../../webview/ClineProvider") +vi.mock("../../../integrations/terminal/TerminalRegistry", () => ({ + TerminalRegistry: { + releaseTerminalsForTask: vi.fn(), + }, +})) +vi.mock("../../ignore/RooIgnoreController") +vi.mock("../../protect/RooProtectedController") +vi.mock("../../context-tracking/FileContextTracker") +vi.mock("../../../services/browser/UrlContentFetcher") +vi.mock("../../../services/browser/BrowserSession") +vi.mock("../../../integrations/editor/DiffViewProvider") +vi.mock("../../tools/ToolRepetitionDetector") +vi.mock("../../../api", () => ({ + buildApiHandler: vi.fn(() => ({ + getModel: () => ({ info: {}, id: "test-model" }), + })), +})) + +// Mock TelemetryService +vi.mock("@roo-code/telemetry", () => ({ + TelemetryService: { + instance: { + captureTaskCreated: vi.fn(), + captureTaskRestarted: vi.fn(), + }, + }, +})) + +// Mock task persistence to avoid disk writes +vi.mock("../../task-persistence", () => ({ + readApiMessages: vi.fn().mockResolvedValue([]), + saveApiMessages: vi.fn().mockResolvedValue(undefined), + readTaskMessages: vi.fn().mockResolvedValue([]), + saveTaskMessages: vi.fn().mockResolvedValue(undefined), + taskMetadata: vi.fn().mockResolvedValue({ + historyItem: { + id: "test-task-id", + number: 1, + task: "Test task", + ts: Date.now(), + totalCost: 0.01, + tokensIn: 100, + tokensOut: 50, + }, + tokenUsage: { + totalTokensIn: 100, + totalTokensOut: 50, + totalCost: 0.01, + contextTokens: 150, + totalCacheWrites: 0, + totalCacheReads: 0, + }, + }), +})) + +describe("Task token usage throttling", () => { + let mockProvider: any + let mockApiConfiguration: ProviderSettings + let task: Task + + beforeEach(() => { + // Reset all mocks + vi.clearAllMocks() + vi.useFakeTimers() + + // Mock provider + mockProvider = { + context: { + globalStorageUri: { fsPath: "/test/path" }, + }, + getState: vi.fn().mockResolvedValue({ mode: "code" }), + log: vi.fn(), + postStateToWebview: vi.fn().mockResolvedValue(undefined), + updateTaskHistory: vi.fn().mockResolvedValue(undefined), + } + + // Mock API configuration + mockApiConfiguration = { + apiProvider: "anthropic", + apiKey: "test-key", + } as ProviderSettings + + // Create task instance without starting it + task = new Task({ + provider: mockProvider as ClineProvider, + apiConfiguration: mockApiConfiguration, + startTask: false, + }) + }) + + afterEach(() => { + vi.useRealTimers() + if (task && !task.abort) { + task.dispose() + } + }) + + test("should emit TaskTokenUsageUpdated immediately on first change", async () => { + const emitSpy = vi.spyOn(task, "emit") + + // Add a message to trigger saveClineMessages + await (task as any).addToClineMessages({ + ts: Date.now(), + type: "say", + say: "text", + text: "Test message", + }) + + // Should emit immediately on first change + expect(emitSpy).toHaveBeenCalledWith( + RooCodeEventName.TaskTokenUsageUpdated, + task.taskId, + expect.any(Object), + expect.any(Object), + ) + }) + + test("should throttle subsequent emissions within 2 seconds", async () => { + const { taskMetadata } = await import("../../task-persistence") + let callCount = 0 + + // Mock to return different token usage on each call + vi.mocked(taskMetadata).mockImplementation(async () => { + callCount++ + return { + historyItem: { + id: "test-task-id", + number: 1, + task: "Test task", + ts: Date.now(), + totalCost: 0.01 * callCount, + tokensIn: 100 * callCount, + tokensOut: 50 * callCount, + }, + tokenUsage: { + totalTokensIn: 100 * callCount, + totalTokensOut: 50 * callCount, + totalCost: 0.01 * callCount, + contextTokens: 150 * callCount, + totalCacheWrites: 0, + totalCacheReads: 0, + }, + } + }) + + const emitSpy = vi.spyOn(task, "emit") + + // First message - should emit + await (task as any).addToClineMessages({ + ts: Date.now(), + type: "say", + say: "text", + text: "Message 1", + }) + + const firstEmitCount = emitSpy.mock.calls.filter( + (call) => call[0] === RooCodeEventName.TaskTokenUsageUpdated, + ).length + + // Second message immediately after - should NOT emit due to throttle + vi.advanceTimersByTime(500) // Advance only 500ms + await (task as any).addToClineMessages({ + ts: Date.now(), + type: "say", + say: "text", + text: "Message 2", + }) + + const secondEmitCount = emitSpy.mock.calls.filter( + (call) => call[0] === RooCodeEventName.TaskTokenUsageUpdated, + ).length + + // Should still be the same count (throttled) + expect(secondEmitCount).toBe(firstEmitCount) + + // Third message after 2+ seconds - should emit + vi.advanceTimersByTime(1600) // Total time: 2100ms + await (task as any).addToClineMessages({ + ts: Date.now(), + type: "say", + say: "text", + text: "Message 3", + }) + + const thirdEmitCount = emitSpy.mock.calls.filter( + (call) => call[0] === RooCodeEventName.TaskTokenUsageUpdated, + ).length + + // Should have emitted again after throttle period + expect(thirdEmitCount).toBeGreaterThan(secondEmitCount) + }) + + test("should include toolUsage in emission payload", async () => { + const emitSpy = vi.spyOn(task, "emit") + + // Set some tool usage + task.toolUsage = { + read_file: { attempts: 5, failures: 1 }, + write_to_file: { attempts: 3, failures: 0 }, + } + + // Add a message to trigger emission + await (task as any).addToClineMessages({ + ts: Date.now(), + type: "say", + say: "text", + text: "Test message", + }) + + // Should emit with toolUsage as third parameter + expect(emitSpy).toHaveBeenCalledWith( + RooCodeEventName.TaskTokenUsageUpdated, + task.taskId, + expect.any(Object), // tokenUsage + task.toolUsage, // toolUsage + ) + }) + + test("should force final emission on task abort", async () => { + const emitSpy = vi.spyOn(task, "emit") + + // Set some tool usage + task.toolUsage = { + read_file: { attempts: 5, failures: 1 }, + } + + // Add a message first + await (task as any).addToClineMessages({ + ts: Date.now(), + type: "say", + say: "text", + text: "Message 1", + }) + + // Clear the spy to check for final emission + emitSpy.mockClear() + + // Abort task immediately (within throttle window) + vi.advanceTimersByTime(500) + await task.abortTask() + + // Should have emitted TaskTokenUsageUpdated before TaskAborted + const calls = emitSpy.mock.calls + const tokenUsageUpdateIndex = calls.findIndex((call) => call[0] === RooCodeEventName.TaskTokenUsageUpdated) + const taskAbortedIndex = calls.findIndex((call) => call[0] === RooCodeEventName.TaskAborted) + + // Should have both events + expect(tokenUsageUpdateIndex).toBeGreaterThanOrEqual(0) + expect(taskAbortedIndex).toBeGreaterThanOrEqual(0) + + // TaskTokenUsageUpdated should come before TaskAborted + expect(tokenUsageUpdateIndex).toBeLessThan(taskAbortedIndex) + }) + + test("should update tokenUsageSnapshot when throttled emission occurs", async () => { + const { taskMetadata } = await import("../../task-persistence") + let callCount = 0 + + // Mock to return different token usage on each call + vi.mocked(taskMetadata).mockImplementation(async () => { + callCount++ + return { + historyItem: { + id: "test-task-id", + number: 1, + task: "Test task", + ts: Date.now(), + totalCost: 0.01 * callCount, + tokensIn: 100 * callCount, + tokensOut: 50 * callCount, + }, + tokenUsage: { + totalTokensIn: 100 * callCount, + totalTokensOut: 50 * callCount, + totalCost: 0.01 * callCount, + contextTokens: 150 * callCount, + totalCacheWrites: 0, + totalCacheReads: 0, + }, + } + }) + + // Add initial message + await (task as any).addToClineMessages({ + ts: Date.now(), + type: "say", + say: "text", + text: "Message 1", + }) + + // Get the initial snapshot + const initialSnapshot = (task as any).tokenUsageSnapshot + + // Add another message within throttle window + vi.advanceTimersByTime(500) + await (task as any).addToClineMessages({ + ts: Date.now(), + type: "say", + say: "text", + text: "Message 2", + }) + + // Snapshot should still be the same (throttled) + expect((task as any).tokenUsageSnapshot).toBe(initialSnapshot) + + // Add message after throttle window + vi.advanceTimersByTime(1600) // Total: 2100ms + await (task as any).addToClineMessages({ + ts: Date.now(), + type: "say", + say: "text", + text: "Message 3", + }) + + // Snapshot should be updated now (new object reference) + expect((task as any).tokenUsageSnapshot).not.toBe(initialSnapshot) + // Values should be different + expect((task as any).tokenUsageSnapshot.totalTokensIn).toBeGreaterThan(initialSnapshot.totalTokensIn) + }) + + test("should not emit if token usage has not changed even after throttle period", async () => { + const { taskMetadata } = await import("../../task-persistence") + + // Mock taskMetadata to return same token usage + const constantTokenUsage: TokenUsage = { + totalTokensIn: 100, + totalTokensOut: 50, + totalCost: 0.01, + contextTokens: 150, + totalCacheWrites: 0, + totalCacheReads: 0, + } + + vi.mocked(taskMetadata).mockResolvedValue({ + historyItem: { + id: "test-task-id", + number: 1, + task: "Test task", + ts: Date.now(), + totalCost: 0.01, + tokensIn: 100, + tokensOut: 50, + }, + tokenUsage: constantTokenUsage, + }) + + const emitSpy = vi.spyOn(task, "emit") + + // Add first message + await (task as any).addToClineMessages({ + ts: Date.now(), + type: "say", + say: "text", + text: "Message 1", + }) + + const firstEmitCount = emitSpy.mock.calls.filter( + (call) => call[0] === RooCodeEventName.TaskTokenUsageUpdated, + ).length + + // Wait for throttle period and add another message + vi.advanceTimersByTime(2100) + await (task as any).addToClineMessages({ + ts: Date.now(), + type: "say", + say: "text", + text: "Message 2", + }) + + const secondEmitCount = emitSpy.mock.calls.filter( + (call) => call[0] === RooCodeEventName.TaskTokenUsageUpdated, + ).length + + // Should not have emitted again since token usage didn't change + expect(secondEmitCount).toBe(firstEmitCount) + }) + + test("should emit when tool usage changes even if token usage is the same", async () => { + const { taskMetadata } = await import("../../task-persistence") + + // Mock taskMetadata to return same token usage + const constantTokenUsage: TokenUsage = { + totalTokensIn: 100, + totalTokensOut: 50, + totalCost: 0.01, + contextTokens: 150, + totalCacheWrites: 0, + totalCacheReads: 0, + } + + vi.mocked(taskMetadata).mockResolvedValue({ + historyItem: { + id: "test-task-id", + number: 1, + task: "Test task", + ts: Date.now(), + totalCost: 0.01, + tokensIn: 100, + tokensOut: 50, + }, + tokenUsage: constantTokenUsage, + }) + + const emitSpy = vi.spyOn(task, "emit") + + // Add first message - should emit + await (task as any).addToClineMessages({ + ts: Date.now(), + type: "say", + say: "text", + text: "Message 1", + }) + + const firstEmitCount = emitSpy.mock.calls.filter( + (call) => call[0] === RooCodeEventName.TaskTokenUsageUpdated, + ).length + + // Wait for throttle period + vi.advanceTimersByTime(2100) + + // Change tool usage (token usage stays the same) + task.toolUsage = { + read_file: { attempts: 5, failures: 1 }, + } + + // Add another message + await (task as any).addToClineMessages({ + ts: Date.now(), + type: "say", + say: "text", + text: "Message 2", + }) + + const secondEmitCount = emitSpy.mock.calls.filter( + (call) => call[0] === RooCodeEventName.TaskTokenUsageUpdated, + ).length + + // Should have emitted because tool usage changed even though token usage didn't + expect(secondEmitCount).toBeGreaterThan(firstEmitCount) + }) + + test("should update toolUsageSnapshot when emission occurs", async () => { + // Add initial message + await (task as any).addToClineMessages({ + ts: Date.now(), + type: "say", + say: "text", + text: "Message 1", + }) + + // Initially toolUsageSnapshot should be set to current toolUsage (empty object) + const initialSnapshot = (task as any).toolUsageSnapshot + expect(initialSnapshot).toBeDefined() + expect(Object.keys(initialSnapshot)).toHaveLength(0) + + // Wait for throttle period + vi.advanceTimersByTime(2100) + + // Update tool usage + task.toolUsage = { + read_file: { attempts: 3, failures: 0 }, + write_to_file: { attempts: 2, failures: 1 }, + } + + // Add another message + await (task as any).addToClineMessages({ + ts: Date.now(), + type: "say", + say: "text", + text: "Message 2", + }) + + // Snapshot should be updated to match the new toolUsage + const newSnapshot = (task as any).toolUsageSnapshot + expect(newSnapshot).not.toBe(initialSnapshot) + expect(newSnapshot.read_file).toEqual({ attempts: 3, failures: 0 }) + expect(newSnapshot.write_to_file).toEqual({ attempts: 2, failures: 1 }) + }) + + test("emitFinalTokenUsageUpdate should emit on tool usage change alone", async () => { + const emitSpy = vi.spyOn(task, "emit") + + // Set initial tool usage and simulate previous emission + ;(task as any).tokenUsageSnapshot = task.getTokenUsage() + ;(task as any).toolUsageSnapshot = {} + + // Change tool usage + task.toolUsage = { + execute_command: { attempts: 1, failures: 0 }, + } + + // Call emitFinalTokenUsageUpdate + task.emitFinalTokenUsageUpdate() + + // Should emit due to tool usage change + expect(emitSpy).toHaveBeenCalledWith( + RooCodeEventName.TaskTokenUsageUpdated, + task.taskId, + expect.any(Object), + task.toolUsage, + ) + }) +}) + +describe("hasToolUsageChanged", () => { + test("should return true when snapshot is undefined and current has data", () => { + const current: ToolUsage = { + read_file: { attempts: 1, failures: 0 }, + } + expect(hasToolUsageChanged(current, undefined)).toBe(true) + }) + + test("should return false when both are empty", () => { + expect(hasToolUsageChanged({}, {})).toBe(false) + }) + + test("should return false when snapshot is undefined and current is empty", () => { + expect(hasToolUsageChanged({}, undefined)).toBe(false) + }) + + test("should return true when a new tool is added", () => { + const current: ToolUsage = { + read_file: { attempts: 1, failures: 0 }, + write_to_file: { attempts: 1, failures: 0 }, + } + const snapshot: ToolUsage = { + read_file: { attempts: 1, failures: 0 }, + } + expect(hasToolUsageChanged(current, snapshot)).toBe(true) + }) + + test("should return true when attempts change", () => { + const current: ToolUsage = { + read_file: { attempts: 2, failures: 0 }, + } + const snapshot: ToolUsage = { + read_file: { attempts: 1, failures: 0 }, + } + expect(hasToolUsageChanged(current, snapshot)).toBe(true) + }) + + test("should return true when failures change", () => { + const current: ToolUsage = { + read_file: { attempts: 1, failures: 1 }, + } + const snapshot: ToolUsage = { + read_file: { attempts: 1, failures: 0 }, + } + expect(hasToolUsageChanged(current, snapshot)).toBe(true) + }) + + test("should return false when nothing changed", () => { + const current: ToolUsage = { + read_file: { attempts: 3, failures: 1 }, + write_to_file: { attempts: 2, failures: 0 }, + } + const snapshot: ToolUsage = { + read_file: { attempts: 3, failures: 1 }, + write_to_file: { attempts: 2, failures: 0 }, + } + expect(hasToolUsageChanged(current, snapshot)).toBe(false) + }) +}) + +describe("hasTokenUsageChanged", () => { + test("should return true when snapshot is undefined", () => { + const current: TokenUsage = { + totalTokensIn: 100, + totalTokensOut: 50, + totalCost: 0.01, + contextTokens: 150, + } + expect(hasTokenUsageChanged(current, undefined)).toBe(true) + }) + + test("should return true when totalTokensIn changes", () => { + const current: TokenUsage = { + totalTokensIn: 200, + totalTokensOut: 50, + totalCost: 0.01, + contextTokens: 150, + } + const snapshot: TokenUsage = { + totalTokensIn: 100, + totalTokensOut: 50, + totalCost: 0.01, + contextTokens: 150, + } + expect(hasTokenUsageChanged(current, snapshot)).toBe(true) + }) + + test("should return false when nothing changed", () => { + const current: TokenUsage = { + totalTokensIn: 100, + totalTokensOut: 50, + totalCost: 0.01, + contextTokens: 150, + totalCacheWrites: 10, + totalCacheReads: 5, + } + const snapshot: TokenUsage = { + totalTokensIn: 100, + totalTokensOut: 50, + totalCost: 0.01, + contextTokens: 150, + totalCacheWrites: 10, + totalCacheReads: 5, + } + expect(hasTokenUsageChanged(current, snapshot)).toBe(false) + }) +}) diff --git a/src/core/tools/AttemptCompletionTool.ts b/src/core/tools/AttemptCompletionTool.ts index e21f3b71880..77677b731b5 100644 --- a/src/core/tools/AttemptCompletionTool.ts +++ b/src/core/tools/AttemptCompletionTool.ts @@ -85,6 +85,12 @@ export class AttemptCompletionTool extends BaseTool<"attempt_completion"> { task.consecutiveMistakeCount = 0 await task.say("completion_result", result, undefined, false) + + // Force final token usage update before emitting TaskCompleted + // This ensures the most recent stats are captured regardless of throttle timer + // and properly updates the snapshot to prevent redundant emissions + task.emitFinalTokenUsageUpdate() + TelemetryService.instance.captureTaskCompleted(task.taskId) task.emit(RooCodeEventName.TaskCompleted, task.taskId, task.getTokenUsage(), task.toolUsage) @@ -198,6 +204,9 @@ export class AttemptCompletionTool extends BaseTool<"attempt_completion"> { false, ) + // Force final token usage update before emitting TaskCompleted for consistency + task.emitFinalTokenUsageUpdate() + TelemetryService.instance.captureTaskCompleted(task.taskId) task.emit(RooCodeEventName.TaskCompleted, task.taskId, task.getTokenUsage(), task.toolUsage) diff --git a/src/core/webview/ClineProvider.ts b/src/core/webview/ClineProvider.ts index 3e054ce7d25..8aac3eccd65 100644 --- a/src/core/webview/ClineProvider.ts +++ b/src/core/webview/ClineProvider.ts @@ -33,6 +33,7 @@ import { type CloudOrganizationMembership, type CreateTaskOptions, type TokenUsage, + type ToolUsage, RooCodeEventName, requestyDefaultModelId, openRouterDefaultModelId, @@ -205,7 +206,7 @@ export class ClineProvider // Create named listener functions so we can remove them later. const onTaskStarted = () => this.emit(RooCodeEventName.TaskStarted, instance.taskId) - const onTaskCompleted = (taskId: string, tokenUsage: any, toolUsage: any) => + const onTaskCompleted = (taskId: string, tokenUsage: TokenUsage, toolUsage: ToolUsage) => this.emit(RooCodeEventName.TaskCompleted, taskId, tokenUsage, toolUsage) const onTaskAborted = async () => { this.emit(RooCodeEventName.TaskAborted, instance.taskId) @@ -246,8 +247,8 @@ export class ClineProvider const onTaskUnpaused = (taskId: string) => this.emit(RooCodeEventName.TaskUnpaused, taskId) const onTaskSpawned = (taskId: string) => this.emit(RooCodeEventName.TaskSpawned, taskId) const onTaskUserMessage = (taskId: string) => this.emit(RooCodeEventName.TaskUserMessage, taskId) - const onTaskTokenUsageUpdated = (taskId: string, tokenUsage: TokenUsage) => - this.emit(RooCodeEventName.TaskTokenUsageUpdated, taskId, tokenUsage) + const onTaskTokenUsageUpdated = (taskId: string, tokenUsage: TokenUsage, toolUsage: ToolUsage) => + this.emit(RooCodeEventName.TaskTokenUsageUpdated, taskId, tokenUsage, toolUsage) // Attach the listeners. instance.on(RooCodeEventName.TaskStarted, onTaskStarted) diff --git a/src/extension/api.ts b/src/extension/api.ts index a9e11b4dbe5..e9c35861c5a 100644 --- a/src/extension/api.ts +++ b/src/extension/api.ts @@ -307,8 +307,8 @@ export class API extends EventEmitter implements RooCodeAPI { this.emit(RooCodeEventName.TaskToolFailed, taskId, tool, error) }) - task.on(RooCodeEventName.TaskTokenUsageUpdated, (_, usage) => { - this.emit(RooCodeEventName.TaskTokenUsageUpdated, task.taskId, usage) + task.on(RooCodeEventName.TaskTokenUsageUpdated, (_, tokenUsage, toolUsage) => { + this.emit(RooCodeEventName.TaskTokenUsageUpdated, task.taskId, tokenUsage, toolUsage) }) // Let's go! diff --git a/src/shared/getApiMetrics.ts b/src/shared/getApiMetrics.ts index 2ae47567648..072c9da6d3f 100644 --- a/src/shared/getApiMetrics.ts +++ b/src/shared/getApiMetrics.ts @@ -1,4 +1,4 @@ -import type { TokenUsage, ClineMessage } from "@roo-code/types" +import type { TokenUsage, ToolUsage, ToolName, ClineMessage } from "@roo-code/types" export type ParsedApiReqStartedTextType = { tokensIn: number @@ -123,3 +123,34 @@ export function hasTokenUsageChanged(current: TokenUsage, snapshot?: TokenUsage) return keysToCompare.some((key) => current[key] !== snapshot[key]) } + +/** + * Check if tool usage has changed by comparing attempts and failures. + * @param current - Current tool usage data + * @param snapshot - Previous snapshot to compare against (undefined treated as empty) + * @returns true if any tool's attempts/failures have changed between current and snapshot + */ +export function hasToolUsageChanged(current: ToolUsage, snapshot?: ToolUsage): boolean { + // Treat undefined snapshot as empty object for consistent comparison + const effectiveSnapshot = snapshot ?? {} + + const currentKeys = Object.keys(current) as ToolName[] + const snapshotKeys = Object.keys(effectiveSnapshot) as ToolName[] + + // Check if number of tools changed + if (currentKeys.length !== snapshotKeys.length) { + return true + } + + // Check if any tool's stats changed + return currentKeys.some((key) => { + const currentTool = current[key] + const snapshotTool = effectiveSnapshot[key] + + if (!snapshotTool || !currentTool) { + return true + } + + return currentTool.attempts !== snapshotTool.attempts || currentTool.failures !== snapshotTool.failures + }) +}