Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 6 additions & 51 deletions api/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,6 @@ const moment = require("moment");
const tracker = require('./parts/mgmt/tracker.js');
require("./init_configs.js");

// EXTENSIVE DEBUGGING - Print configuration
console.log('=== COUNTLY API STARTUP DEBUG ===');
console.log('Process ENV SERVICE_TYPE:', process.env.SERVICE_TYPE);
console.log('Config loaded:', !!countlyConfig);
console.log('Config keys:', Object.keys(countlyConfig));
console.log('Full config:', JSON.stringify(countlyConfig, null, 2));
console.log('API config:', JSON.stringify(countlyConfig.api, null, 2));
console.log('MongoDB config:', JSON.stringify(countlyConfig.mongodb, null, 2));
console.log('ClickHouse config:', JSON.stringify(countlyConfig.clickhouse, null, 2));
console.log('Logging config:', JSON.stringify(countlyConfig.logging, null, 2));
console.log('=== END CONFIG DEBUG ===');

var granuralQueries = require('./parts/queries/coreAggregation.js');

//Add deletion manager endpoint
Expand All @@ -36,24 +24,16 @@ require('./utils/mutationManager.js');
var t = ["countly:", "api"];
common.processRequest = processRequest;

console.log("Starting Countly", "version", versionInfo.version, "package", pack.version);
console.log('=== DATABASE CONFIG CHECK ===');
console.log('countlyConfig.mongodb:', JSON.stringify(countlyConfig.mongodb, null, 2));
console.log('frontendConfig.mongodb:', JSON.stringify(frontendConfig.mongodb, null, 2));
log.i("Starting Countly", "version", versionInfo.version, "package", pack.version);
if (!common.checkDatabaseConfigMatch(countlyConfig.mongodb, frontendConfig.mongodb)) {
log.w('API AND FRONTEND DATABASE CONFIGS ARE DIFFERENT');
console.log('WARNING: Database configs do not match!');
}
console.log('=== END DATABASE CONFIG CHECK ===');

// Finaly set the visible title
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in comment: "Finaly" should be "Finally".

Suggested change
// Finaly set the visible title
// Finally set the visible title

Copilot uses AI. Check for mistakes.
process.title = t.join(' ');

