From 112f1d18613fd0ae3a5f1bbbf2be2c1a5b72e83b Mon Sep 17 00:00:00 2001 From: Chris Date: Fri, 8 May 2026 22:12:11 +0800 Subject: [PATCH] feat(enterprise): add trace processor lease holders --- backend/package.json | 2 +- .../routes/__tests__/agentRoutesRbac.test.ts | 108 +++- .../enterpriseTraceMetadataRoutes.test.ts | 31 + backend/src/routes/agentRoutes.ts | 120 +++- backend/src/routes/simpleTraceRoutes.ts | 108 +++- .../verifyEnterpriseMultiTenantWindows.ts | 13 +- .../services/__tests__/enterpriseDb.test.ts | 2 +- .../__tests__/enterpriseSchema.test.ts | 5 +- .../traceProcessorLeaseStore.test.ts | 205 ++++++ backend/src/services/enterpriseSchema.ts | 31 + .../src/services/traceProcessorLeaseStore.ts | 598 ++++++++++++++++++ .../enterprise-multi-tenant/README.md | 4 +- 12 files changed, 1207 insertions(+), 20 deletions(-) create mode 100644 backend/src/services/__tests__/traceProcessorLeaseStore.test.ts create mode 100644 backend/src/services/traceProcessorLeaseStore.ts diff --git a/backend/package.json b/backend/package.json index f1c95456..a12ee4b3 100644 --- a/backend/package.json +++ b/backend/package.json @@ -41,7 +41,7 @@ "prepack": "npm run build", "typecheck": "tsc --noEmit", "test": "jest", - "test:core": "jest --runInBand --forceExit src/agent/communication/__tests__/agentMessageBus.test.ts src/agent/core/executors/__tests__/strategyExecutor.test.ts src/agent/core/executors/__tests__/hypothesisExecutor.test.ts src/agent/context/__tests__/enhancedSessionContext.test.ts src/tests/adbTools.test.ts src/services/__tests__/sessionLogger.test.ts src/services/__tests__/traceAnalysisSkillConfig.test.ts src/agent/agents/domain/__tests__/registry.test.ts src/agentv3/__tests__/sqlIncludeInjector.test.ts src/agentv3/__tests__/analysisPatternMemory.test.ts src/agentv3/__tests__/claudeRuntimeRuntimeSnapshots.test.ts src/middleware/__tests__/auth.test.ts src/services/__tests__/rbac.test.ts src/routes/__tests__/agentRoutesRbac.test.ts src/routes/__tests__/ownerGuardRoutes.test.ts src/routes/__tests__/requestContextRouteCoverage.test.ts src/middleware/__tests__/legacyApiCompatibility.test.ts src/services/__tests__/enterpriseDb.test.ts src/services/__tests__/enterpriseSchema.test.ts src/services/__tests__/enterpriseRepository.test.ts src/services/__tests__/processRss.test.ts src/scripts/__tests__/benchmarkTraceProcessorRss.test.ts src/services/__tests__/enterpriseKnowledgeScope.test.ts src/services/__tests__/enterpriseMigration.test.ts src/services/__tests__/runtimeSnapshotStore.test.ts src/services/providerManager/__tests__/localSecretStore.test.ts src/services/providerManager/__tests__/enterpriseProviderStore.test.ts src/routes/__tests__/enterpriseTraceMetadataRoutes.test.ts src/routes/__tests__/enterpriseReportRoutes.test.ts src/routes/__tests__/enterpriseRestartPersistence.test.ts", + "test:core": "jest --runInBand --forceExit src/agent/communication/__tests__/agentMessageBus.test.ts src/agent/core/executors/__tests__/strategyExecutor.test.ts src/agent/core/executors/__tests__/hypothesisExecutor.test.ts src/agent/context/__tests__/enhancedSessionContext.test.ts src/tests/adbTools.test.ts src/services/__tests__/sessionLogger.test.ts src/services/__tests__/traceAnalysisSkillConfig.test.ts src/agent/agents/domain/__tests__/registry.test.ts src/agentv3/__tests__/sqlIncludeInjector.test.ts src/agentv3/__tests__/analysisPatternMemory.test.ts src/agentv3/__tests__/claudeRuntimeRuntimeSnapshots.test.ts src/middleware/__tests__/auth.test.ts src/services/__tests__/rbac.test.ts src/routes/__tests__/agentRoutesRbac.test.ts src/routes/__tests__/ownerGuardRoutes.test.ts src/routes/__tests__/requestContextRouteCoverage.test.ts src/middleware/__tests__/legacyApiCompatibility.test.ts src/services/__tests__/enterpriseDb.test.ts src/services/__tests__/enterpriseSchema.test.ts src/services/__tests__/enterpriseRepository.test.ts src/services/__tests__/processRss.test.ts src/services/__tests__/traceProcessorLeaseStore.test.ts src/scripts/__tests__/benchmarkTraceProcessorRss.test.ts src/services/__tests__/enterpriseKnowledgeScope.test.ts src/services/__tests__/enterpriseMigration.test.ts src/services/__tests__/runtimeSnapshotStore.test.ts src/services/providerManager/__tests__/localSecretStore.test.ts src/services/providerManager/__tests__/enterpriseProviderStore.test.ts src/routes/__tests__/enterpriseTraceMetadataRoutes.test.ts src/routes/__tests__/enterpriseReportRoutes.test.ts src/routes/__tests__/enterpriseRestartPersistence.test.ts", "test:watch": "jest --watch", "test:coverage": "jest --coverage", "test:unit": "jest --testPathPatterns=src/tests", diff --git a/backend/src/routes/__tests__/agentRoutesRbac.test.ts b/backend/src/routes/__tests__/agentRoutesRbac.test.ts index 1c4c4eb3..4f4d22bf 100644 --- a/backend/src/routes/__tests__/agentRoutesRbac.test.ts +++ b/backend/src/routes/__tests__/agentRoutesRbac.test.ts @@ -2,13 +2,28 @@ // Copyright (C) 2024-2026 Gracker (Chris) // This file is part of SmartPerfetto. See LICENSE for details. -import { afterEach, describe, expect, it } from '@jest/globals'; +import { afterEach, describe, expect, it, jest } from '@jest/globals'; import express from 'express'; +import fs from 'fs/promises'; +import os from 'os'; +import path from 'path'; import request from 'supertest'; +import { ENTERPRISE_FEATURE_FLAG_ENV } from '../../config'; +import { ENTERPRISE_DB_PATH_ENV } from '../../services/enterpriseDb'; +import { ENTERPRISE_DATA_DIR_ENV, writeTraceMetadata } from '../../services/traceMetadataStore'; +import { + getTraceProcessorLeaseStore, + setTraceProcessorLeaseStoreForTests, +} from '../../services/traceProcessorLeaseStore'; +import { setTraceProcessorServiceForTests } from '../../services/traceProcessorService'; import agentRoutes from '../agentRoutes'; const originalApiKey = process.env.SMARTPERFETTO_API_KEY; const originalSsoTrustedHeaders = process.env.SMARTPERFETTO_SSO_TRUSTED_HEADERS; +const originalEnterprise = process.env[ENTERPRISE_FEATURE_FLAG_ENV]; +const originalEnterpriseDbPath = process.env[ENTERPRISE_DB_PATH_ENV]; +const originalEnterpriseDataDir = process.env[ENTERPRISE_DATA_DIR_ENV]; +const originalUploadDir = process.env.UPLOAD_DIR; function makeApp(): express.Express { const app = express(); @@ -27,17 +42,38 @@ function viewerHeaders(req: request.Test): request.Test { .set('X-SmartPerfetto-SSO-Scopes', 'trace:read,report:read'); } -afterEach(() => { +function analystHeaders(req: request.Test): request.Test { + return req + .set('X-SmartPerfetto-SSO-User-Id', 'analyst-user') + .set('X-SmartPerfetto-SSO-Email', 'analyst@example.test') + .set('X-SmartPerfetto-SSO-Tenant-Id', 'tenant-a') + .set('X-SmartPerfetto-SSO-Workspace-Id', 'workspace-a') + .set('X-SmartPerfetto-SSO-Roles', 'analyst') + .set('X-SmartPerfetto-SSO-Scopes', 'trace:read,trace:write,agent:run,report:read'); +} + +function restoreEnvValue(key: string, value: string | undefined): void { + if (value === undefined) { + delete process.env[key]; + } else { + process.env[key] = value; + } +} + +afterEach(async () => { + jest.restoreAllMocks(); + setTraceProcessorServiceForTests(null); + setTraceProcessorLeaseStoreForTests(null); if (originalApiKey === undefined) { delete process.env.SMARTPERFETTO_API_KEY; } else { process.env.SMARTPERFETTO_API_KEY = originalApiKey; } - if (originalSsoTrustedHeaders === undefined) { - delete process.env.SMARTPERFETTO_SSO_TRUSTED_HEADERS; - } else { - process.env.SMARTPERFETTO_SSO_TRUSTED_HEADERS = originalSsoTrustedHeaders; - } + restoreEnvValue('SMARTPERFETTO_SSO_TRUSTED_HEADERS', originalSsoTrustedHeaders); + restoreEnvValue(ENTERPRISE_FEATURE_FLAG_ENV, originalEnterprise); + restoreEnvValue(ENTERPRISE_DB_PATH_ENV, originalEnterpriseDbPath); + restoreEnvValue(ENTERPRISE_DATA_DIR_ENV, originalEnterpriseDataDir); + restoreEnvValue('UPLOAD_DIR', originalUploadDir); }); describe('agent route RBAC', () => { @@ -52,4 +88,62 @@ describe('agent route RBAC', () => { expect(res.body.error).toBe('Forbidden'); expect(res.body.details).toContain('agent:run'); }); + + it('rejects analyze when the scoped trace processor lease is draining', async () => { + const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'smartperfetto-agent-lease-')); + let leaseStore: ReturnType | null = null; + try { + const traceId = 'trace-draining'; + const tracePath = path.join(tmpDir, `${traceId}.trace`); + await fs.writeFile(tracePath, 'trace bytes'); + delete process.env.SMARTPERFETTO_API_KEY; + process.env.SMARTPERFETTO_SSO_TRUSTED_HEADERS = 'true'; + process.env[ENTERPRISE_FEATURE_FLAG_ENV] = 'true'; + process.env[ENTERPRISE_DB_PATH_ENV] = path.join(tmpDir, 'enterprise.sqlite'); + process.env[ENTERPRISE_DATA_DIR_ENV] = path.join(tmpDir, 'data'); + process.env.UPLOAD_DIR = path.join(tmpDir, 'uploads'); + + await writeTraceMetadata({ + id: traceId, + filename: `${traceId}.trace`, + size: 11, + uploadedAt: new Date().toISOString(), + status: 'ready', + path: tracePath, + tenantId: 'tenant-a', + workspaceId: 'workspace-a', + userId: 'analyst-user', + }); + setTraceProcessorServiceForTests({ + getOrLoadTrace: jest.fn(async () => ({ + id: traceId, + filename: `${traceId}.trace`, + size: 11, + filePath: tracePath, + uploadTime: new Date(), + status: 'ready', + })), + } as any); + + const scope = { tenantId: 'tenant-a', workspaceId: 'workspace-a', userId: 'analyst-user' }; + leaseStore = getTraceProcessorLeaseStore(); + const lease = leaseStore.acquireHolder(scope, traceId, { + holderType: 'manual_register', + holderRef: 'port:9100', + }); + leaseStore.markStarting(scope, lease.id); + leaseStore.markReady(scope, lease.id); + leaseStore.beginDraining(scope, lease.id); + + const res = await analystHeaders(request(makeApp()).post('/api/agent/v1/analyze')) + .send({ traceId, query: 'analyze this trace' }); + + expect(res.status).toBe(409); + expect(res.body.code).toBe('TRACE_PROCESSOR_LEASE_UNAVAILABLE'); + } finally { + leaseStore?.close(); + setTraceProcessorLeaseStoreForTests(null); + await fs.rm(tmpDir, { recursive: true, force: true }); + } + }); }); diff --git a/backend/src/routes/__tests__/enterpriseTraceMetadataRoutes.test.ts b/backend/src/routes/__tests__/enterpriseTraceMetadataRoutes.test.ts index da1c6164..d520a698 100644 --- a/backend/src/routes/__tests__/enterpriseTraceMetadataRoutes.test.ts +++ b/backend/src/routes/__tests__/enterpriseTraceMetadataRoutes.test.ts @@ -83,6 +83,31 @@ function readTraceAsset(traceId: string): TraceAssetRow | null { } } +function readTraceProcessorLeases(traceId: string): Array<{ + id: string; + state: string; + holder_type: string; + holder_ref: string; +}> { + const db = openEnterpriseDb(dbPath); + try { + return db.prepare(` + SELECT l.id, l.state, h.holder_type, h.holder_ref + FROM trace_processor_leases l + JOIN trace_processor_holders h ON h.lease_id = l.id + WHERE l.trace_id = ? + ORDER BY h.holder_type + `).all(traceId); + } finally { + db.close(); + } +} + beforeEach(async () => { tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'smartperfetto-enterprise-trace-routes-')); dbPath = path.join(tmpDir, 'enterprise.sqlite'); @@ -158,6 +183,12 @@ describe('enterprise trace metadata routes', () => { expect(JSON.parse(row!.metadata_json)).toEqual(expect.objectContaining({ filename: 'fixture.trace', })); + expect(readTraceProcessorLeases(traceId)).toEqual([ + expect.objectContaining({ + state: 'active', + holder_type: 'frontend_http_rpc', + }), + ]); const listRes = await ssoHeaders(request(app).get('/api/traces')); expect(listRes.status).toBe(200); diff --git a/backend/src/routes/agentRoutes.ts b/backend/src/routes/agentRoutes.ts index fbf3fbe2..31bd32b1 100644 --- a/backend/src/routes/agentRoutes.ts +++ b/backend/src/routes/agentRoutes.ts @@ -57,7 +57,11 @@ import { FileSystemSceneReportStore } from '../services/sceneReport/sceneReportS import { SceneReportMemoryCache } from '../services/sceneReport/sceneReportMemoryCache'; import { computeTraceContentHash } from '../agent/scene/traceHash'; import { probeTraceDuration } from '../agent/scene/sceneTraceDurationProbe'; -import { sceneStoryConfig } from '../config'; +import { resolveFeatureConfig, sceneStoryConfig } from '../config'; +import { + getTraceProcessorLeaseStore, + type TraceProcessorLeaseRecord, +} from '../services/traceProcessorLeaseStore'; import { registerAgentLogsRoutes } from './agentLogsRoutes'; import { registerAgentQuickSceneRoutes } from './agentQuickSceneRoutes'; import { registerAgentReportRoutes } from './agentReportRoutes'; @@ -142,6 +146,37 @@ function buildRunId(sessionId: string, sequence: number): string { return `run-${sessionId}-${sequence}-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; } +function enterpriseLeasesEnabled(): boolean { + return resolveFeatureConfig().enterprise; +} + +function leaseScopeFromRequestContext(context: RequestContext) { + return { + tenantId: context.tenantId, + workspaceId: context.workspaceId, + userId: context.userId, + }; +} + +function leaseScopeFromSession(session: AnalysisSession) { + if (!session.tenantId || !session.workspaceId) return null; + return { + tenantId: session.tenantId, + workspaceId: session.workspaceId, + userId: session.userId, + }; +} + +function markLeaseReadyIfNew( + lease: TraceProcessorLeaseRecord, + scope: { tenantId: string; workspaceId: string; userId?: string }, +): TraceProcessorLeaseRecord { + if (lease.state !== 'pending') return lease; + const store = getTraceProcessorLeaseStore(); + const starting = store.markStarting(scope, lease.id); + return store.markReady(scope, starting.id); +} + function buildSessionObservability( session: AnalysisSession ): { runId: string; requestId: string; runSequence: number; status: string } | undefined { @@ -931,6 +966,38 @@ async function handleAnalyzeRequest( runSequence: runContext.sequence, }); + let agentRunLease: TraceProcessorLeaseRecord | null = null; + if (enterpriseLeasesEnabled()) { + try { + const scope = leaseScopeFromRequestContext(requestContext); + agentRunLease = getTraceProcessorLeaseStore().acquireHolder( + scope, + traceId, + { + holderType: 'agent_run', + holderRef: runContext.runId, + runId: runContext.runId, + sessionId, + metadata: { + requestId: runContext.requestId, + runSequence: runContext.sequence, + }, + }, + ); + agentRunLease = markLeaseReadyIfNew(agentRunLease, scope); + } catch (leaseError: any) { + sessionForRun.status = 'failed'; + sessionForRun.error = leaseError.message; + markSessionRunStatus(sessionForRun, 'failed', leaseError.message); + res.status(409).json({ + success: false, + code: 'TRACE_PROCESSOR_LEASE_UNAVAILABLE', + error: leaseError.message, + }); + return; + } + } + // Validate traceContext — must be array of objects with columns/rows const traceContext = Array.isArray(rawTraceContext) ? rawTraceContext.filter( @@ -960,6 +1027,18 @@ async function handleAnalyzeRequest( timestamp: Date.now(), }); } + }).finally(() => { + if (!agentRunLease) return; + try { + getTraceProcessorLeaseStore().releaseHolder( + leaseScopeFromRequestContext(requestContext), + agentRunLease.id, + 'agent_run', + runContext.runId, + ); + } catch (releaseError: any) { + console.warn(`[AgentRoutes] Failed to release agent_run lease ${agentRunLease.id}: ${releaseError.message}`); + } }); res.json({ @@ -974,6 +1053,8 @@ async function handleAnalyzeRequest( providerSnapshotChanged: preparedSession?.providerSnapshotChanged || undefined, architecture: 'agent-driven', runId: runContext.runId, + leaseId: agentRunLease?.id, + leaseState: agentRunLease?.state, requestId: runContext.requestId, runSequence: runContext.sequence, observability: { @@ -4043,7 +4124,27 @@ function sendAgentDrivenResult(res: express.Response, session: AnalysisSession) // Generate HTML report let reportUrl: string | undefined; let reportError: string | undefined; + const reportId = `agent-report-${session.sessionId}-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + let reportLease: TraceProcessorLeaseRecord | null = null; try { + if (enterpriseLeasesEnabled()) { + const scope = leaseScopeFromSession(session); + if (scope) { + reportLease = getTraceProcessorLeaseStore().acquireHolder( + scope, + session.traceId, + { + holderType: 'report_generation', + holderRef: reportId, + reportId, + sessionId: session.sessionId, + runId: session.lastRun?.runId || session.activeRun?.runId, + }, + ); + reportLease = markLeaseReadyIfNew(reportLease, scope); + } + } + const generator = getHTMLReportGenerator(); // Report assembly (cumulative findings dedup, empty-conclusion fallback, // snapshot-first analysisNotes/Plan/Flags) lives in the shared builder so @@ -4073,7 +4174,6 @@ function sendAgentDrivenResult(res: express.Response, session: AnalysisSession) const html = generator.generateAgentDrivenHTML(reportData); // Store report - const reportId = `agent-report-${session.sessionId}-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; persistReport(reportId, { html, generatedAt: Date.now(), @@ -4097,6 +4197,22 @@ function sendAgentDrivenResult(res: express.Response, session: AnalysisSession) resultConfidence: result?.confidence, resultRounds: result?.rounds, }); + } finally { + if (reportLease) { + const scope = leaseScopeFromSession(session); + if (scope) { + try { + getTraceProcessorLeaseStore().releaseHolder( + scope, + reportLease.id, + 'report_generation', + reportId, + ); + } catch (releaseError: any) { + console.warn(`[AgentRoutes] Failed to release report_generation lease ${reportLease.id}: ${releaseError.message}`); + } + } + } } // Send analysis_completed event with full result. Keep it replayable so a diff --git a/backend/src/routes/simpleTraceRoutes.ts b/backend/src/routes/simpleTraceRoutes.ts index 990c1e0f..0734e461 100644 --- a/backend/src/routes/simpleTraceRoutes.ts +++ b/backend/src/routes/simpleTraceRoutes.ts @@ -12,10 +12,15 @@ import os from 'os'; import { Readable, Transform } from 'stream'; import { pipeline } from 'stream/promises'; import { v4 as uuidv4 } from 'uuid'; +import { resolveFeatureConfig } from '../config'; import { attachRequestContext, requireRequestContext, type RequestContext } from '../middleware/auth'; import { getTraceProcessorService } from '../services/traceProcessorService'; import { getPortPool } from '../services/portPool'; import { TraceProcessorFactory } from '../services/workingTraceProcessor'; +import { + getTraceProcessorLeaseStore, + type TraceProcessorLeaseRecord, +} from '../services/traceProcessorLeaseStore'; import { buildTraceOwnerMetadata, deleteTraceMetadata, @@ -76,6 +81,73 @@ function requireTracePermission(permission: 'trace:read' | 'trace:write', detail }; } +function enterpriseLeasesEnabled(): boolean { + return resolveFeatureConfig().enterprise; +} + +function leaseScopeFromContext(context: RequestContext) { + return { + tenantId: context.tenantId, + workspaceId: context.workspaceId, + userId: context.userId, + }; +} + +function markLeaseReadyIfNew( + lease: TraceProcessorLeaseRecord, + context: RequestContext, +): TraceProcessorLeaseRecord { + if (lease.state !== 'pending') return lease; + const store = getTraceProcessorLeaseStore(); + const scope = leaseScopeFromContext(context); + const starting = store.markStarting(scope, lease.id); + return store.markReady(scope, starting.id); +} + +function acquireFrontendTraceLease( + context: RequestContext, + traceId: string, + sessionId?: string, +): TraceProcessorLeaseRecord | null { + if (!enterpriseLeasesEnabled()) return null; + const holderRef = context.windowId || sessionId || context.requestId || context.userId; + const lease = getTraceProcessorLeaseStore().acquireHolder( + leaseScopeFromContext(context), + traceId, + { + holderType: 'frontend_http_rpc', + holderRef, + windowId: context.windowId, + sessionId, + metadata: { + requestId: context.requestId, + }, + }, + ); + return markLeaseReadyIfNew(lease, context); +} + +function acquireManualRegisterLease( + context: RequestContext, + traceId: string, + port: number, +): TraceProcessorLeaseRecord | null { + if (!enterpriseLeasesEnabled()) return null; + const lease = getTraceProcessorLeaseStore().acquireHolder( + leaseScopeFromContext(context), + traceId, + { + holderType: 'manual_register', + holderRef: `port:${port}`, + metadata: { + port, + requestId: context.requestId, + }, + }, + ); + return markLeaseReadyIfNew(lease, context); +} + async function finalizeTraceUpload( traceId: string, filename: string, @@ -111,7 +183,9 @@ async function finalizeTraceUpload( } } - return tps?.getTraceWithPort(traceId); + const traceWithPort = tps?.getTraceWithPort(traceId); + const lease = acquireFrontendTraceLease(context, traceId); + return lease ? { ...(traceWithPort ?? {}), leaseId: lease.id, leaseState: lease.state } : traceWithPort; } function getFilenameFromUrl(rawUrl: string, fallback = 'trace.perfetto'): string { @@ -433,6 +507,10 @@ router.get('/stats', async (req, res) => { const traceService = getTraceProcessorService(); const traces = traceService.getAllTraces().filter(t => ownedTraceIds.has(t.id)); const allocations = portPoolStats.allocations.filter(a => ownedTraceIds.has(a.traceId)); + const leases = enterpriseLeasesEnabled() + ? getTraceProcessorLeaseStore().listLeases(leaseScopeFromContext(context)) + .filter(lease => ownedTraceIds.has(lease.traceId)) + : []; res.json({ success: true, @@ -450,6 +528,25 @@ router.get('/stats', async (req, res) => { processors: { count: processorStats.traceIds.filter(traceId => ownedTraceIds.has(traceId)).length, traceIds: processorStats.traceIds.filter(traceId => ownedTraceIds.has(traceId)), + items: processorStats.processors.filter(processor => ownedTraceIds.has(processor.traceId)), + }, + leases: { + count: leases.length, + items: leases.map(lease => ({ + id: lease.id, + traceId: lease.traceId, + mode: lease.mode, + state: lease.state, + rssBytes: lease.rssBytes, + holderCount: lease.holderCount, + holders: lease.holders.map(holder => ({ + holderType: holder.holderType, + holderRef: holder.holderRef, + windowId: holder.windowId, + heartbeatAt: holder.heartbeatAt, + expiresAt: holder.expiresAt, + })), + })), }, traces: { count: traces.length, @@ -548,10 +645,14 @@ router.post('/register-rpc', async (req, res) => { ...buildTraceOwnerMetadata(context), }); + const lease = acquireManualRegisterLease(context, traceId, Number(port)); + res.json({ success: true, traceId, port, + leaseId: lease?.id, + leaseState: lease?.state, message: `External RPC connection registered successfully`, }); @@ -582,6 +683,9 @@ router.get('/:id', async (req, res) => { const tps = getTraceProcessorService(); const traceInfo = tps?.getTraceWithPort(id); + const sessionId = typeof req.query.sessionId === 'string' ? req.query.sessionId : undefined; + const lease = traceInfo?.port ? acquireFrontendTraceLease(context, id, sessionId) : null; + res.json({ success: true, trace: { @@ -589,6 +693,8 @@ router.get('/:id', async (req, res) => { processorStatus: traceInfo?.status || 'unknown', hasProcessor: !!traceInfo?.processor, port: traceInfo?.port ?? metadata.port, + leaseId: lease?.id, + leaseState: lease?.state, } }); } catch (error: any) { diff --git a/backend/src/scripts/verifyEnterpriseMultiTenantWindows.ts b/backend/src/scripts/verifyEnterpriseMultiTenantWindows.ts index ff6312d4..b05c4ae3 100644 --- a/backend/src/scripts/verifyEnterpriseMultiTenantWindows.ts +++ b/backend/src/scripts/verifyEnterpriseMultiTenantWindows.ts @@ -386,6 +386,8 @@ async function scenarioD1(db: Database.Database, tracePath: string, windows: { const cOwned = await ownedTraceIds(windows.userCWindow1.context); const a1VisibleToB = await readTraceMetadataForContext(uploads[0].traceId, windows.userBWindow1.context); const bVisibleToA = await readTraceMetadataForContext(uploads[2].traceId, windows.userAWindow1.context); + const a1VisibleToC = await readTraceMetadataForContext(uploads[0].traceId, windows.userCWindow1.context); + const cVisibleToA = await readTraceMetadataForContext(uploads[3].traceId, windows.userAWindow1.context); const checks = { sameFilenameUsesDistinctTraceIds: new Set(traceIds).size === uploads.length, @@ -397,8 +399,9 @@ async function scenarioD1(db: Database.Database, tracePath: string, windows: { ) === uploads.length, ownerGuardAllowsOwnTwoWindowUploads: aOwned.includes(uploads[0].traceId) && aOwned.includes(uploads[1].traceId), - ownerGuardBlocksCrossUserTraces: !a1VisibleToB && !bVisibleToA, - threeUsersHaveScopedTraceLists: aOwned.length === 2 && bOwned.length === 1 && cOwned.length === 1, + workspaceRbacAllowsSameWorkspacePeerTraces: Boolean(a1VisibleToB) && Boolean(bVisibleToA), + workspaceGuardBlocksCrossTenantTraces: !a1VisibleToC && !cVisibleToA, + threeUsersHaveScopedTraceLists: aOwned.length === 3 && bOwned.length === 3 && cOwned.length === 1, twoWindowsHaveSeparateSessions: userAWindow1Run.sessionId !== userAWindow2Run.sessionId && userAWindow1Run.runId !== userAWindow2Run.runId, }; @@ -478,7 +481,7 @@ async function scenarioD2( const checks = { bCanStartWhileALongSqlPending: bStartedBeforeALongSqlDone && Boolean(bRun.runId), aTraceReadableDuringBStart: Boolean(aReadableDuringBStart) && aFileStillExists, - ownerGuardBlocksCrossUserDuringLongSql: !aVisibleToB && !bVisibleToA, + workspaceRbacAllowsPeerTraceReadsDuringLongSql: Boolean(aVisibleToB) && Boolean(bVisibleToA), aEventStreamContinuesAfterBStart: aRunEventCursors.join(',') === '1,2,3,4', bRunCanQueueOrRun: bRunStatus === 'pending' || bRunStatus === 'running', runEventsDoNotMix: bRunEventCursors.join(',') === '1', @@ -572,9 +575,9 @@ export async function runEnterpriseWindowRegression( tracePath, scenarios, coverageLimitations: [ - 'D1 covers same-name trace isolation across three users and two windows at the trace metadata, TraceAsset, and analysis session/run schema layers.', + 'D1 covers same-name trace isolation across three users and two windows at the trace metadata, TraceAsset, workspace RBAC, and analysis session/run schema layers.', 'D2 covers a deterministic long-SQL window at the run/event metadata layer without invoking a real LLM provider.', - 'Production TraceProcessorLease assertions remain a future hardening target for section 0.4.4 and section 0.7 D1/D2 final acceptance.', + 'Production TraceProcessorLease holder/state assertions are covered by the §0.4.4 lease store and route tests; backend proxy and queue behavior remain future §0.7 D1/D2 final-acceptance work.', ], }; report.passed = allChecksPassed(report); diff --git a/backend/src/services/__tests__/enterpriseDb.test.ts b/backend/src/services/__tests__/enterpriseDb.test.ts index 475fe129..6e369e2c 100644 --- a/backend/src/services/__tests__/enterpriseDb.test.ts +++ b/backend/src/services/__tests__/enterpriseDb.test.ts @@ -43,7 +43,7 @@ describe('enterprise SQLite WAL database', () => { const rows = db.prepare( 'SELECT version FROM enterprise_schema_migrations ORDER BY version', ).all(); - expect(rows).toEqual([{ version: 1 }, { version: 2 }, { version: 3 }, { version: 4 }]); + expect(rows).toEqual([{ version: 1 }, { version: 2 }, { version: 3 }, { version: 4 }, { version: 5 }]); } finally { db.close(); } diff --git a/backend/src/services/__tests__/enterpriseSchema.test.ts b/backend/src/services/__tests__/enterpriseSchema.test.ts index ed3bd8e0..93bc4135 100644 --- a/backend/src/services/__tests__/enterpriseSchema.test.ts +++ b/backend/src/services/__tests__/enterpriseSchema.test.ts @@ -170,7 +170,9 @@ describe('enterprise core schema', () => { 'holder_ref', 'window_id', 'heartbeat_at', + 'expires_at', 'created_at', + 'metadata_json', ]); expectColumns(db!, 'analysis_sessions', [ 'id', @@ -317,6 +319,7 @@ describe('enterprise core schema', () => { 'idx_trace_processor_leases_owner_guard', 'idx_trace_processor_leases_trace', 'idx_trace_processor_holders_lease', + 'idx_trace_processor_holders_expiry', 'idx_analysis_sessions_owner_guard', 'idx_analysis_sessions_tenant_workspace_id_unique', 'idx_analysis_runs_status', @@ -354,7 +357,7 @@ describe('enterprise core schema', () => { const rows = db!.prepare( 'SELECT version FROM enterprise_schema_migrations ORDER BY version', ).all(); - expect(rows).toEqual([{ version: 1 }, { version: 2 }, { version: 3 }, { version: 4 }]); + expect(rows).toEqual([{ version: 1 }, { version: 2 }, { version: 3 }, { version: 4 }, { version: 5 }]); }); test('enforces the full tenant workspace session run event chain', () => { diff --git a/backend/src/services/__tests__/traceProcessorLeaseStore.test.ts b/backend/src/services/__tests__/traceProcessorLeaseStore.test.ts new file mode 100644 index 00000000..d01da4c6 --- /dev/null +++ b/backend/src/services/__tests__/traceProcessorLeaseStore.test.ts @@ -0,0 +1,205 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later +// Copyright (C) 2024-2026 Gracker (Chris) +// This file is part of SmartPerfetto. See LICENSE for details. + +import Database from 'better-sqlite3'; +import { applyEnterpriseMinimalSchema } from '../enterpriseSchema'; +import { + resolveHolderTtlPolicy, + TraceProcessorLeaseStore, + type TraceProcessorHolderType, +} from '../traceProcessorLeaseStore'; +import type { EnterpriseRepositoryScope } from '../enterpriseRepository'; + +const scope: EnterpriseRepositoryScope = { + tenantId: 'tenant-a', + workspaceId: 'workspace-a', + userId: 'user-a', +}; + +function seedGraph(db: Database.Database, traceId = 'trace-a'): void { + const now = 1_700_000_000_000; + db.prepare(` + INSERT INTO organizations (id, name, status, plan, created_at, updated_at) + VALUES ('tenant-a', 'Tenant A', 'active', 'enterprise', ?, ?) + `).run(now, now); + db.prepare(` + INSERT INTO workspaces (id, tenant_id, name, created_at, updated_at) + VALUES ('workspace-a', 'tenant-a', 'Workspace A', ?, ?) + `).run(now, now); + db.prepare(` + INSERT INTO trace_assets + (id, tenant_id, workspace_id, local_path, status, created_at) + VALUES + (?, 'tenant-a', 'workspace-a', ?, 'ready', ?) + `).run(traceId, `/tmp/${traceId}.pftrace`, now); +} + +describe('TraceProcessorLeaseStore', () => { + let db: Database.Database; + let store: TraceProcessorLeaseStore; + + beforeEach(() => { + db = new Database(':memory:'); + applyEnterpriseMinimalSchema(db); + seedGraph(db); + store = new TraceProcessorLeaseStore(db); + }); + + afterEach(() => { + db.close(); + }); + + it('defines graded TTL policies for frontend, agent, report, and manual holders', () => { + expect(resolveHolderTtlPolicy({ + holderType: 'frontend_http_rpc', + holderRef: 'window-a', + frontendVisibility: 'visible', + })).toEqual({ + heartbeatTtlMs: 90_000, + idleTtlMs: 4 * 60 * 60 * 1000, + }); + expect(resolveHolderTtlPolicy({ + holderType: 'frontend_http_rpc', + holderRef: 'window-a', + frontendVisibility: 'hidden', + }).idleTtlMs).toBe(8 * 60 * 60 * 1000); + expect(resolveHolderTtlPolicy({ + holderType: 'frontend_http_rpc', + holderRef: 'window-a', + frontendVisibility: 'offline', + }).heartbeatTtlMs).toBe(30 * 60 * 1000); + expect(resolveHolderTtlPolicy({ + holderType: 'manual_register', + holderRef: 'port:9100', + }).idleTtlMs).toBe(60 * 60 * 1000); + expect(resolveHolderTtlPolicy({ + holderType: 'agent_run', + holderRef: 'run-a', + }).idleTtlMs).toBe(24 * 60 * 60 * 1000); + expect(resolveHolderTtlPolicy({ + holderType: 'report_generation', + holderRef: 'report-a', + }).heartbeatTtlMs).toBe(5 * 60 * 1000); + }); + + it('acquires all four holder classes on one scoped lease', () => { + const holders: Array<{ holderType: TraceProcessorHolderType; holderRef: string; windowId?: string }> = [ + { holderType: 'frontend_http_rpc', holderRef: 'window-a', windowId: 'window-a' }, + { holderType: 'agent_run', holderRef: 'run-a' }, + { holderType: 'report_generation', holderRef: 'report-a' }, + { holderType: 'manual_register', holderRef: 'port:9100' }, + ]; + + let lease = store.acquireHolder(scope, 'trace-a', holders[0], { now: 1000 }); + store.markStarting(scope, lease.id); + lease = store.markReady(scope, lease.id); + for (const holder of holders.slice(1)) { + lease = store.acquireHolder(scope, 'trace-a', holder, { now: 2000 }); + } + + expect(lease.state).toBe('active'); + expect(lease.holderCount).toBe(4); + expect(lease.holders.map(holder => holder.holderType).sort()).toEqual([ + 'agent_run', + 'frontend_http_rpc', + 'manual_register', + 'report_generation', + ]); + }); + + it('follows pending -> starting -> active -> idle -> released through the state machine', () => { + let lease = store.acquireHolder(scope, 'trace-a', { + holderType: 'agent_run', + holderRef: 'run-a', + runId: 'run-a', + sessionId: 'session-a', + }, { now: 1000 }); + expect(lease.state).toBe('pending'); + + lease = store.markStarting(scope, lease.id); + expect(lease.state).toBe('starting'); + + lease = store.markReady(scope, lease.id); + expect(lease.state).toBe('active'); + expect(lease.holders[0].metadata).toMatchObject({ + runId: 'run-a', + sessionId: 'session-a', + }); + + lease = store.releaseHolder(scope, lease.id, 'agent_run', 'run-a'); + expect(lease.state).toBe('idle'); + expect(lease.holderCount).toBe(0); + + lease = store.beginDraining(scope, lease.id); + expect(lease.state).toBe('released'); + }); + + it('rejects new holders while a lease is draining and releases after active holders leave', () => { + let lease = store.acquireHolder(scope, 'trace-a', { + holderType: 'frontend_http_rpc', + holderRef: 'window-a', + windowId: 'window-a', + }, { now: 1000 }); + store.markStarting(scope, lease.id); + lease = store.markReady(scope, lease.id); + + lease = store.beginDraining(scope, lease.id); + expect(lease.state).toBe('draining'); + + expect(() => store.acquireHolder(scope, 'trace-a', { + holderType: 'manual_register', + holderRef: 'port:9100', + })).toThrow('is draining'); + + lease = store.releaseHolder(scope, lease.id, 'frontend_http_rpc', 'window-a'); + expect(lease.state).toBe('released'); + }); + + it('keeps lease visibility scoped to tenant and workspace', () => { + const lease = store.acquireHolder(scope, 'trace-a', { + holderType: 'manual_register', + holderRef: 'port:9100', + }); + + expect(store.getLeaseById(scope, lease.id)?.id).toBe(lease.id); + expect(store.getLeaseById({ + tenantId: 'tenant-a', + workspaceId: 'workspace-b', + }, lease.id)).toBeNull(); + }); + + it('sweeps expired holders and releases holderless expired leases', () => { + const lease = store.acquireHolder(scope, 'trace-a', { + holderType: 'manual_register', + holderRef: 'port:9100', + }, { now: 1000 }); + store.markStarting(scope, lease.id); + store.markReady(scope, lease.id); + + const result = store.sweepExpired(1000 + 60 * 60 * 1000 + 1); + const swept = store.getLeaseById(scope, lease.id)!; + + expect(result).toEqual({ holdersRemoved: 1, leasesReleased: 1 }); + expect(swept.state).toBe('released'); + expect(swept.holderCount).toBe(0); + }); + + it('moves an active lease to idle when holder heartbeat TTL expires before idle TTL', () => { + const lease = store.acquireHolder(scope, 'trace-a', { + holderType: 'frontend_http_rpc', + holderRef: 'window-a', + windowId: 'window-a', + frontendVisibility: 'visible', + }, { now: 1000 }); + store.markStarting(scope, lease.id); + store.markReady(scope, lease.id); + + const result = store.sweepExpired(1000 + 90_001); + const swept = store.getLeaseById(scope, lease.id)!; + + expect(result).toEqual({ holdersRemoved: 1, leasesReleased: 0 }); + expect(swept.state).toBe('idle'); + expect(swept.holderCount).toBe(0); + }); +}); diff --git a/backend/src/services/enterpriseSchema.ts b/backend/src/services/enterpriseSchema.ts index ea90ca55..13053e95 100644 --- a/backend/src/services/enterpriseSchema.ts +++ b/backend/src/services/enterpriseSchema.ts @@ -9,6 +9,22 @@ interface MigrationStep { up: (db: Database.Database) => void; } +function tableHasColumn(db: Database.Database, table: string, column: string): boolean { + const rows = db.prepare(`PRAGMA table_info(${table})`).all() as Array<{ name: string }>; + return rows.some(row => row.name === column); +} + +function addColumnIfMissing( + db: Database.Database, + table: string, + column: string, + definition: string, +): void { + if (!tableHasColumn(db, table, column)) { + db.exec(`ALTER TABLE ${table} ADD COLUMN ${column} ${definition}`); + } +} + export const ENTERPRISE_CORE_SCHEMA_TABLES = [ 'organizations', 'workspaces', @@ -325,13 +341,17 @@ const MIGRATIONS: MigrationStep[] = [ holder_ref TEXT NOT NULL, window_id TEXT, heartbeat_at INTEGER, + expires_at INTEGER, created_at INTEGER NOT NULL, + metadata_json TEXT, FOREIGN KEY (lease_id) REFERENCES trace_processor_leases(id) ON DELETE CASCADE ); CREATE INDEX IF NOT EXISTS idx_trace_processor_holders_lease ON trace_processor_holders(lease_id, holder_type, holder_ref); CREATE INDEX IF NOT EXISTS idx_trace_processor_holders_window ON trace_processor_holders(window_id, heartbeat_at); + CREATE INDEX IF NOT EXISTS idx_trace_processor_holders_expiry + ON trace_processor_holders(expires_at, heartbeat_at); CREATE TABLE IF NOT EXISTS conversation_turns ( id TEXT PRIMARY KEY, @@ -476,6 +496,17 @@ const MIGRATIONS: MigrationStep[] = [ `); }, }, + { + version: 5, + up: (db) => { + addColumnIfMissing(db, 'trace_processor_holders', 'expires_at', 'INTEGER'); + addColumnIfMissing(db, 'trace_processor_holders', 'metadata_json', 'TEXT'); + db.exec(` + CREATE INDEX IF NOT EXISTS idx_trace_processor_holders_expiry + ON trace_processor_holders(expires_at, heartbeat_at); + `); + }, + }, ]; export function applyEnterpriseMinimalSchema(db: Database.Database): void { diff --git a/backend/src/services/traceProcessorLeaseStore.ts b/backend/src/services/traceProcessorLeaseStore.ts new file mode 100644 index 00000000..df548a0b --- /dev/null +++ b/backend/src/services/traceProcessorLeaseStore.ts @@ -0,0 +1,598 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later +// Copyright (C) 2024-2026 Gracker (Chris) +// This file is part of SmartPerfetto. See LICENSE for details. + +import type Database from 'better-sqlite3'; +import { v4 as uuidv4 } from 'uuid'; +import { openEnterpriseDb, resolveEnterpriseDbPath } from './enterpriseDb'; +import type { EnterpriseRepositoryScope } from './enterpriseRepository'; + +export type TraceProcessorLeaseMode = 'shared' | 'isolated'; +export type TraceProcessorLeaseState = + | 'pending' + | 'starting' + | 'ready' + | 'idle' + | 'active' + | 'draining' + | 'released' + | 'crashed' + | 'restarting' + | 'failed'; + +export type TraceProcessorHolderType = + | 'frontend_http_rpc' + | 'agent_run' + | 'report_generation' + | 'manual_register'; + +export type FrontendHolderVisibility = 'visible' | 'hidden' | 'offline'; + +export interface TraceProcessorHolderTtlPolicy { + heartbeatTtlMs: number; + idleTtlMs: number; +} + +export interface TraceProcessorHolderInput { + holderType: TraceProcessorHolderType; + holderRef: string; + windowId?: string; + frontendVisibility?: FrontendHolderVisibility; + sessionId?: string; + runId?: string; + reportId?: string; + metadata?: Record; +} + +export interface TraceProcessorLeaseRecord { + id: string; + tenantId: string; + workspaceId: string; + traceId: string; + mode: TraceProcessorLeaseMode; + state: TraceProcessorLeaseState; + rssBytes: number | null; + heartbeatAt: number | null; + expiresAt: number | null; + holderCount: number; + holders: TraceProcessorHolderRecord[]; +} + +export interface TraceProcessorHolderRecord { + id: string; + leaseId: string; + holderType: TraceProcessorHolderType; + holderRef: string; + windowId: string | null; + heartbeatAt: number | null; + expiresAt: number | null; + createdAt: number; + metadata: Record | null; +} + +interface LeaseRow { + id: string; + tenant_id: string; + workspace_id: string; + trace_id: string; + mode: string; + state: string; + rss_bytes: number | null; + heartbeat_at: number | null; + expires_at: number | null; +} + +interface HolderRow { + id: string; + lease_id: string; + holder_type: string; + holder_ref: string; + window_id: string | null; + heartbeat_at: number | null; + expires_at: number | null; + created_at: number; + metadata_json: string | null; +} + +const TERMINAL_STATES = new Set(['released', 'failed']); +const ACQUIRABLE_STATES = new Set([ + 'pending', + 'starting', + 'ready', + 'idle', + 'active', + 'crashed', + 'restarting', +]); + +const ALLOWED_TRANSITIONS: Record = { + pending: ['starting', 'draining', 'released', 'failed'], + starting: ['ready', 'draining', 'crashed', 'failed'], + ready: ['idle', 'active', 'draining', 'crashed', 'failed'], + idle: ['active', 'draining', 'released', 'crashed', 'failed'], + active: ['idle', 'draining', 'crashed', 'failed'], + draining: ['released', 'failed'], + released: [], + crashed: ['restarting', 'draining', 'failed'], + restarting: ['ready', 'draining', 'failed'], + failed: [], +}; + +function assertNonEmpty(value: string, name: string): void { + if (!value.trim()) { + throw new Error(`${name} is required`); + } +} + +function assertValidTransition(from: TraceProcessorLeaseState, to: TraceProcessorLeaseState): void { + if (from === to) return; + if (!ALLOWED_TRANSITIONS[from].includes(to)) { + throw new Error(`Invalid trace processor lease transition: ${from} -> ${to}`); + } +} + +function parseMetadata(raw: string | null): Record | null { + if (!raw) return null; + try { + const parsed = JSON.parse(raw); + return parsed && typeof parsed === 'object' && !Array.isArray(parsed) + ? parsed as Record + : null; + } catch { + return null; + } +} + +function metadataForHolder(holder: TraceProcessorHolderInput): string | null { + const metadata = { + ...(holder.metadata ?? {}), + ...(holder.frontendVisibility ? { frontendVisibility: holder.frontendVisibility } : {}), + ...(holder.sessionId ? { sessionId: holder.sessionId } : {}), + ...(holder.runId ? { runId: holder.runId } : {}), + ...(holder.reportId ? { reportId: holder.reportId } : {}), + }; + return Object.keys(metadata).length > 0 ? JSON.stringify(metadata) : null; +} + +export function resolveHolderTtlPolicy(holder: TraceProcessorHolderInput): TraceProcessorHolderTtlPolicy { + if (holder.holderType === 'frontend_http_rpc') { + const visibility = holder.frontendVisibility ?? 'visible'; + if (visibility === 'hidden') { + return { + heartbeatTtlMs: 10 * 60 * 1000, + idleTtlMs: 8 * 60 * 60 * 1000, + }; + } + if (visibility === 'offline') { + return { + heartbeatTtlMs: 30 * 60 * 1000, + idleTtlMs: 30 * 60 * 1000, + }; + } + return { + heartbeatTtlMs: 90 * 1000, + idleTtlMs: 4 * 60 * 60 * 1000, + }; + } + + if (holder.holderType === 'manual_register') { + return { + heartbeatTtlMs: 5 * 60 * 1000, + idleTtlMs: 60 * 60 * 1000, + }; + } + + return { + heartbeatTtlMs: 5 * 60 * 1000, + idleTtlMs: 24 * 60 * 60 * 1000, + }; +} + +export class TraceProcessorLeaseStore { + constructor(private readonly db: Database.Database = openEnterpriseDb()) {} + + close(): void { + this.db.close(); + } + + acquireHolder( + scope: EnterpriseRepositoryScope, + traceId: string, + holder: TraceProcessorHolderInput, + options: { mode?: TraceProcessorLeaseMode; now?: number } = {}, + ): TraceProcessorLeaseRecord { + assertNonEmpty(scope.tenantId, 'tenantId'); + assertNonEmpty(scope.workspaceId, 'workspaceId'); + assertNonEmpty(traceId, 'traceId'); + assertNonEmpty(holder.holderRef, 'holderRef'); + + return this.db.transaction(() => { + const now = options.now ?? Date.now(); + const blockingLease = this.findTraceLeaseByStates(scope, traceId, ['draining']); + if (blockingLease) { + throw new Error(`Trace processor lease ${blockingLease.id} is draining`); + } + + let lease = this.findAcquirableTraceLease(scope, traceId); + if (!lease) { + const leaseId = uuidv4(); + const ttl = resolveHolderTtlPolicy(holder); + this.db.prepare(` + INSERT INTO trace_processor_leases + (id, tenant_id, workspace_id, trace_id, mode, state, heartbeat_at, expires_at) + VALUES + (?, ?, ?, ?, ?, 'pending', ?, ?) + `).run( + leaseId, + scope.tenantId, + scope.workspaceId, + traceId, + options.mode ?? 'shared', + now, + now + ttl.idleTtlMs, + ); + lease = this.mustGetLease(scope, leaseId); + } + + if (!ACQUIRABLE_STATES.has(lease.state as TraceProcessorLeaseState)) { + throw new Error(`Trace processor lease ${lease.id} is not acquirable (${lease.state})`); + } + + this.upsertHolder(lease.id, holder, now); + const ttl = resolveHolderTtlPolicy(holder); + const expiresAt = Math.max(lease.expires_at ?? 0, now + ttl.idleTtlMs); + this.db.prepare(` + UPDATE trace_processor_leases + SET heartbeat_at = ?, expires_at = ? + WHERE id = ? + `).run(now, expiresAt, lease.id); + this.refreshLeaseActivityState(scope, lease.id); + return this.getLeaseById(scope, lease.id)!; + })(); + } + + heartbeatHolder( + scope: EnterpriseRepositoryScope, + leaseId: string, + holder: TraceProcessorHolderInput, + now = Date.now(), + ): TraceProcessorLeaseRecord { + assertNonEmpty(leaseId, 'leaseId'); + const lease = this.mustGetLease(scope, leaseId); + const row = this.db.prepare(` + SELECT id FROM trace_processor_holders + WHERE lease_id = ? AND holder_type = ? AND holder_ref = ? + LIMIT 1 + `).get(lease.id, holder.holderType, holder.holderRef) as { id: string } | undefined; + if (!row) { + throw new Error(`Trace processor holder not found: ${holder.holderType}/${holder.holderRef}`); + } + + const ttl = resolveHolderTtlPolicy(holder); + this.db.prepare(` + UPDATE trace_processor_holders + SET heartbeat_at = ?, expires_at = ?, window_id = COALESCE(?, window_id), metadata_json = ? + WHERE id = ? + `).run(now, now + ttl.heartbeatTtlMs, holder.windowId ?? null, metadataForHolder(holder), row.id); + this.db.prepare(` + UPDATE trace_processor_leases + SET heartbeat_at = ?, expires_at = MAX(COALESCE(expires_at, 0), ?) + WHERE id = ? + `).run(now, now + ttl.idleTtlMs, lease.id); + this.refreshLeaseActivityState(scope, lease.id); + return this.getLeaseById(scope, lease.id)!; + } + + releaseHolder( + scope: EnterpriseRepositoryScope, + leaseId: string, + holderType: TraceProcessorHolderType, + holderRef: string, + ): TraceProcessorLeaseRecord { + assertNonEmpty(leaseId, 'leaseId'); + assertNonEmpty(holderRef, 'holderRef'); + const lease = this.mustGetLease(scope, leaseId); + this.db.prepare(` + DELETE FROM trace_processor_holders + WHERE lease_id = ? AND holder_type = ? AND holder_ref = ? + `).run(lease.id, holderType, holderRef); + this.refreshLeaseActivityState(scope, lease.id); + return this.getLeaseById(scope, lease.id)!; + } + + markStarting(scope: EnterpriseRepositoryScope, leaseId: string): TraceProcessorLeaseRecord { + return this.transition(scope, leaseId, 'starting'); + } + + markReady(scope: EnterpriseRepositoryScope, leaseId: string): TraceProcessorLeaseRecord { + const lease = this.transition(scope, leaseId, 'ready'); + this.refreshLeaseActivityState(scope, lease.id); + return this.getLeaseById(scope, lease.id)!; + } + + markCrashed(scope: EnterpriseRepositoryScope, leaseId: string): TraceProcessorLeaseRecord { + return this.transition(scope, leaseId, 'crashed'); + } + + markRestarting(scope: EnterpriseRepositoryScope, leaseId: string): TraceProcessorLeaseRecord { + return this.transition(scope, leaseId, 'restarting'); + } + + markFailed(scope: EnterpriseRepositoryScope, leaseId: string): TraceProcessorLeaseRecord { + return this.transition(scope, leaseId, 'failed'); + } + + beginDraining(scope: EnterpriseRepositoryScope, leaseId: string): TraceProcessorLeaseRecord { + const lease = this.transition(scope, leaseId, 'draining'); + this.refreshLeaseActivityState(scope, lease.id); + return this.getLeaseById(scope, lease.id)!; + } + + recordRss(scope: EnterpriseRepositoryScope, leaseId: string, rssBytes: number | null): TraceProcessorLeaseRecord { + const lease = this.mustGetLease(scope, leaseId); + this.db.prepare(` + UPDATE trace_processor_leases + SET rss_bytes = ?, heartbeat_at = ? + WHERE id = ? + `).run(rssBytes, Date.now(), lease.id); + return this.getLeaseById(scope, lease.id)!; + } + + getLeaseById(scope: EnterpriseRepositoryScope, leaseId: string): TraceProcessorLeaseRecord | null { + const row = this.db.prepare(` + SELECT * + FROM trace_processor_leases + WHERE tenant_id = ? AND workspace_id = ? AND id = ? + LIMIT 1 + `).get(scope.tenantId, scope.workspaceId, leaseId) as LeaseRow | undefined; + return row ? this.mapLease(row) : null; + } + + listLeases( + scope: EnterpriseRepositoryScope, + criteria: { traceId?: string; states?: TraceProcessorLeaseState[] } = {}, + ): TraceProcessorLeaseRecord[] { + const params: unknown[] = [scope.tenantId, scope.workspaceId]; + const clauses = ['tenant_id = ?', 'workspace_id = ?']; + if (criteria.traceId) { + clauses.push('trace_id = ?'); + params.push(criteria.traceId); + } + if (criteria.states && criteria.states.length > 0) { + clauses.push(`state IN (${criteria.states.map(() => '?').join(', ')})`); + params.push(...criteria.states); + } + const rows = this.db.prepare(` + SELECT * + FROM trace_processor_leases + WHERE ${clauses.join(' AND ')} + ORDER BY heartbeat_at DESC, id ASC + `).all(...params) as LeaseRow[]; + return rows.map(row => this.mapLease(row)); + } + + hasActiveHolders(scope: EnterpriseRepositoryScope, traceId: string): boolean { + return this.listLeases(scope, { traceId }) + .some(lease => !TERMINAL_STATES.has(lease.state) && lease.holderCount > 0); + } + + sweepExpired(now = Date.now()): { holdersRemoved: number; leasesReleased: number } { + const holderResult = this.db.prepare(` + DELETE FROM trace_processor_holders + WHERE expires_at IS NOT NULL AND expires_at <= ? + `).run(now); + + this.db.prepare(` + UPDATE trace_processor_leases + SET state = 'idle' + WHERE state = 'active' + AND id IN ( + SELECT l.id + FROM trace_processor_leases l + LEFT JOIN trace_processor_holders h ON h.lease_id = l.id + GROUP BY l.id + HAVING COUNT(h.id) = 0 + ) + `).run(); + + const releasableRows = this.db.prepare(` + SELECT l.* + FROM trace_processor_leases l + LEFT JOIN trace_processor_holders h ON h.lease_id = l.id + WHERE l.state NOT IN ('released', 'failed') + AND l.expires_at IS NOT NULL + AND l.expires_at <= ? + GROUP BY l.id + HAVING COUNT(h.id) = 0 + `).all(now) as LeaseRow[]; + + for (const lease of releasableRows) { + this.db.prepare(` + UPDATE trace_processor_leases + SET state = 'released' + WHERE id = ? + `).run(lease.id); + } + + return { + holdersRemoved: holderResult.changes, + leasesReleased: releasableRows.length, + }; + } + + private transition( + scope: EnterpriseRepositoryScope, + leaseId: string, + nextState: TraceProcessorLeaseState, + ): TraceProcessorLeaseRecord { + const lease = this.mustGetLease(scope, leaseId); + const currentState = lease.state as TraceProcessorLeaseState; + assertValidTransition(currentState, nextState); + this.db.prepare(` + UPDATE trace_processor_leases + SET state = ?, heartbeat_at = ? + WHERE id = ? + `).run(nextState, Date.now(), lease.id); + return this.getLeaseById(scope, lease.id)!; + } + + private upsertHolder(leaseId: string, holder: TraceProcessorHolderInput, now: number): void { + const ttl = resolveHolderTtlPolicy(holder); + const metadataJson = metadataForHolder(holder); + const existing = this.db.prepare(` + SELECT id FROM trace_processor_holders + WHERE lease_id = ? AND holder_type = ? AND holder_ref = ? + LIMIT 1 + `).get(leaseId, holder.holderType, holder.holderRef) as { id: string } | undefined; + + if (existing) { + this.db.prepare(` + UPDATE trace_processor_holders + SET window_id = ?, heartbeat_at = ?, expires_at = ?, metadata_json = ? + WHERE id = ? + `).run(holder.windowId ?? null, now, now + ttl.heartbeatTtlMs, metadataJson, existing.id); + return; + } + + this.db.prepare(` + INSERT INTO trace_processor_holders + (id, lease_id, holder_type, holder_ref, window_id, heartbeat_at, expires_at, created_at, metadata_json) + VALUES + (?, ?, ?, ?, ?, ?, ?, ?, ?) + `).run( + uuidv4(), + leaseId, + holder.holderType, + holder.holderRef, + holder.windowId ?? null, + now, + now + ttl.heartbeatTtlMs, + now, + metadataJson, + ); + } + + private refreshLeaseActivityState(scope: EnterpriseRepositoryScope, leaseId: string): void { + const lease = this.mustGetLease(scope, leaseId); + const state = lease.state as TraceProcessorLeaseState; + const holderCount = this.countHolders(lease.id); + + if (state === 'draining' && holderCount === 0) { + this.db.prepare(`UPDATE trace_processor_leases SET state = 'released' WHERE id = ?`).run(lease.id); + return; + } + if (state === 'ready' || state === 'idle' || state === 'active') { + const nextState: TraceProcessorLeaseState = holderCount > 0 ? 'active' : 'idle'; + if (nextState !== state) { + this.db.prepare(`UPDATE trace_processor_leases SET state = ? WHERE id = ?`).run(nextState, lease.id); + } + } + } + + private findAcquirableTraceLease(scope: EnterpriseRepositoryScope, traceId: string): LeaseRow | null { + const rows = this.db.prepare(` + SELECT * + FROM trace_processor_leases + WHERE tenant_id = ? AND workspace_id = ? AND trace_id = ? + ORDER BY heartbeat_at DESC, id ASC + `).all(scope.tenantId, scope.workspaceId, traceId) as LeaseRow[]; + return rows.find(row => ACQUIRABLE_STATES.has(row.state as TraceProcessorLeaseState)) ?? null; + } + + private findTraceLeaseByStates( + scope: EnterpriseRepositoryScope, + traceId: string, + states: TraceProcessorLeaseState[], + ): LeaseRow | null { + const rows = this.db.prepare(` + SELECT * + FROM trace_processor_leases + WHERE tenant_id = ? AND workspace_id = ? AND trace_id = ? + AND state IN (${states.map(() => '?').join(', ')}) + ORDER BY heartbeat_at DESC, id ASC + LIMIT 1 + `).all(scope.tenantId, scope.workspaceId, traceId, ...states) as LeaseRow[]; + return rows[0] ?? null; + } + + private mustGetLease(scope: EnterpriseRepositoryScope, leaseId: string): LeaseRow { + const row = this.db.prepare(` + SELECT * + FROM trace_processor_leases + WHERE tenant_id = ? AND workspace_id = ? AND id = ? + LIMIT 1 + `).get(scope.tenantId, scope.workspaceId, leaseId) as LeaseRow | undefined; + if (!row) { + throw new Error(`Trace processor lease not found: ${leaseId}`); + } + return row; + } + + private countHolders(leaseId: string): number { + const row = this.db.prepare(` + SELECT COUNT(*) AS count + FROM trace_processor_holders + WHERE lease_id = ? + `).get(leaseId) as { count: number }; + return row.count; + } + + private holdersForLease(leaseId: string): TraceProcessorHolderRecord[] { + const rows = this.db.prepare(` + SELECT * + FROM trace_processor_holders + WHERE lease_id = ? + ORDER BY created_at ASC, id ASC + `).all(leaseId) as HolderRow[]; + return rows.map(row => ({ + id: row.id, + leaseId: row.lease_id, + holderType: row.holder_type as TraceProcessorHolderType, + holderRef: row.holder_ref, + windowId: row.window_id, + heartbeatAt: row.heartbeat_at, + expiresAt: row.expires_at, + createdAt: row.created_at, + metadata: parseMetadata(row.metadata_json), + })); + } + + private mapLease(row: LeaseRow): TraceProcessorLeaseRecord { + const holders = this.holdersForLease(row.id); + return { + id: row.id, + tenantId: row.tenant_id, + workspaceId: row.workspace_id, + traceId: row.trace_id, + mode: row.mode as TraceProcessorLeaseMode, + state: row.state as TraceProcessorLeaseState, + rssBytes: row.rss_bytes, + heartbeatAt: row.heartbeat_at, + expiresAt: row.expires_at, + holderCount: holders.length, + holders, + }; + } +} + +let singleton: TraceProcessorLeaseStore | null = null; +let singletonDbPath: string | null = null; + +export function getTraceProcessorLeaseStore(): TraceProcessorLeaseStore { + const dbPath = resolveEnterpriseDbPath(); + if (!singleton || singletonDbPath !== dbPath) { + try { + singleton?.close(); + } catch { + // Ignore stale singleton cleanup errors; a new DB handle is authoritative. + } + singleton = new TraceProcessorLeaseStore(openEnterpriseDb(dbPath)); + singletonDbPath = dbPath; + } + return singleton; +} + +export function setTraceProcessorLeaseStoreForTests(store: TraceProcessorLeaseStore | null): void { + singleton = store; + singletonDbPath = null; +} diff --git a/docs/features/enterprise-multi-tenant/README.md b/docs/features/enterprise-multi-tenant/README.md index d9cc90c6..5037f4f9 100644 --- a/docs/features/enterprise-multi-tenant/README.md +++ b/docs/features/enterprise-multi-tenant/README.md @@ -60,7 +60,7 @@ - [x] 4.1.6 AI session cache 加 mtime/CAS 或改为后端权威 session list - [x] 4.2 上传链路 stream 化(§11.1):URL 上传不再 `arrayBuffer()` 全量进内存;本地上传与 RAM/磁盘策略联动;temp file 用 traceId/uuid + 原子 rename - [ ] 4.3 RSS benchmark:scroll/startup/ANR/memory/heapprofd/vendor 大 trace × 100MB/500MB/1GB,记录启动 RSS、load peak、query headroom -- [ ] 4.4 `TraceProcessorLease` 4 类 holder(frontend_http_rpc / agent_run / report_generation / manual_register)+ 状态机(§11.4)+ 分级 TTL +- [x] 4.4 `TraceProcessorLease` 4 类 holder(frontend_http_rpc / agent_run / report_generation / manual_register)+ 状态机(§11.4)+ 分级 TTL - [ ] 4.5 Backend proxy `/api/tp/:leaseId/{status,websocket,query}`(§11.3) - [ ] 4.5.1 同时支持 `/status` HTTP / `/websocket` 二进制双向 / `/query` HTTP - [ ] 4.5.2 前端 `HttpRpcEngine` 抽象成 `HttpRpcTarget`(direct-port + backend-lease-proxy) @@ -459,7 +459,7 @@ api_keys(id, tenant_id, workspace_id, owner_user_id, key_hash, scopes, expires_a trace_assets(id, tenant_id, workspace_id, owner_user_id, local_path, sha256, size_bytes, status, metadata_json, created_at, expires_at) trace_processor_leases(id, tenant_id, workspace_id, trace_id, mode, state, rss_bytes, heartbeat_at, expires_at) -trace_processor_holders(id, lease_id, holder_type, holder_ref, window_id, heartbeat_at, created_at) +trace_processor_holders(id, lease_id, holder_type, holder_ref, window_id, heartbeat_at, expires_at, created_at, metadata_json) analysis_sessions(id, tenant_id, workspace_id, trace_id, created_by, provider_snapshot_id, title, visibility, status, created_at, updated_at) analysis_runs(id, tenant_id, workspace_id, session_id, mode, status, question, started_at, completed_at, error_json)