[controller] Add configs to tune replication factor based on current/future/backup version status#2740
[controller] Add configs to tune replication factor based on current/future/backup version status#2740misyel wants to merge 5 commits intolinkedin:mainfrom
Conversation
…rsion transition phases
There was a problem hiding this comment.
Pull request overview
Adds an RF/MinActiveReplicas tuning framework (behind controller.rf.tuning.enabled) so replication settings can be configured per version lifecycle stage (future/current/backup), and applies these settings to version metadata and Helix IdealStates during version creation and transitions.
Changes:
- Introduces new per-cluster config keys for RF tuning (future/current/backup RF + MinActiveReplicas) and reads them in
VeniceControllerClusterConfig. - Applies tuned RF/MinActiveReplicas when creating Helix resources (
ZkHelixAdminClient) and during version lifecycle transitions (VeniceHelixAdmin), including a newupdateIdealState(..., minActive, numReplicas)overload. - Updates/extends unit tests to cover RF tuning enabled/disabled behaviors.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkHelixAdminClient.java | Use tuned future-version RF/minActive when creating Helix resources. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java | Apply tuned RF during version creation and lifecycle transitions; add IdealState update overload. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java | Add RF tuning config fields and defaults. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/StoreBackupVersionCleanupService.java | Switch backup-version replica reduction behavior to RF tuning path. |
| internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java | Add new RF tuning config keys. |
| services/venice-controller/src/test/java/com/linkedin/venice/controller/TestZkHelixAdminClient.java | Add tests for resource creation RF tuning enabled/disabled. |
| services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdmin.java | Add version lifecycle transition tests for RF tuning enabled/disabled. |
| services/venice-controller/src/test/java/com/linkedin/venice/controller/TestStoreBackupVersionCleanupService.java | Add tests for backup replica reduction under RF tuning and update existing test expectations. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…egacy fallback - Extract applyRfTuningOnVersionSwap() helper to deduplicate RF tuning block across setCurrentVersion, rollForwardToFutureVersion, and rollbackToBackupVersion - Add input validation (RF >= 1, 1 <= minActive <= RF) in both updateIdealState() and createVeniceStorageClusterResources() - Clamp RF tuning config defaults in VeniceControllerClusterConfig to prevent invalid Helix IdealStates when RF=1 - Restore legacy backupVersionReplicaReductionEnabled fallback path for clusters not using RF tuning - Fix Javadoc on updateIdealState to reflect single atomic write - Rename migrationClusterConfig to destClusterConfig for clarity Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Existing test testRollForwardPartitionNotReady mocks VeniceHelixAdmin without setting up multiClusterConfigs, causing NPE when the helper method is called. Add null guard for multiClusterConfigs and clusterConfig at the top of applyRfTuningOnVersionSwap. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
HelixAdminClient, comprehensive tests - Move backup RF reduction from StoreBackupVersionCleanupService to version swap time — backup versions are now reduced immediately when demoted, not lazily during cleanup - Add updateIdealState(cluster, resource, minActive, numReplicas) to HelixAdminClient interface with validation; VeniceHelixAdmin delegates to it - Add RF tuning to AbstractPushMonitor version swap paths (normal push and sequential roll forward) — the primary version swap path - Fix rollbackToBackupVersion to also reduce the demoted previous version RF (was passing NON_EXISTING_VERSION) - Add 17 new tests covering: config clamping (4), updateIdealState validation/no-op (5), push monitor version swap (2), rollback RF tuning (2), setStoreCurrentVersion RF tuning (2), lifecycle transitions (2 existing) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
services/venice-controller/src/test/java/com/linkedin/venice/controller/TestStoreBackupVersionCleanupService.java:333
- This test no longer enables/validates the legacy backup-version replica reduction path (
controller.backup.version.replica.reduction.enabled). Given the PR description states this flag remains as a fallback when RF tuning is disabled, please keep a test that asserts the legacy IdealState update still happens during the “not ready for deletion yet” window (or update the PR description if the fallback is intentionally removed).
public void testCleanupBackupVersion_OnlyOneBackupVersion() {
// Test that a store with only one backup version (two versions total) doesn't get cleaned up
Map<Integer, VersionStatus> versions = new HashMap<>();
versions.put(1, VersionStatus.ONLINE);
versions.put(2, VersionStatus.ONLINE);
// Create a store with two versions (one backup, one current)
Store storeWithOneBackup = mockStore(360000, System.currentTimeMillis() - DEFAULT_RETENTION_MS * 2, versions, 2);
doReturn(System.currentTimeMillis()).when(storeWithOneBackup).getLatestVersionPromoteToCurrentTimestamp();
// Should not clean up since there's only one backup version
Assert.assertFalse(service.cleanupBackupVersion(storeWithOneBackup, CLUSTER_NAME));
// Verify no versions were deleted
verify(admin, never()).deleteOldVersionInStore(CLUSTER_NAME, storeWithOneBackup.getName(), 1);
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (!whetherStoreReadyToBeCleanup( | ||
| store, | ||
| admin.getBackupVersionDefaultRetentionMs(), | ||
| time, | ||
| currentVersion, | ||
| this.minBackupVersionCleanupDelay)) { | ||
| // not ready to clean up backup versions yet, update the backup version ideal state to use 2 replicas after | ||
| // minimal delay | ||
| if (multiClusterConfig.getControllerConfig(clusterName).isBackupVersionReplicaReductionEnabled()) { | ||
| for (Version version: versions) { | ||
| if (version.getNumber() >= currentVersion) { | ||
| continue; | ||
| } | ||
|
|
||
| if (admin.updateIdealState( | ||
| clusterName, | ||
| Version.composeKafkaTopic(store.getName(), version.getNumber()), | ||
| MIN_REPLICA)) { | ||
| LOGGER.info( | ||
| "Store {} version {} is updated to ideal state to use {} replicas", | ||
| store.getName(), | ||
| version.getNumber(), | ||
| MIN_REPLICA); | ||
| } | ||
| } | ||
| } | ||
| return false; | ||
| } |
There was a problem hiding this comment.
cleanupBackupVersion no longer applies the legacy controller.backup.version.replica.reduction.enabled behavior when the store is not yet eligible for deletion (the previous block that reduced backup-version replicas/minActive during the waiting period is gone). This is both a behavior regression (the config is now effectively unused) and contradicts the PR description that the legacy flag is preserved as a fallback when RF tuning is disabled. Consider restoring the legacy reduction path (e.g., when RF tuning is disabled and isBackupVersionReplicaReductionEnabled() is true) or updating the PR description/config surface accordingly.
| // Update version metadata RF | ||
| store.getVersion(newCurrentVersion).setReplicationFactor(currentVersionRfCount); | ||
| if (previousVersion != Store.NON_EXISTING_VERSION && store.containsVersion(previousVersion)) { | ||
| store.getVersion(previousVersion).setReplicationFactor(backupVersionRfCount); | ||
| } | ||
|
|
||
| // Update Helix IdealState | ||
| String currentVersionTopic = Version.composeKafkaTopic(storeName, newCurrentVersion); | ||
| helixAdminClient | ||
| .updateIdealState(clusterName, currentVersionTopic, currentVersionMinActiveReplicaCount, currentVersionRfCount); | ||
|
|
||
| if (previousVersion != Store.NON_EXISTING_VERSION) { | ||
| String backupVersionTopic = Version.composeKafkaTopic(storeName, previousVersion); | ||
| helixAdminClient | ||
| .updateIdealState(clusterName, backupVersionTopic, backupVersionMinActiveReplicaCount, backupVersionRfCount); | ||
| } |
There was a problem hiding this comment.
applyRfTuningOnVersionSwap is invoked while holding the per-store write lock (clusterLockManager.createStoreWriteLock(...)). The method performs synchronous Helix IdealState updates (ZK writes) which can prolong lock hold time and block other metadata operations for the store. Consider moving the Helix updates outside the lock (e.g., compute intended changes under the lock, persist the store update, then update IdealStates after releasing the lock), or otherwise ensure this cannot introduce long lock contention in production.
| // Update version metadata RF | |
| store.getVersion(newCurrentVersion).setReplicationFactor(currentVersionRfCount); | |
| if (previousVersion != Store.NON_EXISTING_VERSION && store.containsVersion(previousVersion)) { | |
| store.getVersion(previousVersion).setReplicationFactor(backupVersionRfCount); | |
| } | |
| // Update Helix IdealState | |
| String currentVersionTopic = Version.composeKafkaTopic(storeName, newCurrentVersion); | |
| helixAdminClient | |
| .updateIdealState(clusterName, currentVersionTopic, currentVersionMinActiveReplicaCount, currentVersionRfCount); | |
| if (previousVersion != Store.NON_EXISTING_VERSION) { | |
| String backupVersionTopic = Version.composeKafkaTopic(storeName, previousVersion); | |
| helixAdminClient | |
| .updateIdealState(clusterName, backupVersionTopic, backupVersionMinActiveReplicaCount, backupVersionRfCount); | |
| } | |
| // Update version metadata RF synchronously; this is the portion that belongs in the store metadata path. | |
| store.getVersion(newCurrentVersion).setReplicationFactor(currentVersionRfCount); | |
| final boolean shouldUpdatePreviousVersion = | |
| previousVersion != Store.NON_EXISTING_VERSION && store.containsVersion(previousVersion); | |
| if (shouldUpdatePreviousVersion) { | |
| store.getVersion(previousVersion).setReplicationFactor(backupVersionRfCount); | |
| } | |
| // Avoid blocking the caller on synchronous Helix/ZK writes. Capture the intended IdealState changes | |
| // and apply them asynchronously so this method can safely be invoked from a locked metadata path. | |
| final String currentVersionTopic = Version.composeKafkaTopic(storeName, newCurrentVersion); | |
| final String backupVersionTopic = | |
| shouldUpdatePreviousVersion ? Version.composeKafkaTopic(storeName, previousVersion) : null; | |
| java.util.concurrent.CompletableFuture.runAsync(() -> { | |
| try { | |
| helixAdminClient.updateIdealState( | |
| clusterName, | |
| currentVersionTopic, | |
| currentVersionMinActiveReplicaCount, | |
| currentVersionRfCount); | |
| if (backupVersionTopic != null) { | |
| helixAdminClient.updateIdealState( | |
| clusterName, | |
| backupVersionTopic, | |
| backupVersionMinActiveReplicaCount, | |
| backupVersionRfCount); | |
| } | |
| } catch (Exception e) { | |
| LOGGER.error( | |
| "Failed to apply RF tuning IdealState updates for store: {}, newCurrentVersion: {}, previousVersion: {}", | |
| storeName, | |
| newCurrentVersion, | |
| previousVersion, | |
| e); | |
| } | |
| }); |
| if (multiClusterConfigs == null) { | ||
| return; | ||
| } | ||
| VeniceControllerClusterConfig clusterConfig = multiClusterConfigs.getControllerConfig(clusterName); | ||
| if (clusterConfig == null || !clusterConfig.isRfTuningEnabled() || newCurrentVersion == NON_EXISTING_VERSION) { | ||
| return; | ||
| } |
There was a problem hiding this comment.
RF tuning is described as a per-cluster feature flag for child controllers, but applyRfTuningOnVersionSwap does not guard against running in the parent controller. Since setStoreCurrentVersion(...) can run in parent when allowedInParent is true, enabling this flag in parent config could unexpectedly mutate version RF metadata and attempt IdealState updates. Consider adding an explicit isParent()/multiClusterConfigs.isParent() check (or otherwise enforcing the flag is only honored on child) to match the intended behavior.
…ntroller guard - Split applyRfTuningOnVersionSwap into two methods: applyRfTuningMetadataUpdate (inside lock) and applyRfTuningIdealStateUpdate (outside lock) to avoid prolonged lock hold time on ZK writes - Same split applied to AbstractPushMonitor version swap paths - Add isParent() guard to both methods — RF tuning only applies on child controllers which manage Helix IdealState - Use getters instead of direct field access for testability (getRealTimeTopicSwitcher, getMultiClusterConfigs, getHelixAdminClient) - Remove reflection from all tests, use doReturn on getters instead Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
| public static final String CONTROLLER_RF_TUNING_ENABLED = "controller.rf.tuning.enabled"; | ||
| public static final String CONTROLLER_CURRENT_VERSION_RF_COUNT = "controller.current.version.rf.count"; | ||
| public static final String CONTROLLER_CURRENT_VERSION_MIN_ACTIVE_REPLICA_COUNT = | ||
| "controller.current.version.min.active.replica.count"; | ||
| public static final String CONTROLLER_BACKUP_VERSION_RF_COUNT = "controller.backup.version.rf.count"; | ||
| public static final String CONTROLLER_BACKUP_VERSION_MIN_ACTIVE_REPLICA_COUNT = | ||
| "controller.backup.version.min.active.replica.count"; | ||
| public static final String CONTROLLER_FUTURE_VERSION_RF_COUNT = "controller.future.version.rf.count"; | ||
| public static final String CONTROLLER_FUTURE_VERSION_MIN_ACTIVE_REPLICA_COUNT = | ||
| "controller.future.version.min.active.replica.count"; | ||
|
|
There was a problem hiding this comment.
I just realized that we have a per-store RF factor definition
so we might need to refactor this, as a delta over the default RF
as in ,
assuming store configs dictate the RF to be 5
FUTURE_VERSION_RF_DELTA = 0, and future version will have the same 5 replicas
BACKUP_VERSION_RF_DELTA = -3 , and backup version will have (5-3) 2 replicas.
of course, we still need the guard of max (1, calculated_rf) .
Problem Statement
Today, replication factor (RF) and MinActiveReplicas are tightly coupled — MinActiveReplicas is always derived as
RF - 1(hardcoded inVersionImpl.getMinActiveReplicas()), and neither can be tuned per version lifecycle stage (future, current, backup).This makes it impossible to:
Solution
Introduces a configurable RF tuning framework behind a per-cluster feature flag (
controller.rf.tuning.enabled) on child controllers. When enabled, the controller uses configurable RF andMinActiveReplicas values for each version lifecycle stage instead of deriving them from the store-level
replicationFactor.New config properties:
controller.rf.tuning.enabledfalsecontroller.future.version.rf.countdefault.replica.factorcontroller.future.version.min.active.replica.countfuture RF - 1controller.current.version.rf.countdefault.replica.factorcontroller.current.version.min.active.replica.countcurrent RF - 1controller.backup.version.rf.countdefault.replica.factorcontroller.backup.version.min.active.replica.countbackup RF - 1Behavior when enabled:
At each version lifecycle transition, the controller updates both the version metadata RF (persisted to ZK) and the Helix IdealState:
future.version.rf.count, Helix resource created with configured RF/MinActiveReplicascurrent.version.rf.count, Helix IdealState updatedbackup.version.rf.count, Helix IdealState updated (MinActiveReplicas lowered first, then numReplicas reduced)When disabled: All defaults fall back to the existing
default.replica.factorvalue, making this a complete no-op. The existingcontroller.backup.version.replica.reduction.enabledbehavioris preserved as a fallback.
NOTE: While the
min.active.replica.countconfigs are accepted and applied to the Helix IdealState,VersionImpl.getMinActiveReplicas()still returnsRF - 1. This means push monitoring, version swap readiness checks, and other controller-side decision logic always derive MinActiveReplicas from the version's RF rather than using the independently configured value. This will be addressed in a followup prCode changes
the flag alone with no other config changes is a no-op.
Concurrency-Specific Checks
storeMetadataUpdatelambdas which hold the store write lock. Helix IdealStateupdates are atomic ZK writes.
storeMetadataUpdatestore write lock pattern.How was this PR tested?
TestStoreBackupVersionCleanupService: 2 new tests — verifies backup reduction uses configured RF/MinActiveReplicas when tuning enabled, and no reduction when disabledTestZkHelixAdminClient: 2 new tests — verifies Helix resource creation usesfutureVersionRfCountwhen tuning enabled, and store-level RF when disabledTestVeniceHelixAdmin: 1 parameterized test (2 cases) — verifies full version lifecycle RF transitions duringrollForwardToFutureVersionwith tuning enabled/disabledtestCleanupBackupVersion_OnlyOneBackupVersionto use RF tuning path.false.Does this PR introduce any user-facing or breaking changes?