From 84ec5a7e68cfdd3fe5625bdaa9c3d8b2eb96c118 Mon Sep 17 00:00:00 2001 From: shaun0927 Date: Thu, 16 Apr 2026 10:27:20 +0900 Subject: [PATCH 1/5] fix: isolate cursor telemetry samples per recording session MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, the main process kept two module-scope arrays — activeCursorSamples and pendingCursorSamples — and set-recording-state on a new recording wiped BOTH. When a user stopped recording and immediately started a new one before store-recorded-session fired, the previous recording's pending samples were discarded or later overwritten with the new session's data, producing empty or mismatched .cursor.json files. Replace the two arrays with a small FIFO buffer (createCursorTelemetryBuffer) that: - Keeps pending batches per completed recording, never wiping them on a new session start. - Yields batches in arrival order to storeRecordedSessionFiles. - Caps pending batches (default 8) so a never-stored sequence cannot leak unbounded memory. Unit-tested directly in src/lib/cursorTelemetryBuffer.test.ts, including the rapid-restart race that motivated the change. --- electron/ipc/handlers.ts | 33 +++----- src/lib/cursorTelemetryBuffer.test.ts | 113 ++++++++++++++++++++++++++ src/lib/cursorTelemetryBuffer.ts | 66 +++++++++++++++ 3 files changed, 192 insertions(+), 20 deletions(-) create mode 100644 src/lib/cursorTelemetryBuffer.test.ts create mode 100644 src/lib/cursorTelemetryBuffer.ts diff --git a/electron/ipc/handlers.ts b/electron/ipc/handlers.ts index 4cb487567..284a671a4 100644 --- a/electron/ipc/handlers.ts +++ b/electron/ipc/handlers.ts @@ -11,6 +11,10 @@ import { shell, systemPreferences, } from "electron"; +import { + type CursorTelemetryPoint, + createCursorTelemetryBuffer, +} from "../../src/lib/cursorTelemetryBuffer"; import { normalizeProjectMedia, normalizeRecordingSession, @@ -275,14 +279,14 @@ async function storeRecordedSessionFiles(payload: StoreRecordedSessionInput) { currentProjectPath = null; const telemetryPath = `${screenVideoPath}.cursor.json`; - if (pendingCursorSamples.length > 0) { + const pendingSamples: CursorTelemetryPoint[] = cursorTelemetryBuffer.takeNextBatch(); + if (pendingSamples.length > 0) { await fs.writeFile( telemetryPath, - JSON.stringify({ version: CURSOR_TELEMETRY_VERSION, samples: pendingCursorSamples }, null, 2), + JSON.stringify({ version: CURSOR_TELEMETRY_VERSION, samples: pendingSamples }, null, 2), "utf-8", ); } - pendingCursorSamples = []; const sessionManifestPath = path.join( RECORDINGS_DIR, @@ -302,16 +306,11 @@ const CURSOR_TELEMETRY_VERSION = 1; const CURSOR_SAMPLE_INTERVAL_MS = 100; const MAX_CURSOR_SAMPLES = 60 * 60 * 10; // 1 hour @ 10Hz -interface CursorTelemetryPoint { - timeMs: number; - cx: number; - cy: number; -} - let cursorCaptureInterval: NodeJS.Timeout | null = null; let cursorCaptureStartTimeMs = 0; -let activeCursorSamples: CursorTelemetryPoint[] = []; -let pendingCursorSamples: CursorTelemetryPoint[] = []; +const cursorTelemetryBuffer = createCursorTelemetryBuffer({ + maxActiveSamples: MAX_CURSOR_SAMPLES, +}); function clamp(value: number, min: number, max: number) { return Math.min(max, Math.max(min, value)); @@ -338,15 +337,11 @@ function sampleCursorPoint() { const cx = clamp((cursor.x - bounds.x) / width, 0, 1); const cy = clamp((cursor.y - bounds.y) / height, 0, 1); - activeCursorSamples.push({ + cursorTelemetryBuffer.push({ timeMs: Math.max(0, Date.now() - cursorCaptureStartTimeMs), cx, cy, }); - - if (activeCursorSamples.length > MAX_CURSOR_SAMPLES) { - activeCursorSamples.shift(); - } } export function registerIpcHandlers( @@ -534,15 +529,13 @@ export function registerIpcHandlers( ipcMain.handle("set-recording-state", (_, recording: boolean) => { if (recording) { stopCursorCapture(); - activeCursorSamples = []; - pendingCursorSamples = []; + cursorTelemetryBuffer.startSession(); cursorCaptureStartTimeMs = Date.now(); sampleCursorPoint(); cursorCaptureInterval = setInterval(sampleCursorPoint, CURSOR_SAMPLE_INTERVAL_MS); } else { stopCursorCapture(); - pendingCursorSamples = [...activeCursorSamples]; - activeCursorSamples = []; + cursorTelemetryBuffer.endSession(); } const source = selectedSource || { name: "Screen" }; diff --git a/src/lib/cursorTelemetryBuffer.test.ts b/src/lib/cursorTelemetryBuffer.test.ts new file mode 100644 index 000000000..a62639459 --- /dev/null +++ b/src/lib/cursorTelemetryBuffer.test.ts @@ -0,0 +1,113 @@ +import { describe, expect, it } from "vitest"; +import { type CursorTelemetryPoint, createCursorTelemetryBuffer } from "./cursorTelemetryBuffer"; + +function sample(tag: number): CursorTelemetryPoint { + return { timeMs: tag, cx: tag / 10, cy: tag / 10 }; +} + +describe("createCursorTelemetryBuffer", () => { + it("stores samples captured during an active session", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); + buf.startSession(); + for (let i = 0; i < 3; i++) buf.push(sample(i)); + buf.endSession(); + + const batch = buf.takeNextBatch(); + expect(batch).toHaveLength(3); + expect(batch[0]?.timeMs).toBe(0); + }); + + it("trims active samples past maxActiveSamples (ring behaviour)", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 2 }); + buf.startSession(); + buf.push(sample(1)); + buf.push(sample(2)); + buf.push(sample(3)); + buf.endSession(); + + const batch = buf.takeNextBatch(); + expect(batch).toEqual([sample(2), sample(3)]); + }); + + it("preserves earlier pending batches when a new session starts before store", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); + + // Recording 1 + buf.startSession(); + buf.push(sample(101)); + buf.push(sample(102)); + buf.endSession(); + + // Recording 2 starts before recording 1's batch has been consumed + buf.startSession(); + buf.push(sample(201)); + buf.endSession(); + + const batch1 = buf.takeNextBatch(); + const batch2 = buf.takeNextBatch(); + expect(batch1.map((s) => s.timeMs)).toEqual([101, 102]); + expect(batch2.map((s) => s.timeMs)).toEqual([201]); + }); + + it("returns an empty batch when nothing is pending", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); + expect(buf.takeNextBatch()).toEqual([]); + }); + + it("drops empty sessions instead of queuing empty batches", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); + buf.startSession(); + buf.endSession(); + expect(buf.pendingCount).toBe(0); + expect(buf.takeNextBatch()).toEqual([]); + }); + + it("caps the pending queue at maxPendingBatches to bound memory", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10, maxPendingBatches: 3 }); + + for (let round = 1; round <= 5; round++) { + buf.startSession(); + buf.push(sample(round)); + buf.endSession(); + } + + expect(buf.pendingCount).toBe(3); + // Oldest two batches (rounds 1 and 2) should have been dropped + expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([3]); + expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([4]); + expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([5]); + }); + + it("starting a new session clears in-progress samples but keeps pending batches", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); + + buf.startSession(); + buf.push(sample(1)); + buf.endSession(); + + buf.startSession(); + buf.push(sample(99)); + // Simulate another startSession before endSession (e.g. rapid restart) + buf.startSession(); + expect(buf.activeCount).toBe(0); + expect(buf.pendingCount).toBe(1); + + const batch = buf.takeNextBatch(); + expect(batch.map((s) => s.timeMs)).toEqual([1]); + }); + + it("reset() clears both active and pending state", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); + buf.startSession(); + buf.push(sample(1)); + buf.endSession(); + buf.startSession(); + buf.push(sample(2)); + + buf.reset(); + + expect(buf.activeCount).toBe(0); + expect(buf.pendingCount).toBe(0); + expect(buf.takeNextBatch()).toEqual([]); + }); +}); diff --git a/src/lib/cursorTelemetryBuffer.ts b/src/lib/cursorTelemetryBuffer.ts new file mode 100644 index 000000000..2b4ef0c0b --- /dev/null +++ b/src/lib/cursorTelemetryBuffer.ts @@ -0,0 +1,66 @@ +export interface CursorTelemetryPoint { + timeMs: number; + cx: number; + cy: number; +} + +export interface CursorTelemetryBuffer { + startSession(): void; + push(point: CursorTelemetryPoint): void; + endSession(): void; + takeNextBatch(): CursorTelemetryPoint[]; + reset(): void; + readonly activeCount: number; + readonly pendingCount: number; +} + +export interface CursorTelemetryBufferOptions { + maxActiveSamples: number; + maxPendingBatches?: number; +} + +const DEFAULT_MAX_PENDING_BATCHES = 8; + +export function createCursorTelemetryBuffer( + options: CursorTelemetryBufferOptions, +): CursorTelemetryBuffer { + const maxActive = options.maxActiveSamples; + const maxPending = options.maxPendingBatches ?? DEFAULT_MAX_PENDING_BATCHES; + + let active: CursorTelemetryPoint[] = []; + let pending: CursorTelemetryPoint[][] = []; + + return { + startSession() { + active = []; + }, + push(point) { + active.push(point); + if (active.length > maxActive) { + active.shift(); + } + }, + endSession() { + if (active.length > 0) { + pending.push(active); + while (pending.length > maxPending) { + pending.shift(); + } + } + active = []; + }, + takeNextBatch() { + return pending.shift() ?? []; + }, + reset() { + active = []; + pending = []; + }, + get activeCount() { + return active.length; + }, + get pendingCount() { + return pending.length; + }, + }; +} From fac0b405d30ea2b9b6bd76920a03dfbd96d4951c Mon Sep 17 00:00:00 2001 From: JunghwanNA <70629228+shaun0927@users.noreply.github.com> Date: Thu, 16 Apr 2026 11:58:16 +0900 Subject: [PATCH 2/5] fix: handle recording discard and write-failure in cursor telemetry buffer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address two issues raised during review: P1 – When a recording is cancelled or restarted, setRecordingState(false) enqueues its cursor batch but store-recorded-session is never called, leaving a stale batch that contaminates the next recording's telemetry. Add discardLatestPending() to the buffer and a discard-cursor-telemetry IPC handler; the renderer now calls it on the discard path. P2 – takeNextBatch() dequeued the batch before fs.writeFile, so a write failure would permanently lose the telemetry. Wrap the write in try/catch and re-insert the batch via prependBatch() on failure. Co-Authored-By: Claude Opus 4.6 (1M context) --- electron/electron-env.d.ts | 1 + electron/ipc/handlers.ts | 19 +++++++++--- electron/preload.ts | 3 ++ src/hooks/useScreenRecorder.ts | 1 + src/lib/cursorTelemetryBuffer.test.ts | 44 +++++++++++++++++++++++++++ src/lib/cursorTelemetryBuffer.ts | 10 ++++++ 6 files changed, 73 insertions(+), 5 deletions(-) diff --git a/electron/electron-env.d.ts b/electron/electron-env.d.ts index b2a37205b..ea364a1f6 100644 --- a/electron/electron-env.d.ts +++ b/electron/electron-env.d.ts @@ -64,6 +64,7 @@ interface Window { error?: string; }>; setRecordingState: (recording: boolean) => Promise; + discardCursorTelemetry: () => Promise; getCursorTelemetry: (videoPath?: string) => Promise<{ success: boolean; samples: CursorTelemetryPoint[]; diff --git a/electron/ipc/handlers.ts b/electron/ipc/handlers.ts index 284a671a4..fc5500619 100644 --- a/electron/ipc/handlers.ts +++ b/electron/ipc/handlers.ts @@ -281,11 +281,16 @@ async function storeRecordedSessionFiles(payload: StoreRecordedSessionInput) { const telemetryPath = `${screenVideoPath}.cursor.json`; const pendingSamples: CursorTelemetryPoint[] = cursorTelemetryBuffer.takeNextBatch(); if (pendingSamples.length > 0) { - await fs.writeFile( - telemetryPath, - JSON.stringify({ version: CURSOR_TELEMETRY_VERSION, samples: pendingSamples }, null, 2), - "utf-8", - ); + try { + await fs.writeFile( + telemetryPath, + JSON.stringify({ version: CURSOR_TELEMETRY_VERSION, samples: pendingSamples }, null, 2), + "utf-8", + ); + } catch (err) { + cursorTelemetryBuffer.prependBatch(pendingSamples); + throw err; + } } const sessionManifestPath = path.join( @@ -544,6 +549,10 @@ export function registerIpcHandlers( } }); + ipcMain.handle("discard-cursor-telemetry", () => { + cursorTelemetryBuffer.discardLatestPending(); + }); + ipcMain.handle("get-cursor-telemetry", async (_, videoPath?: string) => { const targetVideoPath = normalizeVideoSourcePath( videoPath ?? currentRecordingSession?.screenVideoPath, diff --git a/electron/preload.ts b/electron/preload.ts index eeca25cd4..93671223b 100644 --- a/electron/preload.ts +++ b/electron/preload.ts @@ -53,6 +53,9 @@ contextBridge.exposeInMainWorld("electronAPI", { getCursorTelemetry: (videoPath?: string) => { return ipcRenderer.invoke("get-cursor-telemetry", videoPath); }, + discardCursorTelemetry: () => { + return ipcRenderer.invoke("discard-cursor-telemetry"); + }, onStopRecordingFromTray: (callback: () => void) => { const listener = () => callback(); ipcRenderer.on("stop-recording-from-tray", listener); diff --git a/src/hooks/useScreenRecorder.ts b/src/hooks/useScreenRecorder.ts index 5cbc54a19..fd8a30708 100644 --- a/src/hooks/useScreenRecorder.ts +++ b/src/hooks/useScreenRecorder.ts @@ -225,6 +225,7 @@ export function useScreenRecorder(): UseScreenRecorderReturn { try { const screenBlob = await activeScreenRecorder.recordedBlobPromise; if (discardRecordingId.current === activeRecordingId) { + window.electronAPI?.discardCursorTelemetry(); return; } if (screenBlob.size === 0) { diff --git a/src/lib/cursorTelemetryBuffer.test.ts b/src/lib/cursorTelemetryBuffer.test.ts index a62639459..5ffbc7ab8 100644 --- a/src/lib/cursorTelemetryBuffer.test.ts +++ b/src/lib/cursorTelemetryBuffer.test.ts @@ -96,6 +96,50 @@ describe("createCursorTelemetryBuffer", () => { expect(batch.map((s) => s.timeMs)).toEqual([1]); }); + it("discardLatestPending() drops the most recently enqueued batch", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); + + buf.startSession(); + buf.push(sample(1)); + buf.endSession(); + + buf.startSession(); + buf.push(sample(2)); + buf.endSession(); + + expect(buf.pendingCount).toBe(2); + buf.discardLatestPending(); + expect(buf.pendingCount).toBe(1); + expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([1]); + }); + + it("discardLatestPending() is safe to call on an empty queue", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); + buf.discardLatestPending(); + expect(buf.pendingCount).toBe(0); + }); + + it("prependBatch() re-inserts a batch at the front of the queue", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); + + buf.startSession(); + buf.push(sample(1)); + buf.endSession(); + + const batch = buf.takeNextBatch(); + expect(buf.pendingCount).toBe(0); + + buf.prependBatch(batch); + expect(buf.pendingCount).toBe(1); + expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([1]); + }); + + it("prependBatch() ignores empty batches", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); + buf.prependBatch([]); + expect(buf.pendingCount).toBe(0); + }); + it("reset() clears both active and pending state", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); buf.startSession(); diff --git a/src/lib/cursorTelemetryBuffer.ts b/src/lib/cursorTelemetryBuffer.ts index 2b4ef0c0b..d812610e9 100644 --- a/src/lib/cursorTelemetryBuffer.ts +++ b/src/lib/cursorTelemetryBuffer.ts @@ -9,6 +9,8 @@ export interface CursorTelemetryBuffer { push(point: CursorTelemetryPoint): void; endSession(): void; takeNextBatch(): CursorTelemetryPoint[]; + prependBatch(batch: CursorTelemetryPoint[]): void; + discardLatestPending(): void; reset(): void; readonly activeCount: number; readonly pendingCount: number; @@ -52,6 +54,14 @@ export function createCursorTelemetryBuffer( takeNextBatch() { return pending.shift() ?? []; }, + prependBatch(batch) { + if (batch.length > 0) { + pending.unshift(batch); + } + }, + discardLatestPending() { + pending.pop(); + }, reset() { active = []; pending = []; From adc610544cb55e853823abca07fe5fc6b5ebffee Mon Sep 17 00:00:00 2001 From: shaun0927 <70629228+shaun0927@users.noreply.github.com> Date: Tue, 21 Apr 2026 17:07:19 +0900 Subject: [PATCH 3/5] docs: document cursor telemetry buffer API and surface drop events Add JSDoc to every public export in cursorTelemetryBuffer so the module meets the 80% docstring-coverage threshold, and make two silent-drop paths observable: - endSession() now returns the number of pending batches evicted by the maxPendingBatches cap and emits console.warn when any are dropped. - prependBatch() defensively trims and warns if an unusual retry pattern would push the queue past the cap (normal retry after takeNextBatch() stays a no-op). Tests cover both drop paths. --- src/lib/cursorTelemetryBuffer.test.ts | 52 +++++++++++++- src/lib/cursorTelemetryBuffer.ts | 100 +++++++++++++++++++++++++- 2 files changed, 148 insertions(+), 4 deletions(-) diff --git a/src/lib/cursorTelemetryBuffer.test.ts b/src/lib/cursorTelemetryBuffer.test.ts index 5ffbc7ab8..309df7e74 100644 --- a/src/lib/cursorTelemetryBuffer.test.ts +++ b/src/lib/cursorTelemetryBuffer.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, it } from "vitest"; +import { describe, expect, it, vi } from "vitest"; import { type CursorTelemetryPoint, createCursorTelemetryBuffer } from "./cursorTelemetryBuffer"; function sample(tag: number): CursorTelemetryPoint { @@ -140,6 +140,56 @@ describe("createCursorTelemetryBuffer", () => { expect(buf.pendingCount).toBe(0); }); + it("endSession() returns the number of dropped batches and warns when the cap is exceeded", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10, maxPendingBatches: 2 }); + const warn = vi.spyOn(console, "warn").mockImplementation(() => undefined); + + for (let round = 1; round <= 2; round++) { + buf.startSession(); + buf.push(sample(round)); + expect(buf.endSession()).toBe(0); + } + expect(warn).not.toHaveBeenCalled(); + + buf.startSession(); + buf.push(sample(3)); + const dropped = buf.endSession(); + expect(dropped).toBe(1); + expect(warn).toHaveBeenCalledTimes(1); + expect(warn.mock.calls[0]?.[0]).toMatch(/dropped 1 pending batch/); + expect(buf.pendingCount).toBe(2); + + warn.mockRestore(); + }); + + it("prependBatch() defensively trims and warns when it would exceed the cap", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10, maxPendingBatches: 2 }); + const warn = vi.spyOn(console, "warn").mockImplementation(() => undefined); + + // Fill the queue to the cap without dropping anything. + for (let round = 1; round <= 2; round++) { + buf.startSession(); + buf.push(sample(round)); + buf.endSession(); + } + expect(buf.pendingCount).toBe(2); + expect(warn).not.toHaveBeenCalled(); + + // Simulate a misuse where a retry prepends without first draining: + // queue would grow to 3, so the oldest-trailing entry must be evicted. + buf.prependBatch([sample(99)]); + expect(buf.pendingCount).toBe(2); + expect(warn).toHaveBeenCalledTimes(1); + expect(warn.mock.calls[0]?.[0]).toMatch(/prependBatch trimmed 1 trailing batch/); + + // Front is the prepended batch; the preserved trailing batch is round 1. + expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([99]); + expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([1]); + expect(buf.pendingCount).toBe(0); + + warn.mockRestore(); + }); + it("reset() clears both active and pending state", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); buf.startSession(); diff --git a/src/lib/cursorTelemetryBuffer.ts b/src/lib/cursorTelemetryBuffer.ts index d812610e9..57db2edce 100644 --- a/src/lib/cursorTelemetryBuffer.ts +++ b/src/lib/cursorTelemetryBuffer.ts @@ -1,17 +1,89 @@ +/** + * A single cursor telemetry sample captured during a recording session. + * + * Coordinates (`cx`, `cy`) are device-pixel positions relative to the + * captured surface; `timeMs` is the offset from the recording's start. + */ export interface CursorTelemetryPoint { timeMs: number; cx: number; cy: number; } +/** + * Per-session cursor telemetry buffer with bounded memory. + * + * Flow: `startSession()` → `push(point)` N times → `endSession()` enqueues + * the collected samples as a completed batch. The main process later + * drains batches in FIFO order via `takeNextBatch()` to persist them to + * disk, and can `prependBatch()` on write failure to retry without losing + * order. + * + * Memory is bounded by `maxActiveSamples` (ring buffer on the in-progress + * batch) and `maxPendingBatches` (FIFO cap across completed batches). + */ export interface CursorTelemetryBuffer { + /** + * Begin a new recording session. Clears any in-progress active samples + * (without touching already-completed pending batches). Safe to call + * repeatedly — e.g. a rapid Stop → Record sequence. + */ startSession(): void; + + /** + * Append a telemetry sample to the current active session. When the + * active buffer exceeds `maxActiveSamples`, the oldest sample is + * dropped (ring behaviour). + */ push(point: CursorTelemetryPoint): void; - endSession(): void; + + /** + * Finalize the active session, moving its samples into the pending + * queue as a single batch. Empty sessions are dropped (no empty batch + * is enqueued). + * + * If the pending queue would exceed `maxPendingBatches`, the oldest + * batches are evicted to bound memory. A `console.warn` is emitted + * whenever at least one batch is dropped so that pathological rapid- + * restart scenarios are observable. + * + * @returns the number of pending batches dropped by this call (0 under + * normal operation). + */ + endSession(): number; + + /** + * Remove and return the oldest pending batch, or an empty array if + * the queue is empty. + */ takeNextBatch(): CursorTelemetryPoint[]; + + /** + * Re-insert a batch at the front of the queue, preserving FIFO order + * on retry paths (e.g. when persisting the batch failed and the + * caller wants the next `takeNextBatch()` to yield it again). + * + * Empty batches are ignored. The pending cap is enforced defensively + * — if prepending would push the queue past `maxPendingBatches`, the + * oldest entries are evicted and a `console.warn` is emitted. In + * normal retry usage this trim is a no-op because the caller has just + * removed the batch via `takeNextBatch()`. + */ prependBatch(batch: CursorTelemetryPoint[]): void; + + /** + * Drop the most recently enqueued pending batch. Used when a recording + * is discarded after `endSession()` but before it has been persisted. + * No-op on an empty queue. + */ discardLatestPending(): void; + + /** + * Clear both the active and pending state. Intended for tests and + * full teardown paths. + */ reset(): void; + readonly activeCount: number; readonly pendingCount: number; } @@ -23,6 +95,11 @@ export interface CursorTelemetryBufferOptions { const DEFAULT_MAX_PENDING_BATCHES = 8; +/** + * Create a cursor telemetry buffer. + * + * @see CursorTelemetryBuffer for the full lifecycle contract. + */ export function createCursorTelemetryBuffer( options: CursorTelemetryBufferOptions, ): CursorTelemetryBuffer { @@ -43,20 +120,37 @@ export function createCursorTelemetryBuffer( } }, endSession() { + let dropped = 0; if (active.length > 0) { pending.push(active); while (pending.length > maxPending) { pending.shift(); + dropped++; } } active = []; + if (dropped > 0) { + console.warn( + `[cursorTelemetryBuffer] dropped ${dropped} pending batch(es) to stay within maxPendingBatches=${maxPending}`, + ); + } + return dropped; }, takeNextBatch() { return pending.shift() ?? []; }, prependBatch(batch) { - if (batch.length > 0) { - pending.unshift(batch); + if (batch.length === 0) return; + pending.unshift(batch); + let dropped = 0; + while (pending.length > maxPending) { + pending.pop(); + dropped++; + } + if (dropped > 0) { + console.warn( + `[cursorTelemetryBuffer] prependBatch trimmed ${dropped} trailing batch(es) to stay within maxPendingBatches=${maxPending}`, + ); } }, discardLatestPending() { From 96765e483d6f39abd15c9cef89fd9c5a51795bef Mon Sep 17 00:00:00 2001 From: shaun0927 <70629228+shaun0927@users.noreply.github.com> Date: Tue, 21 Apr 2026 18:12:28 +0900 Subject: [PATCH 4/5] docs: correct cx/cy units and sanitize buffer option limits Two follow-up fixes for CodeRabbit feedback on the docs commit: - CursorTelemetryPoint JSDoc previously described cx/cy as 'device-pixel positions'. The producer sampleCursorPoint() in electron/ipc/handlers.ts clamps them to the [0, 1] range after dividing by the source display's width/height, so they are normalised ratios, not pixel values. Correct the doc comment accordingly. - createCursorTelemetryBuffer now sanitizes maxActiveSamples and maxPendingBatches: non-finite, zero, or negative values fall back to safe positive-integer defaults. Without this, a caller passing Infinity or NaN would hang the trim loops. New test covers the sanitisation path for both options. --- src/lib/cursorTelemetryBuffer.test.ts | 23 +++++++++++++++++++++++ src/lib/cursorTelemetryBuffer.ts | 22 ++++++++++++++++++---- 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/src/lib/cursorTelemetryBuffer.test.ts b/src/lib/cursorTelemetryBuffer.test.ts index 309df7e74..567a1ebc0 100644 --- a/src/lib/cursorTelemetryBuffer.test.ts +++ b/src/lib/cursorTelemetryBuffer.test.ts @@ -190,6 +190,29 @@ describe("createCursorTelemetryBuffer", () => { warn.mockRestore(); }); + it("sanitizes non-finite or non-positive option values to safe defaults", () => { + // Infinity / NaN / negative would otherwise turn the trim loops + // into infinite loops. The buffer must fall back to defaults. + const buf = createCursorTelemetryBuffer({ + maxActiveSamples: Number.POSITIVE_INFINITY, + maxPendingBatches: Number.NaN, + }); + + buf.startSession(); + buf.push(sample(1)); + expect(() => buf.endSession()).not.toThrow(); + expect(buf.pendingCount).toBe(1); + + const buf2 = createCursorTelemetryBuffer({ + maxActiveSamples: -5, + maxPendingBatches: 0, + }); + buf2.startSession(); + buf2.push(sample(2)); + expect(() => buf2.endSession()).not.toThrow(); + expect(buf2.pendingCount).toBe(1); + }); + it("reset() clears both active and pending state", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); buf.startSession(); diff --git a/src/lib/cursorTelemetryBuffer.ts b/src/lib/cursorTelemetryBuffer.ts index 57db2edce..e97bab817 100644 --- a/src/lib/cursorTelemetryBuffer.ts +++ b/src/lib/cursorTelemetryBuffer.ts @@ -1,8 +1,10 @@ /** * A single cursor telemetry sample captured during a recording session. * - * Coordinates (`cx`, `cy`) are device-pixel positions relative to the - * captured surface; `timeMs` is the offset from the recording's start. + * Coordinates (`cx`, `cy`) are clamped ratios in the `[0, 1]` range, + * normalised against the captured surface's width and height by the + * main-process `sampleCursorPoint()` before being pushed. `timeMs` is the + * offset (in milliseconds) from the recording's start. */ export interface CursorTelemetryPoint { timeMs: number; @@ -94,17 +96,29 @@ export interface CursorTelemetryBufferOptions { } const DEFAULT_MAX_PENDING_BATCHES = 8; +const DEFAULT_MAX_ACTIVE_SAMPLES = 10_000; + +/** Coerce a numeric option into a safe, finite, positive integer. */ +function sanitizeLimit(value: number | undefined, fallback: number): number { + if (typeof value !== "number" || !Number.isFinite(value)) return fallback; + const floored = Math.floor(value); + return floored >= 1 ? floored : fallback; +} /** * Create a cursor telemetry buffer. * + * Numeric options are sanitized: non-finite, negative, or zero values fall + * back to safe defaults so a bad caller cannot disable the memory bounds + * (which would turn the trim loops into infinite loops). + * * @see CursorTelemetryBuffer for the full lifecycle contract. */ export function createCursorTelemetryBuffer( options: CursorTelemetryBufferOptions, ): CursorTelemetryBuffer { - const maxActive = options.maxActiveSamples; - const maxPending = options.maxPendingBatches ?? DEFAULT_MAX_PENDING_BATCHES; + const maxActive = sanitizeLimit(options.maxActiveSamples, DEFAULT_MAX_ACTIVE_SAMPLES); + const maxPending = sanitizeLimit(options.maxPendingBatches, DEFAULT_MAX_PENDING_BATCHES); let active: CursorTelemetryPoint[] = []; let pending: CursorTelemetryPoint[][] = []; From 3b9b4192bf650fc55fd12dafffdccad1448904e8 Mon Sep 17 00:00:00 2001 From: shaun0927 <70629228+shaun0927@users.noreply.github.com> Date: Tue, 28 Apr 2026 18:27:14 +0900 Subject: [PATCH 5/5] fix: key cursor telemetry batches by recordingId for safe discard MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit discardLatestPending() popped whichever batch happened to be at the back of the queue. With a Stop → Record → Discard sequence, the pending queue can have recording B's batch sitting in front of A's by the time A's finalize callback resolves (because finalizeRecording awaits fixWebmDuration), so the discard targets the wrong recording. Tag each completed batch with the recording id supplied at startSession() time and replace discardLatestPending() with discardBatch(recordingId). takeNextBatch() now returns the full {recordingId, samples} shape so prependBatch() can re-queue it on write-failure without losing the id. The renderer already owns a stable recordingId (Date.now() in useScreenRecorder) and the IPC surface threads it through set-recording-state and discard-cursor-telemetry. Adds a regression test that mirrors FabLrc's scenario in PR #457: two recordings finalize, A is discarded after B has already been queued, and the buffer must drop A while keeping B intact. --- electron/electron-env.d.ts | 4 +- electron/ipc/handlers.ts | 25 +++-- electron/preload.ts | 8 +- src/hooks/useScreenRecorder.ts | 4 +- src/lib/cursorTelemetryBuffer.test.ts | 129 +++++++++++++++++--------- src/lib/cursorTelemetryBuffer.ts | 83 +++++++++++------ 6 files changed, 166 insertions(+), 87 deletions(-) diff --git a/electron/electron-env.d.ts b/electron/electron-env.d.ts index ea364a1f6..08c06c67d 100644 --- a/electron/electron-env.d.ts +++ b/electron/electron-env.d.ts @@ -63,8 +63,8 @@ interface Window { message?: string; error?: string; }>; - setRecordingState: (recording: boolean) => Promise; - discardCursorTelemetry: () => Promise; + setRecordingState: (recording: boolean, recordingId?: number) => Promise; + discardCursorTelemetry: (recordingId: number) => Promise; getCursorTelemetry: (videoPath?: string) => Promise<{ success: boolean; samples: CursorTelemetryPoint[]; diff --git a/electron/ipc/handlers.ts b/electron/ipc/handlers.ts index fc5500619..7fe6c52f7 100644 --- a/electron/ipc/handlers.ts +++ b/electron/ipc/handlers.ts @@ -279,16 +279,20 @@ async function storeRecordedSessionFiles(payload: StoreRecordedSessionInput) { currentProjectPath = null; const telemetryPath = `${screenVideoPath}.cursor.json`; - const pendingSamples: CursorTelemetryPoint[] = cursorTelemetryBuffer.takeNextBatch(); - if (pendingSamples.length > 0) { + const pendingBatch = cursorTelemetryBuffer.takeNextBatch(); + if (pendingBatch && pendingBatch.samples.length > 0) { try { await fs.writeFile( telemetryPath, - JSON.stringify({ version: CURSOR_TELEMETRY_VERSION, samples: pendingSamples }, null, 2), + JSON.stringify( + { version: CURSOR_TELEMETRY_VERSION, samples: pendingBatch.samples }, + null, + 2, + ), "utf-8", ); } catch (err) { - cursorTelemetryBuffer.prependBatch(pendingSamples); + cursorTelemetryBuffer.prependBatch(pendingBatch); throw err; } } @@ -531,10 +535,15 @@ export function registerIpcHandlers( } }); - ipcMain.handle("set-recording-state", (_, recording: boolean) => { + ipcMain.handle("set-recording-state", (_, recording: boolean, recordingId?: number) => { if (recording) { stopCursorCapture(); - cursorTelemetryBuffer.startSession(); + // The renderer is the source of truth for the recording id (it + // uses the same id as the saved fileName). Fall back to a + // timestamp only if the renderer didn't supply one, so the + // buffer always has a stable key per session. + const id = typeof recordingId === "number" ? recordingId : Date.now(); + cursorTelemetryBuffer.startSession(id); cursorCaptureStartTimeMs = Date.now(); sampleCursorPoint(); cursorCaptureInterval = setInterval(sampleCursorPoint, CURSOR_SAMPLE_INTERVAL_MS); @@ -549,8 +558,8 @@ export function registerIpcHandlers( } }); - ipcMain.handle("discard-cursor-telemetry", () => { - cursorTelemetryBuffer.discardLatestPending(); + ipcMain.handle("discard-cursor-telemetry", (_, recordingId: number) => { + cursorTelemetryBuffer.discardBatch(recordingId); }); ipcMain.handle("get-cursor-telemetry", async (_, videoPath?: string) => { diff --git a/electron/preload.ts b/electron/preload.ts index 93671223b..962e58281 100644 --- a/electron/preload.ts +++ b/electron/preload.ts @@ -47,14 +47,14 @@ contextBridge.exposeInMainWorld("electronAPI", { getRecordedVideoPath: () => { return ipcRenderer.invoke("get-recorded-video-path"); }, - setRecordingState: (recording: boolean) => { - return ipcRenderer.invoke("set-recording-state", recording); + setRecordingState: (recording: boolean, recordingId?: number) => { + return ipcRenderer.invoke("set-recording-state", recording, recordingId); }, getCursorTelemetry: (videoPath?: string) => { return ipcRenderer.invoke("get-cursor-telemetry", videoPath); }, - discardCursorTelemetry: () => { - return ipcRenderer.invoke("discard-cursor-telemetry"); + discardCursorTelemetry: (recordingId: number) => { + return ipcRenderer.invoke("discard-cursor-telemetry", recordingId); }, onStopRecordingFromTray: (callback: () => void) => { const listener = () => callback(); diff --git a/src/hooks/useScreenRecorder.ts b/src/hooks/useScreenRecorder.ts index fd8a30708..a95b672fc 100644 --- a/src/hooks/useScreenRecorder.ts +++ b/src/hooks/useScreenRecorder.ts @@ -225,7 +225,7 @@ export function useScreenRecorder(): UseScreenRecorderReturn { try { const screenBlob = await activeScreenRecorder.recordedBlobPromise; if (discardRecordingId.current === activeRecordingId) { - window.electronAPI?.discardCursorTelemetry(); + window.electronAPI?.discardCursorTelemetry(activeRecordingId); return; } if (screenBlob.size === 0) { @@ -554,7 +554,7 @@ export function useScreenRecorder(): UseScreenRecorderReturn { setRecording(true); setPaused(false); setElapsedSeconds(0); - window.electronAPI?.setRecordingState(true); + window.electronAPI?.setRecordingState(true, recordingId.current); const activeScreenRecorder = screenRecorder.current; const activeWebcamRecorder = webcamRecorder.current; diff --git a/src/lib/cursorTelemetryBuffer.test.ts b/src/lib/cursorTelemetryBuffer.test.ts index 567a1ebc0..17174accc 100644 --- a/src/lib/cursorTelemetryBuffer.test.ts +++ b/src/lib/cursorTelemetryBuffer.test.ts @@ -2,141 +2,182 @@ import { describe, expect, it, vi } from "vitest"; import { type CursorTelemetryPoint, createCursorTelemetryBuffer } from "./cursorTelemetryBuffer"; function sample(tag: number): CursorTelemetryPoint { - return { timeMs: tag, cx: tag / 10, cy: tag / 10 }; + // Decouple the timestamp tag from the coordinate fixture so cursor + // points stay inside the normalized [0, 1] range that real samples use. + const normalized = (tag % 100) / 100; + return { timeMs: tag, cx: normalized, cy: normalized }; } describe("createCursorTelemetryBuffer", () => { it("stores samples captured during an active session", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); - buf.startSession(); + buf.startSession(1); for (let i = 0; i < 3; i++) buf.push(sample(i)); buf.endSession(); const batch = buf.takeNextBatch(); - expect(batch).toHaveLength(3); - expect(batch[0]?.timeMs).toBe(0); + expect(batch?.recordingId).toBe(1); + expect(batch?.samples).toHaveLength(3); + expect(batch?.samples[0]?.timeMs).toBe(0); }); it("trims active samples past maxActiveSamples (ring behaviour)", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 2 }); - buf.startSession(); + buf.startSession(1); buf.push(sample(1)); buf.push(sample(2)); buf.push(sample(3)); buf.endSession(); const batch = buf.takeNextBatch(); - expect(batch).toEqual([sample(2), sample(3)]); + expect(batch?.samples).toEqual([sample(2), sample(3)]); }); it("preserves earlier pending batches when a new session starts before store", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); // Recording 1 - buf.startSession(); + buf.startSession(1); buf.push(sample(101)); buf.push(sample(102)); buf.endSession(); // Recording 2 starts before recording 1's batch has been consumed - buf.startSession(); + buf.startSession(2); buf.push(sample(201)); buf.endSession(); const batch1 = buf.takeNextBatch(); const batch2 = buf.takeNextBatch(); - expect(batch1.map((s) => s.timeMs)).toEqual([101, 102]); - expect(batch2.map((s) => s.timeMs)).toEqual([201]); + expect(batch1?.recordingId).toBe(1); + expect(batch1?.samples.map((s) => s.timeMs)).toEqual([101, 102]); + expect(batch2?.recordingId).toBe(2); + expect(batch2?.samples.map((s) => s.timeMs)).toEqual([201]); }); - it("returns an empty batch when nothing is pending", () => { + it("returns null when nothing is pending", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); - expect(buf.takeNextBatch()).toEqual([]); + expect(buf.takeNextBatch()).toBeNull(); }); it("drops empty sessions instead of queuing empty batches", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); - buf.startSession(); + buf.startSession(1); buf.endSession(); expect(buf.pendingCount).toBe(0); - expect(buf.takeNextBatch()).toEqual([]); + expect(buf.takeNextBatch()).toBeNull(); }); it("caps the pending queue at maxPendingBatches to bound memory", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10, maxPendingBatches: 3 }); for (let round = 1; round <= 5; round++) { - buf.startSession(); + buf.startSession(round); buf.push(sample(round)); buf.endSession(); } expect(buf.pendingCount).toBe(3); // Oldest two batches (rounds 1 and 2) should have been dropped - expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([3]); - expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([4]); - expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([5]); + expect(buf.takeNextBatch()?.recordingId).toBe(3); + expect(buf.takeNextBatch()?.recordingId).toBe(4); + expect(buf.takeNextBatch()?.recordingId).toBe(5); }); it("starting a new session clears in-progress samples but keeps pending batches", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); - buf.startSession(); + buf.startSession(1); buf.push(sample(1)); buf.endSession(); - buf.startSession(); + buf.startSession(2); buf.push(sample(99)); // Simulate another startSession before endSession (e.g. rapid restart) - buf.startSession(); + buf.startSession(3); expect(buf.activeCount).toBe(0); expect(buf.pendingCount).toBe(1); const batch = buf.takeNextBatch(); - expect(batch.map((s) => s.timeMs)).toEqual([1]); + expect(batch?.recordingId).toBe(1); + expect(batch?.samples.map((s) => s.timeMs)).toEqual([1]); }); - it("discardLatestPending() drops the most recently enqueued batch", () => { + it("discardBatch(id) drops only the batch produced by that recording id", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); - buf.startSession(); + buf.startSession(1); buf.push(sample(1)); buf.endSession(); - buf.startSession(); + buf.startSession(2); buf.push(sample(2)); buf.endSession(); expect(buf.pendingCount).toBe(2); - buf.discardLatestPending(); + expect(buf.discardBatch(1)).toBe(true); expect(buf.pendingCount).toBe(1); - expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([1]); + expect(buf.takeNextBatch()?.recordingId).toBe(2); }); - it("discardLatestPending() is safe to call on an empty queue", () => { + it("discardBatch(id) targets the correct batch even when a later recording sits in front of it", () => { + // Regression test for the rapid Stop → Record → Discard sequence: + // recording A's finalize callback does async work (fixWebmDuration), + // recording B finishes in the meantime, then A's callback resolves + // with discard intent. The discard must drop A — not B, which + // happens to be the *latest* pending batch by the time discard runs. const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); - buf.discardLatestPending(); + + buf.startSession(1); + buf.push(sample(11)); + buf.endSession(); + + buf.startSession(2); + buf.push(sample(22)); + buf.endSession(); + + expect(buf.pendingCount).toBe(2); + expect(buf.discardBatch(1)).toBe(true); + + const remaining = buf.takeNextBatch(); + expect(remaining?.recordingId).toBe(2); + expect(remaining?.samples.map((s) => s.timeMs)).toEqual([22]); + expect(buf.takeNextBatch()).toBeNull(); + }); + + it("discardBatch(id) is a no-op (returns false) when the id is unknown or already drained", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); + expect(buf.discardBatch(42)).toBe(false); + + buf.startSession(1); + buf.push(sample(1)); + buf.endSession(); + buf.takeNextBatch(); + expect(buf.discardBatch(1)).toBe(false); expect(buf.pendingCount).toBe(0); }); it("prependBatch() re-inserts a batch at the front of the queue", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); - buf.startSession(); + buf.startSession(1); buf.push(sample(1)); buf.endSession(); const batch = buf.takeNextBatch(); + expect(batch).not.toBeNull(); expect(buf.pendingCount).toBe(0); - buf.prependBatch(batch); + if (batch) buf.prependBatch(batch); expect(buf.pendingCount).toBe(1); - expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([1]); + const next = buf.takeNextBatch(); + expect(next?.recordingId).toBe(1); + expect(next?.samples.map((s) => s.timeMs)).toEqual([1]); }); it("prependBatch() ignores empty batches", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); - buf.prependBatch([]); + buf.prependBatch({ recordingId: 1, samples: [] }); expect(buf.pendingCount).toBe(0); }); @@ -145,13 +186,13 @@ describe("createCursorTelemetryBuffer", () => { const warn = vi.spyOn(console, "warn").mockImplementation(() => undefined); for (let round = 1; round <= 2; round++) { - buf.startSession(); + buf.startSession(round); buf.push(sample(round)); expect(buf.endSession()).toBe(0); } expect(warn).not.toHaveBeenCalled(); - buf.startSession(); + buf.startSession(3); buf.push(sample(3)); const dropped = buf.endSession(); expect(dropped).toBe(1); @@ -168,7 +209,7 @@ describe("createCursorTelemetryBuffer", () => { // Fill the queue to the cap without dropping anything. for (let round = 1; round <= 2; round++) { - buf.startSession(); + buf.startSession(round); buf.push(sample(round)); buf.endSession(); } @@ -177,14 +218,14 @@ describe("createCursorTelemetryBuffer", () => { // Simulate a misuse where a retry prepends without first draining: // queue would grow to 3, so the oldest-trailing entry must be evicted. - buf.prependBatch([sample(99)]); + buf.prependBatch({ recordingId: 99, samples: [sample(99)] }); expect(buf.pendingCount).toBe(2); expect(warn).toHaveBeenCalledTimes(1); expect(warn.mock.calls[0]?.[0]).toMatch(/prependBatch trimmed 1 trailing batch/); // Front is the prepended batch; the preserved trailing batch is round 1. - expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([99]); - expect(buf.takeNextBatch().map((s) => s.timeMs)).toEqual([1]); + expect(buf.takeNextBatch()?.recordingId).toBe(99); + expect(buf.takeNextBatch()?.recordingId).toBe(1); expect(buf.pendingCount).toBe(0); warn.mockRestore(); @@ -198,7 +239,7 @@ describe("createCursorTelemetryBuffer", () => { maxPendingBatches: Number.NaN, }); - buf.startSession(); + buf.startSession(1); buf.push(sample(1)); expect(() => buf.endSession()).not.toThrow(); expect(buf.pendingCount).toBe(1); @@ -207,7 +248,7 @@ describe("createCursorTelemetryBuffer", () => { maxActiveSamples: -5, maxPendingBatches: 0, }); - buf2.startSession(); + buf2.startSession(2); buf2.push(sample(2)); expect(() => buf2.endSession()).not.toThrow(); expect(buf2.pendingCount).toBe(1); @@ -215,16 +256,16 @@ describe("createCursorTelemetryBuffer", () => { it("reset() clears both active and pending state", () => { const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); - buf.startSession(); + buf.startSession(1); buf.push(sample(1)); buf.endSession(); - buf.startSession(); + buf.startSession(2); buf.push(sample(2)); buf.reset(); expect(buf.activeCount).toBe(0); expect(buf.pendingCount).toBe(0); - expect(buf.takeNextBatch()).toEqual([]); + expect(buf.takeNextBatch()).toBeNull(); }); }); diff --git a/src/lib/cursorTelemetryBuffer.ts b/src/lib/cursorTelemetryBuffer.ts index e97bab817..0c7e0e10e 100644 --- a/src/lib/cursorTelemetryBuffer.ts +++ b/src/lib/cursorTelemetryBuffer.ts @@ -12,25 +12,39 @@ export interface CursorTelemetryPoint { cy: number; } +/** + * A completed batch of cursor samples, tagged with the recording id that + * produced them. The id is supplied at `startSession()` time and travels + * with the batch through the pending queue, retries, and discards. + */ +export interface CursorTelemetryBatch { + recordingId: number; + samples: CursorTelemetryPoint[]; +} + /** * Per-session cursor telemetry buffer with bounded memory. * - * Flow: `startSession()` → `push(point)` N times → `endSession()` enqueues - * the collected samples as a completed batch. The main process later - * drains batches in FIFO order via `takeNextBatch()` to persist them to - * disk, and can `prependBatch()` on write failure to retry without losing - * order. + * Flow: `startSession(recordingId)` → `push(point)` N times → `endSession()` + * enqueues the collected samples as a completed batch tagged with that + * `recordingId`. The main process later drains batches in FIFO order via + * `takeNextBatch()` to persist them to disk, and can `prependBatch()` on + * write failure to retry without losing order. A discard request keys on + * the recording id so an asynchronous "discard recording A" decision that + * arrives after recording B has already enqueued its batch still drops + * the right one. * * Memory is bounded by `maxActiveSamples` (ring buffer on the in-progress * batch) and `maxPendingBatches` (FIFO cap across completed batches). */ export interface CursorTelemetryBuffer { /** - * Begin a new recording session. Clears any in-progress active samples - * (without touching already-completed pending batches). Safe to call - * repeatedly — e.g. a rapid Stop → Record sequence. + * Begin a new recording session under the given `recordingId`. Clears + * any in-progress active samples (without touching already-completed + * pending batches). Safe to call repeatedly — e.g. a rapid Stop → + * Record sequence — and the most recent id wins. */ - startSession(): void; + startSession(recordingId: number): void; /** * Append a telemetry sample to the current active session. When the @@ -41,8 +55,8 @@ export interface CursorTelemetryBuffer { /** * Finalize the active session, moving its samples into the pending - * queue as a single batch. Empty sessions are dropped (no empty batch - * is enqueued). + * queue as a single batch tagged with the current recording id. Empty + * sessions are dropped (no empty batch is enqueued). * * If the pending queue would exceed `maxPendingBatches`, the oldest * batches are evicted to bound memory. A `console.warn` is emitted @@ -55,10 +69,10 @@ export interface CursorTelemetryBuffer { endSession(): number; /** - * Remove and return the oldest pending batch, or an empty array if - * the queue is empty. + * Remove and return the oldest pending batch, or `null` if the queue + * is empty. */ - takeNextBatch(): CursorTelemetryPoint[]; + takeNextBatch(): CursorTelemetryBatch | null; /** * Re-insert a batch at the front of the queue, preserving FIFO order @@ -71,14 +85,22 @@ export interface CursorTelemetryBuffer { * normal retry usage this trim is a no-op because the caller has just * removed the batch via `takeNextBatch()`. */ - prependBatch(batch: CursorTelemetryPoint[]): void; + prependBatch(batch: CursorTelemetryBatch): void; /** - * Drop the most recently enqueued pending batch. Used when a recording - * is discarded after `endSession()` but before it has been persisted. - * No-op on an empty queue. + * Drop the pending batch produced by the given `recordingId`. Used + * when a recording is discarded after its `endSession()` has run but + * before it has been persisted. Returns `true` if a batch was + * removed, `false` otherwise (no matching id, or the batch was + * already drained). + * + * Keying on the recording id (rather than "the latest pending batch") + * avoids a real bug: when finalizing a recording does asynchronous + * work like `fixWebmDuration`, a quick Stop → Record → Discard + * sequence can interleave such that the latest pending batch belongs + * to a *later* recording than the one being discarded. */ - discardLatestPending(): void; + discardBatch(recordingId: number): boolean; /** * Clear both the active and pending state. Intended for tests and @@ -121,11 +143,13 @@ export function createCursorTelemetryBuffer( const maxPending = sanitizeLimit(options.maxPendingBatches, DEFAULT_MAX_PENDING_BATCHES); let active: CursorTelemetryPoint[] = []; - let pending: CursorTelemetryPoint[][] = []; + let activeRecordingId: number | null = null; + let pending: CursorTelemetryBatch[] = []; return { - startSession() { + startSession(recordingId) { active = []; + activeRecordingId = recordingId; }, push(point) { active.push(point); @@ -135,14 +159,15 @@ export function createCursorTelemetryBuffer( }, endSession() { let dropped = 0; - if (active.length > 0) { - pending.push(active); + if (active.length > 0 && activeRecordingId !== null) { + pending.push({ recordingId: activeRecordingId, samples: active }); while (pending.length > maxPending) { pending.shift(); dropped++; } } active = []; + activeRecordingId = null; if (dropped > 0) { console.warn( `[cursorTelemetryBuffer] dropped ${dropped} pending batch(es) to stay within maxPendingBatches=${maxPending}`, @@ -151,10 +176,10 @@ export function createCursorTelemetryBuffer( return dropped; }, takeNextBatch() { - return pending.shift() ?? []; + return pending.shift() ?? null; }, prependBatch(batch) { - if (batch.length === 0) return; + if (batch.samples.length === 0) return; pending.unshift(batch); let dropped = 0; while (pending.length > maxPending) { @@ -167,11 +192,15 @@ export function createCursorTelemetryBuffer( ); } }, - discardLatestPending() { - pending.pop(); + discardBatch(recordingId) { + const idx = pending.findIndex((b) => b.recordingId === recordingId); + if (idx === -1) return false; + pending.splice(idx, 1); + return true; }, reset() { active = []; + activeRecordingId = null; pending = []; }, get activeCount() {