console.log('=== CONNECTING TO DATABASES ===');
plugins.connectToAllDatabases().then(function() {
console.log('✓ Database connection successful');
console.log('common.db available:', !!common.db);
console.log('common.drillDb available:', !!common.drillDb);
log.d('Database connection successful');

plugins.loadConfigs(common.db, function() {
tracker.enable();
Expand All @@ -62,21 +42,18 @@ plugins.connectToAllDatabases().then(function() {
common.readBatcher = new ReadBatcher(common.db);
common.insertBatcher = new InsertBatcher(common.db);
common.queryRunner = new QueryRunner();
console.log('✓ Batchers and QueryRunner initialized');

common.drillQueryRunner = granuralQueries;
if (common.drillDb) {
common.drillReadBatcher = new ReadBatcher(common.drillDb, {configs_db: common.db});
console.log('✓ Drill database components initialized');
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leftover console.log statement that should be converted to use the log utility like the rest of the file. This is inconsistent with the cleanup done in the rest of this file where console.log statements were replaced with log.* methods.

Suggested change
console.log('✓ Drill database components initialized');
log.d('✓ Drill database components initialized');

Copilot uses AI. Check for mistakes.
common.drillReadBatcher = new ReadBatcher(common.drillDb);
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The common.drillReadBatcher is initialized twice. Line 48 creates it with configuration options {configs_db: common.db}, but then line 50 immediately overwrites it without those options. This appears to be a merge error or leftover debug code. The second initialization should be removed, or if the configuration options are no longer needed, only the first initialization should be removed.

Suggested change
common.drillReadBatcher = new ReadBatcher(common.drillDb);

Copilot uses AI. Check for mistakes.
}

/**
* Set Max Sockets
*/
console.log('=== SETTING MAX SOCKETS ===');
console.log('countlyConfig.api.max_sockets:', countlyConfig.api.max_sockets);
http.globalAgent.maxSockets = countlyConfig.api.max_sockets || 1024;
console.log('✓ Max sockets set to:', http.globalAgent.maxSockets);

// mutation manager default settings
[
Expand Down Expand Up @@ -116,16 +93,10 @@ plugins.connectToAllDatabases().then(function() {
});
}

/**
* Initialize Plugins
*/
console.log('=== INITIALIZING PLUGINS ===');

/**
* Initialize Plugins
*/
plugins.init();
console.log('✓ Plugins initialized');

/**
* Trying to gracefully handle the batch state
Expand Down Expand Up @@ -216,17 +187,13 @@ plugins.connectToAllDatabases().then(function() {

plugins.dispatch("/master", {}); // init hook

console.log('=== CREATING SERVER ===');
console.log('common.config.api:', JSON.stringify(common.config.api, null, 2));
const serverOptions = {
port: common.config.api.port,
host: common.config.api.host || ''
};
console.log('Server options:', serverOptions);

let server;
if (common.config.api.ssl && common.config.api.ssl.enabled) {
console.log('Creating HTTPS server with SSL');
const sslOptions = {
key: fs.readFileSync(common.config.api.ssl.key),
cert: fs.readFileSync(common.config.api.ssl.cert)
Expand All @@ -237,32 +204,20 @@ plugins.connectToAllDatabases().then(function() {
server = https.createServer(sslOptions, handleRequest);
}
else {
console.log('Creating HTTP server');
server = http.createServer(handleRequest);
}

console.log('Starting server on', serverOptions.host + ':' + serverOptions.port);
server.listen(serverOptions.port, serverOptions.host, () => {
console.log('✓ Server listening on', serverOptions.host + ':' + serverOptions.port);
log.i('Server listening on', serverOptions.host + ':' + serverOptions.port);
});

server.timeout = common.config.api.timeout || 120000;
server.keepAliveTimeout = common.config.api.timeout || 120000;
server.headersTimeout = (common.config.api.timeout || 120000) + 1000; // Slightly higher
console.log('✓ Server timeouts configured:', {
timeout: server.timeout,
keepAliveTimeout: server.keepAliveTimeout,
headersTimeout: server.headersTimeout
});

server.headersTimeout = (common.config.api.timeout || 120000) + 1000;

console.log('=== LOADING PLUGIN CONFIGS ===');
plugins.loadConfigs(common.db);
console.log('✓ Plugin configs loaded');
console.log('=== API STARTUP COMPLETE ===');
}).catch(function(error) {
console.error('❌ DATABASE CONNECTION FAILED:', error);
console.error('Error details:', error.stack);
log.e('Database connection failed:', error);
process.exit(1);
});

Expand Down
57 changes: 54 additions & 3 deletions api/config.sample.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,22 @@ var countlyConfig = {
},
max_open_connections: 10,
// Dictionary configuration for DictionaryManager
// dictionary: {
// enableMongoDBSource: true // Enable/disable MongoDB as a dictionary source (default: true, auto-disabled if mongodb driver not available)
// },
dictionary: {
enableMongoDBSource: true, // Enable/disable MongoDB as a dictionary source (auto-disabled if mongodb driver not available)
nativePort: 9000, // Native TCP port for dictionary connections (use 9440 for Cloud with TLS)
host: null, // Override host for dictionary connections (defaults to ClickHouse URL host)
secure: false // Enable TLS for dictionary connections (required for ClickHouse Cloud)
},
// Identity configuration for user merging and dictionary data retention
identity: {
daysOld: 30, // Number of days after which identity mappings are baked into cold partitions.
// Dictionary only loads mappings from the last (daysOld + 1) days.
// Used by both identity dictionary and ColdPartitionMerging job.
lifetime: {
min: 60, // Minimum dictionary cache lifetime in seconds (dictionary won't reload before this time)
max: 120 // Maximum dictionary cache lifetime in seconds (dictionary will reload after this time)
}
},
clickhouse_settings: {
idle_connection_timeout: 11000 + '',
async_insert: 1,
Expand All @@ -139,6 +152,44 @@ var countlyConfig = {
allow_suspicious_types_in_order_by: 1,
optimize_move_to_prewhere: 1,
query_plan_optimize_lazy_materialization: 1
},
/**
* Cluster configuration for distributed ClickHouse deployments
*
* Configuration uses boolean flags for clarity:
* - shards: false, replicas: false → single mode (default)
* - shards: false, replicas: true → replicated mode (HA, recommended)
* - shards: true, replicas: false → sharded mode (horizontal scaling, no HA)
* - shards: true, replicas: true → ha mode (full HA with sharding)
*
* @property {string} name - Cluster name (must match ClickHouse cluster config)
* @property {boolean} shards - Enable sharding (horizontal scaling across multiple shards)
* @property {boolean} replicas - Enable replication (high availability with multiple replicas)
* @property {boolean} isCloud - ClickHouse Cloud mode (skip DDL, validate schema exists)
*/
cluster: {
name: 'countly_cluster',
shards: false, // Enable sharding (horizontal scaling)
replicas: false, // Enable replication (high availability)
isCloud: false // Set to true for ClickHouse Cloud
},
// Replication configuration (used when cluster.replicas=true)
replication: {
coordinatorType: 'keeper', // 'keeper' (ClickHouse Keeper) or 'zookeeper'
zkPath: '/clickhouse/tables/{shard}/{database}/{table}',
replicaName: '{replica}'
},
// Parallel replicas configuration for query acceleration
// Only effective when cluster.replicas=true or cluster.isCloud=true
parallelReplicas: {
enabled: false, // Enable parallel replica queries
maxParallelReplicas: 2, // Number of replicas to use for parallel queries
clusterForParallelReplicas: null // null = auto-detect from cluster.name
},
// Distributed table configuration
distributed: {
writeThrough: true, // Write through distributed tables (not direct to local)
insertDistributedSync: true // Wait for data to be written to all shards
}
},
/**
Expand Down
5 changes: 5 additions & 0 deletions api/configextender.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ const OVERRIDES = {
}
},

// ClickHouse configuration overrides.
// Explicitly mapped keys are listed below. Additional nested keys (like cluster.*,
// parallelReplicas.*, replication.*, distributed.*, dictionary.*) work via the
// recursive dig() fallback mechanism, which creates nested objects from underscore-
// separated env var names (e.g., COUNTLY_CONFIG__CLICKHOUSE_CLUSTER_ENABLED -> clickhouse.cluster.enabled).
CLICKHOUSE: {
URL: 'url',
USERNAME: 'username',
Expand Down
25 changes: 24 additions & 1 deletion api/jobs/mutationManagerJob.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ catch {
//
}

let ClusterManager = null;
try {
ClusterManager = require('../../plugins/clickhouse/api/managers/ClusterManager');
}
catch {
// ClusterManager not available (clickhouse plugin not loaded)
}

const countlyConfig = require('../config');

/**
* Normalize mutation manager job configuration values
* @param {Object} cfg - Raw configuration object from the DB
Expand Down Expand Up @@ -311,7 +321,20 @@ class MutationManagerJob extends Job {
for (const task of awaiting) {
try {
if (chHealth && typeof chHealth.getMutationStatus === 'function') {
const status = await chHealth.getMutationStatus({ validation_command_id: task.validation_command_id, table: task.collection, database: task.db });
// In cluster mode, mutations target _local tables, so validation must check _local
let validationTable = task.collection;
if (ClusterManager) {
try {
const cm = new ClusterManager(countlyConfig.clickhouse || {});
if (cm.isClusterMode()) {
validationTable = task.collection + '_local';
}
}
catch (e) {
log.w('Could not determine cluster mode for validation table', e?.message);
}
}
Comment on lines +326 to +336
Copy link

Copilot AI Dec 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ClusterManager is instantiated inside a loop for every task being processed. This is inefficient and could impact performance when processing many mutation tasks. The cluster mode determination should be done once before the loop, either as a class property or by checking once at the start of the method.

Copilot uses AI. Check for mistakes.
const status = await chHealth.getMutationStatus({ validation_command_id: task.validation_command_id, table: validationTable, database: task.db });
if (status && status.is_done) {
await common.db.collection("mutation_manager").updateOne(
{ _id: task._id },
Expand Down
10 changes: 8 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@
},
"scripts": {
"test": "grunt --verbose",
"test:unit": "NODE_PATH=./node_modules/ npx mocha 'test/unit/*.unit.js' 'test/unit/*.unit.test.js' 'plugins/*/tests/**/*.unit.js' --reporter spec --timeout 50000",
"test:unit": "NODE_PATH=./node_modules/ npx mocha --preserve-symlinks --preserve-symlinks-main 'test/unit/*.unit.js' 'test/unit/*.unit.test.js' 'plugins/*/tests/**/*.unit.js' --reporter spec --timeout 50000",
"test:clickhouse:cluster": "node plugins/clickhouse/tests/run-cluster-tests.js",
"prepare": "husky",
"start:api": "node api/api.js",
"start:frontend": "node frontend/express/app.js",
"start:aggregator": "node api/aggregator.js",
"start:ingestor": "node api/ingestor.js",
"start:jobserver": "node jobServer/index.js"
"start:jobserver": "node jobServer/index.js",
"start:api:dev": "COUNTLY_CONFIG__SYMLINKED=true NODE_ENV=development COUNTLY_CONTAINER=api node --preserve-symlinks --preserve-symlinks-main --trace-warnings api/api.js",
"start:frontend:dev": "COUNTLY_CONFIG__SYMLINKED=true NODE_ENV=development COUNTLY_CONTAINER=api node --preserve-symlinks --preserve-symlinks-main --trace-warnings frontend/express/app.js",
"start:aggregator:dev": "COUNTLY_CONFIG__SYMLINKED=true NODE_ENV=development COUNTLY_CONTAINER=api node --preserve-symlinks --preserve-symlinks-main --trace-warnings api/aggregator.js",
"start:ingestor:dev": "COUNTLY_CONFIG__SYMLINKED=true NODE_ENV=development COUNTLY_CONTAINER=api node --preserve-symlinks --preserve-symlinks-main --trace-warnings api/ingestor.js",
"start:jobserver:dev": "COUNTLY_CONFIG__SYMLINKED=true NODE_ENV=development COUNTLY_CONTAINER=api node --preserve-symlinks --preserve-symlinks-main --trace-warnings jobServer/index.js"
},
"lint-staged": {
"*.{js,vue}": [
Expand Down
Loading