diff --git a/backend/src/routes/__tests__/traceProcessorProxyRoutes.test.ts b/backend/src/routes/__tests__/traceProcessorProxyRoutes.test.ts index 44059539..6c68cc41 100644 --- a/backend/src/routes/__tests__/traceProcessorProxyRoutes.test.ts +++ b/backend/src/routes/__tests__/traceProcessorProxyRoutes.test.ts @@ -43,6 +43,12 @@ let upstreamSockets: Set; let upstreamPort: number; let lease: TraceProcessorLeaseRecord; let queryRawMock: jest.MockedFunction<(traceId: string, body: Buffer, options?: any) => Promise>; +let restartLeaseMock: jest.MockedFunction<( + traceId: string, + leaseId: string, + mode: string, + scope: EnterpriseRepositoryScope, +) => Promise>; function restoreEnvValue(key: string, value: string | undefined): void { if (value === undefined) { @@ -69,6 +75,17 @@ function ssoHeaders(req: request.Test, workspaceId = 'workspace-a'): request.Tes .set('X-Window-Id', 'window-a'); } +function adminHeaders(req: request.Test, workspaceId = 'workspace-a'): request.Test { + return req + .set('X-SmartPerfetto-SSO-User-Id', 'admin-a') + .set('X-SmartPerfetto-SSO-Email', 'admin-a@example.test') + .set('X-SmartPerfetto-SSO-Tenant-Id', 'tenant-a') + .set('X-SmartPerfetto-SSO-Workspace-Id', workspaceId) + .set('X-SmartPerfetto-SSO-Roles', 'workspace_admin') + .set('X-SmartPerfetto-SSO-Scopes', 'trace:read,trace:write,runtime:manage') + .set('X-Window-Id', 'admin-window'); +} + function binaryParser(res: request.Response, callback: (err: Error | null, body: Buffer) => void): void { const chunks: Buffer[] = []; res.on('data', chunk => chunks.push(Buffer.from(chunk))); @@ -176,6 +193,13 @@ beforeEach(async () => { seedEnterpriseGraph(); lease = createReadyLease(); queryRawMock = jest.fn(async (_traceId: string, body: Buffer) => body); + restartLeaseMock = jest.fn(async (traceId, leaseId, _mode, restartScope) => { + const store = getTraceProcessorLeaseStore(); + store.markCrashed(restartScope, leaseId); + store.markRestarting(restartScope, leaseId); + store.markReady(restartScope, leaseId); + return { id: 'restarted-processor', traceId }; + }); setTraceProcessorServiceForTests({ getOrLoadTrace: jest.fn(async () => ({ id: 'trace-a', @@ -204,6 +228,7 @@ beforeEach(async () => { processor: {status: 'ready'}, })), queryRaw: queryRawMock, + restartLease: restartLeaseMock, } as any); }); @@ -273,6 +298,78 @@ describe('trace processor lease proxy routes', () => { expect(res.status).toBe(404); }); + it('requires runtime manage permission for lease admin actions', async () => { + const app = makeApp(); + + const res = await ssoHeaders( + request(app) + .post(`/api/tp/${lease.id}/restart`) + .send({ reason: 'hung query' }), + ); + + expect(res.status).toBe(403); + expect(restartLeaseMock).not.toHaveBeenCalled(); + }); + + it('lets workspace admins drain a scoped lease and block new proxy work', async () => { + const app = makeApp(); + + const drainRes = await adminHeaders( + request(app) + .post(`/api/tp/${lease.id}/drain`) + .send({ reason: 'hung query' }), + ); + + expect(drainRes.status).toBe(200); + expect(drainRes.body).toMatchObject({ + success: true, + action: 'drain', + reason: 'hung query', + lease: { + id: lease.id, + state: 'draining', + }, + }); + + const blockedRes = await adminHeaders( + request(app).post(`/api/tp/${lease.id}/status`), + ); + expect(blockedRes.status).toBe(409); + expect(blockedRes.body.error).toBe('Trace processor lease is draining'); + }); + + it('lets workspace admins restart a scoped lease without changing the lease id', async () => { + const app = makeApp(); + + const restartRes = await adminHeaders( + request(app) + .post(`/api/tp/${lease.id}/restart`) + .send({ reason: 'operator restart after hung query' }), + ); + + expect(restartRes.status).toBe(200); + expect(restartRes.body).toMatchObject({ + success: true, + action: 'restart', + reason: 'operator restart after hung query', + lease: { + id: lease.id, + traceId: 'trace-a', + state: 'active', + }, + }); + expect(restartLeaseMock).toHaveBeenCalledWith( + 'trace-a', + lease.id, + 'shared', + { + tenantId: 'tenant-a', + workspaceId: 'workspace-a', + userId: 'admin-a', + }, + ); + }); + it('tunnels websocket upgrades to the leased trace processor port', async () => { const app = makeApp(); const proxyServer = http.createServer(app); diff --git a/backend/src/routes/traceProcessorProxyRoutes.ts b/backend/src/routes/traceProcessorProxyRoutes.ts index 282156b7..23dd826f 100644 --- a/backend/src/routes/traceProcessorProxyRoutes.ts +++ b/backend/src/routes/traceProcessorProxyRoutes.ts @@ -239,6 +239,17 @@ function ensureTraceRead(context: RequestContext): void { } } +function ensureRuntimeManage(context: RequestContext): void { + if (!hasRbacPermission(context, 'runtime:manage')) { + throw new TraceProcessorProxyError(403, 'Trace processor lease admin requires runtime:manage permission'); + } +} + +function leaseAdminReason(req: Request): string | undefined { + const reason = typeof req.body?.reason === 'string' ? req.body.reason.trim() : ''; + return reason ? reason.slice(0, 500) : undefined; +} + async function resolveProxyTargetForContext( context: RequestContext, leaseId: string, @@ -360,6 +371,69 @@ async function forwardQueryRpc(req: Request, res: Response): Promise { res.status(200).send(responseBody); } +async function drainLease(req: Request, res: Response): Promise { + const leaseId = sanitizeContextId(req.params.leaseId); + if (!leaseId) { + res.status(400).json({ success: false, error: 'leaseId is required' }); + return; + } + + const context = requireRequestContext(req); + ensureRuntimeManage(context); + const scope = leaseScopeFromContext(context); + const store = getTraceProcessorLeaseStore(); + const lease = store.getLeaseById(scope, leaseId); + if (!lease) { + throw new TraceProcessorProxyError(404, 'Trace processor lease not found'); + } + if (lease.state === 'released' || lease.state === 'failed') { + throw new TraceProcessorProxyError(409, `Trace processor lease is ${lease.state}`); + } + + const drained = store.beginDraining(scope, lease.id); + res.json({ + success: true, + action: 'drain', + reason: leaseAdminReason(req), + lease: drained, + }); +} + +async function restartLease(req: Request, res: Response): Promise { + const leaseId = sanitizeContextId(req.params.leaseId); + if (!leaseId) { + res.status(400).json({ success: false, error: 'leaseId is required' }); + return; + } + + const context = requireRequestContext(req); + ensureRuntimeManage(context); + const scope = leaseScopeFromContext(context); + const store = getTraceProcessorLeaseStore(); + const lease = store.getLeaseById(scope, leaseId); + if (!lease) { + throw new TraceProcessorProxyError(404, 'Trace processor lease not found'); + } + if (CONFLICT_STATES.has(lease.state)) { + throw new TraceProcessorProxyError(409, `Trace processor lease is ${lease.state}`); + } + + const traceProcessorService = getTraceProcessorService(); + const trace = await traceProcessorService.getOrLoadTrace(lease.traceId); + if (!trace) { + throw new TraceProcessorProxyError(404, 'Trace not found for trace processor lease'); + } + + await traceProcessorService.restartLease(lease.traceId, lease.id, lease.mode, scope); + const restarted = store.getLeaseById(scope, lease.id); + res.json({ + success: true, + action: 'restart', + reason: leaseAdminReason(req), + lease: restarted, + }); +} + function sendProxyError(res: Response, error: unknown): void { if (error instanceof TraceProcessorProxyError) { if (error.statusCode === 403) { @@ -472,6 +546,22 @@ router.post('/:leaseId/query', express.raw({ type: '*/*', limit: serverConfig.bo } }); +router.post('/:leaseId/drain', express.json({ limit: '32kb' }), async (req, res) => { + try { + await drainLease(req, res); + } catch (error) { + sendProxyError(res, error); + } +}); + +router.post('/:leaseId/restart', express.json({ limit: '32kb' }), async (req, res) => { + try { + await restartLease(req, res); + } catch (error) { + sendProxyError(res, error); + } +}); + export function handleTraceProcessorProxyUpgrade( req: IncomingMessage, socket: Duplex, diff --git a/backend/src/services/__tests__/rbac.test.ts b/backend/src/services/__tests__/rbac.test.ts index 20c34b60..9b2d0d58 100644 --- a/backend/src/services/__tests__/rbac.test.ts +++ b/backend/src/services/__tests__/rbac.test.ts @@ -35,8 +35,10 @@ describe('enterprise RBAC matrix', () => { expect(hasRbacPermission(context('workspace_admin'), 'trace:delete_any')).toBe(true); expect(hasRbacPermission(context('workspace_admin'), 'provider:manage_workspace')).toBe(true); expect(hasRbacPermission(context('workspace_admin'), 'provider:manage_org')).toBe(false); + expect(hasRbacPermission(context('workspace_admin'), 'runtime:manage')).toBe(true); expect(hasRbacPermission(context('org_admin'), 'provider:manage_org')).toBe(true); + expect(hasRbacPermission(context('org_admin'), 'runtime:manage')).toBe(true); }); test('lets explicit scopes authorize API key contexts without granting unrelated permissions', () => { diff --git a/backend/src/services/__tests__/traceProcessorLeaseProcessorRouting.test.ts b/backend/src/services/__tests__/traceProcessorLeaseProcessorRouting.test.ts index 098c50dd..9215b943 100644 --- a/backend/src/services/__tests__/traceProcessorLeaseProcessorRouting.test.ts +++ b/backend/src/services/__tests__/traceProcessorLeaseProcessorRouting.test.ts @@ -224,6 +224,53 @@ describe('TraceProcessorService lease restart supervisor', () => { } }); + it('allows an explicit admin restart of a ready lease processor', async () => { + const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'smartperfetto-lease-admin-restart-')); + try { + const traceId = 'trace-admin-restart'; + const tracePath = path.join(tmpDir, `${traceId}.trace`); + await fs.writeFile(tracePath, 'trace bytes'); + + db = new Database(':memory:'); + applyEnterpriseMinimalSchema(db); + seedEnterpriseGraph(db, traceId); + const store = new TraceProcessorLeaseStore(db); + setTraceProcessorLeaseStoreForTests(store); + const leaseId = createActiveLease(store, traceId); + + const service = new TraceProcessorService(tmpDir, { + backoffMs: [0], + jitterMs: 0, + }); + await service.initializeUploadWithId(traceId, 'trace-admin-restart.trace', 11, tracePath); + + const current = fakeProcessor('current-processor', traceId); + (service as any).processors.set(`${traceId}:lease:${leaseId}`, current); + + const restarted = fakeProcessor('restarted-processor', traceId); + const createSpy = jest + .spyOn(TraceProcessorFactory, 'create') + .mockResolvedValue(restarted as any); + + await expect(service.restartLease(traceId, leaseId, 'isolated', scope)) + .resolves.toBe(restarted); + + expect(createSpy).toHaveBeenCalledTimes(1); + expect(createSpy).toHaveBeenCalledWith(traceId, tracePath, expect.objectContaining({ + processorKey: `${traceId}:lease:${leaseId}`, + leaseId, + leaseMode: 'isolated', + })); + expect(current.destroy).toHaveBeenCalledTimes(1); + expect(store.getLeaseById(scope, leaseId)).toMatchObject({ + id: leaseId, + state: 'active', + }); + } finally { + await fs.rm(tmpDir, { recursive: true, force: true }); + } + }); + it('marks the lease failed after the 1s/5s/15s backoff restart attempts all fail', async () => { const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'smartperfetto-lease-restart-fail-')); try { diff --git a/backend/src/services/rbac.ts b/backend/src/services/rbac.ts index 8294e691..333c290b 100644 --- a/backend/src/services/rbac.ts +++ b/backend/src/services/rbac.ts @@ -21,7 +21,8 @@ export type RbacPermission = | 'report:delete' | 'provider:manage_workspace' | 'provider:manage_org' - | 'audit:read'; + | 'audit:read' + | 'runtime:manage'; const ROLE_PERMISSIONS: Record = { viewer: ['trace:read', 'report:read'], @@ -44,6 +45,7 @@ const ROLE_PERMISSIONS: Record = { 'report:delete', 'provider:manage_workspace', 'audit:read', + 'runtime:manage', ], org_admin: [ 'trace:read', @@ -57,6 +59,7 @@ const ROLE_PERMISSIONS: Record = { 'provider:manage_workspace', 'provider:manage_org', 'audit:read', + 'runtime:manage', ], }; diff --git a/backend/src/services/traceProcessorService.ts b/backend/src/services/traceProcessorService.ts index 97134209..e54704da 100644 --- a/backend/src/services/traceProcessorService.ts +++ b/backend/src/services/traceProcessorService.ts @@ -637,6 +637,20 @@ export class TraceProcessorService extends EventEmitter { return this.createProcessor(traceId, { leaseId, mode }); } + public async restartLease( + traceId: string, + leaseId: string, + mode: TraceProcessorLeaseMode | string, + leaseScope: EnterpriseRepositoryScope, + ): Promise { + return this.restartLeaseProcessor(traceId, { + traceId, + leaseId, + mode, + leaseScope, + }); + } + private async restartLeaseProcessor( traceId: string, leaseContext: TraceProcessorLeaseQueryContext,