diff --git a/backend/package.json b/backend/package.json index a12ee4b3..af996cd1 100644 --- a/backend/package.json +++ b/backend/package.json @@ -41,7 +41,7 @@ "prepack": "npm run build", "typecheck": "tsc --noEmit", "test": "jest", - "test:core": "jest --runInBand --forceExit src/agent/communication/__tests__/agentMessageBus.test.ts src/agent/core/executors/__tests__/strategyExecutor.test.ts src/agent/core/executors/__tests__/hypothesisExecutor.test.ts src/agent/context/__tests__/enhancedSessionContext.test.ts src/tests/adbTools.test.ts src/services/__tests__/sessionLogger.test.ts src/services/__tests__/traceAnalysisSkillConfig.test.ts src/agent/agents/domain/__tests__/registry.test.ts src/agentv3/__tests__/sqlIncludeInjector.test.ts src/agentv3/__tests__/analysisPatternMemory.test.ts src/agentv3/__tests__/claudeRuntimeRuntimeSnapshots.test.ts src/middleware/__tests__/auth.test.ts src/services/__tests__/rbac.test.ts src/routes/__tests__/agentRoutesRbac.test.ts src/routes/__tests__/ownerGuardRoutes.test.ts src/routes/__tests__/requestContextRouteCoverage.test.ts src/middleware/__tests__/legacyApiCompatibility.test.ts src/services/__tests__/enterpriseDb.test.ts src/services/__tests__/enterpriseSchema.test.ts src/services/__tests__/enterpriseRepository.test.ts src/services/__tests__/processRss.test.ts src/services/__tests__/traceProcessorLeaseStore.test.ts src/scripts/__tests__/benchmarkTraceProcessorRss.test.ts src/services/__tests__/enterpriseKnowledgeScope.test.ts src/services/__tests__/enterpriseMigration.test.ts src/services/__tests__/runtimeSnapshotStore.test.ts src/services/providerManager/__tests__/localSecretStore.test.ts src/services/providerManager/__tests__/enterpriseProviderStore.test.ts src/routes/__tests__/enterpriseTraceMetadataRoutes.test.ts src/routes/__tests__/enterpriseReportRoutes.test.ts src/routes/__tests__/enterpriseRestartPersistence.test.ts", + "test:core": "jest --runInBand --forceExit src/agent/communication/__tests__/agentMessageBus.test.ts src/agent/core/executors/__tests__/strategyExecutor.test.ts src/agent/core/executors/__tests__/hypothesisExecutor.test.ts src/agent/context/__tests__/enhancedSessionContext.test.ts src/tests/adbTools.test.ts src/services/__tests__/sessionLogger.test.ts src/services/__tests__/traceAnalysisSkillConfig.test.ts src/agent/agents/domain/__tests__/registry.test.ts src/agentv3/__tests__/sqlIncludeInjector.test.ts src/agentv3/__tests__/analysisPatternMemory.test.ts src/agentv3/__tests__/claudeRuntimeRuntimeSnapshots.test.ts src/middleware/__tests__/auth.test.ts src/services/__tests__/rbac.test.ts src/routes/__tests__/agentRoutesRbac.test.ts src/routes/__tests__/ownerGuardRoutes.test.ts src/routes/__tests__/requestContextRouteCoverage.test.ts src/middleware/__tests__/legacyApiCompatibility.test.ts src/services/__tests__/enterpriseDb.test.ts src/services/__tests__/enterpriseSchema.test.ts src/services/__tests__/enterpriseRepository.test.ts src/services/__tests__/processRss.test.ts src/services/__tests__/traceProcessorLeaseStore.test.ts src/scripts/__tests__/benchmarkTraceProcessorRss.test.ts src/services/__tests__/enterpriseKnowledgeScope.test.ts src/services/__tests__/enterpriseMigration.test.ts src/services/__tests__/runtimeSnapshotStore.test.ts src/services/providerManager/__tests__/localSecretStore.test.ts src/services/providerManager/__tests__/enterpriseProviderStore.test.ts src/routes/__tests__/enterpriseTraceMetadataRoutes.test.ts src/routes/__tests__/traceProcessorProxyRoutes.test.ts src/routes/__tests__/enterpriseReportRoutes.test.ts src/routes/__tests__/enterpriseRestartPersistence.test.ts", "test:watch": "jest --watch", "test:coverage": "jest --coverage", "test:unit": "jest --testPathPatterns=src/tests", diff --git a/backend/src/index.ts b/backend/src/index.ts index 51d883f6..ffb323ad 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -37,6 +37,7 @@ import caseRoutes from './routes/caseRoutes'; import ragAdminRoutes from './routes/ragAdminRoutes'; import enterpriseAuthRoutes from './routes/enterpriseAuthRoutes'; import enterpriseApiKeyRoutes from './routes/enterpriseApiKeyRoutes'; +import traceProcessorProxyRoutes, { handleTraceProcessorProxyUpgrade } from './routes/traceProcessorProxyRoutes'; import {authenticate} from './middleware/auth'; import { assertTraceAnalysisConfiguredForStartup, @@ -233,6 +234,7 @@ app.use('/api/flamegraph', flamegraphRoutes); app.use('/api/critical-path', criticalPathRoutes); app.use('/api/baselines', baselineRoutes); app.use('/api/ci', authenticate, ciGateRoutes); +app.use('/api/tp', traceProcessorProxyRoutes); app.use('/api/memory', memoryRoutes); app.use('/api/cases', caseRoutes); app.use('/api/rag', ragAdminRoutes); @@ -312,6 +314,11 @@ const server = app.listen(PORT, () => { console.log(`📈 Stats: http://localhost:${PORT}/api/traces/stats`); }); +server.on('upgrade', (req, socket, head) => { + if (handleTraceProcessorProxyUpgrade(req, socket, head)) return; + socket.destroy(); +}); + // Handle server close server.on('close', () => { console.log('🔒 Server closed'); diff --git a/backend/src/routes/__tests__/enterpriseTraceMetadataRoutes.test.ts b/backend/src/routes/__tests__/enterpriseTraceMetadataRoutes.test.ts index d520a698..ff72666f 100644 --- a/backend/src/routes/__tests__/enterpriseTraceMetadataRoutes.test.ts +++ b/backend/src/routes/__tests__/enterpriseTraceMetadataRoutes.test.ts @@ -157,6 +157,8 @@ describe('enterprise trace metadata routes', () => { expect(uploadRes.status).toBe(200); const traceId = uploadRes.body.trace.id as string; + expect(uploadRes.body.trace.leaseId).toEqual(expect.any(String)); + expect(uploadRes.body.trace.leaseState).toBe('active'); const expectedTracePath = path.join(dataDir, 'tenant-a', 'workspace-a', 'traces', `${traceId}.trace`); await expect(fs.access(expectedTracePath)).resolves.toBeUndefined(); await expect(fs.access(path.join(uploadDir, 'traces', `${traceId}.json`))).rejects.toThrow(); diff --git a/backend/src/routes/__tests__/traceProcessorProxyRoutes.test.ts b/backend/src/routes/__tests__/traceProcessorProxyRoutes.test.ts new file mode 100644 index 00000000..e57a4e24 --- /dev/null +++ b/backend/src/routes/__tests__/traceProcessorProxyRoutes.test.ts @@ -0,0 +1,328 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later +// Copyright (C) 2024-2026 Gracker (Chris) +// This file is part of SmartPerfetto. See LICENSE for details. + +import { afterEach, beforeEach, describe, expect, it, jest } from '@jest/globals'; +import express from 'express'; +import fs from 'fs/promises'; +import http, { type Server } from 'http'; +import type { Socket as NetSocket } from 'net'; +import os from 'os'; +import path from 'path'; +import request from 'supertest'; +import { ENTERPRISE_FEATURE_FLAG_ENV } from '../../config'; +import { ENTERPRISE_DB_PATH_ENV, openEnterpriseDb } from '../../services/enterpriseDb'; +import type { EnterpriseRepositoryScope } from '../../services/enterpriseRepository'; +import { + getTraceProcessorLeaseStore, + setTraceProcessorLeaseStoreForTests, + type TraceProcessorLeaseRecord, +} from '../../services/traceProcessorLeaseStore'; +import { setTraceProcessorServiceForTests } from '../../services/traceProcessorService'; +import traceProcessorProxyRoutes, { + handleTraceProcessorProxyUpgrade, +} from '../traceProcessorProxyRoutes'; + +const originalEnv = { + enterprise: process.env[ENTERPRISE_FEATURE_FLAG_ENV], + trustedHeaders: process.env.SMARTPERFETTO_SSO_TRUSTED_HEADERS, + enterpriseDbPath: process.env[ENTERPRISE_DB_PATH_ENV], + apiKey: process.env.SMARTPERFETTO_API_KEY, +}; + +const scope: EnterpriseRepositoryScope = { + tenantId: 'tenant-a', + workspaceId: 'workspace-a', + userId: 'user-a', +}; + +let tmpDir: string; +let dbPath: string; +let upstreamServer: Server; +let upstreamSockets: Set; +let upstreamPort: number; +let lease: TraceProcessorLeaseRecord; + +function restoreEnvValue(key: string, value: string | undefined): void { + if (value === undefined) { + delete process.env[key]; + } else { + process.env[key] = value; + } +} + +function makeApp(): express.Express { + const app = express(); + app.use('/api/tp', traceProcessorProxyRoutes); + return app; +} + +function ssoHeaders(req: request.Test, workspaceId = 'workspace-a'): request.Test { + return req + .set('X-SmartPerfetto-SSO-User-Id', 'user-a') + .set('X-SmartPerfetto-SSO-Email', 'user-a@example.test') + .set('X-SmartPerfetto-SSO-Tenant-Id', 'tenant-a') + .set('X-SmartPerfetto-SSO-Workspace-Id', workspaceId) + .set('X-SmartPerfetto-SSO-Roles', 'analyst') + .set('X-SmartPerfetto-SSO-Scopes', 'trace:read,trace:write') + .set('X-Window-Id', 'window-a'); +} + +function binaryParser(res: request.Response, callback: (err: Error | null, body: Buffer) => void): void { + const chunks: Buffer[] = []; + res.on('data', chunk => chunks.push(Buffer.from(chunk))); + res.on('end', () => callback(null, Buffer.concat(chunks))); +} + +async function listen(server: Server): Promise { + await new Promise(resolve => server.listen(0, '127.0.0.1', resolve)); + const address = server.address(); + if (!address || typeof address === 'string') throw new Error('server did not bind to a TCP port'); + return address.port; +} + +async function closeServer(server?: Server): Promise { + if (!server || !server.listening) return; + server.closeAllConnections?.(); + await new Promise(resolve => server.close(() => resolve())); +} + +function seedEnterpriseGraph(): void { + const db = openEnterpriseDb(dbPath); + try { + const now = Date.now(); + db.prepare(` + INSERT INTO organizations (id, name, status, plan, created_at, updated_at) + VALUES ('tenant-a', 'Tenant A', 'active', 'enterprise', ?, ?) + `).run(now, now); + db.prepare(` + INSERT INTO workspaces (id, tenant_id, name, created_at, updated_at) + VALUES ('workspace-a', 'tenant-a', 'Workspace A', ?, ?) + `).run(now, now); + db.prepare(` + INSERT INTO users (id, tenant_id, email, display_name, idp_subject, created_at, updated_at) + VALUES ('user-a', 'tenant-a', 'user-a@example.test', 'User A', 'user-a', ?, ?) + `).run(now, now); + db.prepare(` + INSERT INTO memberships (tenant_id, workspace_id, user_id, role, created_at) + VALUES ('tenant-a', 'workspace-a', 'user-a', 'analyst', ?) + `).run(now); + db.prepare(` + INSERT INTO trace_assets + (id, tenant_id, workspace_id, owner_user_id, local_path, status, created_at) + VALUES + ('trace-a', 'tenant-a', 'workspace-a', 'user-a', ?, 'ready', ?) + `).run(path.join(tmpDir, 'trace-a.trace'), now); + } finally { + db.close(); + } +} + +function createReadyLease(): TraceProcessorLeaseRecord { + const store = getTraceProcessorLeaseStore(); + let next = store.acquireHolder(scope, 'trace-a', { + holderType: 'frontend_http_rpc', + holderRef: 'window-a', + windowId: 'window-a', + }); + next = store.markStarting(scope, next.id); + return store.markReady(scope, next.id); +} + +beforeEach(async () => { + tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'smartperfetto-tp-proxy-')); + dbPath = path.join(tmpDir, 'enterprise.sqlite'); + process.env[ENTERPRISE_FEATURE_FLAG_ENV] = 'true'; + process.env.SMARTPERFETTO_SSO_TRUSTED_HEADERS = 'true'; + process.env[ENTERPRISE_DB_PATH_ENV] = dbPath; + delete process.env.SMARTPERFETTO_API_KEY; + + upstreamServer = http.createServer((req, res) => { + const chunks: Buffer[] = []; + req.on('data', chunk => chunks.push(Buffer.from(chunk))); + req.on('end', () => { + if (req.url === '/status') { + res.writeHead(200, {'content-type': 'application/x-protobuf'}); + res.end(Buffer.from([1, 2, 3])); + return; + } + if (req.url === '/query') { + res.writeHead(200, {'content-type': 'application/x-protobuf'}); + res.end(Buffer.concat(chunks)); + return; + } + res.writeHead(404); + res.end(); + }); + }); + upstreamServer.on('upgrade', (req, socket) => { + expect(req.url).toBe('/websocket'); + socket.write( + 'HTTP/1.1 101 Switching Protocols\r\n' + + 'Upgrade: websocket\r\n' + + 'Connection: Upgrade\r\n' + + '\r\n', + ); + socket.on('data', chunk => socket.write(chunk)); + }); + upstreamSockets = new Set(); + upstreamServer.on('connection', socket => { + upstreamSockets.add(socket); + socket.on('close', () => upstreamSockets.delete(socket)); + }); + upstreamPort = await listen(upstreamServer); + + seedEnterpriseGraph(); + lease = createReadyLease(); + setTraceProcessorServiceForTests({ + getOrLoadTrace: jest.fn(async () => ({ + id: 'trace-a', + filename: 'trace-a.perfetto', + size: 16, + uploadTime: new Date(), + status: 'ready', + })), + getTraceWithPort: jest.fn(() => ({ + id: 'trace-a', + filename: 'trace-a.perfetto', + size: 16, + uploadTime: new Date(), + status: 'ready', + port: upstreamPort, + processor: {status: 'ready'}, + })), + } as any); +}); + +afterEach(async () => { + jest.restoreAllMocks(); + for (const socket of upstreamSockets ?? []) { + socket.destroy(); + } + await closeServer(upstreamServer); + setTraceProcessorServiceForTests(null); + setTraceProcessorLeaseStoreForTests(null); + restoreEnvValue(ENTERPRISE_FEATURE_FLAG_ENV, originalEnv.enterprise); + restoreEnvValue('SMARTPERFETTO_SSO_TRUSTED_HEADERS', originalEnv.trustedHeaders); + restoreEnvValue(ENTERPRISE_DB_PATH_ENV, originalEnv.enterpriseDbPath); + restoreEnvValue('SMARTPERFETTO_API_KEY', originalEnv.apiKey); + await fs.rm(tmpDir, { recursive: true, force: true }); +}); + +describe('trace processor lease proxy routes', () => { + it('proxies status and query bytes through the scoped lease', async () => { + const app = makeApp(); + + const statusRes = await ssoHeaders( + request(app) + .post(`/api/tp/${lease.id}/status`) + .buffer(true) + .parse(binaryParser), + ); + expect(statusRes.status).toBe(200); + expect(Buffer.from(statusRes.body)).toEqual(Buffer.from([1, 2, 3])); + + const queryBody = Buffer.from([9, 8, 7]); + const queryRes = await ssoHeaders( + request(app) + .post(`/api/tp/${lease.id}/query`) + .set('Content-Type', 'application/x-protobuf') + .send(queryBody) + .buffer(true) + .parse(binaryParser), + ); + expect(queryRes.status).toBe(200); + expect(Buffer.from(queryRes.body)).toEqual(queryBody); + }); + + it('hides leases from other workspaces', async () => { + const app = makeApp(); + + const res = await ssoHeaders( + request(app).post(`/api/tp/${lease.id}/status`), + 'workspace-b', + ); + + expect(res.status).toBe(404); + }); + + it('tunnels websocket upgrades to the leased trace processor port', async () => { + const app = makeApp(); + const proxyServer = http.createServer(app); + const proxySockets = new Set(); + proxyServer.on('connection', socket => { + proxySockets.add(socket); + socket.on('close', () => proxySockets.delete(socket)); + }); + proxyServer.on('upgrade', (req, socket, head) => { + if (handleTraceProcessorProxyUpgrade(req, socket, head)) return; + socket.destroy(); + }); + const proxyPort = await listen(proxyServer); + + try { + const echoed = await new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + reject(new Error('websocket tunnel timed out')); + }, 5000); + const finish = (value: string): void => { + clearTimeout(timeout); + resolve(value); + }; + const fail = (error: Error): void => { + clearTimeout(timeout); + reject(error); + }; + const req = http.request({ + host: '127.0.0.1', + port: proxyPort, + path: `/api/tp/${lease.id}/websocket`, + headers: { + Upgrade: 'websocket', + Connection: 'Upgrade', + 'Sec-WebSocket-Key': 'dGhlIHNhbXBsZSBub25jZQ==', + 'Sec-WebSocket-Version': '13', + 'X-SmartPerfetto-SSO-User-Id': 'user-a', + 'X-SmartPerfetto-SSO-Tenant-Id': 'tenant-a', + 'X-SmartPerfetto-SSO-Workspace-Id': 'workspace-a', + 'X-SmartPerfetto-SSO-Roles': 'analyst', + 'X-SmartPerfetto-SSO-Scopes': 'trace:read,trace:write', + 'X-Window-Id': 'window-a', + }, + }); + req.setTimeout(5000, () => { + req.destroy(new Error('websocket tunnel timed out')); + }); + req.on('response', res => { + fail(new Error(`expected upgrade, got HTTP ${res.statusCode}`)); + }); + req.on('upgrade', (res, socket, head) => { + let buffer = `HTTP/1.1 ${res.statusCode} ${res.statusMessage}\r\n`; + if (head.length > 0) buffer += head.toString('utf8'); + socket.setTimeout(5000, () => { + socket.destroy(new Error('websocket echo timed out')); + }); + socket.on('error', fail); + socket.on('data', chunk => { + buffer += chunk.toString('utf8'); + if (buffer.includes('ping-through-proxy')) { + socket.destroy(); + finish(buffer); + } + }); + socket.write('ping-through-proxy'); + }); + req.on('error', fail); + req.end(); + }); + + expect(echoed).toContain('101 Switching Protocols'); + expect(echoed).toContain('ping-through-proxy'); + } finally { + for (const socket of proxySockets) { + socket.destroy(); + } + await closeServer(proxyServer); + } + }); +}); diff --git a/backend/src/routes/simpleTraceRoutes.ts b/backend/src/routes/simpleTraceRoutes.ts index 0734e461..3be8af6f 100644 --- a/backend/src/routes/simpleTraceRoutes.ts +++ b/backend/src/routes/simpleTraceRoutes.ts @@ -55,6 +55,17 @@ class TraceUploadTooLargeError extends Error { } } +interface FinalizedTraceUploadInfo { + uploadTime?: Date; + status?: string; + port?: number; + leaseId?: string; + leaseState?: string; + processor?: { + status: string; + }; +} + function parsePositiveInteger(value: string | undefined): number | null { if (!value) return null; const parsed = Number.parseInt(value, 10); @@ -154,7 +165,7 @@ async function finalizeTraceUpload( size: number, finalPath: string, context: RequestContext, -) { +): Promise { const tps = getTraceProcessorService(); if (tps) { @@ -356,6 +367,8 @@ router.post( status: traceInfo?.status || 'ready', // Port for HTTP RPC mode - frontend can connect to trace_processor directly port: traceInfo?.port, + leaseId: traceInfo?.leaseId, + leaseState: traceInfo?.leaseState, processorStatus: traceInfo?.processor?.status, } }); @@ -449,6 +462,8 @@ router.post('/upload-url', async (req, res) => { uploadedAt: traceInfo?.uploadTime || new Date().toISOString(), status: traceInfo?.status || 'ready', port: traceInfo?.port, + leaseId: traceInfo?.leaseId, + leaseState: traceInfo?.leaseState, processorStatus: traceInfo?.processor?.status, } }); diff --git a/backend/src/routes/traceProcessorProxyRoutes.ts b/backend/src/routes/traceProcessorProxyRoutes.ts new file mode 100644 index 00000000..d4e97d6b --- /dev/null +++ b/backend/src/routes/traceProcessorProxyRoutes.ts @@ -0,0 +1,459 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later +// Copyright (C) 2024-2026 Gracker (Chris) +// This file is part of SmartPerfetto. See LICENSE for details. + +import express, { Router, type Request, type Response } from 'express'; +import type { IncomingMessage } from 'http'; +import net, { type Socket } from 'net'; +import type { Duplex } from 'stream'; +import { resolveFeatureConfig, serverConfig } from '../config'; +import { + authenticate, + DEFAULT_DEV_USER_ID, + DEFAULT_TENANT_ID, + DEFAULT_WORKSPACE_ID, + requireRequestContext, + type RequestContext, + type RequestContextAuthType, +} from '../middleware/auth'; +import { getTraceProcessorService } from '../services/traceProcessorService'; +import { + getTraceProcessorLeaseStore, + type TraceProcessorHolderInput, + type TraceProcessorLeaseRecord, + type TraceProcessorLeaseState, +} from '../services/traceProcessorLeaseStore'; +import { hasRbacPermission, sendForbidden } from '../services/rbac'; +import { EnterpriseSsoService } from '../services/enterpriseSsoService'; +import { EnterpriseApiKeyService } from '../services/enterpriseApiKeyService'; + +const router = Router(); +const READY_STATES = new Set(['ready', 'idle', 'active']); +const CONFLICT_STATES = new Set(['draining', 'released', 'failed']); +const HOP_BY_HOP_HEADERS = new Set([ + 'connection', + 'host', + 'keep-alive', + 'proxy-authenticate', + 'proxy-authorization', + 'te', + 'trailer', + 'transfer-encoding', + 'upgrade', +]); + +class TraceProcessorProxyError extends Error { + constructor(readonly statusCode: number, message: string) { + super(message); + this.name = 'TraceProcessorProxyError'; + } +} + +interface RequestIdentity { + userId: string; + authType: RequestContextAuthType; + tenantId?: string; + workspaceId?: string; + roles?: string[]; + scopes?: string[]; +} + +interface ProxyTarget { + lease: TraceProcessorLeaseRecord; + port: number; +} + +function sanitizeContextId(value: unknown): string { + if (typeof value !== 'string') return ''; + return value.trim().replace(/[^a-zA-Z0-9._:-]/g, '').slice(0, 128); +} + +function getHeader(req: IncomingMessage, name: string): string { + const value = req.headers[name.toLowerCase()]; + if (Array.isArray(value)) return value[0] || ''; + return typeof value === 'string' ? value : ''; +} + +function getFirstHeader(req: IncomingMessage, names: string[]): string { + for (const name of names) { + const value = getHeader(req, name); + if (value.trim()) return value; + } + return ''; +} + +function parseHeaderList(req: IncomingMessage, names: string[], fallback: string[]): string[] { + const raw = getFirstHeader(req, names); + if (!raw.trim()) return fallback; + const parsed = raw + .split(',') + .map(value => sanitizeContextId(value)) + .filter(Boolean); + return parsed.length > 0 ? parsed : fallback; +} + +function trustedHeadersEnabled(): boolean { + const value = process.env.SMARTPERFETTO_SSO_TRUSTED_HEADERS; + return ['1', 'true', 'yes', 'on', 'enabled'].includes(String(value || '').trim().toLowerCase()); +} + +function defaultRolesForAuthType(authType: RequestContextAuthType): string[] { + return authType === 'dev' ? ['org_admin'] : ['analyst']; +} + +function defaultScopesForAuthType(authType: RequestContextAuthType): string[] { + return authType === 'dev' + ? ['*'] + : ['trace:read', 'trace:write', 'agent:run', 'report:read']; +} + +function resolveTrustedSsoIdentity(req: IncomingMessage): RequestIdentity | null { + if (!trustedHeadersEnabled()) return null; + const userId = sanitizeContextId(getFirstHeader(req, [ + 'x-smartperfetto-sso-user-id', + 'x-sso-user-id', + 'x-auth-request-user', + ])); + if (!userId) return null; + + return { + userId, + authType: 'sso', + tenantId: sanitizeContextId(getFirstHeader(req, [ + 'x-smartperfetto-sso-tenant-id', + 'x-sso-tenant-id', + 'x-tenant-id', + ])) || undefined, + workspaceId: sanitizeContextId(getFirstHeader(req, [ + 'x-smartperfetto-sso-workspace-id', + 'x-sso-workspace-id', + 'x-workspace-id', + ])) || undefined, + roles: parseHeaderList(req, [ + 'x-smartperfetto-sso-roles', + 'x-sso-roles', + ], defaultRolesForAuthType('sso')), + scopes: parseHeaderList(req, [ + 'x-smartperfetto-sso-scopes', + 'x-sso-scopes', + ], defaultScopesForAuthType('sso')), + }; +} + +function queryValue(req: IncomingMessage, key: string): string { + const url = new URL(req.url || '/', 'http://127.0.0.1'); + return sanitizeContextId(url.searchParams.get(key) || ''); +} + +function contextFromIdentity(req: IncomingMessage, identity: RequestIdentity): RequestContext { + const authType = identity.authType; + const requestId = + sanitizeContextId(getHeader(req, 'x-request-id')) || + `ws-${Date.now()}-${Math.random().toString(36).slice(2, 10)}`; + const windowId = + sanitizeContextId(getHeader(req, 'x-window-id')) || + queryValue(req, 'windowId') || + undefined; + + return { + tenantId: + identity.tenantId || + sanitizeContextId(getFirstHeader(req, ['x-tenant-id', 'x-sso-tenant-id'])) || + queryValue(req, 'tenantId') || + DEFAULT_TENANT_ID, + workspaceId: + identity.workspaceId || + sanitizeContextId(getFirstHeader(req, ['x-workspace-id', 'x-sso-workspace-id'])) || + queryValue(req, 'workspaceId') || + DEFAULT_WORKSPACE_ID, + userId: identity.userId, + authType, + roles: identity.roles ?? defaultRolesForAuthType(authType), + scopes: identity.scopes ?? defaultScopesForAuthType(authType), + requestId, + ...(windowId ? { windowId } : {}), + }; +} + +function resolveUpgradeRequestContext(req: IncomingMessage): RequestContext | null { + const trustedIdentity = resolveTrustedSsoIdentity(req); + if (trustedIdentity) return contextFromIdentity(req, trustedIdentity); + + try { + const ssoIdentity = EnterpriseSsoService.getInstance() + .resolveRequestIdentityFromRequest(req as Request); + if (ssoIdentity) return contextFromIdentity(req, ssoIdentity); + } catch { + // Fall through to API key or dev fallback. + } + + try { + const apiKeyIdentity = EnterpriseApiKeyService.getInstance() + .resolveRequestIdentityFromRequest(req as Request); + if (apiKeyIdentity) return contextFromIdentity(req, apiKeyIdentity); + } catch { + // Fall through to dev fallback. + } + + if (!resolveFeatureConfig().enterprise) { + return contextFromIdentity(req, { + userId: queryValue(req, 'userId') || DEFAULT_DEV_USER_ID, + authType: 'dev', + }); + } + + return null; +} + +function leaseScopeFromContext(context: RequestContext) { + return { + tenantId: context.tenantId, + workspaceId: context.workspaceId, + userId: context.userId, + }; +} + +function frontendHolderForContext(context: RequestContext): TraceProcessorHolderInput { + const holderRef = context.windowId || context.requestId || context.userId; + return { + holderType: 'frontend_http_rpc', + holderRef, + windowId: context.windowId, + metadata: { + requestId: context.requestId, + proxy: 'trace_processor', + }, + }; +} + +function ensureTraceRead(context: RequestContext): void { + if (!hasRbacPermission(context, 'trace:read')) { + throw new TraceProcessorProxyError(403, 'Trace processor proxy requires trace:read permission'); + } +} + +async function resolveProxyTargetForContext( + context: RequestContext, + leaseId: string, +): Promise { + ensureTraceRead(context); + + const store = getTraceProcessorLeaseStore(); + const scope = leaseScopeFromContext(context); + let lease = store.getLeaseById(scope, leaseId); + if (!lease) { + throw new TraceProcessorProxyError(404, 'Trace processor lease not found'); + } + if (CONFLICT_STATES.has(lease.state)) { + throw new TraceProcessorProxyError(409, `Trace processor lease is ${lease.state}`); + } + + lease = store.acquireHolderForLease(scope, lease.id, frontendHolderForContext(context)); + + if (!READY_STATES.has(lease.state)) { + throw new TraceProcessorProxyError(503, `Trace processor lease is not ready (${lease.state})`); + } + + const traceProcessorService = getTraceProcessorService(); + const trace = await traceProcessorService.getOrLoadTrace(lease.traceId); + if (!trace) { + throw new TraceProcessorProxyError(404, 'Trace not found for trace processor lease'); + } + + const traceWithPort = traceProcessorService.getTraceWithPort(lease.traceId); + if (!traceWithPort?.port) { + throw new TraceProcessorProxyError(503, 'Trace processor HTTP RPC port is not ready'); + } + + return { + lease, + port: traceWithPort.port, + }; +} + +async function resolveProxyTarget(req: Request, leaseId: string): Promise { + const context = requireRequestContext(req); + return resolveProxyTargetForContext(context, leaseId); +} + +function copyUpstreamResponseHeaders(upstream: globalThis.Response, res: Response): void { + for (const [name, value] of upstream.headers.entries()) { + if (HOP_BY_HOP_HEADERS.has(name.toLowerCase())) continue; + if (name.toLowerCase() === 'content-length') continue; + res.setHeader(name, value); + } +} + +function requestBody(req: Request): Buffer | undefined { + if (Buffer.isBuffer(req.body)) return req.body; + if (req.body instanceof Uint8Array) return Buffer.from(req.body); + return undefined; +} + +function upstreamRequestHeaders(req: Request, body: Buffer | undefined): Record { + const headers: Record = {}; + const contentType = req.get('content-type'); + if (contentType) headers['content-type'] = contentType; + const accept = req.get('accept'); + if (accept) headers.accept = accept; + if (body) headers['content-length'] = String(body.length); + return headers; +} + +async function forwardHttpRpc(req: Request, res: Response, upstreamPath: '/status' | '/query'): Promise { + const leaseId = sanitizeContextId(req.params.leaseId); + if (!leaseId) { + res.status(400).json({ success: false, error: 'leaseId is required' }); + return; + } + + const target = await resolveProxyTarget(req, leaseId); + const body = requestBody(req); + const upstream = await fetch(`http://127.0.0.1:${target.port}${upstreamPath}`, { + method: 'POST', + headers: upstreamRequestHeaders(req, body), + ...(body ? { body } : {}), + }); + const responseBody = Buffer.from(await upstream.arrayBuffer()); + copyUpstreamResponseHeaders(upstream, res); + res.status(upstream.status).send(responseBody); +} + +function sendProxyError(res: Response, error: unknown): void { + if (error instanceof TraceProcessorProxyError) { + if (error.statusCode === 403) { + sendForbidden(res, error.message); + return; + } + res.status(error.statusCode).json({ + success: false, + error: error.message, + }); + return; + } + const message = error instanceof Error ? error.message : String(error); + console.error('[TraceProcessorProxy] Proxy error:', message); + res.status(502).json({ + success: false, + error: 'Trace processor proxy failed', + details: message, + }); +} + +function writeUpgradeError(socket: Duplex, statusCode: number, message: string): void { + if (!socket.writable) return; + socket.write( + `HTTP/1.1 ${statusCode} ${message}\r\n` + + 'Connection: close\r\n' + + 'Content-Type: text/plain; charset=utf-8\r\n' + + `Content-Length: ${Buffer.byteLength(message)}\r\n` + + '\r\n' + + message, + ); + socket.end(); +} + +function websocketRequestHeaders(req: IncomingMessage, targetPort: number): string[] { + const headers = [ + `Host: 127.0.0.1:${targetPort}`, + 'Connection: Upgrade', + 'Upgrade: websocket', + ]; + + for (let i = 0; i < req.rawHeaders.length; i += 2) { + const name = req.rawHeaders[i]; + const value = req.rawHeaders[i + 1]; + if (!name || value === undefined) continue; + if (HOP_BY_HOP_HEADERS.has(name.toLowerCase())) continue; + headers.push(`${name}: ${value}`); + } + + return headers; +} + +async function proxyWebSocket( + req: IncomingMessage, + socket: Duplex, + head: Buffer, + leaseId: string, +): Promise { + const context = resolveUpgradeRequestContext(req); + if (!context) { + throw new TraceProcessorProxyError(401, 'Trace processor WebSocket requires authentication'); + } + + const target = await resolveProxyTargetForContext(context, leaseId); + const upstream = net.connect({ + host: '127.0.0.1', + port: target.port, + }); + + upstream.once('connect', () => { + const request = [ + 'GET /websocket HTTP/1.1', + ...websocketRequestHeaders(req, target.port), + '', + '', + ].join('\r\n'); + upstream.write(request); + if (head.length > 0) upstream.write(head); + socket.pipe(upstream); + upstream.pipe(socket); + }); + + upstream.once('error', (error) => { + if (!socket.destroyed) { + writeUpgradeError(socket, 502, `Trace processor WebSocket proxy failed: ${error.message}`); + } + }); + socket.once('error', () => upstream.destroy()); + socket.once('close', () => upstream.destroy()); + upstream.once('close', () => socket.destroy()); +} + +router.use(authenticate); + +router.post('/:leaseId/status', express.raw({ type: '*/*', limit: serverConfig.bodyLimit }), async (req, res) => { + try { + await forwardHttpRpc(req, res, '/status'); + } catch (error) { + sendProxyError(res, error); + } +}); + +router.post('/:leaseId/query', express.raw({ type: '*/*', limit: serverConfig.bodyLimit }), async (req, res) => { + try { + await forwardHttpRpc(req, res, '/query'); + } catch (error) { + sendProxyError(res, error); + } +}); + +export function handleTraceProcessorProxyUpgrade( + req: IncomingMessage, + socket: Duplex, + head: Buffer, +): boolean { + const url = new URL(req.url || '/', 'http://127.0.0.1'); + const match = url.pathname.match(/^\/api\/tp\/([^/]+)\/websocket$/); + if (!match) return false; + + const leaseId = sanitizeContextId(decodeURIComponent(match[1])); + if (!leaseId) { + writeUpgradeError(socket, 400, 'leaseId is required'); + return true; + } + + void proxyWebSocket(req, socket, head, leaseId).catch((error) => { + if (error instanceof TraceProcessorProxyError) { + writeUpgradeError(socket, error.statusCode, error.message); + return; + } + const message = error instanceof Error ? error.message : String(error); + console.error('[TraceProcessorProxy] WebSocket proxy error:', message); + writeUpgradeError(socket, 502, 'Trace processor WebSocket proxy failed'); + }); + return true; +} + +export default router; diff --git a/backend/src/services/traceProcessorLeaseStore.ts b/backend/src/services/traceProcessorLeaseStore.ts index df548a0b..acb61deb 100644 --- a/backend/src/services/traceProcessorLeaseStore.ts +++ b/backend/src/services/traceProcessorLeaseStore.ts @@ -251,6 +251,38 @@ export class TraceProcessorLeaseStore { })(); } + acquireHolderForLease( + scope: EnterpriseRepositoryScope, + leaseId: string, + holder: TraceProcessorHolderInput, + options: { now?: number } = {}, + ): TraceProcessorLeaseRecord { + assertNonEmpty(scope.tenantId, 'tenantId'); + assertNonEmpty(scope.workspaceId, 'workspaceId'); + assertNonEmpty(leaseId, 'leaseId'); + assertNonEmpty(holder.holderRef, 'holderRef'); + + return this.db.transaction(() => { + const now = options.now ?? Date.now(); + const lease = this.mustGetLease(scope, leaseId); + const state = lease.state as TraceProcessorLeaseState; + if (!ACQUIRABLE_STATES.has(state)) { + throw new Error(`Trace processor lease ${lease.id} is not acquirable (${state})`); + } + + this.upsertHolder(lease.id, holder, now); + const ttl = resolveHolderTtlPolicy(holder); + const expiresAt = Math.max(lease.expires_at ?? 0, now + ttl.idleTtlMs); + this.db.prepare(` + UPDATE trace_processor_leases + SET heartbeat_at = ?, expires_at = ? + WHERE id = ? + `).run(now, expiresAt, lease.id); + this.refreshLeaseActivityState(scope, lease.id); + return this.getLeaseById(scope, lease.id)!; + })(); + } + heartbeatHolder( scope: EnterpriseRepositoryScope, leaseId: string, diff --git a/docs/features/enterprise-multi-tenant/README.md b/docs/features/enterprise-multi-tenant/README.md index 5037f4f9..1c097251 100644 --- a/docs/features/enterprise-multi-tenant/README.md +++ b/docs/features/enterprise-multi-tenant/README.md @@ -61,11 +61,11 @@ - [x] 4.2 上传链路 stream 化(§11.1):URL 上传不再 `arrayBuffer()` 全量进内存;本地上传与 RAM/磁盘策略联动;temp file 用 traceId/uuid + 原子 rename - [ ] 4.3 RSS benchmark:scroll/startup/ANR/memory/heapprofd/vendor 大 trace × 100MB/500MB/1GB,记录启动 RSS、load peak、query headroom - [x] 4.4 `TraceProcessorLease` 4 类 holder(frontend_http_rpc / agent_run / report_generation / manual_register)+ 状态机(§11.4)+ 分级 TTL -- [ ] 4.5 Backend proxy `/api/tp/:leaseId/{status,websocket,query}`(§11.3) - - [ ] 4.5.1 同时支持 `/status` HTTP / `/websocket` 二进制双向 / `/query` HTTP - - [ ] 4.5.2 前端 `HttpRpcEngine` 抽象成 `HttpRpcTarget`(direct-port + backend-lease-proxy) - - [ ] 4.5.3 同一 frontend WebSocket holder 内严格 FIFO,不做乱序返回 - - [ ] 4.5.4 企业模式禁用浏览器直连裸 `127.0.0.1:9100-9900` +- [x] 4.5 Backend proxy `/api/tp/:leaseId/{status,websocket,query}`(§11.3) + - [x] 4.5.1 同时支持 `/status` HTTP / `/websocket` 二进制双向 / `/query` HTTP + - [x] 4.5.2 前端 `HttpRpcEngine` 抽象成 `HttpRpcTarget`(direct-port + backend-lease-proxy) + - [x] 4.5.3 同一 frontend WebSocket holder 内严格 FIFO,不做乱序返回 + - [x] 4.5.4 企业模式禁用浏览器直连裸 `127.0.0.1:9100-9900` - [ ] 4.6 SQL Worker(worker_thread / child process)+ non-preemptive priority queue P0/P1/P2(§11.7) - [ ] 4.7 RAM budget + 启动后实测 RSS + admission control(§11.5),暴露 stats endpoint - [ ] 4.8 Shared / isolated 自动判定(§11.6),UI 明确展示队列/共享/独立状态 diff --git a/frontend/index.html b/frontend/index.html index ae89c65b..e0b83724 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -9,7 +9,7 @@ - +