Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import cors from 'cors';
import path from 'path';

// Import configuration
import { serverConfig } from './config';
import { resolveFeatureConfig, serverConfig } from './config';

// Import routes (now after dotenv.config())
import sqlRoutes from './routes/sql';
Expand Down Expand Up @@ -71,6 +71,7 @@ import {
// Import cleanup utilities
import { TraceProcessorFactory, killOrphanProcessors } from './services/workingTraceProcessor';
import { getPortPool, resetPortPool } from './services/portPool';
import { failInterruptedAnalysisRunsOnStartup } from './services/analysisRunStore';

const app = express();
const PORT = serverConfig.port;
Expand Down Expand Up @@ -271,6 +272,22 @@ app.use((err: any, req: express.Request, res: express.Response, next: express.Ne
});

// Initialize services
function recoverInterruptedEnterpriseRuns(): void {
if (!resolveFeatureConfig().enterprise) return;
try {
const recovered = failInterruptedAnalysisRunsOnStartup();
if (recovered.length > 0) {
console.warn(
`[EnterpriseRecovery] Marked ${recovered.length} interrupted analysis run(s) failed after backend startup`,
);
}
} catch (error: any) {
console.warn('[EnterpriseRecovery] Failed to recover interrupted analysis runs:', error?.message || error);
}
}

recoverInterruptedEnterpriseRuns();

// Kill orphan trace_processor processes from previous runs
killOrphanProcessors();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ import { EnhancedSessionContext, sessionContextManager } from '../../agent/conte
import { ENTERPRISE_DB_PATH_ENV, openEnterpriseDb } from '../../services/enterpriseDb';
import { ENTERPRISE_MIGRATION_PHASE_ENV } from '../../services/enterpriseMigration';
import { SessionPersistenceService } from '../../services/sessionPersistenceService';
import {
failInterruptedAnalysisRunsOnStartup,
getAnalysisRunLifecycle,
persistAnalysisRunState,
resetAnalysisRunStoreForTests,
} from '../../services/analysisRunStore';
import {
ENTERPRISE_DATA_DIR_ENV,
writeTraceMetadata,
Expand All @@ -37,6 +43,8 @@ const USER_ID = 'user-a';
const TRACE_ID = 'trace-restart-a';
const SESSION_ID = 'session-restart-a';
const RUN_ID = 'run-restart-a';
const INTERRUPTED_SESSION_ID = 'session-restart-interrupted';
const INTERRUPTED_RUN_ID = 'run-restart-interrupted';
const REPORT_ID = 'report-restart-a';
const GENERATED_AT = 1_700_000_000_000;

Expand Down Expand Up @@ -75,7 +83,7 @@ function ssoHeaders(req: request.Test, workspaceId = WORKSPACE_ID): request.Test
);
}

