Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
"prepack": "npm run build",
"typecheck": "tsc --noEmit",
"test": "jest",
"test:core": "jest --runInBand --forceExit src/agent/communication/__tests__/agentMessageBus.test.ts src/agent/core/executors/__tests__/strategyExecutor.test.ts src/agent/core/executors/__tests__/hypothesisExecutor.test.ts src/agent/context/__tests__/enhancedSessionContext.test.ts src/tests/adbTools.test.ts src/services/__tests__/sessionLogger.test.ts src/services/__tests__/traceAnalysisSkillConfig.test.ts src/agent/agents/domain/__tests__/registry.test.ts src/agentv3/__tests__/sqlIncludeInjector.test.ts src/agentv3/__tests__/analysisPatternMemory.test.ts src/agentv3/__tests__/claudeRuntimeRuntimeSnapshots.test.ts src/middleware/__tests__/auth.test.ts src/services/__tests__/rbac.test.ts src/routes/__tests__/agentRoutesRbac.test.ts src/routes/__tests__/ownerGuardRoutes.test.ts src/routes/__tests__/requestContextRouteCoverage.test.ts src/middleware/__tests__/legacyApiCompatibility.test.ts src/services/__tests__/enterpriseDb.test.ts src/services/__tests__/enterpriseSchema.test.ts src/services/__tests__/enterpriseRepository.test.ts src/services/__tests__/processRss.test.ts src/services/__tests__/traceProcessorLeaseStore.test.ts src/scripts/__tests__/benchmarkTraceProcessorRss.test.ts src/services/__tests__/enterpriseKnowledgeScope.test.ts src/services/__tests__/enterpriseMigration.test.ts src/services/__tests__/runtimeSnapshotStore.test.ts src/services/providerManager/__tests__/localSecretStore.test.ts src/services/providerManager/__tests__/enterpriseProviderStore.test.ts src/routes/__tests__/enterpriseTraceMetadataRoutes.test.ts src/routes/__tests__/traceProcessorProxyRoutes.test.ts src/routes/__tests__/enterpriseReportRoutes.test.ts src/routes/__tests__/enterpriseRestartPersistence.test.ts",
"test:core": "jest --runInBand --forceExit src/agent/communication/__tests__/agentMessageBus.test.ts src/agent/core/executors/__tests__/strategyExecutor.test.ts src/agent/core/executors/__tests__/hypothesisExecutor.test.ts src/agent/context/__tests__/enhancedSessionContext.test.ts src/tests/adbTools.test.ts src/services/__tests__/sessionLogger.test.ts src/services/__tests__/traceAnalysisSkillConfig.test.ts src/agent/agents/domain/__tests__/registry.test.ts src/agentv3/__tests__/sqlIncludeInjector.test.ts src/agentv3/__tests__/analysisPatternMemory.test.ts src/agentv3/__tests__/claudeRuntimeRuntimeSnapshots.test.ts src/middleware/__tests__/auth.test.ts src/services/__tests__/rbac.test.ts src/routes/__tests__/agentRoutesRbac.test.ts src/routes/__tests__/ownerGuardRoutes.test.ts src/routes/__tests__/requestContextRouteCoverage.test.ts src/middleware/__tests__/legacyApiCompatibility.test.ts src/services/__tests__/enterpriseDb.test.ts src/services/__tests__/enterpriseSchema.test.ts src/services/__tests__/enterpriseRepository.test.ts src/services/__tests__/processRss.test.ts src/services/__tests__/traceProcessorLeaseStore.test.ts src/services/__tests__/traceProcessorSqlWorker.test.ts src/scripts/__tests__/benchmarkTraceProcessorRss.test.ts src/services/__tests__/enterpriseKnowledgeScope.test.ts src/services/__tests__/enterpriseMigration.test.ts src/services/__tests__/runtimeSnapshotStore.test.ts src/services/providerManager/__tests__/localSecretStore.test.ts src/services/providerManager/__tests__/enterpriseProviderStore.test.ts src/routes/__tests__/enterpriseTraceMetadataRoutes.test.ts src/routes/__tests__/traceProcessorProxyRoutes.test.ts src/routes/__tests__/enterpriseReportRoutes.test.ts src/routes/__tests__/enterpriseRestartPersistence.test.ts",
"test:watch": "jest --watch",
"test:coverage": "jest --coverage",
"test:unit": "jest --testPathPatterns=src/tests",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ let upstreamServer: Server;
let upstreamSockets: Set<NetSocket>;
let upstreamPort: number;
let lease: TraceProcessorLeaseRecord;
let queryRawMock: jest.MockedFunction<(traceId: string, body: Buffer, options?: any) => Promise<Buffer>>;

