diff --git a/.gitignore b/.gitignore index cb3f9358..89e21924 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ coverage .DS_Store globalConfig.json *.log +.history diff --git a/package.json b/package.json index a02a10d5..cd9db6e9 100644 --- a/package.json +++ b/package.json @@ -43,7 +43,8 @@ "run-release": "yarn worker hawk-worker-release", "run-email": "yarn worker hawk-worker-email", "run-telegram": "yarn worker hawk-worker-telegram", - "run-limiter": "yarn worker hawk-worker-limiter" + "run-limiter": "yarn worker hawk-worker-limiter", + "run-performance": "yarn worker hawk-worker-performance" }, "dependencies": { "@hawk.so/nodejs": "^3.1.1", diff --git a/workers/performance/README.md b/workers/performance/README.md new file mode 100644 index 00000000..26eab945 --- /dev/null +++ b/workers/performance/README.md @@ -0,0 +1,63 @@ +# Performance worker + +This worker is needed to save performance data uploaded from user to our DB. + +## Performance delivery scheme + +1. User wants to deploy project +2. He runs deploy script on the server and it runs static builder, for example Webpack. +3. After Webpack finished his job, our [Webpack Plugin](https://github.com/codex-team/hawk.webpack.plugin) gets a source maps for new bundles and sends them to us. + +example request: + +```bash +curl --location 'http://localhost:3000/performance' \ +--header 'Content-Type: application/json' \ +--data '{ + "token": "eyJpbnRlZ3JhdGlvbklkIjoiZTU2ZTU5ODctN2JhZi00NTI3LWI4MmMtYjdkOWRhZDBiMDBmIiwic2VjcmV0IjoiZDQ5YTU0YjMtOWExZi00ZGI2LTkxZmYtMjk4M2JlMTVlODA0In0=", + "projectId": "67d4adeccf25fa00ab563c32", + "catcherType": "performance", + "payload": { + "projectId": "67d4adeccf25fa00ab563c32", + "transactionId": "drxEFnbxGc7OumTVl3FkCm1v9BvBC9OpBrEiE3qG", + "name": "complex-operation", + "timestamp": 1742075217, + "duration": 702.9999999999964, + "startTime": 15322.000000000002, + "endTime": 16024.999999999998, + "catcherVersion": "3.2.1", + "spans": [ + { + "id": "6tk2UD4m0wDUjD99uvO1wylp3SnYumiWPlhRCZ2w", + "name": "step-1", + "duration": 400.9999999999982, + "startTime": 15322.000000000002, + "endTime": 15723, + "transactionId": "drxEFnbxGc7OumTVl3FkCm1v9BvBC9OpBrEiE3qG" + }, + { + "id": "V2ZOiUWjtip5ZSATIr7VX4KInAZgGJxdUQNZot2j", + "name": "step-2", + "duration": 301.9999999999982, + "startTime": 15723, + "endTime": 16024.999999999998, + "transactionId": "drxEFnbxGc7OumTVl3FkCm1v9BvBC9OpBrEiE3qG" + }, + { + "id": "JGIerBJqInvnvpIPBwsUIfuIxt3LcWQ6lFPwQJdN", + "name": "step-3", + "duration": 300.9999999999982, + "startTime": 15724, + "endTime": 16024.999999999998, + "transactionId": "drxEFnbxGc7OumTVl3FkCm1v9BvBC9OpBrEiE3qG" + } + ], + "tags": { + "type": "background" + } + } +}' +``` + +4. Collector accepts file and give a task for PerformanceWorker for saving it to DB +5. PerformanceWorker saves it to DB. diff --git a/workers/performance/package.json b/workers/performance/package.json new file mode 100644 index 00000000..d7484538 --- /dev/null +++ b/workers/performance/package.json @@ -0,0 +1,15 @@ +{ + "name": "hawk-worker-performance", + "description": "Collects and parses performance", + "workerType": "performance", + "version": "0.0.1", + "main": "src/index.ts", + "repository": "https://github.com/codex-team/hawk.workers/tree/master/workers/performance", + "author": "CodeX", + "license": "UNLICENSED", + "private": true, + "devDependencies": { + "rimraf": "^3.0.0", + "webpack": "^4.39.3" + } +} \ No newline at end of file diff --git a/workers/performance/src/index.ts b/workers/performance/src/index.ts new file mode 100644 index 00000000..4640fc12 --- /dev/null +++ b/workers/performance/src/index.ts @@ -0,0 +1,224 @@ +import { DatabaseController } from '../../../lib/db/controller'; +import { Worker } from '../../../lib/worker'; +import { DatabaseReadWriteError } from '../../../lib/workerErrors'; +import * as pkg from '../package.json'; +import { Collection } from 'mongodb'; +import type { PerformanceRecord, PerformanceDocument, AggregatedTransaction } from './types'; + +/** + * Performance worker + */ +export default class PerformanceWorker extends Worker { + /** + * Worker type (will pull tasks from Registry queue with the same name) + */ + public readonly type: string = pkg.workerType; + + /** + * Database Controller + */ + private db: DatabaseController = new DatabaseController(process.env.MONGO_EVENTS_DATABASE_URI); + + private readonly dbCollectionName: string = 'performance_transactions'; + + /** + * Collection to save performance data + */ + private performanceCollection: Collection; + + /** + * Start consuming messages + */ + public async start(): Promise { + await this.db.connect(); + this.db.createGridFsBucket(this.dbCollectionName); + this.performanceCollection = this.db.getConnection().collection(this.dbCollectionName); + await super.start(); + } + + /** + * Finish everything + */ + public async finish(): Promise { + await super.finish(); + await this.db.close(); + } + + /** + * Message handle function + * + * @param task - Message object from consume method + */ + public async handle(task: PerformanceRecord): Promise { + switch (task.catcherType) { + case 'performance': await this.savePerformance(task); break; + } + } + + /** + * Save performance data to database + * + * @param data - Performance record containing project ID and performance metrics + * + * Key operations: + * 1. Round all numeric values to 3 decimal places to avoid floating point precision issues + * 2. Add timestamp to each transaction + * 3. Round values in aggregatedSpans as well + * 4. Use bulkWrite for efficient database operations + * 5. Trigger aggregation after saving + */ + private async savePerformance(data: PerformanceRecord): Promise { + try { + const { projectId, payload, catcherType } = data; + + if (catcherType !== 'performance') { + throw new Error('Invalid catcher type'); + } + + const ROUND_DECIMALS = 3; + const BASE = 10; + const transactionsWithTimestamp = payload.transactions.map(transaction => ({ + ...transaction, + timestamp: payload.timestamp, + minStartTime: Math.round(transaction.minStartTime * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS), + maxEndTime: Math.round(transaction.maxEndTime * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS), + maxDuration: Math.round(transaction.maxDuration * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS), + p50duration: Math.round(transaction.p50duration * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS), + p95duration: Math.round(transaction.p95duration * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS), + avgStartTime: Math.round(transaction.avgStartTime * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS), + aggregatedSpans: transaction.aggregatedSpans.map(span => ({ + ...span, + minStartTime: Math.round(span.minStartTime * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS), + maxEndTime: Math.round(span.maxEndTime * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS), + maxDuration: Math.round(span.maxDuration * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS), + p50duration: Math.round(span.p50duration * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS), + p95duration: Math.round(span.p95duration * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS), + })), + })); + + const bulkOperations = transactionsWithTimestamp.map(transaction => ({ + updateOne: { + filter: { + projectId, + name: transaction.name, + }, + update: { + $push: { + transactions: transaction, + }, + $setOnInsert: { + projectId, + name: transaction.name, + createdAt: new Date(), + }, + }, + upsert: true, + } + })); + + await this.performanceCollection.bulkWrite(bulkOperations); + await this.aggregateTransactions(projectId); + } catch (err) { + this.logger.error(`Couldn't save performance data due to: ${err}`); + throw new DatabaseReadWriteError(err); + } + } + + /** + * Aggregate transactions data for a project + * + * @param projectId - Project ID to aggregate data for + * + * Key operations: + * 1. Calculate min/max timestamps across all transactions + * 2. Sort durations array to calculate percentiles + * 3. Calculate p50 and p95 percentiles from sorted durations + * 4. Calculate failure rate based on error count + * 5. Round all numeric values to 3 decimal places + * 6. Update documents with aggregated data + */ + private async aggregateTransactions(projectId: string): Promise { + const PERCENTILE_50 = 0.5; + const PERCENTILE_95 = 0.95; + const ROUND_DECIMALS = 3; + + const aggregationPipeline = [ + { $match: { projectId } }, + { $unwind: '$transactions' }, + { + $group: { + _id: '$name', + minStartTime: { $min: '$transactions.minStartTime' }, + maxEndTime: { $max: '$transactions.maxEndTime' }, + durations: { $push: '$transactions.maxDuration' }, + maxDurations: { $push: '$transactions.maxDuration' }, + totalCount: { $sum: 1 }, + errorCount: { + $sum: { + $cond: [ { $eq: ['$transactions.status', 'error'] }, 1, 0], + }, + }, + }, + }, + { + $project: { + _id: 0, + name: '$_id', + minStartTime: { + $round: [ + { $min: '$minStartTime' }, + ROUND_DECIMALS, + ] + }, + maxEndTime: { + $round: [ + { $max: '$maxEndTime' }, + ROUND_DECIMALS, + ] + }, + p50duration: { + $round: [ + { + $arrayElemAt: [ + { $sortArray: { input: '$durations', sortBy: 1 } }, + { $floor: { $multiply: [ { $size: '$durations' }, PERCENTILE_50] } }, + ] + }, + ROUND_DECIMALS, + ] + }, + p95duration: { + $round: [ + { + $arrayElemAt: [ + { $sortArray: { input: '$durations', sortBy: 1 } }, + { $floor: { $multiply: [ { $size: '$durations' }, PERCENTILE_95] } }, + ] + }, + ROUND_DECIMALS, + ] + }, + maxDuration: { + $round: [ + { $max: '$maxDurations' }, + ROUND_DECIMALS, + ] + }, + failureRate: { + $round: [ + { $divide: ['$errorCount', '$totalCount'] }, + ROUND_DECIMALS, + ] + }, + }, + }, + ]; + + const aggregatedData = await this.performanceCollection.aggregate(aggregationPipeline).toArray(); + + await this.performanceCollection.updateMany( + { projectId }, + { $set: { aggregatedData } } + ); + } +} diff --git a/workers/performance/src/types.ts b/workers/performance/src/types.ts new file mode 100644 index 00000000..e2395b53 --- /dev/null +++ b/workers/performance/src/types.ts @@ -0,0 +1,88 @@ +/** + * Interface for aggregated span data + */ +interface AggregatedSpan { + aggregationId: string; + name: string; + minStartTime: number; + maxEndTime: number; + p50duration: number; + p95duration: number; + maxDuration: number; + failureRate: number; +} + +/** + * Interface for transaction data + */ +interface Transaction { + aggregationId: string; + name: string; + avgStartTime: number; + minStartTime: number; + maxEndTime: number; + p50duration: number; + p95duration: number; + maxDuration: number; + count: number; + failureRate: number; + aggregatedSpans: AggregatedSpan[]; + timestamp: number; +} + +/** + * Interface for performance data + */ +interface PerformancePayload { + transactions: Transaction[]; + timestamp: number; +} + +/** + * Main interface for performance record + */ +interface PerformanceRecord { + projectId: string; + payload: PerformancePayload; + catcherType: 'performance'; +} + +/** + * Interface for performance database document + */ +interface PerformanceDocument { + projectId: string; + name: string; + transactions: Transaction[]; + aggregatedData?: AggregatedTransaction[]; + createdAt: Date; +} + +/** + * Interface for aggregated transaction data + */ +interface AggregatedTransaction { + name: string; + minStartTime: number; + maxEndTime: number; + p50duration: number; + p95duration: number; + maxDuration: number; + failureRate: number; +} + +interface PerformanceSpansDocument extends AggregatedTransaction { + projectId: string; + transactionId: string; + timestamp: number; +} + +export type { + Transaction, + AggregatedSpan, + PerformanceRecord, + PerformancePayload, + PerformanceDocument, + AggregatedTransaction, + PerformanceSpansDocument +}; \ No newline at end of file