function readCount(table: 'trace_assets' | 'report_artifacts' | 'sessions'): number {
function readCount(table: 'trace_assets' | 'report_artifacts' | 'sessions' | 'analysis_runs'): number {
const db = openEnterpriseDb(dbPath);
try {
const row = db.prepare(`SELECT COUNT(*) AS count FROM ${table}`).get() as { count: number };
Expand Down Expand Up @@ -142,6 +150,17 @@ async function seedRestartState(): Promise<void> {
jank_type: 'App Deadline Missed',
});
persistence.saveSessionContext(SESSION_ID, context);

persistAnalysisRunState({
tenantId: TENANT_ID,
workspaceId: WORKSPACE_ID,
userId: USER_ID,
sessionId: INTERRUPTED_SESSION_ID,
runId: INTERRUPTED_RUN_ID,
traceId: TRACE_ID,
query: 'restart interrupted running run',
mode: 'agent',
}, 'running', { now: GENERATED_AT + 2000 });
}

beforeEach(async () => {
Expand All @@ -159,12 +178,14 @@ beforeEach(async () => {
delete process.env.SMARTPERFETTO_API_KEY;

SessionPersistenceService.resetForTests();
resetAnalysisRunStoreForTests();
reportStore.clear();
sessionContextManager.remove(SESSION_ID);
});

afterEach(async () => {
SessionPersistenceService.resetForTests();
resetAnalysisRunStoreForTests();
reportStore.clear();
sessionContextManager.remove(SESSION_ID);
restoreEnvValue(ENTERPRISE_FEATURE_FLAG_ENV, originalEnv.enterprise);
Expand All @@ -183,10 +204,16 @@ describe('enterprise restart persistence', () => {
expect(readCount('trace_assets')).toBe(1);
expect(readCount('report_artifacts')).toBe(1);
expect(readCount('sessions')).toBe(1);
expect(readCount('analysis_runs')).toBe(2);

reportStore.clear();
sessionContextManager.remove(SESSION_ID);
SessionPersistenceService.resetForTests();
resetAnalysisRunStoreForTests();
const recoveredRuns = failInterruptedAnalysisRunsOnStartup({
now: GENERATED_AT + 3000,
error: 'backend restart during active analysis',
});

const app = makeApp();

Expand Down Expand Up @@ -233,5 +260,28 @@ describe('enterprise restart persistence', () => {
expect(turnsRes.body.turns[0]).toEqual(expect.objectContaining({
query: '分析 restart 后是否可恢复',
}));

expect(recoveredRuns).toEqual([
expect.objectContaining({
id: INTERRUPTED_RUN_ID,
previousStatus: 'running',
}),
]);
expect(getAnalysisRunLifecycle({
tenantId: TENANT_ID,
workspaceId: WORKSPACE_ID,
userId: USER_ID,
}, INTERRUPTED_RUN_ID)).toEqual(expect.objectContaining({
status: 'failed',
completedAt: GENERATED_AT + 3000,
}));
expect(getAnalysisRunLifecycle({
tenantId: TENANT_ID,
workspaceId: WORKSPACE_ID,
userId: USER_ID,
}, RUN_ID)).toEqual(expect.objectContaining({
status: 'completed',
completedAt: GENERATED_AT,
}));
});
});
44 changes: 44 additions & 0 deletions backend/src/services/__tests__/analysisRunStore.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
} from '../enterpriseDb';
import {
getAnalysisRunLifecycle,
failInterruptedAnalysisRunsOnStartup,
heartbeatAnalysisRun,
isAnalysisRunHeartbeatFresh,
persistAnalysisRunState,
Expand Down Expand Up @@ -101,4 +102,47 @@ describe('analysis run store', () => {
}));
expect(isAnalysisRunHeartbeatFresh(runScope, 'run-failed', 1_777_000_011_000, 60_000)).toBe(false);
});

it('fails interrupted nonterminal runs on backend startup while preserving terminal runs', () => {
const pendingScope = scope({ sessionId: 'session-pending', runId: 'run-pending' });
const runningScope = scope({ sessionId: 'session-running', runId: 'run-running' });
const awaitingScope = scope({ sessionId: 'session-awaiting', runId: 'run-awaiting' });
const completedScope = scope({ sessionId: 'session-completed', runId: 'run-completed' });

persistAnalysisRunState(pendingScope, 'pending', { now: 1_777_000_000_000 });
persistAnalysisRunState(runningScope, 'running', { now: 1_777_000_001_000 });
persistAnalysisRunState(awaitingScope, 'awaiting_user', { now: 1_777_000_002_000 });
persistAnalysisRunState(completedScope, 'completed', { now: 1_777_000_003_000 });

const recovered = failInterruptedAnalysisRunsOnStartup({
now: 1_777_000_100_000,
error: 'test restart',
});

expect(recovered.map(run => [run.id, run.previousStatus])).toEqual([
['run-pending', 'pending'],
['run-running', 'running'],
['run-awaiting', 'awaiting_user'],
]);
expect(getAnalysisRunLifecycle(pendingScope, 'run-pending')).toEqual(expect.objectContaining({
status: 'failed',
completedAt: 1_777_000_100_000,
errorJson: JSON.stringify({ message: 'test restart', source: 'backend_startup_recovery' }),
}));
expect(getAnalysisRunLifecycle(runningScope, 'run-running')?.status).toBe('failed');
expect(getAnalysisRunLifecycle(awaitingScope, 'run-awaiting')?.status).toBe('failed');
expect(getAnalysisRunLifecycle(completedScope, 'run-completed')?.status).toBe('completed');

const db = openEnterpriseDb();
try {
expect(db.prepare('SELECT status FROM analysis_sessions WHERE id = ?').get('session-running')).toEqual({
status: 'failed',
});
expect(db.prepare('SELECT status FROM analysis_sessions WHERE id = ?').get('session-completed')).toEqual({
status: 'completed',
});
} finally {
db.close();
}
});
});
76 changes: 76 additions & 0 deletions backend/src/services/analysisRunStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ export interface AnalysisRunLifecycle {
errorJson: string | null;
}

export interface InterruptedAnalysisRunRecovery {
id: string;
tenantId: string;
workspaceId: string;
sessionId: string;
previousStatus: PersistedAnalysisRunStatus | string;
}

interface AnalysisRunRow extends Record<string, unknown> {
id: string;
status: string;
Expand All @@ -42,6 +50,14 @@ interface AnalysisRunRow extends Record<string, unknown> {
error_json: string | null;
}

interface InterruptedAnalysisRunRow extends Record<string, unknown> {
id: string;
tenant_id: string;
workspace_id: string;
session_id: string;
status: string;
}

let singletonDb: Database.Database | null = null;
let singletonDbPath: string | null = null;

Expand Down Expand Up @@ -261,3 +277,63 @@ export function isAnalysisRunHeartbeatFresh(
const heartbeatAt = lifecycle.heartbeatAt ?? lifecycle.updatedAt ?? lifecycle.startedAt;
return now - heartbeatAt <= maxStaleMs;
}

export function failInterruptedAnalysisRunsOnStartup(
options: { now?: number; error?: string } = {},
): InterruptedAnalysisRunRecovery[] {
const now = options.now ?? Date.now();
const errorJson = JSON.stringify({
message: options.error ?? 'Backend restarted before analysis completed',
source: 'backend_startup_recovery',
});
const db = getAnalysisRunDb();
return db.transaction(() => {
const interrupted = db.prepare<unknown[], InterruptedAnalysisRunRow>(`
SELECT id, tenant_id, workspace_id, session_id, status
FROM analysis_runs
WHERE status IN ('pending', 'running', 'awaiting_user')
ORDER BY updated_at ASC, started_at ASC, id ASC
`).all();

for (const run of interrupted) {
db.prepare(`
UPDATE analysis_runs
SET status = 'failed',
heartbeat_at = ?,
updated_at = ?,
completed_at = COALESCE(completed_at, ?),
error_json = COALESCE(error_json, ?)
WHERE tenant_id = ?
AND workspace_id = ?
AND id = ?
AND status = ?
`).run(
now,
now,
now,
errorJson,
run.tenant_id,
run.workspace_id,
run.id,
run.status,
);
db.prepare(`
UPDATE analysis_sessions
SET status = 'failed',
updated_at = ?
WHERE tenant_id = ?
AND workspace_id = ?
AND id = ?
AND status IN ('pending', 'running', 'awaiting_user')
`).run(now, run.tenant_id, run.workspace_id, run.session_id);
}

return interrupted.map(run => ({
id: run.id,
tenantId: run.tenant_id,
workspaceId: run.workspace_id,
sessionId: run.session_id,
previousStatus: run.status,
}));
})();
}