From 8b338dc9c5460be5b5fb60bf32c258f14aa41862 Mon Sep 17 00:00:00 2001 From: Chris Date: Sat, 9 May 2026 00:33:51 +0800 Subject: [PATCH 1/2] feat(enterprise): persist replayable agent events --- .../routes/__tests__/agentRoutesRbac.test.ts | 99 +++++++++ backend/src/routes/agentRoutes.ts | 122 ++++++++++- .../__tests__/agentEventStore.test.ts | 124 +++++++++++ backend/src/services/agentEventStore.ts | 206 ++++++++++++++++++ 4 files changed, 547 insertions(+), 4 deletions(-) create mode 100644 backend/src/services/__tests__/agentEventStore.test.ts create mode 100644 backend/src/services/agentEventStore.ts diff --git a/backend/src/routes/__tests__/agentRoutesRbac.test.ts b/backend/src/routes/__tests__/agentRoutesRbac.test.ts index bd7466bc..ca7adccd 100644 --- a/backend/src/routes/__tests__/agentRoutesRbac.test.ts +++ b/backend/src/routes/__tests__/agentRoutesRbac.test.ts @@ -11,6 +11,10 @@ import request from 'supertest'; import { ENTERPRISE_FEATURE_FLAG_ENV } from '../../config'; import { ENTERPRISE_DB_PATH_ENV } from '../../services/enterpriseDb'; import { ENTERPRISE_DATA_DIR_ENV, writeTraceMetadata } from '../../services/traceMetadataStore'; +import { + persistSerializedAgentEvent, + resetAgentEventStoreForTests, +} from '../../services/agentEventStore'; import { getTraceProcessorLeaseStore, setTraceProcessorLeaseStoreForTests, @@ -64,6 +68,7 @@ afterEach(async () => { jest.restoreAllMocks(); setTraceProcessorServiceForTests(null); setTraceProcessorLeaseStoreForTests(null); + resetAgentEventStoreForTests(); if (originalApiKey === undefined) { delete process.env.SMARTPERFETTO_API_KEY; } else { @@ -222,4 +227,98 @@ describe('agent route RBAC', () => { await fs.rm(tmpDir, { recursive: true, force: true }); } }); + + it('replays persisted terminal SSE events before falling back to the in-memory buffer', async () => { + const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'smartperfetto-agent-event-replay-')); + let leaseStore: ReturnType | null = null; + try { + const traceId = 'trace-agent-event-replay'; + const tracePath = path.join(tmpDir, `${traceId}.trace`); + await fs.writeFile(tracePath, 'trace bytes'); + delete process.env.SMARTPERFETTO_API_KEY; + process.env.SMARTPERFETTO_SSO_TRUSTED_HEADERS = 'true'; + process.env[ENTERPRISE_FEATURE_FLAG_ENV] = 'true'; + process.env[ENTERPRISE_DB_PATH_ENV] = path.join(tmpDir, 'enterprise.sqlite'); + process.env[ENTERPRISE_DATA_DIR_ENV] = path.join(tmpDir, 'data'); + process.env.UPLOAD_DIR = path.join(tmpDir, 'uploads'); + + await writeTraceMetadata({ + id: traceId, + filename: `${traceId}.trace`, + size: 11, + uploadedAt: new Date().toISOString(), + status: 'ready', + path: tracePath, + tenantId: 'tenant-a', + workspaceId: 'workspace-a', + userId: 'analyst-user', + }); + setTraceProcessorServiceForTests({ + getOrLoadTrace: jest.fn(async () => ({ + id: traceId, + filename: `${traceId}.trace`, + size: 11, + filePath: tracePath, + uploadTime: new Date(), + status: 'ready', + })), + getTrace: jest.fn(() => ({ + id: traceId, + filename: `${traceId}.trace`, + size: 11, + filePath: tracePath, + uploadTime: new Date(), + status: 'ready', + })), + ensureProcessorForLease: jest.fn(async () => undefined), + runWithLease: jest.fn(() => new Promise(() => undefined)), + query: jest.fn(async () => ({ columns: [], rows: [], durationMs: 1 })), + } as any); + + const analyzeRes = await analystHeaders(request(makeApp()).post('/api/agent/v1/analyze')) + .send({ traceId, query: 'analyze this trace' }); + + expect(analyzeRes.status).toBe(200); + const { sessionId, runId } = analyzeRes.body; + persistSerializedAgentEvent({ + tenantId: 'tenant-a', + workspaceId: 'workspace-a', + userId: 'analyst-user', + sessionId, + runId, + traceId, + query: 'analyze this trace', + }, { + cursor: 99, + eventType: 'analysis_completed', + eventData: JSON.stringify({ + type: 'analysis_completed', + data: { reportUrl: '/api/reports/report-from-db' }, + }), + createdAt: 1_777_000_002_000, + }); + + const streamRes = await analystHeaders( + request(makeApp()) + .get(`/api/agent/v1/${sessionId}/stream`) + .set('Last-Event-ID', '98') + .set('Accept', 'text/event-stream'), + ); + + expect(streamRes.status).toBe(200); + expect(streamRes.text).toContain('id: 99'); + expect(streamRes.text).toContain('event: analysis_completed'); + expect(streamRes.text).toContain('/api/reports/report-from-db'); + leaseStore = getTraceProcessorLeaseStore(); + expect(leaseStore.listLeases({ + tenantId: 'tenant-a', + workspaceId: 'workspace-a', + userId: 'analyst-user', + }, { traceId })).toHaveLength(1); + } finally { + leaseStore?.close(); + setTraceProcessorLeaseStoreForTests(null); + await fs.rm(tmpDir, { recursive: true, force: true }); + } + }); }); diff --git a/backend/src/routes/agentRoutes.ts b/backend/src/routes/agentRoutes.ts index c079c15c..79ff32c3 100644 --- a/backend/src/routes/agentRoutes.ts +++ b/backend/src/routes/agentRoutes.ts @@ -81,7 +81,14 @@ import { appendReplayableSseEvent, hasTerminalReplayAfter, parseLastEventId, + TERMINAL_SSE_EVENT_TYPES, } from '../assistant/stream/sessionSseReplay'; +import { + listSerializedAgentEventsAfter, + persistSerializedAgentEvent, + type AgentEventPersistenceScope, + type SerializedAgentEvent, +} from '../services/agentEventStore'; import { AgentAnalyzeSessionService, AnalyzeSessionPreparationError, @@ -378,6 +385,85 @@ interface AnalysisSession { const assistantAppService = new AssistantApplicationService(); const streamProjector = new StreamProjector(); +function agentEventScopeFromSession(session: AnalysisSession): AgentEventPersistenceScope | null { + const run = session.activeRun || session.lastRun; + if ( + !resolveFeatureConfig().enterprise || + !session.tenantId || + !session.workspaceId || + !run?.runId + ) { + return null; + } + return { + tenantId: session.tenantId, + workspaceId: session.workspaceId, + userId: session.userId, + sessionId: session.sessionId, + runId: run.runId, + traceId: session.traceId, + query: run.query || session.query, + }; +} + +function persistBufferedAgentEvent(session: AnalysisSession, event: SerializedAgentEvent): void { + const scope = agentEventScopeFromSession(session); + if (!scope) return; + try { + persistSerializedAgentEvent(scope, event); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + session.logger.warn('AgentEvents', 'Failed to persist SSE event', { + sessionId: session.sessionId, + runId: scope.runId, + eventType: event.eventType, + cursor: event.cursor, + error: message, + }); + } +} + +function replayPersistedAgentEvents( + session: AnalysisSession, + res: express.Response, + lastEventId: number, +): { replayed: number; includesTerminal: boolean; lastCursor: number } { + const scope = agentEventScopeFromSession(session); + if (!scope) return { replayed: 0, includesTerminal: false, lastCursor: lastEventId }; + let events: SerializedAgentEvent[] = []; + try { + events = listSerializedAgentEventsAfter(scope, scope.runId, lastEventId); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + session.logger.warn('AgentEvents', 'Failed to load persisted SSE replay events', { + sessionId: session.sessionId, + runId: scope.runId, + lastEventId, + error: message, + }); + return { replayed: 0, includesTerminal: false, lastCursor: lastEventId }; + } + + let replayed = 0; + let includesTerminal = false; + let lastCursor = lastEventId; + for (const event of events) { + try { + res.write(`id: ${event.cursor}\n`); + res.write(`event: ${event.eventType}\n`); + res.write(`data: ${event.eventData}\n\n`); + replayed++; + lastCursor = event.cursor; + if (TERMINAL_SSE_EVENT_TYPES.has(event.eventType)) { + includesTerminal = true; + } + } catch { + break; + } + } + return { replayed, includesTerminal, lastCursor }; +} + function sendReplayableSessionEvent( session: AnalysisSession, res: express.Response, @@ -385,6 +471,12 @@ function sendReplayableSessionEvent( payload: unknown ): number { const event = appendReplayableSseEvent(session, eventType, payload); + persistBufferedAgentEvent(session, { + cursor: event.seqId, + eventType: event.eventType, + eventData: event.eventData, + createdAt: Date.now(), + }); streamProjector.sendEvent(res, eventType, payload, event.seqId); return event.seqId; } @@ -1189,12 +1281,28 @@ function handleSessionStream(req: express.Request, res: express.Response, sessio ...buildStreamObservability(session), }); + let ringReplayAfter = lastEventId; + if (lastEventId !== null) { + const persistedReplay = replayPersistedAgentEvents(session, res, lastEventId); + ringReplayAfter = persistedReplay.lastCursor; + if (persistedReplay.replayed > 0) { + console.log( + `[AgentRoutes] Replayed ${persistedReplay.replayed} persisted SSE events for ${sessionId} ` + + `(after seqId ${lastEventId})` + ); + } + if (persistedReplay.includesTerminal) { + res.end(); + return; + } + } + // Replay missed events from the ring buffer if reconnecting. - if (lastEventId !== null && session.sseEventBuffer.length > 0) { - const replayIncludesTerminal = hasTerminalReplayAfter(session, lastEventId); - const replayed = streamProjector.replayBufferedEvents(res, session.sseEventBuffer, lastEventId); + if (ringReplayAfter !== null && session.sseEventBuffer.length > 0) { + const replayIncludesTerminal = hasTerminalReplayAfter(session, ringReplayAfter); + const replayed = streamProjector.replayBufferedEvents(res, session.sseEventBuffer, ringReplayAfter); if (replayed > 0) { - console.log(`[AgentRoutes] Replayed ${replayed} missed SSE events for ${sessionId} (after seqId ${lastEventId})`); + console.log(`[AgentRoutes] Replayed ${replayed} missed SSE events for ${sessionId} (after seqId ${ringReplayAfter})`); } if (replayIncludesTerminal) { res.end(); @@ -3070,6 +3178,12 @@ function broadcastToAgentDrivenClients(sessionId: string, update: StreamingUpdat seqId, onBufferedEvent: (event) => { session.sseEventBuffer.push(event); + persistBufferedAgentEvent(session, { + cursor: event.seqId, + eventType: event.eventType, + eventData: event.eventData, + createdAt: Date.now(), + }); // Trim ring buffer to cap if (session.sseEventBuffer.length > SSE_RING_BUFFER_SIZE) { session.sseEventBuffer.splice(0, session.sseEventBuffer.length - SSE_RING_BUFFER_SIZE); diff --git a/backend/src/services/__tests__/agentEventStore.test.ts b/backend/src/services/__tests__/agentEventStore.test.ts new file mode 100644 index 00000000..0c54598c --- /dev/null +++ b/backend/src/services/__tests__/agentEventStore.test.ts @@ -0,0 +1,124 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later +// Copyright (C) 2024-2026 Gracker (Chris) +// This file is part of SmartPerfetto. See LICENSE for details. + +import { afterEach, beforeEach, describe, expect, it } from '@jest/globals'; +import fs from 'fs/promises'; +import os from 'os'; +import path from 'path'; +import { + ENTERPRISE_DB_PATH_ENV, + openEnterpriseDb, +} from '../enterpriseDb'; +import { + listSerializedAgentEventsAfter, + persistSerializedAgentEvent, + resetAgentEventStoreForTests, + type AgentEventPersistenceScope, +} from '../agentEventStore'; + +const originalDbPath = process.env[ENTERPRISE_DB_PATH_ENV]; + +let tmpDir: string; + +function restoreEnvValue(key: string, value: string | undefined): void { + if (value === undefined) { + delete process.env[key]; + } else { + process.env[key] = value; + } +} + +function scope(overrides: Partial = {}): AgentEventPersistenceScope { + return { + tenantId: 'tenant-a', + workspaceId: 'workspace-a', + userId: 'user-a', + sessionId: 'session-a', + runId: 'run-a', + traceId: 'trace-a', + query: 'why is this trace slow?', + ...overrides, + }; +} + +beforeEach(async () => { + tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'smartperfetto-agent-events-')); + process.env[ENTERPRISE_DB_PATH_ENV] = path.join(tmpDir, 'enterprise.sqlite'); +}); + +afterEach(async () => { + resetAgentEventStoreForTests(); + restoreEnvValue(ENTERPRISE_DB_PATH_ENV, originalDbPath); + await fs.rm(tmpDir, { recursive: true, force: true }); +}); + +describe('agent event store', () => { + it('persists replayable SSE events with workspace scope and terminal run status', () => { + const eventScope = scope(); + + persistSerializedAgentEvent(eventScope, { + cursor: 1, + eventType: 'progress', + eventData: JSON.stringify({ type: 'progress', data: { phase: 'start' } }), + createdAt: 1_777_000_000_000, + }); + persistSerializedAgentEvent(eventScope, { + cursor: 2, + eventType: 'analysis_completed', + eventData: JSON.stringify({ type: 'analysis_completed', data: { reportUrl: '/api/reports/report-a' } }), + createdAt: 1_777_000_000_100, + }); + persistSerializedAgentEvent(eventScope, { + cursor: 2, + eventType: 'analysis_completed', + eventData: JSON.stringify({ duplicate: true }), + createdAt: 1_777_000_000_200, + }); + + expect(listSerializedAgentEventsAfter(eventScope, 'run-a', 0)).toEqual([ + expect.objectContaining({ cursor: 1, eventType: 'progress' }), + expect.objectContaining({ cursor: 2, eventType: 'analysis_completed' }), + ]); + expect(listSerializedAgentEventsAfter(eventScope, 'run-a', 1)).toEqual([ + expect.objectContaining({ + cursor: 2, + eventData: expect.stringContaining('/api/reports/report-a'), + }), + ]); + expect(listSerializedAgentEventsAfter(scope({ workspaceId: 'workspace-b' }), 'run-a', 0)).toEqual([]); + + const db = openEnterpriseDb(); + try { + expect(db.prepare('SELECT COUNT(*) AS count FROM agent_events WHERE run_id = ?').get('run-a')).toEqual({ count: 2 }); + expect(db.prepare('SELECT status, completed_at FROM analysis_runs WHERE id = ?').get('run-a')).toEqual({ + status: 'completed', + completed_at: 1_777_000_000_100, + }); + expect(db.prepare('SELECT status FROM analysis_sessions WHERE id = ?').get('session-a')).toEqual({ + status: 'completed', + }); + } finally { + db.close(); + } + }); + + it('marks the run failed when an error event is persisted', () => { + persistSerializedAgentEvent(scope({ runId: 'run-failed' }), { + cursor: 1, + eventType: 'error', + eventData: JSON.stringify({ error: 'cancelled' }), + createdAt: 1_777_000_001_000, + }); + + const db = openEnterpriseDb(); + try { + expect(db.prepare('SELECT status, completed_at FROM analysis_runs WHERE id = ?').get('run-failed')).toEqual({ + status: 'failed', + completed_at: 1_777_000_001_000, + }); + } finally { + db.close(); + } + }); +}); diff --git a/backend/src/services/agentEventStore.ts b/backend/src/services/agentEventStore.ts new file mode 100644 index 00000000..f64b4b1a --- /dev/null +++ b/backend/src/services/agentEventStore.ts @@ -0,0 +1,206 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later +// Copyright (C) 2024-2026 Gracker (Chris) +// This file is part of SmartPerfetto. See LICENSE for details. + +import crypto from 'crypto'; +import type Database from 'better-sqlite3'; +import { openEnterpriseDb, resolveEnterpriseDbPath } from './enterpriseDb'; +import type { EnterpriseRepositoryScope } from './enterpriseRepository'; + +export interface AgentEventPersistenceScope extends EnterpriseRepositoryScope { + sessionId: string; + runId: string; + traceId: string; + query?: string; +} + +export interface SerializedAgentEvent { + cursor: number; + eventType: string; + eventData: string; + createdAt: number; +} + +interface AgentEventRow extends Record { + cursor: number; + event_type: string; + payload_json: string; + created_at: number; +} + +let singletonDb: Database.Database | null = null; +let singletonDbPath: string | null = null; + +function getAgentEventDb(): Database.Database { + const dbPath = resolveEnterpriseDbPath(); + if (!singletonDb || singletonDbPath !== dbPath) { + singletonDb?.close(); + singletonDb = openEnterpriseDb(dbPath); + singletonDbPath = dbPath; + } + return singletonDb; +} + +export function resetAgentEventStoreForTests(): void { + singletonDb?.close(); + singletonDb = null; + singletonDbPath = null; +} + +function ensureAgentEventGraph( + db: Database.Database, + scope: AgentEventPersistenceScope, + now: number, +): void { + db.prepare(` + INSERT OR IGNORE INTO organizations (id, name, status, plan, created_at, updated_at) + VALUES (?, ?, 'active', 'enterprise', ?, ?) + `).run(scope.tenantId, scope.tenantId, now, now); + + db.prepare(` + INSERT OR IGNORE INTO workspaces (id, tenant_id, name, created_at, updated_at) + VALUES (?, ?, ?, ?, ?) + `).run(scope.workspaceId, scope.tenantId, scope.workspaceId, now, now); + + if (scope.userId) { + db.prepare(` + INSERT INTO users (id, tenant_id, email, display_name, idp_subject, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + email = excluded.email, + display_name = excluded.display_name, + updated_at = excluded.updated_at + `).run( + scope.userId, + scope.tenantId, + `${scope.userId}@agent-event.local`, + scope.userId, + `agent-event:${scope.userId}`, + now, + now, + ); + } + + db.prepare(` + INSERT OR IGNORE INTO trace_assets + (id, tenant_id, workspace_id, owner_user_id, local_path, size_bytes, status, metadata_json, created_at) + VALUES + (?, ?, ?, ?, ?, 0, 'metadata_only', ?, ?) + `).run( + scope.traceId, + scope.tenantId, + scope.workspaceId, + scope.userId ?? null, + `metadata-only:${scope.traceId}`, + JSON.stringify({ source: 'agent_event', sessionId: scope.sessionId, runId: scope.runId }), + now, + ); + + db.prepare(` + INSERT OR IGNORE INTO analysis_sessions + (id, tenant_id, workspace_id, trace_id, created_by, title, visibility, status, created_at, updated_at) + VALUES + (?, ?, ?, ?, ?, ?, 'private', 'running', ?, ?) + `).run( + scope.sessionId, + scope.tenantId, + scope.workspaceId, + scope.traceId, + scope.userId ?? null, + `Agent session ${scope.sessionId}`, + now, + now, + ); + + db.prepare(` + INSERT OR IGNORE INTO analysis_runs + (id, tenant_id, workspace_id, session_id, mode, status, question, started_at, completed_at) + VALUES + (?, ?, ?, ?, 'agent', 'running', ?, ?, NULL) + `).run( + scope.runId, + scope.tenantId, + scope.workspaceId, + scope.sessionId, + scope.query ?? '', + now, + ); +} + +function terminalStatusForEvent(eventType: string): 'completed' | 'failed' | null { + if (eventType === 'analysis_completed') return 'completed'; + if (eventType === 'error') return 'failed'; + return null; +} + +export function persistSerializedAgentEvent( + scope: AgentEventPersistenceScope, + event: SerializedAgentEvent, +): void { + const db = getAgentEventDb(); + const write = db.transaction(() => { + ensureAgentEventGraph(db, scope, event.createdAt); + db.prepare(` + INSERT OR IGNORE INTO agent_events + (id, tenant_id, workspace_id, run_id, cursor, event_type, payload_json, created_at) + VALUES + (?, ?, ?, ?, ?, ?, ?, ?) + `).run( + crypto.randomUUID(), + scope.tenantId, + scope.workspaceId, + scope.runId, + event.cursor, + event.eventType, + event.eventData, + event.createdAt, + ); + + const terminalStatus = terminalStatusForEvent(event.eventType); + if (terminalStatus) { + db.prepare(` + UPDATE analysis_runs + SET status = ?, completed_at = COALESCE(completed_at, ?) + WHERE tenant_id = ? AND workspace_id = ? AND id = ? + `).run(terminalStatus, event.createdAt, scope.tenantId, scope.workspaceId, scope.runId); + db.prepare(` + UPDATE analysis_sessions + SET status = ?, updated_at = ? + WHERE tenant_id = ? AND workspace_id = ? AND id = ? + `).run(terminalStatus, event.createdAt, scope.tenantId, scope.workspaceId, scope.sessionId); + } else { + db.prepare(` + UPDATE analysis_sessions + SET updated_at = ? + WHERE tenant_id = ? AND workspace_id = ? AND id = ? + `).run(event.createdAt, scope.tenantId, scope.workspaceId, scope.sessionId); + } + }); + write(); +} + +export function listSerializedAgentEventsAfter( + scope: EnterpriseRepositoryScope, + runId: string, + cursor: number, + limit = 1000, +): SerializedAgentEvent[] { + const db = getAgentEventDb(); + const boundedLimit = Math.max(1, Math.min(1000, Math.floor(limit))); + const rows = db.prepare(` + SELECT cursor, event_type, payload_json, created_at + FROM agent_events + WHERE tenant_id = ? + AND workspace_id = ? + AND run_id = ? + AND cursor > ? + ORDER BY cursor ASC + LIMIT ? + `).all(scope.tenantId, scope.workspaceId, runId, cursor, boundedLimit); + return rows.map(row => ({ + cursor: row.cursor, + eventType: row.event_type, + eventData: row.payload_json, + createdAt: row.created_at, + })); +} From 8dea5bfa69207b202fff62779f8b4d0c4fa231ef Mon Sep 17 00:00:00 2001 From: Chris Date: Sat, 9 May 2026 00:48:50 +0800 Subject: [PATCH 2/2] feat(enterprise): persist run heartbeats --- backend/package.json | 2 +- .../assistantApplicationService.test.ts | 60 ++++ .../assistantApplicationService.ts | 18 ++ .../routes/__tests__/agentRoutesRbac.test.ts | 15 + backend/src/routes/agentRoutes.ts | 92 ++++++ .../__tests__/analysisRunStore.test.ts | 104 +++++++ .../services/__tests__/enterpriseDb.test.ts | 9 +- .../__tests__/enterpriseSchema.test.ts | 12 +- backend/src/services/agentEventStore.ts | 23 +- backend/src/services/analysisRunStore.ts | 263 ++++++++++++++++++ backend/src/services/enterpriseSchema.ts | 11 + .../enterprise-multi-tenant/baseline.md | 39 +++ .../enterprise-multi-tenant/rss-benchmark.md | 7 + 13 files changed, 650 insertions(+), 5 deletions(-) create mode 100644 backend/src/assistant/application/__tests__/assistantApplicationService.test.ts create mode 100644 backend/src/services/__tests__/analysisRunStore.test.ts create mode 100644 backend/src/services/analysisRunStore.ts diff --git a/backend/package.json b/backend/package.json index 6676781f..6cbcacc3 100644 --- a/backend/package.json +++ b/backend/package.json @@ -41,7 +41,7 @@ "prepack": "npm run build", "typecheck": "tsc --noEmit", "test": "jest", - "test:core": "jest --runInBand --forceExit src/agent/communication/__tests__/agentMessageBus.test.ts src/agent/core/executors/__tests__/strategyExecutor.test.ts src/agent/core/executors/__tests__/hypothesisExecutor.test.ts src/agent/context/__tests__/enhancedSessionContext.test.ts src/tests/adbTools.test.ts src/services/__tests__/sessionLogger.test.ts src/services/__tests__/traceAnalysisSkillConfig.test.ts src/agent/agents/domain/__tests__/registry.test.ts src/agentv3/__tests__/sqlIncludeInjector.test.ts src/agentv3/__tests__/analysisPatternMemory.test.ts src/agentv3/__tests__/claudeRuntimeRuntimeSnapshots.test.ts src/middleware/__tests__/auth.test.ts src/services/__tests__/rbac.test.ts src/routes/__tests__/agentRoutesRbac.test.ts src/routes/__tests__/ownerGuardRoutes.test.ts src/routes/__tests__/requestContextRouteCoverage.test.ts src/middleware/__tests__/legacyApiCompatibility.test.ts src/services/__tests__/enterpriseDb.test.ts src/services/__tests__/enterpriseSchema.test.ts src/services/__tests__/enterpriseRepository.test.ts src/services/__tests__/processRss.test.ts src/services/__tests__/workingTraceProcessor.enterpriseIsolation.test.ts src/services/__tests__/traceProcessorLeaseStore.test.ts src/services/__tests__/traceProcessorLeaseModeDecision.test.ts src/services/__tests__/traceProcessorLeaseProcessorRouting.test.ts src/services/__tests__/traceProcessorSqlWorker.test.ts src/services/__tests__/traceProcessorRamBudget.test.ts src/scripts/__tests__/benchmarkTraceProcessorRss.test.ts src/services/__tests__/enterpriseKnowledgeScope.test.ts src/services/__tests__/enterpriseMigration.test.ts src/services/__tests__/runtimeSnapshotStore.test.ts src/services/providerManager/__tests__/localSecretStore.test.ts src/services/providerManager/__tests__/enterpriseProviderStore.test.ts src/routes/__tests__/enterpriseTraceMetadataRoutes.test.ts src/routes/__tests__/traceProcessorProxyRoutes.test.ts src/routes/__tests__/enterpriseReportRoutes.test.ts src/routes/__tests__/enterpriseRestartPersistence.test.ts", + "test:core": "jest --runInBand --forceExit src/agent/communication/__tests__/agentMessageBus.test.ts src/agent/core/executors/__tests__/strategyExecutor.test.ts src/agent/core/executors/__tests__/hypothesisExecutor.test.ts src/agent/context/__tests__/enhancedSessionContext.test.ts src/tests/adbTools.test.ts src/services/__tests__/sessionLogger.test.ts src/services/__tests__/traceAnalysisSkillConfig.test.ts src/agent/agents/domain/__tests__/registry.test.ts src/agentv3/__tests__/sqlIncludeInjector.test.ts src/agentv3/__tests__/analysisPatternMemory.test.ts src/agentv3/__tests__/claudeRuntimeRuntimeSnapshots.test.ts src/middleware/__tests__/auth.test.ts src/assistant/application/__tests__/assistantApplicationService.test.ts src/services/__tests__/rbac.test.ts src/routes/__tests__/agentRoutesRbac.test.ts src/routes/__tests__/ownerGuardRoutes.test.ts src/routes/__tests__/requestContextRouteCoverage.test.ts src/middleware/__tests__/legacyApiCompatibility.test.ts src/services/__tests__/enterpriseDb.test.ts src/services/__tests__/enterpriseSchema.test.ts src/services/__tests__/enterpriseRepository.test.ts src/services/__tests__/processRss.test.ts src/services/__tests__/workingTraceProcessor.enterpriseIsolation.test.ts src/services/__tests__/traceProcessorLeaseStore.test.ts src/services/__tests__/traceProcessorLeaseModeDecision.test.ts src/services/__tests__/traceProcessorLeaseProcessorRouting.test.ts src/services/__tests__/traceProcessorSqlWorker.test.ts src/services/__tests__/traceProcessorRamBudget.test.ts src/scripts/__tests__/benchmarkTraceProcessorRss.test.ts src/services/__tests__/analysisRunStore.test.ts src/services/__tests__/agentEventStore.test.ts src/services/__tests__/enterpriseKnowledgeScope.test.ts src/services/__tests__/enterpriseMigration.test.ts src/services/__tests__/runtimeSnapshotStore.test.ts src/services/providerManager/__tests__/localSecretStore.test.ts src/services/providerManager/__tests__/enterpriseProviderStore.test.ts src/routes/__tests__/enterpriseTraceMetadataRoutes.test.ts src/routes/__tests__/traceProcessorProxyRoutes.test.ts src/routes/__tests__/enterpriseReportRoutes.test.ts src/routes/__tests__/enterpriseRestartPersistence.test.ts", "test:watch": "jest --watch", "test:coverage": "jest --coverage", "test:unit": "jest --testPathPatterns=src/tests", diff --git a/backend/src/assistant/application/__tests__/assistantApplicationService.test.ts b/backend/src/assistant/application/__tests__/assistantApplicationService.test.ts new file mode 100644 index 00000000..cf920772 --- /dev/null +++ b/backend/src/assistant/application/__tests__/assistantApplicationService.test.ts @@ -0,0 +1,60 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later +// Copyright (C) 2024-2026 Gracker (Chris) +// This file is part of SmartPerfetto. See LICENSE for details. + +import { describe, expect, it, jest } from '@jest/globals'; +import { + AssistantApplicationService, + type ManagedAssistantSession, +} from '../assistantApplicationService'; + +function session(overrides: Partial = {}): ManagedAssistantSession { + return { + sessionId: 'session-a', + status: 'running', + createdAt: 1_777_000_000_000, + lastActivityAt: 1_777_000_000_000, + sseClients: [], + ...overrides, + }; +} + +describe('AssistantApplicationService cleanup', () => { + it('lets callers keep abandoned non-terminal sessions when an external run heartbeat is fresh', () => { + const service = new AssistantApplicationService(); + const managed = session(); + const onCleanup = jest.fn(); + service.setSession(managed.sessionId, managed); + + const removed = service.cleanupIdleSessions({ + now: 1_777_000_010_000, + terminalMaxIdleMs: 1_000, + nonTerminalMaxIdleMs: 1_000, + shouldCleanup: (_sessionId, _session, context) => { + expect(context.isAbandonedNonTerminal).toBe(true); + return false; + }, + onCleanup, + }); + + expect(removed).toEqual([]); + expect(onCleanup).not.toHaveBeenCalled(); + expect(service.getSession(managed.sessionId)).toBe(managed); + }); + + it('still removes stale abandoned non-terminal sessions when the cleanup predicate allows it', () => { + const service = new AssistantApplicationService(); + const managed = session(); + service.setSession(managed.sessionId, managed); + + const removed = service.cleanupIdleSessions({ + now: 1_777_000_010_000, + terminalMaxIdleMs: 1_000, + nonTerminalMaxIdleMs: 1_000, + shouldCleanup: () => true, + }); + + expect(removed).toEqual([managed.sessionId]); + expect(service.getSession(managed.sessionId)).toBeUndefined(); + }); +}); diff --git a/backend/src/assistant/application/assistantApplicationService.ts b/backend/src/assistant/application/assistantApplicationService.ts index 3e59ec9a..e6ddbe8e 100644 --- a/backend/src/assistant/application/assistantApplicationService.ts +++ b/backend/src/assistant/application/assistantApplicationService.ts @@ -24,6 +24,16 @@ export interface SessionCleanupOptions { terminalMaxIdleMs: number; nonTerminalMaxIdleMs: number; now?: number; + shouldCleanup?: ( + sessionId: string, + session: T, + context: { + now: number; + idleMs: number; + isTerminal: boolean; + isAbandonedNonTerminal: boolean; + }, + ) => boolean; onCleanup?: (sessionId: string, session: T) => void; } @@ -99,6 +109,14 @@ export class AssistantApplicationService { (isTerminal && idle > options.terminalMaxIdleMs) || (isAbandonedNonTerminal && idle > options.nonTerminalMaxIdleMs) ) { + if (options.shouldCleanup?.(sessionId, session, { + now, + idleMs: idle, + isTerminal, + isAbandonedNonTerminal, + }) === false) { + continue; + } options.onCleanup?.(sessionId, session); this.sessions.delete(sessionId); removed.push(sessionId); diff --git a/backend/src/routes/__tests__/agentRoutesRbac.test.ts b/backend/src/routes/__tests__/agentRoutesRbac.test.ts index ca7adccd..2e0c0155 100644 --- a/backend/src/routes/__tests__/agentRoutesRbac.test.ts +++ b/backend/src/routes/__tests__/agentRoutesRbac.test.ts @@ -15,6 +15,10 @@ import { persistSerializedAgentEvent, resetAgentEventStoreForTests, } from '../../services/agentEventStore'; +import { + getAnalysisRunLifecycle, + resetAnalysisRunStoreForTests, +} from '../../services/analysisRunStore'; import { getTraceProcessorLeaseStore, setTraceProcessorLeaseStoreForTests, @@ -69,6 +73,7 @@ afterEach(async () => { setTraceProcessorServiceForTests(null); setTraceProcessorLeaseStoreForTests(null); resetAgentEventStoreForTests(); + resetAnalysisRunStoreForTests(); if (originalApiKey === undefined) { delete process.env.SMARTPERFETTO_API_KEY; } else { @@ -280,6 +285,16 @@ describe('agent route RBAC', () => { expect(analyzeRes.status).toBe(200); const { sessionId, runId } = analyzeRes.body; + const persistedRun = getAnalysisRunLifecycle({ + tenantId: 'tenant-a', + workspaceId: 'workspace-a', + userId: 'analyst-user', + }, runId); + expect(persistedRun).toEqual(expect.objectContaining({ + id: runId, + status: 'running', + })); + expect(persistedRun?.heartbeatAt).toEqual(expect.any(Number)); persistSerializedAgentEvent({ tenantId: 'tenant-a', workspaceId: 'workspace-a', diff --git a/backend/src/routes/agentRoutes.ts b/backend/src/routes/agentRoutes.ts index 79ff32c3..637c47f7 100644 --- a/backend/src/routes/agentRoutes.ts +++ b/backend/src/routes/agentRoutes.ts @@ -89,6 +89,13 @@ import { type AgentEventPersistenceScope, type SerializedAgentEvent, } from '../services/agentEventStore'; +import { + heartbeatAnalysisRun, + isAnalysisRunHeartbeatFresh, + persistAnalysisRunState, + type AnalysisRunPersistenceScope, + type PersistedAnalysisRunStatus, +} from '../services/analysisRunStore'; import { AgentAnalyzeSessionService, AnalyzeSessionPreparationError, @@ -284,6 +291,7 @@ function startSessionRun( }); } + persistSessionRunState(session, 'pending'); return run; } @@ -299,6 +307,7 @@ function markSessionRunStatus( } session.activeRun.error = error; session.lastRun = { ...session.activeRun }; + persistSessionRunState(session, status, error); } // Attach/echo requestId for all agent endpoints. @@ -406,6 +415,72 @@ function agentEventScopeFromSession(session: AnalysisSession): AgentEventPersist }; } +function analysisRunScopeFromSession(session: AnalysisSession): AnalysisRunPersistenceScope | null { + return agentEventScopeFromSession(session); +} + +function persistSessionRunState( + session: AnalysisSession, + status: PersistedAnalysisRunStatus, + error?: string, +): void { + const scope = analysisRunScopeFromSession(session); + if (!scope) return; + try { + persistAnalysisRunState(scope, status, { error }); + } catch (persistError) { + const message = persistError instanceof Error ? persistError.message : String(persistError); + session.logger.warn('AnalysisRun', 'Failed to persist run state', { + sessionId: session.sessionId, + runId: scope.runId, + status, + error: message, + }); + } +} + +function heartbeatSessionRun(session: AnalysisSession): void { + const scope = analysisRunScopeFromSession(session); + if (!scope) return; + try { + heartbeatAnalysisRun(scope); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + session.logger.warn('AnalysisRun', 'Failed to persist run heartbeat', { + sessionId: session.sessionId, + runId: scope.runId, + error: message, + }); + } +} + +function startSessionRunHeartbeat(session: AnalysisSession): NodeJS.Timeout | undefined { + if (!analysisRunScopeFromSession(session)) return undefined; + heartbeatSessionRun(session); + return setInterval(() => heartbeatSessionRun(session), AGENT_RUN_HEARTBEAT_INTERVAL_MS); +} + +function isPersistedSessionRunFresh(session: AnalysisSession, now: number): boolean { + const scope = analysisRunScopeFromSession(session); + if (!scope) return false; + try { + return isAnalysisRunHeartbeatFresh( + scope, + scope.runId, + now, + AGENT_RUN_HEARTBEAT_MAX_STALE_MS, + ); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + session.logger.warn('AnalysisRun', 'Failed to inspect persisted run heartbeat', { + sessionId: session.sessionId, + runId: scope.runId, + error: message, + }); + return true; + } +} + function persistBufferedAgentEvent(session: AnalysisSession, event: SerializedAgentEvent): void { const scope = agentEventScopeFromSession(session); if (!scope) return; @@ -863,6 +938,8 @@ const MAX_SESSION_AGENT_DIALOGUE = 800; const MAX_SESSION_AGENT_RESPONSES = 400; const TERMINAL_SESSION_MAX_IDLE_MS = 30 * 60 * 1000; const NON_TERMINAL_SESSION_MAX_IDLE_MS = 2 * 60 * 60 * 1000; +const AGENT_RUN_HEARTBEAT_INTERVAL_MS = 30 * 1000; +const AGENT_RUN_HEARTBEAT_MAX_STALE_MS = NON_TERMINAL_SESSION_MAX_IDLE_MS; function trimSessionArray(items: T[], maxEntries: number): void { if (items.length > maxEntries) { @@ -2564,6 +2641,8 @@ async function runAgentDrivenAnalysis( const { logger } = session; session.status = 'running'; session.lastActivityAt = Date.now(); + persistSessionRunState(session, 'running'); + const runHeartbeatInterval = startSessionRunHeartbeat(session); logger.info('AgentDrivenAnalysis', 'Starting agent-driven analysis', { query, traceId, @@ -2829,6 +2908,9 @@ async function runAgentDrivenAnalysis( logger.close(); throw error; } finally { + if (runHeartbeatInterval) { + clearInterval(runHeartbeatInterval); + } // Prevent listener accumulation across multi-turn requests in the same session. if (session.orchestratorUpdateHandler) { session.orchestrator.off('update', session.orchestratorUpdateHandler); @@ -4500,6 +4582,16 @@ const sessionCleanupInterval = setInterval(() => { assistantAppService.cleanupIdleSessions({ terminalMaxIdleMs: TERMINAL_SESSION_MAX_IDLE_MS, nonTerminalMaxIdleMs: NON_TERMINAL_SESSION_MAX_IDLE_MS, + shouldCleanup: (_sessionId, session, context) => { + if (!resolveFeatureConfig().enterprise || !context.isAbandonedNonTerminal) { + return true; + } + if (isPersistedSessionRunFresh(session, context.now)) { + session.lastActivityAt = context.now; + return false; + } + return true; + }, onCleanup: (sessionId, session) => { console.log(`[AgentRoutes] Cleaning up stale session: ${sessionId}`); session.sseClients.forEach((client) => { diff --git a/backend/src/services/__tests__/analysisRunStore.test.ts b/backend/src/services/__tests__/analysisRunStore.test.ts new file mode 100644 index 00000000..58429a6c --- /dev/null +++ b/backend/src/services/__tests__/analysisRunStore.test.ts @@ -0,0 +1,104 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later +// Copyright (C) 2024-2026 Gracker (Chris) +// This file is part of SmartPerfetto. See LICENSE for details. + +import { afterEach, beforeEach, describe, expect, it } from '@jest/globals'; +import fs from 'fs/promises'; +import os from 'os'; +import path from 'path'; +import { + ENTERPRISE_DB_PATH_ENV, + openEnterpriseDb, +} from '../enterpriseDb'; +import { + getAnalysisRunLifecycle, + heartbeatAnalysisRun, + isAnalysisRunHeartbeatFresh, + persistAnalysisRunState, + resetAnalysisRunStoreForTests, + type AnalysisRunPersistenceScope, +} from '../analysisRunStore'; + +const originalDbPath = process.env[ENTERPRISE_DB_PATH_ENV]; +let tmpDir: string; + +function restoreEnvValue(key: string, value: string | undefined): void { + if (value === undefined) { + delete process.env[key]; + } else { + process.env[key] = value; + } +} + +function scope(overrides: Partial = {}): AnalysisRunPersistenceScope { + return { + tenantId: 'tenant-a', + workspaceId: 'workspace-a', + userId: 'user-a', + sessionId: 'session-a', + runId: 'run-a', + traceId: 'trace-a', + query: 'why is this trace slow?', + ...overrides, + }; +} + +beforeEach(async () => { + tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'smartperfetto-analysis-runs-')); + process.env[ENTERPRISE_DB_PATH_ENV] = path.join(tmpDir, 'enterprise.sqlite'); +}); + +afterEach(async () => { + resetAnalysisRunStoreForTests(); + restoreEnvValue(ENTERPRISE_DB_PATH_ENV, originalDbPath); + await fs.rm(tmpDir, { recursive: true, force: true }); +}); + +describe('analysis run store', () => { + it('persists run lifecycle and heartbeat with workspace scope', () => { + const runScope = scope(); + + persistAnalysisRunState(runScope, 'running', { now: 1_777_000_000_000 }); + heartbeatAnalysisRun(runScope, 1_777_000_030_000); + + expect(getAnalysisRunLifecycle(runScope, 'run-a')).toEqual(expect.objectContaining({ + id: 'run-a', + status: 'running', + heartbeatAt: 1_777_000_030_000, + updatedAt: 1_777_000_030_000, + completedAt: null, + })); + expect(isAnalysisRunHeartbeatFresh(runScope, 'run-a', 1_777_000_040_000, 60_000)).toBe(true); + expect(isAnalysisRunHeartbeatFresh(runScope, 'run-a', 1_777_000_200_000, 60_000)).toBe(false); + expect(getAnalysisRunLifecycle(scope({ workspaceId: 'workspace-b' }), 'run-a')).toBeNull(); + + const db = openEnterpriseDb(); + try { + expect(db.prepare('SELECT status, heartbeat_at, updated_at FROM analysis_runs WHERE id = ?').get('run-a')).toEqual({ + status: 'running', + heartbeat_at: 1_777_000_030_000, + updated_at: 1_777_000_030_000, + }); + } finally { + db.close(); + } + }); + + it('marks terminal runs stale for cleanup decisions', () => { + const runScope = scope({ runId: 'run-failed' }); + + persistAnalysisRunState(runScope, 'running', { now: 1_777_000_000_000 }); + persistAnalysisRunState(runScope, 'failed', { + now: 1_777_000_010_000, + error: 'cancelled by user', + }); + + expect(getAnalysisRunLifecycle(runScope, 'run-failed')).toEqual(expect.objectContaining({ + status: 'failed', + completedAt: 1_777_000_010_000, + heartbeatAt: 1_777_000_010_000, + errorJson: JSON.stringify({ message: 'cancelled by user' }), + })); + expect(isAnalysisRunHeartbeatFresh(runScope, 'run-failed', 1_777_000_011_000, 60_000)).toBe(false); + }); +}); diff --git a/backend/src/services/__tests__/enterpriseDb.test.ts b/backend/src/services/__tests__/enterpriseDb.test.ts index 6e369e2c..38c3ac8b 100644 --- a/backend/src/services/__tests__/enterpriseDb.test.ts +++ b/backend/src/services/__tests__/enterpriseDb.test.ts @@ -43,7 +43,14 @@ describe('enterprise SQLite WAL database', () => { const rows = db.prepare( 'SELECT version FROM enterprise_schema_migrations ORDER BY version', ).all(); - expect(rows).toEqual([{ version: 1 }, { version: 2 }, { version: 3 }, { version: 4 }, { version: 5 }]); + expect(rows).toEqual([ + { version: 1 }, + { version: 2 }, + { version: 3 }, + { version: 4 }, + { version: 5 }, + { version: 6 }, + ]); } finally { db.close(); } diff --git a/backend/src/services/__tests__/enterpriseSchema.test.ts b/backend/src/services/__tests__/enterpriseSchema.test.ts index 93bc4135..2f2acf1e 100644 --- a/backend/src/services/__tests__/enterpriseSchema.test.ts +++ b/backend/src/services/__tests__/enterpriseSchema.test.ts @@ -197,6 +197,8 @@ describe('enterprise core schema', () => { 'question', 'started_at', 'completed_at', + 'heartbeat_at', + 'updated_at', 'error_json', ]); expectColumns(db!, 'conversation_turns', [ @@ -323,6 +325,7 @@ describe('enterprise core schema', () => { 'idx_analysis_sessions_owner_guard', 'idx_analysis_sessions_tenant_workspace_id_unique', 'idx_analysis_runs_status', + 'idx_analysis_runs_heartbeat', 'idx_analysis_runs_tenant_workspace_id_unique', 'idx_conversation_turns_session', 'idx_conversation_turns_run', @@ -357,7 +360,14 @@ describe('enterprise core schema', () => { const rows = db!.prepare( 'SELECT version FROM enterprise_schema_migrations ORDER BY version', ).all(); - expect(rows).toEqual([{ version: 1 }, { version: 2 }, { version: 3 }, { version: 4 }, { version: 5 }]); + expect(rows).toEqual([ + { version: 1 }, + { version: 2 }, + { version: 3 }, + { version: 4 }, + { version: 5 }, + { version: 6 }, + ]); }); test('enforces the full tenant workspace session run event chain', () => { diff --git a/backend/src/services/agentEventStore.ts b/backend/src/services/agentEventStore.ts index f64b4b1a..5cf6e5a3 100644 --- a/backend/src/services/agentEventStore.ts +++ b/backend/src/services/agentEventStore.ts @@ -160,15 +160,34 @@ export function persistSerializedAgentEvent( if (terminalStatus) { db.prepare(` UPDATE analysis_runs - SET status = ?, completed_at = COALESCE(completed_at, ?) + SET status = ?, + completed_at = COALESCE(completed_at, ?), + heartbeat_at = ?, + updated_at = ? WHERE tenant_id = ? AND workspace_id = ? AND id = ? - `).run(terminalStatus, event.createdAt, scope.tenantId, scope.workspaceId, scope.runId); + `).run( + terminalStatus, + event.createdAt, + event.createdAt, + event.createdAt, + scope.tenantId, + scope.workspaceId, + scope.runId, + ); db.prepare(` UPDATE analysis_sessions SET status = ?, updated_at = ? WHERE tenant_id = ? AND workspace_id = ? AND id = ? `).run(terminalStatus, event.createdAt, scope.tenantId, scope.workspaceId, scope.sessionId); } else { + db.prepare(` + UPDATE analysis_runs + SET heartbeat_at = ?, updated_at = ? + WHERE tenant_id = ? + AND workspace_id = ? + AND id = ? + AND status IN ('pending', 'running', 'awaiting_user') + `).run(event.createdAt, event.createdAt, scope.tenantId, scope.workspaceId, scope.runId); db.prepare(` UPDATE analysis_sessions SET updated_at = ? diff --git a/backend/src/services/analysisRunStore.ts b/backend/src/services/analysisRunStore.ts new file mode 100644 index 00000000..0397e013 --- /dev/null +++ b/backend/src/services/analysisRunStore.ts @@ -0,0 +1,263 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later +// Copyright (C) 2024-2026 Gracker (Chris) +// This file is part of SmartPerfetto. See LICENSE for details. + +import type Database from 'better-sqlite3'; +import { openEnterpriseDb, resolveEnterpriseDbPath } from './enterpriseDb'; +import type { EnterpriseRepositoryScope } from './enterpriseRepository'; + +export type PersistedAnalysisRunStatus = + | 'pending' + | 'running' + | 'awaiting_user' + | 'completed' + | 'failed' + | 'cancelled'; + +export interface AnalysisRunPersistenceScope extends EnterpriseRepositoryScope { + sessionId: string; + runId: string; + traceId: string; + query?: string; + mode?: string; +} + +export interface AnalysisRunLifecycle { + id: string; + status: PersistedAnalysisRunStatus | string; + startedAt: number; + completedAt: number | null; + heartbeatAt: number | null; + updatedAt: number | null; + errorJson: string | null; +} + +interface AnalysisRunRow extends Record { + id: string; + status: string; + started_at: number; + completed_at: number | null; + heartbeat_at: number | null; + updated_at: number | null; + error_json: string | null; +} + +let singletonDb: Database.Database | null = null; +let singletonDbPath: string | null = null; + +function getAnalysisRunDb(): Database.Database { + const dbPath = resolveEnterpriseDbPath(); + if (!singletonDb || singletonDbPath !== dbPath) { + singletonDb?.close(); + singletonDb = openEnterpriseDb(dbPath); + singletonDbPath = dbPath; + } + return singletonDb; +} + +export function resetAnalysisRunStoreForTests(): void { + singletonDb?.close(); + singletonDb = null; + singletonDbPath = null; +} + +function isTerminalStatus(status: string): boolean { + return status === 'completed' || status === 'failed' || status === 'cancelled'; +} + +function ensureAnalysisRunGraph( + db: Database.Database, + scope: AnalysisRunPersistenceScope, + now: number, +): void { + db.prepare(` + INSERT OR IGNORE INTO organizations (id, name, status, plan, created_at, updated_at) + VALUES (?, ?, 'active', 'enterprise', ?, ?) + `).run(scope.tenantId, scope.tenantId, now, now); + + db.prepare(` + INSERT OR IGNORE INTO workspaces (id, tenant_id, name, created_at, updated_at) + VALUES (?, ?, ?, ?, ?) + `).run(scope.workspaceId, scope.tenantId, scope.workspaceId, now, now); + + if (scope.userId) { + db.prepare(` + INSERT INTO users (id, tenant_id, email, display_name, idp_subject, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + email = excluded.email, + display_name = excluded.display_name, + updated_at = excluded.updated_at + `).run( + scope.userId, + scope.tenantId, + `${scope.userId}@analysis-run.local`, + scope.userId, + `analysis-run:${scope.userId}`, + now, + now, + ); + } + + db.prepare(` + INSERT OR IGNORE INTO trace_assets + (id, tenant_id, workspace_id, owner_user_id, local_path, size_bytes, status, metadata_json, created_at) + VALUES + (?, ?, ?, ?, ?, 0, 'metadata_only', ?, ?) + `).run( + scope.traceId, + scope.tenantId, + scope.workspaceId, + scope.userId ?? null, + `metadata-only:${scope.traceId}`, + JSON.stringify({ source: 'analysis_run', sessionId: scope.sessionId, runId: scope.runId }), + now, + ); + + db.prepare(` + INSERT OR IGNORE INTO analysis_sessions + (id, tenant_id, workspace_id, trace_id, created_by, title, visibility, status, created_at, updated_at) + VALUES + (?, ?, ?, ?, ?, ?, 'private', 'running', ?, ?) + `).run( + scope.sessionId, + scope.tenantId, + scope.workspaceId, + scope.traceId, + scope.userId ?? null, + `Agent session ${scope.sessionId}`, + now, + now, + ); + + db.prepare(` + INSERT OR IGNORE INTO analysis_runs + (id, tenant_id, workspace_id, session_id, mode, status, question, started_at, completed_at, heartbeat_at, updated_at) + VALUES + (?, ?, ?, ?, ?, 'running', ?, ?, NULL, ?, ?) + `).run( + scope.runId, + scope.tenantId, + scope.workspaceId, + scope.sessionId, + scope.mode ?? 'agent', + scope.query ?? '', + now, + now, + now, + ); +} + +export function persistAnalysisRunState( + scope: AnalysisRunPersistenceScope, + status: PersistedAnalysisRunStatus, + options: { now?: number; error?: string } = {}, +): void { + const now = options.now ?? Date.now(); + const terminal = isTerminalStatus(status); + const db = getAnalysisRunDb(); + const write = db.transaction(() => { + ensureAnalysisRunGraph(db, scope, now); + db.prepare(` + UPDATE analysis_runs + SET status = ?, + question = CASE WHEN ? <> '' THEN ? ELSE question END, + heartbeat_at = ?, + updated_at = ?, + completed_at = CASE WHEN ? THEN COALESCE(completed_at, ?) ELSE completed_at END, + error_json = ? + WHERE tenant_id = ? + AND workspace_id = ? + AND id = ? + `).run( + status, + scope.query ?? '', + scope.query ?? '', + now, + now, + terminal ? 1 : 0, + now, + options.error ? JSON.stringify({ message: options.error }) : null, + scope.tenantId, + scope.workspaceId, + scope.runId, + ); + db.prepare(` + UPDATE analysis_sessions + SET status = ?, updated_at = ? + WHERE tenant_id = ? + AND workspace_id = ? + AND id = ? + `).run( + terminal ? status : 'running', + now, + scope.tenantId, + scope.workspaceId, + scope.sessionId, + ); + }); + write(); +} + +export function heartbeatAnalysisRun( + scope: AnalysisRunPersistenceScope, + now = Date.now(), +): void { + const db = getAnalysisRunDb(); + const write = db.transaction(() => { + ensureAnalysisRunGraph(db, scope, now); + db.prepare(` + UPDATE analysis_runs + SET heartbeat_at = ?, updated_at = ? + WHERE tenant_id = ? + AND workspace_id = ? + AND id = ? + AND status IN ('pending', 'running', 'awaiting_user') + `).run(now, now, scope.tenantId, scope.workspaceId, scope.runId); + db.prepare(` + UPDATE analysis_sessions + SET updated_at = ? + WHERE tenant_id = ? + AND workspace_id = ? + AND id = ? + `).run(now, scope.tenantId, scope.workspaceId, scope.sessionId); + }); + write(); +} + +export function getAnalysisRunLifecycle( + scope: EnterpriseRepositoryScope, + runId: string, +): AnalysisRunLifecycle | null { + const db = getAnalysisRunDb(); + const row = db.prepare(` + SELECT id, status, started_at, completed_at, heartbeat_at, updated_at, error_json + FROM analysis_runs + WHERE tenant_id = ? + AND workspace_id = ? + AND id = ? + LIMIT 1 + `).get(scope.tenantId, scope.workspaceId, runId); + if (!row) return null; + return { + id: row.id, + status: row.status, + startedAt: row.started_at, + completedAt: row.completed_at, + heartbeatAt: row.heartbeat_at, + updatedAt: row.updated_at, + errorJson: row.error_json, + }; +} + +export function isAnalysisRunHeartbeatFresh( + scope: EnterpriseRepositoryScope, + runId: string, + now: number, + maxStaleMs: number, +): boolean { + const lifecycle = getAnalysisRunLifecycle(scope, runId); + if (!lifecycle || isTerminalStatus(lifecycle.status)) return false; + const heartbeatAt = lifecycle.heartbeatAt ?? lifecycle.updatedAt ?? lifecycle.startedAt; + return now - heartbeatAt <= maxStaleMs; +} diff --git a/backend/src/services/enterpriseSchema.ts b/backend/src/services/enterpriseSchema.ts index 13053e95..5de93e3e 100644 --- a/backend/src/services/enterpriseSchema.ts +++ b/backend/src/services/enterpriseSchema.ts @@ -507,6 +507,17 @@ const MIGRATIONS: MigrationStep[] = [ `); }, }, + { + version: 6, + up: (db) => { + addColumnIfMissing(db, 'analysis_runs', 'heartbeat_at', 'INTEGER'); + addColumnIfMissing(db, 'analysis_runs', 'updated_at', 'INTEGER'); + db.exec(` + CREATE INDEX IF NOT EXISTS idx_analysis_runs_heartbeat + ON analysis_runs(tenant_id, workspace_id, status, heartbeat_at); + `); + }, + }, ]; export function applyEnterpriseMinimalSchema(db: Database.Database): void { diff --git a/docs/features/enterprise-multi-tenant/baseline.md b/docs/features/enterprise-multi-tenant/baseline.md index 22689a8d..b29c6b1c 100644 --- a/docs/features/enterprise-multi-tenant/baseline.md +++ b/docs/features/enterprise-multi-tenant/baseline.md @@ -62,3 +62,42 @@ Missing §0.4.3 required matrix cells: - memory: 100MB, 500MB, 1GB - heapprofd: 100MB, 500MB, 1GB - vendor: 100MB, 500MB, 1GB + +## 2026-05-09 RSS Benchmark Local Startup Trace Pass + +Branch: `feature/enterprise-multi-tenant-agent-events` + +Command: + +```bash +PATH="$HOME/.nvm/versions/node/v24.15.0/bin:$PATH" \ + TP_PORT_MIN=9840 TP_PORT_MAX=9859 \ + npm run benchmark:trace-rss -- \ + --trace startup=/Users/chris/Code/SmartPerfetto/Trace/StartUp_com.snapchat.android_2026-03-26_02_01_37_652_3347ms_sn140292552S000618_7.trace \ + --trace startup=/Users/chris/Code/SmartPerfetto/Trace/StartUp_com.snapchat.android_2026-03-26_03_48_03_288_982ms_sn140292552S000186_8.trace \ + --trace startup=/Users/chris/Code/SmartPerfetto/Trace/StartUp_com.android.chrome_2026-03-26_01_41_05_555_1353ms_sn140292552S000186_9.trace \ + --output test-output/trace-processor-rss-benchmark-startup-local.json \ + --markdown test-output/trace-processor-rss-benchmark-startup-local.md +``` + +Result: PASS for 3 real local startup traces in the 100MB bucket. §0.4.3 +remains incomplete because the required matrix still misses 17 of 18 cells. + +| Trace | Scene | Size bucket | File size | Init | Load peak | Query peak | Query delta | Status | +| --- | --- | --- | ---: | ---: | ---: | ---: | ---: | --- | +| `StartUp_com.snapchat.android_2026-03-26_02_01_37_652_3347ms_sn140292552S000618_7.trace` | startup | 100MB | 198.5 MiB | 5571ms | 3038.0 MiB | 3341.9 MiB | 303.8 MiB | PASS | +| `StartUp_com.snapchat.android_2026-03-26_03_48_03_288_982ms_sn140292552S000186_8.trace` | startup | 100MB | 198.3 MiB | 5256ms | 2992.5 MiB | 3279.8 MiB | 287.2 MiB | PASS | +| `StartUp_com.android.chrome_2026-03-26_01_41_05_555_1353ms_sn140292552S000186_9.trace` | startup | 100MB | 183.7 MiB | 3191ms | 1500.5 MiB | 1521.2 MiB | 20.7 MiB | PASS | + +Observed §0.4.3 matrix cells: + +- startup: 100MB + +Missing §0.4.3 required matrix cells: + +- scroll: 100MB, 500MB, 1GB +- startup: 500MB, 1GB +- ANR: 100MB, 500MB, 1GB +- memory: 100MB, 500MB, 1GB +- heapprofd: 100MB, 500MB, 1GB +- vendor: 100MB, 500MB, 1GB diff --git a/docs/features/enterprise-multi-tenant/rss-benchmark.md b/docs/features/enterprise-multi-tenant/rss-benchmark.md index 341ad077..a6690c82 100644 --- a/docs/features/enterprise-multi-tenant/rss-benchmark.md +++ b/docs/features/enterprise-multi-tenant/rss-benchmark.md @@ -85,3 +85,10 @@ As of 2026-05-08, the repository checkout only has small local fixtures: These smoke traces can validate the benchmark harness, but they cannot complete §0.4.3. Do not mark README §0.4.3 complete until the required 18 scene/size cells above are covered by real benchmark output. + +Additional local audit on 2026-05-09 found three real startup traces under +`/Users/chris/Code/SmartPerfetto/Trace/`, each in the 100MB bucket. They were +benchmarked and recorded in `baseline.md`, covering only `startup:100MB`. +The required matrix still lacks the remaining 17 cells, so §0.4.3 remains +blocked on collecting representative scroll, ANR, memory, heapprofd, vendor, +500MB, and 1GB traces.