diff --git a/backend/src/index.ts b/backend/src/index.ts index ffb323ad..d8eba7bb 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -9,7 +9,7 @@ import cors from 'cors'; import path from 'path'; // Import configuration -import { serverConfig } from './config'; +import { resolveFeatureConfig, serverConfig } from './config'; // Import routes (now after dotenv.config()) import sqlRoutes from './routes/sql'; @@ -71,6 +71,7 @@ import { // Import cleanup utilities import { TraceProcessorFactory, killOrphanProcessors } from './services/workingTraceProcessor'; import { getPortPool, resetPortPool } from './services/portPool'; +import { failInterruptedAnalysisRunsOnStartup } from './services/analysisRunStore'; const app = express(); const PORT = serverConfig.port; @@ -271,6 +272,22 @@ app.use((err: any, req: express.Request, res: express.Response, next: express.Ne }); // Initialize services +function recoverInterruptedEnterpriseRuns(): void { + if (!resolveFeatureConfig().enterprise) return; + try { + const recovered = failInterruptedAnalysisRunsOnStartup(); + if (recovered.length > 0) { + console.warn( + `[EnterpriseRecovery] Marked ${recovered.length} interrupted analysis run(s) failed after backend startup`, + ); + } + } catch (error: any) { + console.warn('[EnterpriseRecovery] Failed to recover interrupted analysis runs:', error?.message || error); + } +} + +recoverInterruptedEnterpriseRuns(); + // Kill orphan trace_processor processes from previous runs killOrphanProcessors(); diff --git a/backend/src/routes/__tests__/enterpriseRestartPersistence.test.ts b/backend/src/routes/__tests__/enterpriseRestartPersistence.test.ts index f100644e..74bf4817 100644 --- a/backend/src/routes/__tests__/enterpriseRestartPersistence.test.ts +++ b/backend/src/routes/__tests__/enterpriseRestartPersistence.test.ts @@ -13,6 +13,12 @@ import { EnhancedSessionContext, sessionContextManager } from '../../agent/conte import { ENTERPRISE_DB_PATH_ENV, openEnterpriseDb } from '../../services/enterpriseDb'; import { ENTERPRISE_MIGRATION_PHASE_ENV } from '../../services/enterpriseMigration'; import { SessionPersistenceService } from '../../services/sessionPersistenceService'; +import { + failInterruptedAnalysisRunsOnStartup, + getAnalysisRunLifecycle, + persistAnalysisRunState, + resetAnalysisRunStoreForTests, +} from '../../services/analysisRunStore'; import { ENTERPRISE_DATA_DIR_ENV, writeTraceMetadata, @@ -37,6 +43,8 @@ const USER_ID = 'user-a'; const TRACE_ID = 'trace-restart-a'; const SESSION_ID = 'session-restart-a'; const RUN_ID = 'run-restart-a'; +const INTERRUPTED_SESSION_ID = 'session-restart-interrupted'; +const INTERRUPTED_RUN_ID = 'run-restart-interrupted'; const REPORT_ID = 'report-restart-a'; const GENERATED_AT = 1_700_000_000_000; @@ -75,7 +83,7 @@ function ssoHeaders(req: request.Test, workspaceId = WORKSPACE_ID): request.Test ); } -function readCount(table: 'trace_assets' | 'report_artifacts' | 'sessions'): number { +function readCount(table: 'trace_assets' | 'report_artifacts' | 'sessions' | 'analysis_runs'): number { const db = openEnterpriseDb(dbPath); try { const row = db.prepare(`SELECT COUNT(*) AS count FROM ${table}`).get() as { count: number }; @@ -142,6 +150,17 @@ async function seedRestartState(): Promise { jank_type: 'App Deadline Missed', }); persistence.saveSessionContext(SESSION_ID, context); + + persistAnalysisRunState({ + tenantId: TENANT_ID, + workspaceId: WORKSPACE_ID, + userId: USER_ID, + sessionId: INTERRUPTED_SESSION_ID, + runId: INTERRUPTED_RUN_ID, + traceId: TRACE_ID, + query: 'restart interrupted running run', + mode: 'agent', + }, 'running', { now: GENERATED_AT + 2000 }); } beforeEach(async () => { @@ -159,12 +178,14 @@ beforeEach(async () => { delete process.env.SMARTPERFETTO_API_KEY; SessionPersistenceService.resetForTests(); + resetAnalysisRunStoreForTests(); reportStore.clear(); sessionContextManager.remove(SESSION_ID); }); afterEach(async () => { SessionPersistenceService.resetForTests(); + resetAnalysisRunStoreForTests(); reportStore.clear(); sessionContextManager.remove(SESSION_ID); restoreEnvValue(ENTERPRISE_FEATURE_FLAG_ENV, originalEnv.enterprise); @@ -183,10 +204,16 @@ describe('enterprise restart persistence', () => { expect(readCount('trace_assets')).toBe(1); expect(readCount('report_artifacts')).toBe(1); expect(readCount('sessions')).toBe(1); + expect(readCount('analysis_runs')).toBe(2); reportStore.clear(); sessionContextManager.remove(SESSION_ID); SessionPersistenceService.resetForTests(); + resetAnalysisRunStoreForTests(); + const recoveredRuns = failInterruptedAnalysisRunsOnStartup({ + now: GENERATED_AT + 3000, + error: 'backend restart during active analysis', + }); const app = makeApp(); @@ -233,5 +260,28 @@ describe('enterprise restart persistence', () => { expect(turnsRes.body.turns[0]).toEqual(expect.objectContaining({ query: '分析 restart 后是否可恢复', })); + + expect(recoveredRuns).toEqual([ + expect.objectContaining({ + id: INTERRUPTED_RUN_ID, + previousStatus: 'running', + }), + ]); + expect(getAnalysisRunLifecycle({ + tenantId: TENANT_ID, + workspaceId: WORKSPACE_ID, + userId: USER_ID, + }, INTERRUPTED_RUN_ID)).toEqual(expect.objectContaining({ + status: 'failed', + completedAt: GENERATED_AT + 3000, + })); + expect(getAnalysisRunLifecycle({ + tenantId: TENANT_ID, + workspaceId: WORKSPACE_ID, + userId: USER_ID, + }, RUN_ID)).toEqual(expect.objectContaining({ + status: 'completed', + completedAt: GENERATED_AT, + })); }); }); diff --git a/backend/src/services/__tests__/analysisRunStore.test.ts b/backend/src/services/__tests__/analysisRunStore.test.ts index 58429a6c..622bdf73 100644 --- a/backend/src/services/__tests__/analysisRunStore.test.ts +++ b/backend/src/services/__tests__/analysisRunStore.test.ts @@ -12,6 +12,7 @@ import { } from '../enterpriseDb'; import { getAnalysisRunLifecycle, + failInterruptedAnalysisRunsOnStartup, heartbeatAnalysisRun, isAnalysisRunHeartbeatFresh, persistAnalysisRunState, @@ -101,4 +102,47 @@ describe('analysis run store', () => { })); expect(isAnalysisRunHeartbeatFresh(runScope, 'run-failed', 1_777_000_011_000, 60_000)).toBe(false); }); + + it('fails interrupted nonterminal runs on backend startup while preserving terminal runs', () => { + const pendingScope = scope({ sessionId: 'session-pending', runId: 'run-pending' }); + const runningScope = scope({ sessionId: 'session-running', runId: 'run-running' }); + const awaitingScope = scope({ sessionId: 'session-awaiting', runId: 'run-awaiting' }); + const completedScope = scope({ sessionId: 'session-completed', runId: 'run-completed' }); + + persistAnalysisRunState(pendingScope, 'pending', { now: 1_777_000_000_000 }); + persistAnalysisRunState(runningScope, 'running', { now: 1_777_000_001_000 }); + persistAnalysisRunState(awaitingScope, 'awaiting_user', { now: 1_777_000_002_000 }); + persistAnalysisRunState(completedScope, 'completed', { now: 1_777_000_003_000 }); + + const recovered = failInterruptedAnalysisRunsOnStartup({ + now: 1_777_000_100_000, + error: 'test restart', + }); + + expect(recovered.map(run => [run.id, run.previousStatus])).toEqual([ + ['run-pending', 'pending'], + ['run-running', 'running'], + ['run-awaiting', 'awaiting_user'], + ]); + expect(getAnalysisRunLifecycle(pendingScope, 'run-pending')).toEqual(expect.objectContaining({ + status: 'failed', + completedAt: 1_777_000_100_000, + errorJson: JSON.stringify({ message: 'test restart', source: 'backend_startup_recovery' }), + })); + expect(getAnalysisRunLifecycle(runningScope, 'run-running')?.status).toBe('failed'); + expect(getAnalysisRunLifecycle(awaitingScope, 'run-awaiting')?.status).toBe('failed'); + expect(getAnalysisRunLifecycle(completedScope, 'run-completed')?.status).toBe('completed'); + + const db = openEnterpriseDb(); + try { + expect(db.prepare('SELECT status FROM analysis_sessions WHERE id = ?').get('session-running')).toEqual({ + status: 'failed', + }); + expect(db.prepare('SELECT status FROM analysis_sessions WHERE id = ?').get('session-completed')).toEqual({ + status: 'completed', + }); + } finally { + db.close(); + } + }); }); diff --git a/backend/src/services/analysisRunStore.ts b/backend/src/services/analysisRunStore.ts index 0397e013..5fd29c85 100644 --- a/backend/src/services/analysisRunStore.ts +++ b/backend/src/services/analysisRunStore.ts @@ -32,6 +32,14 @@ export interface AnalysisRunLifecycle { errorJson: string | null; } +export interface InterruptedAnalysisRunRecovery { + id: string; + tenantId: string; + workspaceId: string; + sessionId: string; + previousStatus: PersistedAnalysisRunStatus | string; +} + interface AnalysisRunRow extends Record { id: string; status: string; @@ -42,6 +50,14 @@ interface AnalysisRunRow extends Record { error_json: string | null; } +interface InterruptedAnalysisRunRow extends Record { + id: string; + tenant_id: string; + workspace_id: string; + session_id: string; + status: string; +} + let singletonDb: Database.Database | null = null; let singletonDbPath: string | null = null; @@ -261,3 +277,63 @@ export function isAnalysisRunHeartbeatFresh( const heartbeatAt = lifecycle.heartbeatAt ?? lifecycle.updatedAt ?? lifecycle.startedAt; return now - heartbeatAt <= maxStaleMs; } + +export function failInterruptedAnalysisRunsOnStartup( + options: { now?: number; error?: string } = {}, +): InterruptedAnalysisRunRecovery[] { + const now = options.now ?? Date.now(); + const errorJson = JSON.stringify({ + message: options.error ?? 'Backend restarted before analysis completed', + source: 'backend_startup_recovery', + }); + const db = getAnalysisRunDb(); + return db.transaction(() => { + const interrupted = db.prepare(` + SELECT id, tenant_id, workspace_id, session_id, status + FROM analysis_runs + WHERE status IN ('pending', 'running', 'awaiting_user') + ORDER BY updated_at ASC, started_at ASC, id ASC + `).all(); + + for (const run of interrupted) { + db.prepare(` + UPDATE analysis_runs + SET status = 'failed', + heartbeat_at = ?, + updated_at = ?, + completed_at = COALESCE(completed_at, ?), + error_json = COALESCE(error_json, ?) + WHERE tenant_id = ? + AND workspace_id = ? + AND id = ? + AND status = ? + `).run( + now, + now, + now, + errorJson, + run.tenant_id, + run.workspace_id, + run.id, + run.status, + ); + db.prepare(` + UPDATE analysis_sessions + SET status = 'failed', + updated_at = ? + WHERE tenant_id = ? + AND workspace_id = ? + AND id = ? + AND status IN ('pending', 'running', 'awaiting_user') + `).run(now, run.tenant_id, run.workspace_id, run.session_id); + } + + return interrupted.map(run => ({ + id: run.id, + tenantId: run.tenant_id, + workspaceId: run.workspace_id, + sessionId: run.session_id, + previousStatus: run.status, + })); + })(); +}