diff --git a/electron/electron-env.d.ts b/electron/electron-env.d.ts index b2a37205b..08c06c67d 100644 --- a/electron/electron-env.d.ts +++ b/electron/electron-env.d.ts @@ -63,7 +63,8 @@ interface Window { message?: string; error?: string; }>; - setRecordingState: (recording: boolean) => Promise; + setRecordingState: (recording: boolean, recordingId?: number) => Promise; + discardCursorTelemetry: (recordingId: number) => Promise; getCursorTelemetry: (videoPath?: string) => Promise<{ success: boolean; samples: CursorTelemetryPoint[]; diff --git a/electron/ipc/handlers.ts b/electron/ipc/handlers.ts index 4cb487567..7fe6c52f7 100644 --- a/electron/ipc/handlers.ts +++ b/electron/ipc/handlers.ts @@ -11,6 +11,10 @@ import { shell, systemPreferences, } from "electron"; +import { + type CursorTelemetryPoint, + createCursorTelemetryBuffer, +} from "../../src/lib/cursorTelemetryBuffer"; import { normalizeProjectMedia, normalizeRecordingSession, @@ -275,14 +279,23 @@ async function storeRecordedSessionFiles(payload: StoreRecordedSessionInput) { currentProjectPath = null; const telemetryPath = `${screenVideoPath}.cursor.json`; - if (pendingCursorSamples.length > 0) { - await fs.writeFile( - telemetryPath, - JSON.stringify({ version: CURSOR_TELEMETRY_VERSION, samples: pendingCursorSamples }, null, 2), - "utf-8", - ); + const pendingBatch = cursorTelemetryBuffer.takeNextBatch(); + if (pendingBatch && pendingBatch.samples.length > 0) { + try { + await fs.writeFile( + telemetryPath, + JSON.stringify( + { version: CURSOR_TELEMETRY_VERSION, samples: pendingBatch.samples }, + null, + 2, + ), + "utf-8", + ); + } catch (err) { + cursorTelemetryBuffer.prependBatch(pendingBatch); + throw err; + } } - pendingCursorSamples = []; const sessionManifestPath = path.join( RECORDINGS_DIR, @@ -302,16 +315,11 @@ const CURSOR_TELEMETRY_VERSION = 1; const CURSOR_SAMPLE_INTERVAL_MS = 100; const MAX_CURSOR_SAMPLES = 60 * 60 * 10; // 1 hour @ 10Hz -interface CursorTelemetryPoint { - timeMs: number; - cx: number; - cy: number; -} - let cursorCaptureInterval: NodeJS.Timeout | null = null; let cursorCaptureStartTimeMs = 0; -let activeCursorSamples: CursorTelemetryPoint[] = []; -let pendingCursorSamples: CursorTelemetryPoint[] = []; +const cursorTelemetryBuffer = createCursorTelemetryBuffer({ + maxActiveSamples: MAX_CURSOR_SAMPLES, +}); function clamp(value: number, min: number, max: number) { return Math.min(max, Math.max(min, value)); @@ -338,15 +346,11 @@ function sampleCursorPoint() { const cx = clamp((cursor.x - bounds.x) / width, 0, 1); const cy = clamp((cursor.y - bounds.y) / height, 0, 1); - activeCursorSamples.push({ + cursorTelemetryBuffer.push({ timeMs: Math.max(0, Date.now() - cursorCaptureStartTimeMs), cx, cy, }); - - if (activeCursorSamples.length > MAX_CURSOR_SAMPLES) { - activeCursorSamples.shift(); - } } export function registerIpcHandlers( @@ -531,18 +535,21 @@ export function registerIpcHandlers( } }); - ipcMain.handle("set-recording-state", (_, recording: boolean) => { + ipcMain.handle("set-recording-state", (_, recording: boolean, recordingId?: number) => { if (recording) { stopCursorCapture(); - activeCursorSamples = []; - pendingCursorSamples = []; + // The renderer is the source of truth for the recording id (it + // uses the same id as the saved fileName). Fall back to a + // timestamp only if the renderer didn't supply one, so the + // buffer always has a stable key per session. + const id = typeof recordingId === "number" ? recordingId : Date.now(); + cursorTelemetryBuffer.startSession(id); cursorCaptureStartTimeMs = Date.now(); sampleCursorPoint(); cursorCaptureInterval = setInterval(sampleCursorPoint, CURSOR_SAMPLE_INTERVAL_MS); } else { stopCursorCapture(); - pendingCursorSamples = [...activeCursorSamples]; - activeCursorSamples = []; + cursorTelemetryBuffer.endSession(); } const source = selectedSource || { name: "Screen" }; @@ -551,6 +558,10 @@ export function registerIpcHandlers( } }); + ipcMain.handle("discard-cursor-telemetry", (_, recordingId: number) => { + cursorTelemetryBuffer.discardBatch(recordingId); + }); + ipcMain.handle("get-cursor-telemetry", async (_, videoPath?: string) => { const targetVideoPath = normalizeVideoSourcePath( videoPath ?? currentRecordingSession?.screenVideoPath, diff --git a/electron/preload.ts b/electron/preload.ts index eeca25cd4..962e58281 100644 --- a/electron/preload.ts +++ b/electron/preload.ts @@ -47,12 +47,15 @@ contextBridge.exposeInMainWorld("electronAPI", { getRecordedVideoPath: () => { return ipcRenderer.invoke("get-recorded-video-path"); }, - setRecordingState: (recording: boolean) => { - return ipcRenderer.invoke("set-recording-state", recording); + setRecordingState: (recording: boolean, recordingId?: number) => { + return ipcRenderer.invoke("set-recording-state", recording, recordingId); }, getCursorTelemetry: (videoPath?: string) => { return ipcRenderer.invoke("get-cursor-telemetry", videoPath); }, + discardCursorTelemetry: (recordingId: number) => { + return ipcRenderer.invoke("discard-cursor-telemetry", recordingId); + }, onStopRecordingFromTray: (callback: () => void) => { const listener = () => callback(); ipcRenderer.on("stop-recording-from-tray", listener); diff --git a/src/hooks/useScreenRecorder.ts b/src/hooks/useScreenRecorder.ts index 5cbc54a19..a95b672fc 100644 --- a/src/hooks/useScreenRecorder.ts +++ b/src/hooks/useScreenRecorder.ts @@ -225,6 +225,7 @@ export function useScreenRecorder(): UseScreenRecorderReturn { try { const screenBlob = await activeScreenRecorder.recordedBlobPromise; if (discardRecordingId.current === activeRecordingId) { + window.electronAPI?.discardCursorTelemetry(activeRecordingId); return; } if (screenBlob.size === 0) { @@ -553,7 +554,7 @@ export function useScreenRecorder(): UseScreenRecorderReturn { setRecording(true); setPaused(false); setElapsedSeconds(0); - window.electronAPI?.setRecordingState(true); + window.electronAPI?.setRecordingState(true, recordingId.current); const activeScreenRecorder = screenRecorder.current; const activeWebcamRecorder = webcamRecorder.current; diff --git a/src/lib/cursorTelemetryBuffer.test.ts b/src/lib/cursorTelemetryBuffer.test.ts new file mode 100644 index 000000000..17174accc --- /dev/null +++ b/src/lib/cursorTelemetryBuffer.test.ts @@ -0,0 +1,271 @@ +import { describe, expect, it, vi } from "vitest"; +import { type CursorTelemetryPoint, createCursorTelemetryBuffer } from "./cursorTelemetryBuffer"; + +function sample(tag: number): CursorTelemetryPoint { + // Decouple the timestamp tag from the coordinate fixture so cursor + // points stay inside the normalized [0, 1] range that real samples use. + const normalized = (tag % 100) / 100; + return { timeMs: tag, cx: normalized, cy: normalized }; +} + +describe("createCursorTelemetryBuffer", () => { + it("stores samples captured during an active session", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); + buf.startSession(1); + for (let i = 0; i < 3; i++) buf.push(sample(i)); + buf.endSession(); + + const batch = buf.takeNextBatch(); + expect(batch?.recordingId).toBe(1); + expect(batch?.samples).toHaveLength(3); + expect(batch?.samples[0]?.timeMs).toBe(0); + }); + + it("trims active samples past maxActiveSamples (ring behaviour)", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 2 }); + buf.startSession(1); + buf.push(sample(1)); + buf.push(sample(2)); + buf.push(sample(3)); + buf.endSession(); + + const batch = buf.takeNextBatch(); + expect(batch?.samples).toEqual([sample(2), sample(3)]); + }); + + it("preserves earlier pending batches when a new session starts before store", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); + + // Recording 1 + buf.startSession(1); + buf.push(sample(101)); + buf.push(sample(102)); + buf.endSession(); + + // Recording 2 starts before recording 1's batch has been consumed + buf.startSession(2); + buf.push(sample(201)); + buf.endSession(); + + const batch1 = buf.takeNextBatch(); + const batch2 = buf.takeNextBatch(); + expect(batch1?.recordingId).toBe(1); + expect(batch1?.samples.map((s) => s.timeMs)).toEqual([101, 102]); + expect(batch2?.recordingId).toBe(2); + expect(batch2?.samples.map((s) => s.timeMs)).toEqual([201]); + }); + + it("returns null when nothing is pending", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); + expect(buf.takeNextBatch()).toBeNull(); + }); + + it("drops empty sessions instead of queuing empty batches", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); + buf.startSession(1); + buf.endSession(); + expect(buf.pendingCount).toBe(0); + expect(buf.takeNextBatch()).toBeNull(); + }); + + it("caps the pending queue at maxPendingBatches to bound memory", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10, maxPendingBatches: 3 }); + + for (let round = 1; round <= 5; round++) { + buf.startSession(round); + buf.push(sample(round)); + buf.endSession(); + } + + expect(buf.pendingCount).toBe(3); + // Oldest two batches (rounds 1 and 2) should have been dropped + expect(buf.takeNextBatch()?.recordingId).toBe(3); + expect(buf.takeNextBatch()?.recordingId).toBe(4); + expect(buf.takeNextBatch()?.recordingId).toBe(5); + }); + + it("starting a new session clears in-progress samples but keeps pending batches", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); + + buf.startSession(1); + buf.push(sample(1)); + buf.endSession(); + + buf.startSession(2); + buf.push(sample(99)); + // Simulate another startSession before endSession (e.g. rapid restart) + buf.startSession(3); + expect(buf.activeCount).toBe(0); + expect(buf.pendingCount).toBe(1); + + const batch = buf.takeNextBatch(); + expect(batch?.recordingId).toBe(1); + expect(batch?.samples.map((s) => s.timeMs)).toEqual([1]); + }); + + it("discardBatch(id) drops only the batch produced by that recording id", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); + + buf.startSession(1); + buf.push(sample(1)); + buf.endSession(); + + buf.startSession(2); + buf.push(sample(2)); + buf.endSession(); + + expect(buf.pendingCount).toBe(2); + expect(buf.discardBatch(1)).toBe(true); + expect(buf.pendingCount).toBe(1); + expect(buf.takeNextBatch()?.recordingId).toBe(2); + }); + + it("discardBatch(id) targets the correct batch even when a later recording sits in front of it", () => { + // Regression test for the rapid Stop → Record → Discard sequence: + // recording A's finalize callback does async work (fixWebmDuration), + // recording B finishes in the meantime, then A's callback resolves + // with discard intent. The discard must drop A — not B, which + // happens to be the *latest* pending batch by the time discard runs. + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); + + buf.startSession(1); + buf.push(sample(11)); + buf.endSession(); + + buf.startSession(2); + buf.push(sample(22)); + buf.endSession(); + + expect(buf.pendingCount).toBe(2); + expect(buf.discardBatch(1)).toBe(true); + + const remaining = buf.takeNextBatch(); + expect(remaining?.recordingId).toBe(2); + expect(remaining?.samples.map((s) => s.timeMs)).toEqual([22]); + expect(buf.takeNextBatch()).toBeNull(); + }); + + it("discardBatch(id) is a no-op (returns false) when the id is unknown or already drained", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); + expect(buf.discardBatch(42)).toBe(false); + + buf.startSession(1); + buf.push(sample(1)); + buf.endSession(); + buf.takeNextBatch(); + expect(buf.discardBatch(1)).toBe(false); + expect(buf.pendingCount).toBe(0); + }); + + it("prependBatch() re-inserts a batch at the front of the queue", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); + + buf.startSession(1); + buf.push(sample(1)); + buf.endSession(); + + const batch = buf.takeNextBatch(); + expect(batch).not.toBeNull(); + expect(buf.pendingCount).toBe(0); + + if (batch) buf.prependBatch(batch); + expect(buf.pendingCount).toBe(1); + const next = buf.takeNextBatch(); + expect(next?.recordingId).toBe(1); + expect(next?.samples.map((s) => s.timeMs)).toEqual([1]); + }); + + it("prependBatch() ignores empty batches", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); + buf.prependBatch({ recordingId: 1, samples: [] }); + expect(buf.pendingCount).toBe(0); + }); + + it("endSession() returns the number of dropped batches and warns when the cap is exceeded", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10, maxPendingBatches: 2 }); + const warn = vi.spyOn(console, "warn").mockImplementation(() => undefined); + + for (let round = 1; round <= 2; round++) { + buf.startSession(round); + buf.push(sample(round)); + expect(buf.endSession()).toBe(0); + } + expect(warn).not.toHaveBeenCalled(); + + buf.startSession(3); + buf.push(sample(3)); + const dropped = buf.endSession(); + expect(dropped).toBe(1); + expect(warn).toHaveBeenCalledTimes(1); + expect(warn.mock.calls[0]?.[0]).toMatch(/dropped 1 pending batch/); + expect(buf.pendingCount).toBe(2); + + warn.mockRestore(); + }); + + it("prependBatch() defensively trims and warns when it would exceed the cap", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10, maxPendingBatches: 2 }); + const warn = vi.spyOn(console, "warn").mockImplementation(() => undefined); + + // Fill the queue to the cap without dropping anything. + for (let round = 1; round <= 2; round++) { + buf.startSession(round); + buf.push(sample(round)); + buf.endSession(); + } + expect(buf.pendingCount).toBe(2); + expect(warn).not.toHaveBeenCalled(); + + // Simulate a misuse where a retry prepends without first draining: + // queue would grow to 3, so the oldest-trailing entry must be evicted. + buf.prependBatch({ recordingId: 99, samples: [sample(99)] }); + expect(buf.pendingCount).toBe(2); + expect(warn).toHaveBeenCalledTimes(1); + expect(warn.mock.calls[0]?.[0]).toMatch(/prependBatch trimmed 1 trailing batch/); + + // Front is the prepended batch; the preserved trailing batch is round 1. + expect(buf.takeNextBatch()?.recordingId).toBe(99); + expect(buf.takeNextBatch()?.recordingId).toBe(1); + expect(buf.pendingCount).toBe(0); + + warn.mockRestore(); + }); + + it("sanitizes non-finite or non-positive option values to safe defaults", () => { + // Infinity / NaN / negative would otherwise turn the trim loops + // into infinite loops. The buffer must fall back to defaults. + const buf = createCursorTelemetryBuffer({ + maxActiveSamples: Number.POSITIVE_INFINITY, + maxPendingBatches: Number.NaN, + }); + + buf.startSession(1); + buf.push(sample(1)); + expect(() => buf.endSession()).not.toThrow(); + expect(buf.pendingCount).toBe(1); + + const buf2 = createCursorTelemetryBuffer({ + maxActiveSamples: -5, + maxPendingBatches: 0, + }); + buf2.startSession(2); + buf2.push(sample(2)); + expect(() => buf2.endSession()).not.toThrow(); + expect(buf2.pendingCount).toBe(1); + }); + + it("reset() clears both active and pending state", () => { + const buf = createCursorTelemetryBuffer({ maxActiveSamples: 10 }); + buf.startSession(1); + buf.push(sample(1)); + buf.endSession(); + buf.startSession(2); + buf.push(sample(2)); + + buf.reset(); + + expect(buf.activeCount).toBe(0); + expect(buf.pendingCount).toBe(0); + expect(buf.takeNextBatch()).toBeNull(); + }); +}); diff --git a/src/lib/cursorTelemetryBuffer.ts b/src/lib/cursorTelemetryBuffer.ts new file mode 100644 index 000000000..0c7e0e10e --- /dev/null +++ b/src/lib/cursorTelemetryBuffer.ts @@ -0,0 +1,213 @@ +/** + * A single cursor telemetry sample captured during a recording session. + * + * Coordinates (`cx`, `cy`) are clamped ratios in the `[0, 1]` range, + * normalised against the captured surface's width and height by the + * main-process `sampleCursorPoint()` before being pushed. `timeMs` is the + * offset (in milliseconds) from the recording's start. + */ +export interface CursorTelemetryPoint { + timeMs: number; + cx: number; + cy: number; +} + +/** + * A completed batch of cursor samples, tagged with the recording id that + * produced them. The id is supplied at `startSession()` time and travels + * with the batch through the pending queue, retries, and discards. + */ +export interface CursorTelemetryBatch { + recordingId: number; + samples: CursorTelemetryPoint[]; +} + +/** + * Per-session cursor telemetry buffer with bounded memory. + * + * Flow: `startSession(recordingId)` → `push(point)` N times → `endSession()` + * enqueues the collected samples as a completed batch tagged with that + * `recordingId`. The main process later drains batches in FIFO order via + * `takeNextBatch()` to persist them to disk, and can `prependBatch()` on + * write failure to retry without losing order. A discard request keys on + * the recording id so an asynchronous "discard recording A" decision that + * arrives after recording B has already enqueued its batch still drops + * the right one. + * + * Memory is bounded by `maxActiveSamples` (ring buffer on the in-progress + * batch) and `maxPendingBatches` (FIFO cap across completed batches). + */ +export interface CursorTelemetryBuffer { + /** + * Begin a new recording session under the given `recordingId`. Clears + * any in-progress active samples (without touching already-completed + * pending batches). Safe to call repeatedly — e.g. a rapid Stop → + * Record sequence — and the most recent id wins. + */ + startSession(recordingId: number): void; + + /** + * Append a telemetry sample to the current active session. When the + * active buffer exceeds `maxActiveSamples`, the oldest sample is + * dropped (ring behaviour). + */ + push(point: CursorTelemetryPoint): void; + + /** + * Finalize the active session, moving its samples into the pending + * queue as a single batch tagged with the current recording id. Empty + * sessions are dropped (no empty batch is enqueued). + * + * If the pending queue would exceed `maxPendingBatches`, the oldest + * batches are evicted to bound memory. A `console.warn` is emitted + * whenever at least one batch is dropped so that pathological rapid- + * restart scenarios are observable. + * + * @returns the number of pending batches dropped by this call (0 under + * normal operation). + */ + endSession(): number; + + /** + * Remove and return the oldest pending batch, or `null` if the queue + * is empty. + */ + takeNextBatch(): CursorTelemetryBatch | null; + + /** + * Re-insert a batch at the front of the queue, preserving FIFO order + * on retry paths (e.g. when persisting the batch failed and the + * caller wants the next `takeNextBatch()` to yield it again). + * + * Empty batches are ignored. The pending cap is enforced defensively + * — if prepending would push the queue past `maxPendingBatches`, the + * oldest entries are evicted and a `console.warn` is emitted. In + * normal retry usage this trim is a no-op because the caller has just + * removed the batch via `takeNextBatch()`. + */ + prependBatch(batch: CursorTelemetryBatch): void; + + /** + * Drop the pending batch produced by the given `recordingId`. Used + * when a recording is discarded after its `endSession()` has run but + * before it has been persisted. Returns `true` if a batch was + * removed, `false` otherwise (no matching id, or the batch was + * already drained). + * + * Keying on the recording id (rather than "the latest pending batch") + * avoids a real bug: when finalizing a recording does asynchronous + * work like `fixWebmDuration`, a quick Stop → Record → Discard + * sequence can interleave such that the latest pending batch belongs + * to a *later* recording than the one being discarded. + */ + discardBatch(recordingId: number): boolean; + + /** + * Clear both the active and pending state. Intended for tests and + * full teardown paths. + */ + reset(): void; + + readonly activeCount: number; + readonly pendingCount: number; +} + +export interface CursorTelemetryBufferOptions { + maxActiveSamples: number; + maxPendingBatches?: number; +} + +const DEFAULT_MAX_PENDING_BATCHES = 8; +const DEFAULT_MAX_ACTIVE_SAMPLES = 10_000; + +/** Coerce a numeric option into a safe, finite, positive integer. */ +function sanitizeLimit(value: number | undefined, fallback: number): number { + if (typeof value !== "number" || !Number.isFinite(value)) return fallback; + const floored = Math.floor(value); + return floored >= 1 ? floored : fallback; +} + +/** + * Create a cursor telemetry buffer. + * + * Numeric options are sanitized: non-finite, negative, or zero values fall + * back to safe defaults so a bad caller cannot disable the memory bounds + * (which would turn the trim loops into infinite loops). + * + * @see CursorTelemetryBuffer for the full lifecycle contract. + */ +export function createCursorTelemetryBuffer( + options: CursorTelemetryBufferOptions, +): CursorTelemetryBuffer { + const maxActive = sanitizeLimit(options.maxActiveSamples, DEFAULT_MAX_ACTIVE_SAMPLES); + const maxPending = sanitizeLimit(options.maxPendingBatches, DEFAULT_MAX_PENDING_BATCHES); + + let active: CursorTelemetryPoint[] = []; + let activeRecordingId: number | null = null; + let pending: CursorTelemetryBatch[] = []; + + return { + startSession(recordingId) { + active = []; + activeRecordingId = recordingId; + }, + push(point) { + active.push(point); + if (active.length > maxActive) { + active.shift(); + } + }, + endSession() { + let dropped = 0; + if (active.length > 0 && activeRecordingId !== null) { + pending.push({ recordingId: activeRecordingId, samples: active }); + while (pending.length > maxPending) { + pending.shift(); + dropped++; + } + } + active = []; + activeRecordingId = null; + if (dropped > 0) { + console.warn( + `[cursorTelemetryBuffer] dropped ${dropped} pending batch(es) to stay within maxPendingBatches=${maxPending}`, + ); + } + return dropped; + }, + takeNextBatch() { + return pending.shift() ?? null; + }, + prependBatch(batch) { + if (batch.samples.length === 0) return; + pending.unshift(batch); + let dropped = 0; + while (pending.length > maxPending) { + pending.pop(); + dropped++; + } + if (dropped > 0) { + console.warn( + `[cursorTelemetryBuffer] prependBatch trimmed ${dropped} trailing batch(es) to stay within maxPendingBatches=${maxPending}`, + ); + } + }, + discardBatch(recordingId) { + const idx = pending.findIndex((b) => b.recordingId === recordingId); + if (idx === -1) return false; + pending.splice(idx, 1); + return true; + }, + reset() { + active = []; + activeRecordingId = null; + pending = []; + }, + get activeCount() { + return active.length; + }, + get pendingCount() { + return pending.length; + }, + }; +}