Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { VKApp } from './services/vk_app';

export interface AppService {
run(): Promise<any> | any;
stop?(): Promise<any> | any;
}

const services = {
Expand Down Expand Up @@ -114,4 +115,35 @@ export class App {
public getServiceList(): Array<string> {
return Array.from(this.services.keys());
}

public async stop(options: { timeoutMs?: number } = {}): Promise<void> {
const timeoutMs = options.timeoutMs ?? 15_000;
this.logger.log('Остановка...');

const order = Array.from(this.services.entries()).reverse();
for (const [name, service] of order) {
if (typeof service.stop !== 'function') continue;
let timer: NodeJS.Timeout | undefined;
try {
await Promise.race([
Promise.resolve(service.stop()),
new Promise<void>((_resolve, reject) => {
timer = setTimeout(() => reject(new Error(`stop timeout after ${timeoutMs}ms`)), timeoutMs);
})
]);
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
this.logger.log(`Остановлено: ${name}`);
} catch (error) {
this.logger.error('service_stop_failed', { service: name, error });
} finally {
if (timer) clearTimeout(timer);
}
}

try {
await sequelize.close();
this.logger.log('БД отключена');
} catch (error) {
this.logger.error('db_close_failed', { error });
}
}
}
13 changes: 12 additions & 1 deletion src/http.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import express, { Application, NextFunction, Request, Response } from 'express';
import { Server } from 'http';
import { config } from '../config';
import { App, AppService } from './app';
import { newTraceId, runWithLogContext } from './logging';
Expand All @@ -12,6 +13,7 @@ export class HttpService implements AppService {
public logger: Logger = new Logger('HTTP');

private http: Application;
private server?: Server;

public ignoreJsonParserUrls: string[] = [];

Expand Down Expand Up @@ -61,11 +63,20 @@ export class HttpService implements AppService {

this.http.use(this.errorHandler.bind(this));

this.http.listen(config.http.port, () => {
this.server = this.http.listen(config.http.port, () => {
this.logger.log(`Сервер запущен на порту: ${config.http.port}`);
});
}

public async stop(): Promise<void> {
const server = this.server;
if (!server) return;
await new Promise<void>((resolve, reject) => {
server.close((err) => (err ? reject(err) : resolve()));
});
this.server = undefined;
}

private setupOriginHeaders() {
this.http.use((req, res, next) => {
res.header('Access-Control-Allow-Origin', '*');
Expand Down
28 changes: 28 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { config } from '../config';
import { App } from './app';
import { validateConfig } from './config/validate';
import { startVanishCronJob as setupVanishCron } from './db/clean';
import { Logger } from './logger';

validateConfig(config);

Expand All @@ -11,4 +12,31 @@ app.runServices();

setupVanishCron(app);

const shutdownLogger = new Logger('CORE');
let shuttingDown = false;
const shutdown = async (signal: NodeJS.Signals) => {
if (shuttingDown) return;
shuttingDown = true;
shutdownLogger.log(`Получен ${signal}, завершаем работу...`);

const forceExit = setTimeout(() => {
shutdownLogger.error('shutdown_force_exit', { reason: 'timeout' });
process.exit(1);
}, 30_000);
forceExit.unref();

try {
await app.stop();
clearTimeout(forceExit);
process.exit(0);
} catch (error) {
shutdownLogger.error('shutdown_failed', { error });
clearTimeout(forceExit);
process.exit(1);
}
};

process.once('SIGTERM', () => void shutdown('SIGTERM'));
process.once('SIGINT', () => void shutdown('SIGINT'));

export { app };
14 changes: 9 additions & 5 deletions src/services/bots/tg/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,7 @@ export class TgBot extends AbstractBot implements AppService {
this.logger.error('tg_set_commands_error', { error: e });
});

const stopPolling = () => this.tg.stop();
process.once('SIGINT', stopPolling);
process.once('SIGTERM', stopPolling);

await this.tg
this.tg
.start({
drop_pending_updates: false,
onStart: () => {
Expand All @@ -149,6 +145,14 @@ export class TgBot extends AbstractBot implements AppService {
});
}

public async stop(): Promise<void> {
try {
await this.tg.stop();
} catch (err) {
this.logger.error('tg_stop_error', { error: err });
}
}

public async getChat(
peerId: number,
creationDefaults?: Partial<CreationAttributes<BotChat>>
Expand Down
17 changes: 15 additions & 2 deletions src/services/parser/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ export class ParserService implements AppService {
private _clearKeys: boolean = false;
private _forceCallsParse: boolean = false;
private _lastHtmlByUrl: Map<string, string> = new Map();
private _stopping: boolean = false;
private _loopDone?: Promise<void>;

constructor(private app: App) {
loadCache();
Expand Down Expand Up @@ -119,7 +121,7 @@ export class ParserService implements AppService {
}

await this.loadCallsSettings();
this.runLoop();
this._loopDone = this.runLoop();
}

public lastSuccessUpdate(): number {
Expand Down Expand Up @@ -268,7 +270,7 @@ export class ParserService implements AppService {
}

private async runLoop() {
while (true) {
while (!this._stopping) {
const { error } = await runWithLogContext(
{
traceId: newTraceId(),
Expand All @@ -278,11 +280,22 @@ export class ParserService implements AppService {
() => this.parse()
);

if (this._stopping) break;

this.delayPromise = getDelayTime(error);
await this.delayPromise.promise;
}
}

public async stop(): Promise<void> {
this._stopping = true;
this.delayPromise?.resolve();
if (this._loopDone) {
await this._loopDone;
this._loopDone = undefined;
}
}
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.

public async parse() {
setLogContext({ event: 'parse' });
let error: boolean = false;
Expand Down
108 changes: 108 additions & 0 deletions tests/app/shutdown.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';

vi.mock('../../config', () => ({
config: {
http: { port: 0 },
dev: false,
api: { url: '/api', rateLimit: { enabled: false } }
}
}));

const closeMock = vi.fn().mockResolvedValue(undefined);
vi.mock('../../src/db', () => ({
sequelize: { close: closeMock, sync: vi.fn().mockResolvedValue(undefined) }
}));
vi.mock('../../src/db/migrator', () => ({
runMigrations: vi.fn().mockResolvedValue([])
}));

vi.mock('../../src/http', () => ({ HttpService: class {} }));
vi.mock('../../src/services/alice', () => ({ AliceApp: class {} }));
vi.mock('../../src/services/api', () => ({ Api: class {} }));
vi.mock('../../src/services/bots', () => ({ BotService: class {} }));
vi.mock('../../src/services/bots/tg', () => ({ TgBot: class {} }));
vi.mock('../../src/services/bots/viber', () => ({ ViberBot: class {} }));
vi.mock('../../src/services/bots/vk', () => ({ VkBot: class {} }));
vi.mock('../../src/services/google', () => ({ GoogleService: class {} }));
vi.mock('../../src/services/image', () => ({ ImageService: class {} }));
vi.mock('../../src/services/parser', () => ({ ParserService: class {} }));
vi.mock('../../src/services/timetable', () => ({ Timetable: class {} }));
vi.mock('../../src/services/vk_app', () => ({ VKApp: class {} }));

const loadApp = async () => {
const { App } = await import('../../src/app');
return App;
};

describe('App.stop', () => {
beforeEach(() => vi.clearAllMocks());
afterEach(() => vi.restoreAllMocks());

it('calls stop on services in reverse registration order and closes the DB', async () => {
const App = await loadApp();
const app = new App([], { validate: false });
const order: string[] = [];

const mkSvc = (name: string, withStop = true) => ({
run: vi.fn(),
stop: withStop
? vi.fn().mockImplementation(async () => {
order.push(name);
})
: undefined
});

(app as any).services.set('http', mkSvc('http'));
(app as any).services.set('parser', mkSvc('parser'));
(app as any).services.set('tg', mkSvc('tg'));
(app as any).services.set('timetable', mkSvc('timetable', false));

await app.stop();

expect(order).toEqual(['tg', 'parser', 'http']);
});

it('continues when one service throws and still closes the DB', async () => {
const App = await loadApp();
const app = new App([], { validate: false });
closeMock.mockClear();

(app as any).services.set('http', {
run: vi.fn(),
stop: vi.fn().mockResolvedValue(undefined)
});
(app as any).services.set('tg', {
run: vi.fn(),
stop: vi.fn().mockRejectedValue(new Error('boom'))
});

await app.stop();

expect((app as any).services.get('http').stop).toHaveBeenCalled();
expect(closeMock).toHaveBeenCalledTimes(1);
});

it('enforces a per-service timeout and moves on to the next', async () => {
const App = await loadApp();
const app = new App([], { validate: false });

let resolveStuck: () => void = () => {};
(app as any).services.set('stuck', {
run: vi.fn(),
stop: vi.fn().mockImplementation(
() =>
new Promise<void>((resolve) => {
resolveStuck = resolve;
})
)
});
const fast = { run: vi.fn(), stop: vi.fn().mockResolvedValue(undefined) };
(app as any).services.set('fast', fast);

const done = app.stop({ timeoutMs: 50 });
await done;
resolveStuck();

expect(fast.stop).toHaveBeenCalled();
});
});
Loading