diff --git a/src/app.ts b/src/app.ts index 2d71e53..8978c97 100644 --- a/src/app.ts +++ b/src/app.ts @@ -17,6 +17,7 @@ import { VKApp } from './services/vk_app'; export interface AppService { run(): Promise | any; + stop?(): Promise | any; } const services = { @@ -114,4 +115,35 @@ export class App { public getServiceList(): Array { return Array.from(this.services.keys()); } + + public async stop(options: { timeoutMs?: number } = {}): Promise { + const timeoutMs = options.timeoutMs ?? 15_000; + this.logger.log('Остановка...'); + + const order = Array.from(this.services.entries()).reverse(); + for (const [name, service] of order) { + if (typeof service.stop !== 'function') continue; + let timer: NodeJS.Timeout | undefined; + try { + await Promise.race([ + Promise.resolve(service.stop()), + new Promise((_resolve, reject) => { + timer = setTimeout(() => reject(new Error(`stop timeout after ${timeoutMs}ms`)), timeoutMs); + }) + ]); + this.logger.log(`Остановлено: ${name}`); + } catch (error) { + this.logger.error('service_stop_failed', { service: name, error }); + } finally { + if (timer) clearTimeout(timer); + } + } + + try { + await sequelize.close(); + this.logger.log('БД отключена'); + } catch (error) { + this.logger.error('db_close_failed', { error }); + } + } } diff --git a/src/http.ts b/src/http.ts index 13cea8f..8d7c34c 100644 --- a/src/http.ts +++ b/src/http.ts @@ -1,4 +1,5 @@ import express, { Application, NextFunction, Request, Response } from 'express'; +import { Server } from 'http'; import { config } from '../config'; import { App, AppService } from './app'; import { newTraceId, runWithLogContext } from './logging'; @@ -12,6 +13,7 @@ export class HttpService implements AppService { public logger: Logger = new Logger('HTTP'); private http: Application; + private server?: Server; public ignoreJsonParserUrls: string[] = []; @@ -61,11 +63,20 @@ export class HttpService implements AppService { this.http.use(this.errorHandler.bind(this)); - this.http.listen(config.http.port, () => { + this.server = this.http.listen(config.http.port, () => { this.logger.log(`Сервер запущен на порту: ${config.http.port}`); }); } + public async stop(): Promise { + const server = this.server; + if (!server) return; + await new Promise((resolve, reject) => { + server.close((err) => (err ? reject(err) : resolve())); + }); + this.server = undefined; + } + private setupOriginHeaders() { this.http.use((req, res, next) => { res.header('Access-Control-Allow-Origin', '*'); diff --git a/src/index.ts b/src/index.ts index aaf4648..367e2eb 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,6 +2,7 @@ import { config } from '../config'; import { App } from './app'; import { validateConfig } from './config/validate'; import { startVanishCronJob as setupVanishCron } from './db/clean'; +import { Logger } from './logger'; validateConfig(config); @@ -11,4 +12,31 @@ app.runServices(); setupVanishCron(app); +const shutdownLogger = new Logger('CORE'); +let shuttingDown = false; +const shutdown = async (signal: NodeJS.Signals) => { + if (shuttingDown) return; + shuttingDown = true; + shutdownLogger.log(`Получен ${signal}, завершаем работу...`); + + const forceExit = setTimeout(() => { + shutdownLogger.error('shutdown_force_exit', { reason: 'timeout' }); + process.exit(1); + }, 30_000); + forceExit.unref(); + + try { + await app.stop(); + clearTimeout(forceExit); + process.exit(0); + } catch (error) { + shutdownLogger.error('shutdown_failed', { error }); + clearTimeout(forceExit); + process.exit(1); + } +}; + +process.once('SIGTERM', () => void shutdown('SIGTERM')); +process.once('SIGINT', () => void shutdown('SIGINT')); + export { app }; diff --git a/src/services/bots/tg/index.ts b/src/services/bots/tg/index.ts index 668caac..fbecb48 100644 --- a/src/services/bots/tg/index.ts +++ b/src/services/bots/tg/index.ts @@ -133,11 +133,7 @@ export class TgBot extends AbstractBot implements AppService { this.logger.error('tg_set_commands_error', { error: e }); }); - const stopPolling = () => this.tg.stop(); - process.once('SIGINT', stopPolling); - process.once('SIGTERM', stopPolling); - - await this.tg + this.tg .start({ drop_pending_updates: false, onStart: () => { @@ -149,6 +145,14 @@ export class TgBot extends AbstractBot implements AppService { }); } + public async stop(): Promise { + try { + await this.tg.stop(); + } catch (err) { + this.logger.error('tg_stop_error', { error: err }); + } + } + public async getChat( peerId: number, creationDefaults?: Partial> diff --git a/src/services/parser/index.ts b/src/services/parser/index.ts index e14f890..3eb30a1 100644 --- a/src/services/parser/index.ts +++ b/src/services/parser/index.ts @@ -88,6 +88,8 @@ export class ParserService implements AppService { private _clearKeys: boolean = false; private _forceCallsParse: boolean = false; private _lastHtmlByUrl: Map = new Map(); + private _stopping: boolean = false; + private _loopDone?: Promise; constructor(private app: App) { loadCache(); @@ -119,7 +121,7 @@ export class ParserService implements AppService { } await this.loadCallsSettings(); - this.runLoop(); + this._loopDone = this.runLoop(); } public lastSuccessUpdate(): number { @@ -268,7 +270,7 @@ export class ParserService implements AppService { } private async runLoop() { - while (true) { + while (!this._stopping) { const { error } = await runWithLogContext( { traceId: newTraceId(), @@ -278,11 +280,22 @@ export class ParserService implements AppService { () => this.parse() ); + if (this._stopping) break; + this.delayPromise = getDelayTime(error); await this.delayPromise.promise; } } + public async stop(): Promise { + this._stopping = true; + this.delayPromise?.resolve(); + if (this._loopDone) { + await this._loopDone; + this._loopDone = undefined; + } + } + public async parse() { setLogContext({ event: 'parse' }); let error: boolean = false; diff --git a/tests/app/shutdown.test.ts b/tests/app/shutdown.test.ts new file mode 100644 index 0000000..0378733 --- /dev/null +++ b/tests/app/shutdown.test.ts @@ -0,0 +1,108 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +vi.mock('../../config', () => ({ + config: { + http: { port: 0 }, + dev: false, + api: { url: '/api', rateLimit: { enabled: false } } + } +})); + +const closeMock = vi.fn().mockResolvedValue(undefined); +vi.mock('../../src/db', () => ({ + sequelize: { close: closeMock, sync: vi.fn().mockResolvedValue(undefined) } +})); +vi.mock('../../src/db/migrator', () => ({ + runMigrations: vi.fn().mockResolvedValue([]) +})); + +vi.mock('../../src/http', () => ({ HttpService: class {} })); +vi.mock('../../src/services/alice', () => ({ AliceApp: class {} })); +vi.mock('../../src/services/api', () => ({ Api: class {} })); +vi.mock('../../src/services/bots', () => ({ BotService: class {} })); +vi.mock('../../src/services/bots/tg', () => ({ TgBot: class {} })); +vi.mock('../../src/services/bots/viber', () => ({ ViberBot: class {} })); +vi.mock('../../src/services/bots/vk', () => ({ VkBot: class {} })); +vi.mock('../../src/services/google', () => ({ GoogleService: class {} })); +vi.mock('../../src/services/image', () => ({ ImageService: class {} })); +vi.mock('../../src/services/parser', () => ({ ParserService: class {} })); +vi.mock('../../src/services/timetable', () => ({ Timetable: class {} })); +vi.mock('../../src/services/vk_app', () => ({ VKApp: class {} })); + +const loadApp = async () => { + const { App } = await import('../../src/app'); + return App; +}; + +describe('App.stop', () => { + beforeEach(() => vi.clearAllMocks()); + afterEach(() => vi.restoreAllMocks()); + + it('calls stop on services in reverse registration order and closes the DB', async () => { + const App = await loadApp(); + const app = new App([], { validate: false }); + const order: string[] = []; + + const mkSvc = (name: string, withStop = true) => ({ + run: vi.fn(), + stop: withStop + ? vi.fn().mockImplementation(async () => { + order.push(name); + }) + : undefined + }); + + (app as any).services.set('http', mkSvc('http')); + (app as any).services.set('parser', mkSvc('parser')); + (app as any).services.set('tg', mkSvc('tg')); + (app as any).services.set('timetable', mkSvc('timetable', false)); + + await app.stop(); + + expect(order).toEqual(['tg', 'parser', 'http']); + }); + + it('continues when one service throws and still closes the DB', async () => { + const App = await loadApp(); + const app = new App([], { validate: false }); + closeMock.mockClear(); + + (app as any).services.set('http', { + run: vi.fn(), + stop: vi.fn().mockResolvedValue(undefined) + }); + (app as any).services.set('tg', { + run: vi.fn(), + stop: vi.fn().mockRejectedValue(new Error('boom')) + }); + + await app.stop(); + + expect((app as any).services.get('http').stop).toHaveBeenCalled(); + expect(closeMock).toHaveBeenCalledTimes(1); + }); + + it('enforces a per-service timeout and moves on to the next', async () => { + const App = await loadApp(); + const app = new App([], { validate: false }); + + let resolveStuck: () => void = () => {}; + (app as any).services.set('stuck', { + run: vi.fn(), + stop: vi.fn().mockImplementation( + () => + new Promise((resolve) => { + resolveStuck = resolve; + }) + ) + }); + const fast = { run: vi.fn(), stop: vi.fn().mockResolvedValue(undefined) }; + (app as any).services.set('fast', fast); + + const done = app.stop({ timeoutMs: 50 }); + await done; + resolveStuck(); + + expect(fast.stop).toHaveBeenCalled(); + }); +});