Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ describe('verifyEnterpriseMultiTenantWindows script', () => {
await fs.rm(tempRoot, { recursive: true, force: true });
});

test('covers current D1/D2/D4/D5/D6/D7/D8/D9/D10 isolation invariants without invoking a real provider', async () => {
test('covers current D1-D10 isolation invariants without invoking a real provider', async () => {
const tracePath = path.join(tempRoot, 'fixture.pftrace');
const uploadRoot = path.join(tempRoot, 'uploads');
const outputPath = path.join(tempRoot, 'report.json');
Expand All @@ -48,6 +48,7 @@ describe('verifyEnterpriseMultiTenantWindows script', () => {
expect(report.checks).toEqual({
D1: true,
D2: true,
D3: true,
D4: true,
D5: true,
D6: true,
Expand All @@ -59,6 +60,7 @@ describe('verifyEnterpriseMultiTenantWindows script', () => {
expect(Object.values(report.scenarios.D1.checks)).toEqual(expect.arrayContaining([true]));
expect(Object.values(report.scenarios.D1.checks).every(Boolean)).toBe(true);
expect(Object.values(report.scenarios.D2.checks).every(Boolean)).toBe(true);
expect(Object.values(report.scenarios.D3.checks).every(Boolean)).toBe(true);
expect(Object.values(report.scenarios.D4.checks).every(Boolean)).toBe(true);
expect(Object.values(report.scenarios.D5.checks).every(Boolean)).toBe(true);
expect(Object.values(report.scenarios.D6.checks).every(Boolean)).toBe(true);
Expand All @@ -71,6 +73,15 @@ describe('verifyEnterpriseMultiTenantWindows script', () => {
reportUrl: expect.stringMatching(/^\/api\/reports\//),
crossTenantReplayCount: 0,
}));
expect(report.scenarios.D3.details).toEqual(expect.objectContaining({
queryDispatchOrder: ['running-p2', 'p0', 'p1', 'queued-p2'],
queuedStats: expect.objectContaining({
running: true,
queuedP0: 1,
queuedP1: 1,
queuedP2: 1,
}),
}));
expect(report.scenarios.D8.details).toEqual(expect.objectContaining({
providerSnapshotChanged: true,
sdkSessionReusable: false,
Expand All @@ -93,6 +104,6 @@ describe('verifyEnterpriseMultiTenantWindows script', () => {

const traceFiles = (await fs.readdir(path.join(uploadRoot, 'traces')))
.filter(file => file.endsWith('.trace'));
expect(traceFiles).toHaveLength(14);
expect(traceFiles).toHaveLength(16);
});
});
210 changes: 209 additions & 1 deletion backend/src/scripts/verifyEnterpriseMultiTenantWindows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@ import {
TP_RAM_BUDGET_BYTES_ENV,
decideTraceProcessorAdmission,
} from '../services/traceProcessorRamBudget';
import {
decodeQueryArgsSql,
encodeQueryResult,
} from '../services/traceProcessorProtobuf';
import { TraceProcessorSqlWorker } from '../services/traceProcessorSqlWorker';
import { ownerFieldsFromContext } from '../services/resourceOwnership';
import type { RequestContext } from '../middleware/auth';

type ScenarioName = 'D1' | 'D2' | 'D4' | 'D5' | 'D6' | 'D7' | 'D8' | 'D9' | 'D10';
type ScenarioName = 'D1' | 'D2' | 'D3' | 'D4' | 'D5' | 'D6' | 'D7' | 'D8' | 'D9' | 'D10';

interface VerifyOptions {
tracePath?: string;
Expand Down Expand Up @@ -546,6 +551,206 @@ async function scenarioD2(
};
}

async function scenarioD3(
db: Database.Database,
tracePath: string,
userAWindow1: WindowContext,
userBWindow1: WindowContext,
): Promise<ScenarioReport> {
const aUpload = await simulateUpload(db, userAWindow1, tracePath, 'd3-user-a-timeline.pftrace');
const bUpload = await simulateUpload(db, userBWindow1, tracePath, 'd3-user-b-full-agent.pftrace');
const bRun = createAnalysisRun(db, userBWindow1, bUpload, 'running');
const store = new TraceProcessorLeaseStore(db);
const aScope = leaseScope(userAWindow1);
const bScope = leaseScope(userBWindow1);
const frontendHolderRef = userAWindow1.context.windowId ?? userAWindow1.label;

let frontendLease = store.acquireHolder(aScope, aUpload.traceId, {
holderType: 'frontend_http_rpc',
holderRef: frontendHolderRef,
windowId: frontendHolderRef,
frontendVisibility: 'visible',
metadata: {
scenario: 'D3',
workload: 'timeline',
priority: 'p0',
},
});
store.markStarting(aScope, frontendLease.id);
frontendLease = store.markReady(aScope, frontendLease.id);

let agentLease = store.acquireHolder(bScope, bUpload.traceId, {
holderType: 'agent_run',
holderRef: bRun.runId,
sessionId: bRun.sessionId,
runId: bRun.runId,
metadata: {
scenario: 'D3',
workload: 'full-agent',
priority: 'p2',
},
});
store.markStarting(bScope, agentLease.id);
agentLease = store.markReady(bScope, agentLease.id);
agentLease = store.acquireHolderForLease(bScope, agentLease.id, {
holderType: 'report_generation',
holderRef: `report-${bRun.runId}`,
reportId: `report-${bRun.runId}`,
metadata: {
scenario: 'D3',
workload: 'background-report',
priority: 'p2',
},
});

const frontendProxyTargets = [
`/api/tp/${encodeURIComponent(frontendLease.id)}/websocket`,
`/api/tp/${encodeURIComponent(frontendLease.id)}/query?priority=p0`,
`/api/tp/${encodeURIComponent(frontendLease.id)}/heartbeat`,
];

const gates = new Map<string, {
promise: Promise<Buffer>;
resolve: (value: Buffer) => void;
}>();
const makeGate = (): {
promise: Promise<Buffer>;
resolve: (value: Buffer) => void;
} => {
let resolve!: (value: Buffer) => void;
const promise = new Promise<Buffer>((res) => {
resolve = res;
});
return { promise, resolve };
};
const gateFor = (sql: string) => {
let gate = gates.get(sql);
if (!gate) {
gate = makeGate();
gates.set(sql, gate);
}
return gate;
};
const flushPromises = async () => {
await Promise.resolve();
await Promise.resolve();
};
const encodeSqlResult = (sql: string) => encodeQueryResult({
columnNames: ['sql'],
rows: [[sql]],
});
const labelForSql = (sql: string) => {
if (sql.includes('p0')) return 'p0';
if (sql.includes('p1')) return 'p1';
if (sql.includes('queued_p2')) return 'queued-p2';
return 'running-p2';
};
const queryDispatchOrder: string[] = [];
const queryEventLog: string[] = [];
const worker = new TraceProcessorSqlWorker({
processorId: 'd3-priority-regression',
traceId: bUpload.traceId,
port: 1,
forceInline: true,
rawExecutor: async request => {
const sql = decodeQueryArgsSql(request.body);
const label = labelForSql(sql);
queryDispatchOrder.push(label);
queryEventLog.push(`start:${label}`);
const result = await gateFor(sql).promise;
queryEventLog.push(`finish:${label}`);
return result;
},
});

let queuedStats: ReturnType<TraceProcessorSqlWorker['getStats']>;
try {
const runningP2 = worker.query('SELECT d3_running_p2_full_agent', { priority: 'p2' });
await flushPromises();

const queuedP1 = worker.query('SELECT d3_p1_agent_context', { priority: 'p1' });
const queuedP2 = worker.query('SELECT d3_queued_p2_report', { priority: 'p2' });
const frontendP0 = worker.query('SELECT d3_p0_frontend_timeline', { priority: 'p0' });
await flushPromises();
queuedStats = worker.getStats();

gateFor('SELECT d3_running_p2_full_agent').resolve(encodeSqlResult('SELECT d3_running_p2_full_agent'));
const runningP2Result = await runningP2;
await flushPromises();

gateFor('SELECT d3_p0_frontend_timeline').resolve(encodeSqlResult('SELECT d3_p0_frontend_timeline'));
const frontendP0Result = await frontendP0;
await flushPromises();

gateFor('SELECT d3_p1_agent_context').resolve(encodeSqlResult('SELECT d3_p1_agent_context'));
const queuedP1Result = await queuedP1;
await flushPromises();

gateFor('SELECT d3_queued_p2_report').resolve(encodeSqlResult('SELECT d3_queued_p2_report'));
const queuedP2Result = await queuedP2;
await flushPromises();

const targetHasLeaseIdOnly = frontendProxyTargets.every(target =>
target.includes(encodeURIComponent(frontendLease.id))
&& !target.includes('127.0.0.1')
&& !target.includes('localhost')
&& !target.includes('://')
&& !/:\d{2,5}/.test(target),
);
const finishRunningP2Index = queryEventLog.indexOf('finish:running-p2');
const startP0Index = queryEventLog.indexOf('start:p0');
const queryRows = [
runningP2Result.rows[0]?.[0],
frontendP0Result.rows[0]?.[0],
queuedP1Result.rows[0]?.[0],
queuedP2Result.rows[0]?.[0],
];

const checks = {
frontendTimelineUsesLeaseProxy: targetHasLeaseIdOnly
&& frontendProxyTargets.every(target => target.startsWith('/api/tp/')),
frontendAndAgentUseLeaseHolders: frontendLease.state === 'active'
&& agentLease.state === 'active'
&& frontendLease.holders.some(holder => holder.holderType === 'frontend_http_rpc')
&& agentLease.holders.some(holder => holder.holderType === 'agent_run')
&& agentLease.holders.some(holder => holder.holderType === 'report_generation'),
p0DoesNotWaitBehindQueuedP1OrP2: queryDispatchOrder.join(',') === 'running-p2,p0,p1,queued-p2',
queueIsNonPreemptive: queryEventLog[0] === 'start:running-p2'
&& finishRunningP2Index > -1
&& startP0Index > finishRunningP2Index,
workerStatsExposeQueuedP0: queuedStats.running
&& queuedStats.queuedP0 === 1
&& queuedStats.queuedP1 === 1
&& queuedStats.queuedP2 === 1,
queryResultsStayAssociatedWithOriginalSql: queryRows.join('|') === [
'SELECT d3_running_p2_full_agent',
'SELECT d3_p0_frontend_timeline',
'SELECT d3_p1_agent_context',
'SELECT d3_queued_p2_report',
].join('|'),
};

return {
checks,
details: {
frontendTraceId: aUpload.traceId,
agentTraceId: bUpload.traceId,
frontendLeaseId: frontendLease.id,
agentLeaseId: agentLease.id,
frontendProxyTargets,
frontendHolderTypes: frontendLease.holders.map(holder => holder.holderType),
agentHolderTypes: agentLease.holders.map(holder => holder.holderType),
queuedStats,
queryDispatchOrder,
queryEventLog,
queryRows,
},
};
} finally {
worker.destroy();
}
}

async function scenarioD4(
db: Database.Database,
tracePath: string,
Expand Down Expand Up @@ -1545,6 +1750,7 @@ export async function runEnterpriseWindowRegression(
windows.userBWindow1,
input.longSqlMs ?? 100,
),
D3: await scenarioD3(db, tracePath, windows.userAWindow1, windows.userBWindow1),
D4: await scenarioD4(db, tracePath, windows.userAWindow1),
D5: await scenarioD5(db, tracePath, windows.userAWindow1),
D6: await scenarioD6(db, tracePath, windows.userAWindow1, windows.userCWindow1),
Expand All @@ -1559,6 +1765,7 @@ export async function runEnterpriseWindowRegression(
checks: {
D1: scenarioPassed(scenarios.D1),
D2: scenarioPassed(scenarios.D2),
D3: scenarioPassed(scenarios.D3),
D4: scenarioPassed(scenarios.D4),
D5: scenarioPassed(scenarios.D5),
D6: scenarioPassed(scenarios.D6),
Expand All @@ -1573,6 +1780,7 @@ export async function runEnterpriseWindowRegression(
coverageLimitations: [
'D1 covers same-name trace isolation across three users and two windows at the trace metadata, TraceAsset, workspace RBAC, and analysis session/run schema layers.',
'D2 covers a deterministic long-SQL window at the run/event metadata layer without invoking a real LLM provider.',
'D3 covers lease-proxy target shape plus TraceProcessorSqlWorker priority ordering; traceProcessorProxyRoutes tests cover authenticated live route priority forwarding.',
'D4 covers the lease state contract and frontend proxy target stability around a simulated trace_processor crash; TraceProcessorService restart backoff and single-supervisor behavior are covered by traceProcessorLeaseProcessorRouting tests.',
'D5 covers TraceProcessorLease holder grace and pageshow-style reacquire semantics after offline heartbeat expiry; frontend stale-lease reload signaling is covered by HttpRpcEngine unit tests.',
'D6 covers the persisted AgentEvent replay contract after a conclusion cursor; the live stream route path is covered by agentRoutesRbac tests.',
Expand Down
8 changes: 4 additions & 4 deletions docs/features/enterprise-multi-tenant/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
- [ ] 6.1 Unit:RequestContext / RBAC / owner guard / provider resolution / ProviderSnapshot hash
- [ ] 6.2 Integration:trace upload/list/delete/download;agent analyze/resume/respond/stream;report read/delete
- [ ] 6.3 Concurrency:多用户同时 upload / analyze / query / cancel / cleanup
- [ ] 6.4 Dual-window e2e:D1-D10 每个场景至少 1 个自动化用例(详见 §0.7)
- [x] 6.4 Dual-window e2e:D1-D10 每个场景至少 1 个自动化用例(详见 §0.7)
- [ ] 6.5 Security:ID 枚举、跨 tenant、无权限访问统一 404
- [ ] 6.6 Runtime:lease acquire / release / heartbeat / stale / crash recovery
- [ ] 6.7 Persistence:backend restart / queue shadow 恢复 / DB reconnect / SecretStore failure
Expand All @@ -105,9 +105,9 @@
- [ ] 6.11 PR Gate:合入前 `npm run verify:pr` 通过

### 0.7 §23 反证循环(双窗口阻断验收,每条都对应一个自动化测试)
- [ ] D1 两窗口分别上传同名 trace → temp file / TraceAsset / lease / session 不互相覆盖
- [ ] D2 A 长 SQL 中,B 上传并分析另一个 trace → A 的 SSE 不断、A lease 不被 destroy、B 能排队或运行
- [ ] D3 A 前端 timeline,B 跑 full agent → A WebSocket 走 lease;P0 不被 P2 长任务无限阻塞
- [x] D1 两窗口分别上传同名 trace → temp file / TraceAsset / lease / session 不互相覆盖
- [x] D2 A 长 SQL 中,B 上传并分析另一个 trace → A 的 SSE 不断、A lease 不被 destroy、B 能排队或运行
- [x] D3 A 前端 timeline,B 跑 full agent → A WebSocket 走 lease;P0 不被 P2 长任务无限阻塞
- [x] D4 trace_processor_shell crash → leaseId 稳定;前端不持有旧 port;supervisor 单点重启
- [x] D5 浏览器断网 / 休眠 30 分钟后恢复 → frontend grace 生效;reacquire lease 或自动 reload
- [x] D6 SSE 在 conclusion 后、analysis_completed 前断开 → AgentEvent replay 能补回 reportUrl
Expand Down