diff --git a/backend/src/routes/__tests__/enterpriseTraceMetadataRoutes.test.ts b/backend/src/routes/__tests__/enterpriseTraceMetadataRoutes.test.ts index 4375d0d6..a8c3bce0 100644 --- a/backend/src/routes/__tests__/enterpriseTraceMetadataRoutes.test.ts +++ b/backend/src/routes/__tests__/enterpriseTraceMetadataRoutes.test.ts @@ -405,6 +405,149 @@ describe('enterprise trace metadata routes', () => { })); }); + it('reports isolated report-generation lease queue length separately from the frontend shared queue', async () => { + const app = makeApp(); + const sourceTracePath = path.join(tmpDir, 'report-queue.trace'); + await fs.writeFile(sourceTracePath, 'report-queue-content'); + let currentTraceId: string | null = null; + let reportLeaseId: string | null = null; + + fakeTraceProcessorService.getTraceWithPort.mockImplementation((...args: unknown[]) => { + const traceId = String(args[0]); + currentTraceId = traceId; + return { + id: traceId, + filename: 'report-queue.trace', + size: 'report-queue-content'.length, + uploadTime: new Date('2026-05-08T00:00:00.000Z'), + status: 'ready', + port: 9124, + processor: { status: 'ready' }, + }; + }); + fakeTraceProcessorService.getAllTraces.mockImplementation(() => currentTraceId ? [{ + id: currentTraceId, + filename: 'report-queue.trace', + size: 'report-queue-content'.length, + uploadTime: new Date('2026-05-08T00:00:00.000Z'), + status: 'ready', + }] : []); + jest.spyOn(TraceProcessorFactory, 'getStats').mockImplementation(() => ({ + count: currentTraceId ? (reportLeaseId ? 2 : 1) : 0, + traceIds: currentTraceId ? [currentTraceId] : [], + processorKeys: currentTraceId + ? [ + currentTraceId, + ...(reportLeaseId ? [`${currentTraceId}:lease:${reportLeaseId}`] : []), + ] + : [], + processors: currentTraceId ? [ + { + kind: 'owned_process', + processorId: 'shared-processor', + traceId: currentTraceId, + status: 'ready', + activeQueries: 0, + httpPort: 9124, + leaseMode: 'shared', + rssBytes: 32 * 1024 * 1024, + rssSampleSource: 'ps' as const, + sqlWorker: { + running: true, + queuedP0: 1, + queuedP1: 0, + queuedP2: 0, + usesWorkerThread: true, + }, + }, + ...(reportLeaseId ? [{ + kind: 'owned_process' as const, + processorId: 'report-processor', + traceId: currentTraceId, + status: 'ready' as const, + activeQueries: 0, + httpPort: 9125, + leaseId: reportLeaseId, + leaseMode: 'isolated', + rssBytes: 96 * 1024 * 1024, + rssSampleSource: 'ps' as const, + sqlWorker: { + running: true, + queuedP0: 0, + queuedP1: 0, + queuedP2: 7, + usesWorkerThread: true, + }, + }] : []), + ] : [], + ramBudget: { + enabled: true, + totalMemoryBytes: 8 * 1024 * 1024 * 1024, + nodeRssBytes: 128 * 1024 * 1024, + osSafetyReserveBytes: 1024 * 1024 * 1024, + uploadReserveBytes: 0, + machineFactor: 0.60, + budgetBytes: 2 * 1024 * 1024 * 1024, + observedProcessorRssBytes: reportLeaseId ? 128 * 1024 * 1024 : 32 * 1024 * 1024, + availableForNewLeaseBytes: 1900 * 1024 * 1024, + activeProcessorCount: currentTraceId ? (reportLeaseId ? 2 : 1) : 0, + unknownRssProcessorCount: 0, + estimateMultiplier: 1.5, + minEstimateBytes: 128 * 1024 * 1024, + }, + })); + + const uploadRes = await ssoHeaders( + request(app) + .post('/api/traces/upload') + .attach('file', sourceTracePath), + ); + + expect(uploadRes.status).toBe(200); + const traceId = uploadRes.body.trace.id as string; + const scope = { + tenantId: 'tenant-a', + workspaceId: 'workspace-a', + userId: 'user-a', + }; + const store = getTraceProcessorLeaseStore(); + const reportLease = store.acquireHolder(scope, traceId, { + holderType: 'report_generation', + holderRef: 'report-queue-a', + reportId: 'report-queue-a', + }, { mode: 'isolated' }); + reportLeaseId = reportLease.id; + store.markStarting(scope, reportLease.id); + store.markReady(scope, reportLease.id); + + const statsRes = await ssoHeaders(request(app).get('/api/traces/stats')); + + expect(statsRes.status).toBe(200); + expect(statsRes.body.stats.processors.queueLength).toBe(8); + const leaseItems = statsRes.body.stats.leases.items as Array<{ + id: string; + mode: string; + queueLength: number; + holders: Array<{ holderType: string }>; + }>; + const frontendLease = leaseItems.find(item => item.mode === 'shared'); + const reportLeaseItem = leaseItems.find(item => item.id === reportLeaseId); + expect(frontendLease).toEqual(expect.objectContaining({ + mode: 'shared', + queueLength: 1, + })); + expect(frontendLease?.holders).toEqual([ + expect.objectContaining({ holderType: 'frontend_http_rpc' }), + ]); + expect(reportLeaseItem).toEqual(expect.objectContaining({ + mode: 'isolated', + queueLength: 7, + })); + expect(reportLeaseItem?.holders).toEqual([ + expect.objectContaining({ holderType: 'report_generation' }), + ]); + }); + it('streams URL uploads into scoped trace storage without buffering the response body', async () => { const app = makeApp(); const traceBytes = 'url-trace-content'; diff --git a/backend/src/routes/simpleTraceRoutes.ts b/backend/src/routes/simpleTraceRoutes.ts index 55b9b938..3dec422d 100644 --- a/backend/src/routes/simpleTraceRoutes.ts +++ b/backend/src/routes/simpleTraceRoutes.ts @@ -16,7 +16,7 @@ import { resolveFeatureConfig } from '../config'; import { attachRequestContext, requireRequestContext, type RequestContext } from '../middleware/auth'; import { getTraceProcessorService } from '../services/traceProcessorService'; import { getPortPool } from '../services/portPool'; -import { TraceProcessorFactory } from '../services/workingTraceProcessor'; +import { TraceProcessorFactory, type TraceProcessorRuntimeStats } from '../services/workingTraceProcessor'; import { openEnterpriseDb } from '../services/enterpriseDb'; import { recordEnterpriseAuditEvent } from '../services/enterpriseAuditService'; import { @@ -151,8 +151,21 @@ function recordLeaseRssFromProcessor( return getTraceProcessorLeaseStore().recordRss(leaseScopeFromContext(context), lease.id, rssBytes); } -function queueLengthForTrace(traceId: string): number { - return sharedQueueLengthForTrace(traceId, TraceProcessorFactory.getStats().processors); +function processorQueueLength(processor: TraceProcessorRuntimeStats): number { + const worker = processor.sqlWorker; + return worker ? worker.queuedP0 + worker.queuedP1 + worker.queuedP2 : 0; +} + +function queueLengthForLease( + lease: TraceProcessorLeaseRecord, + processors: TraceProcessorRuntimeStats[], +): number { + if (lease.mode === 'isolated') { + return processors + .filter(processor => processor.traceId === lease.traceId && processor.leaseId === lease.id) + .reduce((sum, processor) => sum + processorQueueLength(processor), 0); + } + return sharedQueueLengthForTrace(lease.traceId, processors); } function recordTraceCleanupAudit( @@ -769,7 +782,7 @@ router.get('/stats', async (req, res) => { mode: lease.mode, state: lease.state, rssBytes: lease.rssBytes, - queueLength: queueLengthForTrace(lease.traceId), + queueLength: queueLengthForLease(lease, processors), holderCount: lease.holderCount, holders: lease.holders.map(holder => ({ holderType: holder.holderType,