Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ coverage
.DS_Store
globalConfig.json
*.log
.history
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@
"run-release": "yarn worker hawk-worker-release",
"run-email": "yarn worker hawk-worker-email",
"run-telegram": "yarn worker hawk-worker-telegram",
"run-limiter": "yarn worker hawk-worker-limiter"
"run-limiter": "yarn worker hawk-worker-limiter",
"run-performance": "yarn worker hawk-worker-performance"
},
"dependencies": {
"@hawk.so/nodejs": "^3.1.1",
Expand Down
63 changes: 63 additions & 0 deletions workers/performance/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Performance worker

This worker is needed to save performance data uploaded from user to our DB.

## Performance delivery scheme

1. User wants to deploy project
2. He runs deploy script on the server and it runs static builder, for example Webpack.
3. After Webpack finished his job, our [Webpack Plugin](https://github.com/codex-team/hawk.webpack.plugin) gets a source maps for new bundles and sends them to us.

example request:

```bash
curl --location 'http://localhost:3000/performance' \
--header 'Content-Type: application/json' \
--data '{
"token": "eyJpbnRlZ3JhdGlvbklkIjoiZTU2ZTU5ODctN2JhZi00NTI3LWI4MmMtYjdkOWRhZDBiMDBmIiwic2VjcmV0IjoiZDQ5YTU0YjMtOWExZi00ZGI2LTkxZmYtMjk4M2JlMTVlODA0In0=",
"projectId": "67d4adeccf25fa00ab563c32",
"catcherType": "performance",
"payload": {
"projectId": "67d4adeccf25fa00ab563c32",
"transactionId": "drxEFnbxGc7OumTVl3FkCm1v9BvBC9OpBrEiE3qG",
"name": "complex-operation",
"timestamp": 1742075217,
"duration": 702.9999999999964,
"startTime": 15322.000000000002,
"endTime": 16024.999999999998,
"catcherVersion": "3.2.1",
"spans": [
{
"id": "6tk2UD4m0wDUjD99uvO1wylp3SnYumiWPlhRCZ2w",
"name": "step-1",
"duration": 400.9999999999982,
"startTime": 15322.000000000002,
"endTime": 15723,
"transactionId": "drxEFnbxGc7OumTVl3FkCm1v9BvBC9OpBrEiE3qG"
},
{
"id": "V2ZOiUWjtip5ZSATIr7VX4KInAZgGJxdUQNZot2j",
"name": "step-2",
"duration": 301.9999999999982,
"startTime": 15723,
"endTime": 16024.999999999998,
"transactionId": "drxEFnbxGc7OumTVl3FkCm1v9BvBC9OpBrEiE3qG"
},
{
"id": "JGIerBJqInvnvpIPBwsUIfuIxt3LcWQ6lFPwQJdN",
"name": "step-3",
"duration": 300.9999999999982,
"startTime": 15724,
"endTime": 16024.999999999998,
"transactionId": "drxEFnbxGc7OumTVl3FkCm1v9BvBC9OpBrEiE3qG"
}
],
"tags": {
"type": "background"
}
}
}'
```

4. Collector accepts file and give a task for PerformanceWorker for saving it to DB
5. PerformanceWorker saves it to DB.
15 changes: 15 additions & 0 deletions workers/performance/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"name": "hawk-worker-performance",
"description": "Collects and parses performance",
"workerType": "performance",
"version": "0.0.1",
"main": "src/index.ts",
"repository": "https://github.com/codex-team/hawk.workers/tree/master/workers/performance",
"author": "CodeX",
"license": "UNLICENSED",
"private": true,
"devDependencies": {
"rimraf": "^3.0.0",
"webpack": "^4.39.3"
}
}
224 changes: 224 additions & 0 deletions workers/performance/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
import { DatabaseController } from '../../../lib/db/controller';
import { Worker } from '../../../lib/worker';
import { DatabaseReadWriteError } from '../../../lib/workerErrors';
import * as pkg from '../package.json';
import { Collection } from 'mongodb';
import type { PerformanceRecord, PerformanceDocument, AggregatedTransaction } from './types';

/**
* Performance worker
*/
export default class PerformanceWorker extends Worker {
/**
* Worker type (will pull tasks from Registry queue with the same name)
*/
public readonly type: string = pkg.workerType;

/**
* Database Controller
*/
private db: DatabaseController = new DatabaseController(process.env.MONGO_EVENTS_DATABASE_URI);

private readonly dbCollectionName: string = 'performance_transactions';

/**
* Collection to save performance data
*/
private performanceCollection: Collection<PerformanceDocument>;

/**
* Start consuming messages
*/
public async start(): Promise<void> {
await this.db.connect();
this.db.createGridFsBucket(this.dbCollectionName);
this.performanceCollection = this.db.getConnection().collection(this.dbCollectionName);
await super.start();
}

/**
* Finish everything
*/
public async finish(): Promise<void> {
await super.finish();
await this.db.close();
}

/**
* Message handle function
*
* @param task - Message object from consume method
*/
public async handle(task: PerformanceRecord): Promise<void> {
switch (task.catcherType) {
case 'performance': await this.savePerformance(task); break;
}
}

/**
* Save performance data to database
*
* @param data - Performance record containing project ID and performance metrics
*
* Key operations:
* 1. Round all numeric values to 3 decimal places to avoid floating point precision issues
* 2. Add timestamp to each transaction
* 3. Round values in aggregatedSpans as well
* 4. Use bulkWrite for efficient database operations
* 5. Trigger aggregation after saving
*/
private async savePerformance(data: PerformanceRecord): Promise<void> {
try {
const { projectId, payload, catcherType } = data;

if (catcherType !== 'performance') {
throw new Error('Invalid catcher type');
}

const ROUND_DECIMALS = 3;
const BASE = 10;
const transactionsWithTimestamp = payload.transactions.map(transaction => ({
...transaction,
timestamp: payload.timestamp,
minStartTime: Math.round(transaction.minStartTime * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS),
maxEndTime: Math.round(transaction.maxEndTime * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS),
maxDuration: Math.round(transaction.maxDuration * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS),
p50duration: Math.round(transaction.p50duration * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS),
p95duration: Math.round(transaction.p95duration * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS),
avgStartTime: Math.round(transaction.avgStartTime * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS),
aggregatedSpans: transaction.aggregatedSpans.map(span => ({
...span,
minStartTime: Math.round(span.minStartTime * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS),
maxEndTime: Math.round(span.maxEndTime * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS),
maxDuration: Math.round(span.maxDuration * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS),
p50duration: Math.round(span.p50duration * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS),
p95duration: Math.round(span.p95duration * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS),
})),
}));

const bulkOperations = transactionsWithTimestamp.map(transaction => ({
updateOne: {
filter: {
projectId,
name: transaction.name,
},
update: {
$push: {
transactions: transaction,
},
$setOnInsert: {
projectId,
name: transaction.name,
createdAt: new Date(),
},
},
upsert: true,
}
}));

await this.performanceCollection.bulkWrite(bulkOperations);
await this.aggregateTransactions(projectId);
} catch (err) {
this.logger.error(`Couldn't save performance data due to: ${err}`);
throw new DatabaseReadWriteError(err);
}
}

/**
* Aggregate transactions data for a project
*
* @param projectId - Project ID to aggregate data for
*
* Key operations:
* 1. Calculate min/max timestamps across all transactions
* 2. Sort durations array to calculate percentiles
* 3. Calculate p50 and p95 percentiles from sorted durations
* 4. Calculate failure rate based on error count
* 5. Round all numeric values to 3 decimal places
* 6. Update documents with aggregated data
*/
private async aggregateTransactions(projectId: string): Promise<void> {
const PERCENTILE_50 = 0.5;
const PERCENTILE_95 = 0.95;
const ROUND_DECIMALS = 3;

const aggregationPipeline = [
{ $match: { projectId } },
{ $unwind: '$transactions' },
{
$group: {
_id: '$name',
minStartTime: { $min: '$transactions.minStartTime' },
maxEndTime: { $max: '$transactions.maxEndTime' },
durations: { $push: '$transactions.maxDuration' },
maxDurations: { $push: '$transactions.maxDuration' },
totalCount: { $sum: 1 },
errorCount: {
$sum: {
$cond: [ { $eq: ['$transactions.status', 'error'] }, 1, 0],
},
},
},
},
{
$project: {
_id: 0,
name: '$_id',
minStartTime: {
$round: [
{ $min: '$minStartTime' },
ROUND_DECIMALS,
]
},
maxEndTime: {
$round: [
{ $max: '$maxEndTime' },
ROUND_DECIMALS,
]
},
p50duration: {
$round: [
{
$arrayElemAt: [
{ $sortArray: { input: '$durations', sortBy: 1 } },
{ $floor: { $multiply: [ { $size: '$durations' }, PERCENTILE_50] } },
]
},
ROUND_DECIMALS,
]
},
p95duration: {
$round: [
{
$arrayElemAt: [
{ $sortArray: { input: '$durations', sortBy: 1 } },
{ $floor: { $multiply: [ { $size: '$durations' }, PERCENTILE_95] } },
]
},
ROUND_DECIMALS,
]
},
maxDuration: {
$round: [
{ $max: '$maxDurations' },
ROUND_DECIMALS,
]
},
failureRate: {
$round: [
{ $divide: ['$errorCount', '$totalCount'] },
ROUND_DECIMALS,
]
},
},
},
];

const aggregatedData = await this.performanceCollection.aggregate<AggregatedTransaction>(aggregationPipeline).toArray();

await this.performanceCollection.updateMany(
{ projectId },
{ $set: { aggregatedData } }
);
}
}
Loading
Loading