function restoreEnvValue(key: string, value: string | undefined): void {
if (value === undefined) {
Expand Down Expand Up @@ -174,6 +175,7 @@ beforeEach(async () => {

seedEnterpriseGraph();
lease = createReadyLease();
queryRawMock = jest.fn(async (_traceId: string, body: Buffer) => body);
setTraceProcessorServiceForTests({
getOrLoadTrace: jest.fn(async () => ({
id: 'trace-a',
Expand All @@ -191,6 +193,7 @@ beforeEach(async () => {
port: upstreamPort,
processor: {status: 'ready'},
})),
queryRaw: queryRawMock,
} as any);
});

Expand Down Expand Up @@ -233,6 +236,11 @@ describe('trace processor lease proxy routes', () => {
);
expect(queryRes.status).toBe(200);
expect(Buffer.from(queryRes.body)).toEqual(queryBody);
expect(queryRawMock).toHaveBeenCalledWith(
'trace-a',
queryBody,
{ priority: 'p0' },
);
});

it('hides leases from other workspaces', async () => {
Expand Down
11 changes: 8 additions & 3 deletions backend/src/routes/perfettoSqlRoutes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { PerfettoSqlSkill } from '../services/perfettoSqlSkill';
import { PerfettoSkillType } from '../types/perfettoSql';
import type { PerfettoSqlRequest } from '../types/perfettoSql';
import { SkillAnalysisAdapter, createSkillAnalysisAdapter } from '../services/skillEngine/skillAnalysisAdapter';
import { normalizeTraceProcessorQueryPriority } from '../services/traceProcessorSqlWorker';

const router = express.Router();

Expand Down Expand Up @@ -559,7 +560,7 @@ router.get('/skills', (_req, res) => {
*/
router.post('/sql', async (req, res) => {
try {
const { traceId, sql } = req.body;
const { traceId, sql, priority: priorityInput } = req.body;

if (!traceId) {
return res.status(400).json({
Expand All @@ -584,7 +585,11 @@ router.post('/sql', async (req, res) => {
});
}

const result = await traceProcessorService.query(traceId, sql);
const priority = normalizeTraceProcessorQueryPriority(
req.get('x-smartperfetto-query-priority') || priorityInput,
'p0',
);
const result = await traceProcessorService.query(traceId, sql, { priority });

if (result.error) {
return res.status(400).json({
Expand Down Expand Up @@ -803,4 +808,4 @@ router.post('/systemserver', async (req, res) => {
// Export services for use in other parts of the app
export { perfettoSqlSkill, traceProcessorService };

export default router;
export default router;
26 changes: 25 additions & 1 deletion backend/src/routes/traceProcessorProxyRoutes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
type TraceProcessorLeaseRecord,
type TraceProcessorLeaseState,
} from '../services/traceProcessorLeaseStore';
import { normalizeTraceProcessorQueryPriority } from '../services/traceProcessorSqlWorker';
import { hasRbacPermission, sendForbidden } from '../services/rbac';
import { EnterpriseSsoService } from '../services/enterpriseSsoService';
import { EnterpriseApiKeyService } from '../services/enterpriseApiKeyService';
Expand Down Expand Up @@ -319,6 +320,29 @@ async function forwardHttpRpc(req: Request, res: Response, upstreamPath: '/statu
res.status(upstream.status).send(responseBody);
}

async function forwardQueryRpc(req: Request, res: Response): Promise<void> {
const leaseId = sanitizeContextId(req.params.leaseId);
if (!leaseId) {
res.status(400).json({ success: false, error: 'leaseId is required' });
return;
}

const target = await resolveProxyTarget(req, leaseId);
const body = requestBody(req);
if (!body) {
res.status(400).json({ success: false, error: 'query protobuf body is required' });
return;
}

const priority = normalizeTraceProcessorQueryPriority(
req.get('x-smartperfetto-query-priority') || req.query.priority,
'p0',
);
const responseBody = await getTraceProcessorService().queryRaw(target.lease.traceId, body, { priority });
res.setHeader('content-type', 'application/x-protobuf');
res.status(200).send(responseBody);
}

function sendProxyError(res: Response, error: unknown): void {
if (error instanceof TraceProcessorProxyError) {
if (error.statusCode === 403) {
Expand Down Expand Up @@ -423,7 +447,7 @@ router.post('/:leaseId/status', express.raw({ type: '*/*', limit: serverConfig.b

router.post('/:leaseId/query', express.raw({ type: '*/*', limit: serverConfig.bodyLimit }), async (req, res) => {
try {
await forwardHttpRpc(req, res, '/query');
await forwardQueryRpc(req, res);
} catch (error) {
sendProxyError(res, error);
}
Expand Down
27 changes: 26 additions & 1 deletion backend/src/services/__tests__/traceProcessorProtobuf.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
import { decodeQueryResult, decodeVarint, encodeVarint } from '../traceProcessorProtobuf';
import {
decodeQueryArgsSql,
decodeQueryResult,
decodeVarint,
encodeQueryArgs,
encodeQueryResult,
encodeVarint,
} from '../traceProcessorProtobuf';

describe('traceProcessorProtobuf', () => {
describe('varint encoding and decoding', () => {
Expand Down Expand Up @@ -40,5 +47,23 @@ describe('traceProcessorProtobuf', () => {
expect(result.columnNames).toEqual(['x']);
expect(result.rows).toEqual([[-1]]);
});

it('round-trips locally encoded query results', () => {
const encoded = encodeQueryResult({
columnNames: ['i', 'f', 's', 'n', 'b'],
rows: [[-1, 1.5, 'text', null, Buffer.from([1, 2, 3])]],
});

const result = decodeQueryResult(encoded);

expect(result.columnNames).toEqual(['i', 'f', 's', 'n', 'b']);
expect(result.rows).toEqual([[-1, 1.5, 'text', null, Buffer.from([1, 2, 3])]]);
});
});

describe('query args decoding', () => {
it('decodes the sql_query field', () => {
expect(decodeQueryArgsSql(encodeQueryArgs('SELECT 42'))).toBe('SELECT 42');
});
});
});
131 changes: 131 additions & 0 deletions backend/src/services/__tests__/traceProcessorSqlWorker.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// SPDX-License-Identifier: AGPL-3.0-or-later
// Copyright (C) 2024-2026 Gracker (Chris)
// This file is part of SmartPerfetto. See LICENSE for details.

import { afterEach, describe, expect, it } from '@jest/globals';
import {
decodeQueryArgsSql,
encodeQueryResult,
} from '../traceProcessorProtobuf';
import {
normalizeTraceProcessorQueryPriority,
TraceProcessorSqlWorker,
} from '../traceProcessorSqlWorker';

function deferred<T>() {
let resolve!: (value: T) => void;
let reject!: (reason?: unknown) => void;
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
return { promise, resolve, reject };
}

async function flushPromises(): Promise<void> {
await Promise.resolve();
await Promise.resolve();
}

function encodedSqlResult(sql: string): Buffer {
return encodeQueryResult({
columnNames: ['sql'],
rows: [[sql]],
});
}

describe('TraceProcessorSqlWorker', () => {
let worker: TraceProcessorSqlWorker | null = null;

afterEach(() => {
worker?.destroy();
worker = null;
});

it('does not preempt the running query, but runs queued P0 before queued P1/P2', async () => {
const started: string[] = [];
const gates = new Map<string, ReturnType<typeof deferred<Buffer>>>();

worker = new TraceProcessorSqlWorker({
processorId: 'processor-a',
traceId: 'trace-a',
port: 1,
forceInline: true,
rawExecutor: async request => {
const sql = decodeQueryArgsSql(request.body);
started.push(sql);
const gate = gates.get(sql) || deferred<Buffer>();
gates.set(sql, gate);
return gate.promise;
},
});

const p2 = worker.query('SELECT p2', { priority: 'p2' });
await flushPromises();
expect(started).toEqual(['SELECT p2']);

const p1 = worker.query('SELECT p1', { priority: 'p1' });
const p0 = worker.query('SELECT p0', { priority: 'p0' });
await flushPromises();
expect(started).toEqual(['SELECT p2']);
expect(worker.getStats()).toMatchObject({
running: true,
queuedP0: 1,
queuedP1: 1,
queuedP2: 0,
});

gates.get('SELECT p2')!.resolve(encodedSqlResult('SELECT p2'));
await expect(p2).resolves.toMatchObject({ rows: [['SELECT p2']] });
await flushPromises();
expect(started).toEqual(['SELECT p2', 'SELECT p0']);

gates.get('SELECT p0')!.resolve(encodedSqlResult('SELECT p0'));
await expect(p0).resolves.toMatchObject({ rows: [['SELECT p0']] });
await flushPromises();
expect(started).toEqual(['SELECT p2', 'SELECT p0', 'SELECT p1']);

gates.get('SELECT p1')!.resolve(encodedSqlResult('SELECT p1'));
await expect(p1).resolves.toMatchObject({ rows: [['SELECT p1']] });
});

it('keeps FIFO order inside the same priority level', async () => {
const started: string[] = [];
const gates = new Map<string, ReturnType<typeof deferred<Buffer>>>();

worker = new TraceProcessorSqlWorker({
processorId: 'processor-b',
traceId: 'trace-b',
port: 1,
forceInline: true,
rawExecutor: async request => {
const sql = decodeQueryArgsSql(request.body);
started.push(sql);
const gate = gates.get(sql) || deferred<Buffer>();
gates.set(sql, gate);
return gate.promise;
},
});

const first = worker.query('SELECT first', { priority: 'p1' });
await flushPromises();
const second = worker.query('SELECT second', { priority: 'p1' });
await flushPromises();
expect(started).toEqual(['SELECT first']);

gates.get('SELECT first')!.resolve(encodedSqlResult('SELECT first'));
await expect(first).resolves.toMatchObject({ rows: [['SELECT first']] });
await flushPromises();
expect(started).toEqual(['SELECT first', 'SELECT second']);

gates.get('SELECT second')!.resolve(encodedSqlResult('SELECT second'));
await expect(second).resolves.toMatchObject({ rows: [['SELECT second']] });
});

it('normalizes public priority names', () => {
expect(normalizeTraceProcessorQueryPriority('interactive')).toBe('p0');
expect(normalizeTraceProcessorQueryPriority('agent')).toBe('p1');
expect(normalizeTraceProcessorQueryPriority('report')).toBe('p2');
expect(normalizeTraceProcessorQueryPriority('unknown', 'p2')).toBe('p2');
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {

function okResult(rows: unknown[][] = []): QueryResult {
return {
columns: [],
columns: rows[0]?.map((_, index) => `c${index}`) ?? [],
rows,
durationMs: 1,
};
Expand Down
80 changes: 80 additions & 0 deletions backend/src/services/traceProcessorHttpRpcClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// SPDX-License-Identifier: AGPL-3.0-or-later
// Copyright (C) 2024-2026 Gracker (Chris)
// This file is part of SmartPerfetto. See LICENSE for details.

import http from 'http';

export interface TraceProcessorHttpRpcRequest {
hostname?: string;
port: number;
path?: '/query' | '/status';
body: Buffer;
timeoutMs: number;
}

export async function executeTraceProcessorHttpRpcRaw(
request: TraceProcessorHttpRpcRequest,
): Promise<Buffer> {
const hostname = request.hostname || '127.0.0.1';
const path = request.path || '/query';

return new Promise((resolve, reject) => {
let settled = false;
let req: http.ClientRequest | null = null;
let wallClockTimer: NodeJS.Timeout | undefined;

const finish = (error: Error | null, body?: Buffer): void => {
if (settled) return;
settled = true;
clearTimeout(wallClockTimer);
if (error) {
reject(error);
} else {
resolve(body || Buffer.alloc(0));
}
};

wallClockTimer = setTimeout(() => {
req?.destroy();
finish(new Error('Query timeout'));
}, request.timeoutMs);
if (typeof (wallClockTimer as any).unref === 'function') {
(wallClockTimer as any).unref();
}

req = http.request({
hostname,
port: request.port,
path,
method: 'POST',
headers: {
'Content-Type': 'application/x-protobuf',
'Content-Length': request.body.length,
},
timeout: request.timeoutMs,
}, (res) => {
const chunks: Buffer[] = [];
res.on('data', chunk => chunks.push(Buffer.from(chunk)));
res.on('end', () => {
const responseBody = Buffer.concat(chunks);
if (res.statusCode !== 200) {
finish(new Error(`HTTP ${res.statusCode}: ${responseBody.toString('utf8')}`));
return;
}
finish(null, responseBody);
});
});

req.on('error', error => {
finish(error);
});

req.on('timeout', () => {
req?.destroy();
finish(new Error('Query timeout'));
});

req.write(request.body);
req.end();
});
}
Loading