Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions backend/src/routes/__tests__/traceProcessorProxyRoutes.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ let upstreamSockets: Set<NetSocket>;
let upstreamPort: number;
let lease: TraceProcessorLeaseRecord;
let queryRawMock: jest.MockedFunction<(traceId: string, body: Buffer, options?: any) => Promise<Buffer>>;
let restartLeaseMock: jest.MockedFunction<(
traceId: string,
leaseId: string,
mode: string,
scope: EnterpriseRepositoryScope,
) => Promise<unknown>>;

function restoreEnvValue(key: string, value: string | undefined): void {
if (value === undefined) {
Expand All @@ -69,6 +75,17 @@ function ssoHeaders(req: request.Test, workspaceId = 'workspace-a'): request.Tes
.set('X-Window-Id', 'window-a');
}

function adminHeaders(req: request.Test, workspaceId = 'workspace-a'): request.Test {
return req
.set('X-SmartPerfetto-SSO-User-Id', 'admin-a')
.set('X-SmartPerfetto-SSO-Email', 'admin-a@example.test')
.set('X-SmartPerfetto-SSO-Tenant-Id', 'tenant-a')
.set('X-SmartPerfetto-SSO-Workspace-Id', workspaceId)
.set('X-SmartPerfetto-SSO-Roles', 'workspace_admin')
.set('X-SmartPerfetto-SSO-Scopes', 'trace:read,trace:write,runtime:manage')
.set('X-Window-Id', 'admin-window');
}

function binaryParser(res: request.Response, callback: (err: Error | null, body: Buffer) => void): void {
const chunks: Buffer[] = [];
res.on('data', chunk => chunks.push(Buffer.from(chunk)));
Expand Down Expand Up @@ -176,6 +193,13 @@ beforeEach(async () => {
seedEnterpriseGraph();
lease = createReadyLease();
queryRawMock = jest.fn(async (_traceId: string, body: Buffer) => body);
restartLeaseMock = jest.fn(async (traceId, leaseId, _mode, restartScope) => {
const store = getTraceProcessorLeaseStore();
store.markCrashed(restartScope, leaseId);
store.markRestarting(restartScope, leaseId);
store.markReady(restartScope, leaseId);
return { id: 'restarted-processor', traceId };
});
setTraceProcessorServiceForTests({
getOrLoadTrace: jest.fn(async () => ({
id: 'trace-a',
Expand Down Expand Up @@ -204,6 +228,7 @@ beforeEach(async () => {
processor: {status: 'ready'},
})),
queryRaw: queryRawMock,
restartLease: restartLeaseMock,
} as any);
});

Expand Down Expand Up @@ -273,6 +298,78 @@ describe('trace processor lease proxy routes', () => {
expect(res.status).toBe(404);
});

it('requires runtime manage permission for lease admin actions', async () => {
const app = makeApp();

const res = await ssoHeaders(
request(app)
.post(`/api/tp/${lease.id}/restart`)
.send({ reason: 'hung query' }),
);

expect(res.status).toBe(403);
expect(restartLeaseMock).not.toHaveBeenCalled();
});

it('lets workspace admins drain a scoped lease and block new proxy work', async () => {
const app = makeApp();

const drainRes = await adminHeaders(
request(app)
.post(`/api/tp/${lease.id}/drain`)
.send({ reason: 'hung query' }),
);

expect(drainRes.status).toBe(200);
expect(drainRes.body).toMatchObject({
success: true,
action: 'drain',
reason: 'hung query',
lease: {
id: lease.id,
state: 'draining',
},
});

const blockedRes = await adminHeaders(
request(app).post(`/api/tp/${lease.id}/status`),
);
expect(blockedRes.status).toBe(409);
expect(blockedRes.body.error).toBe('Trace processor lease is draining');
});

it('lets workspace admins restart a scoped lease without changing the lease id', async () => {
const app = makeApp();

const restartRes = await adminHeaders(
request(app)
.post(`/api/tp/${lease.id}/restart`)
.send({ reason: 'operator restart after hung query' }),
);

expect(restartRes.status).toBe(200);
expect(restartRes.body).toMatchObject({
success: true,
action: 'restart',
reason: 'operator restart after hung query',
lease: {
id: lease.id,
traceId: 'trace-a',
state: 'active',
},
});
expect(restartLeaseMock).toHaveBeenCalledWith(
'trace-a',
lease.id,
'shared',
{
tenantId: 'tenant-a',
workspaceId: 'workspace-a',
userId: 'admin-a',
},
);
});

it('tunnels websocket upgrades to the leased trace processor port', async () => {
const app = makeApp();
const proxyServer = http.createServer(app);
Expand Down
90 changes: 90 additions & 0 deletions backend/src/routes/traceProcessorProxyRoutes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,17 @@ function ensureTraceRead(context: RequestContext): void {
}
}

function ensureRuntimeManage(context: RequestContext): void {
if (!hasRbacPermission(context, 'runtime:manage')) {
throw new TraceProcessorProxyError(403, 'Trace processor lease admin requires runtime:manage permission');
}
}

function leaseAdminReason(req: Request): string | undefined {
const reason = typeof req.body?.reason === 'string' ? req.body.reason.trim() : '';
return reason ? reason.slice(0, 500) : undefined;
}

async function resolveProxyTargetForContext(
context: RequestContext,
leaseId: string,
Expand Down Expand Up @@ -360,6 +371,69 @@ async function forwardQueryRpc(req: Request, res: Response): Promise<void> {
res.status(200).send(responseBody);
}

async function drainLease(req: Request, res: Response): Promise<void> {
const leaseId = sanitizeContextId(req.params.leaseId);
if (!leaseId) {
res.status(400).json({ success: false, error: 'leaseId is required' });
return;
}

const context = requireRequestContext(req);
ensureRuntimeManage(context);
const scope = leaseScopeFromContext(context);
const store = getTraceProcessorLeaseStore();
const lease = store.getLeaseById(scope, leaseId);
if (!lease) {
throw new TraceProcessorProxyError(404, 'Trace processor lease not found');
}
if (lease.state === 'released' || lease.state === 'failed') {
throw new TraceProcessorProxyError(409, `Trace processor lease is ${lease.state}`);
}

const drained = store.beginDraining(scope, lease.id);
res.json({
success: true,
action: 'drain',
reason: leaseAdminReason(req),
lease: drained,
});
}

async function restartLease(req: Request, res: Response): Promise<void> {
const leaseId = sanitizeContextId(req.params.leaseId);
if (!leaseId) {
res.status(400).json({ success: false, error: 'leaseId is required' });
return;
}

const context = requireRequestContext(req);
ensureRuntimeManage(context);
const scope = leaseScopeFromContext(context);
const store = getTraceProcessorLeaseStore();
const lease = store.getLeaseById(scope, leaseId);
if (!lease) {
throw new TraceProcessorProxyError(404, 'Trace processor lease not found');
}
if (CONFLICT_STATES.has(lease.state)) {
throw new TraceProcessorProxyError(409, `Trace processor lease is ${lease.state}`);
}

const traceProcessorService = getTraceProcessorService();
const trace = await traceProcessorService.getOrLoadTrace(lease.traceId);
if (!trace) {
throw new TraceProcessorProxyError(404, 'Trace not found for trace processor lease');
}

await traceProcessorService.restartLease(lease.traceId, lease.id, lease.mode, scope);
const restarted = store.getLeaseById(scope, lease.id);
res.json({
success: true,
action: 'restart',
reason: leaseAdminReason(req),
lease: restarted,
});
}

function sendProxyError(res: Response, error: unknown): void {
if (error instanceof TraceProcessorProxyError) {
if (error.statusCode === 403) {
Expand Down Expand Up @@ -472,6 +546,22 @@ router.post('/:leaseId/query', express.raw({ type: '*/*', limit: serverConfig.bo
}
});

router.post('/:leaseId/drain', express.json({ limit: '32kb' }), async (req, res) => {
try {
await drainLease(req, res);
} catch (error) {
sendProxyError(res, error);
}
});

router.post('/:leaseId/restart', express.json({ limit: '32kb' }), async (req, res) => {
try {
await restartLease(req, res);
} catch (error) {
sendProxyError(res, error);
}
});

export function handleTraceProcessorProxyUpgrade(
req: IncomingMessage,
socket: Duplex,
Expand Down
2 changes: 2 additions & 0 deletions backend/src/services/__tests__/rbac.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ describe('enterprise RBAC matrix', () => {
expect(hasRbacPermission(context('workspace_admin'), 'trace:delete_any')).toBe(true);
expect(hasRbacPermission(context('workspace_admin'), 'provider:manage_workspace')).toBe(true);
expect(hasRbacPermission(context('workspace_admin'), 'provider:manage_org')).toBe(false);
expect(hasRbacPermission(context('workspace_admin'), 'runtime:manage')).toBe(true);

expect(hasRbacPermission(context('org_admin'), 'provider:manage_org')).toBe(true);
expect(hasRbacPermission(context('org_admin'), 'runtime:manage')).toBe(true);
});

test('lets explicit scopes authorize API key contexts without granting unrelated permissions', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,53 @@ describe('TraceProcessorService lease restart supervisor', () => {
}
});

it('allows an explicit admin restart of a ready lease processor', async () => {
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'smartperfetto-lease-admin-restart-'));
try {
const traceId = 'trace-admin-restart';
const tracePath = path.join(tmpDir, `${traceId}.trace`);
await fs.writeFile(tracePath, 'trace bytes');

db = new Database(':memory:');
applyEnterpriseMinimalSchema(db);
seedEnterpriseGraph(db, traceId);
const store = new TraceProcessorLeaseStore(db);
setTraceProcessorLeaseStoreForTests(store);
const leaseId = createActiveLease(store, traceId);

const service = new TraceProcessorService(tmpDir, {
backoffMs: [0],
jitterMs: 0,
});
await service.initializeUploadWithId(traceId, 'trace-admin-restart.trace', 11, tracePath);

const current = fakeProcessor('current-processor', traceId);
(service as any).processors.set(`${traceId}:lease:${leaseId}`, current);

const restarted = fakeProcessor('restarted-processor', traceId);
const createSpy = jest
.spyOn(TraceProcessorFactory, 'create')
.mockResolvedValue(restarted as any);

await expect(service.restartLease(traceId, leaseId, 'isolated', scope))
.resolves.toBe(restarted);

expect(createSpy).toHaveBeenCalledTimes(1);
expect(createSpy).toHaveBeenCalledWith(traceId, tracePath, expect.objectContaining({
processorKey: `${traceId}:lease:${leaseId}`,
leaseId,
leaseMode: 'isolated',
}));
expect(current.destroy).toHaveBeenCalledTimes(1);
expect(store.getLeaseById(scope, leaseId)).toMatchObject({
id: leaseId,
state: 'active',
});
} finally {
await fs.rm(tmpDir, { recursive: true, force: true });
}
});

it('marks the lease failed after the 1s/5s/15s backoff restart attempts all fail', async () => {
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'smartperfetto-lease-restart-fail-'));
try {
Expand Down
5 changes: 4 additions & 1 deletion backend/src/services/rbac.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ export type RbacPermission =
| 'report:delete'
| 'provider:manage_workspace'
| 'provider:manage_org'
| 'audit:read';
| 'audit:read'
| 'runtime:manage';

const ROLE_PERMISSIONS: Record<string, RbacPermission[]> = {
viewer: ['trace:read', 'report:read'],
Expand All @@ -44,6 +45,7 @@ const ROLE_PERMISSIONS: Record<string, RbacPermission[]> = {
'report:delete',
'provider:manage_workspace',
'audit:read',
'runtime:manage',
],
org_admin: [
'trace:read',
Expand All @@ -57,6 +59,7 @@ const ROLE_PERMISSIONS: Record<string, RbacPermission[]> = {
'provider:manage_workspace',
'provider:manage_org',
'audit:read',
'runtime:manage',
],
};

Expand Down
14 changes: 14 additions & 0 deletions backend/src/services/traceProcessorService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,20 @@ export class TraceProcessorService extends EventEmitter {
return this.createProcessor(traceId, { leaseId, mode });
}

public async restartLease(
traceId: string,
leaseId: string,
mode: TraceProcessorLeaseMode | string,
leaseScope: EnterpriseRepositoryScope,
): Promise<TraceProcessor> {
return this.restartLeaseProcessor(traceId, {
traceId,
leaseId,
mode,
leaseScope,
});
}

private async restartLeaseProcessor(
traceId: string,
leaseContext: TraceProcessorLeaseQueryContext,
Expand Down