[controller] Register PARTITION_STATE and STORE_VERSION_STATE schemas at startup#2745
[controller] Register PARTITION_STATE and STORE_VERSION_STATE schemas at startup#2745pthirun wants to merge 2 commits intolinkedin:mainfrom
Conversation
2087150 to
3c9b2f2
Compare
startup Add ControllerClientBackedSystemSchemaInitializer for PARTITION_STATE and STORE_VERSION_STATE, matching the existing pattern for KAFKA_MESSAGE_ENVELOPE. This allows any child controller to register new schema versions at startup, removing the dependency on deploying the system schema cluster controller first. Gated by new config controller.storage.protocol.schema.startup.registration.enabled (default: false) for intentional rollout. Requires the existing system.schema.initialization.at.start.time.enabled to also be true. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
3c9b2f2 to
6ccf2de
Compare
kvargha
left a comment
There was a problem hiding this comment.
Please add some tests to validate the behavior
| ControllerClientBackedSystemSchemaInitializer partitionStateSchemaInitializer = | ||
| new ControllerClientBackedSystemSchemaInitializer( | ||
| AvroProtocolDefinition.PARTITION_STATE, | ||
| systemStoreCluster, | ||
| null, | ||
| null, | ||
| false, | ||
| ((VeniceHelixAdmin) admin).getSslFactory(), | ||
| childControllerUrl, | ||
| d2ServiceName, | ||
| regionD2Client, | ||
| d2ZkHost, | ||
| sslOnly); | ||
| partitionStateSchemaInitializer.execute(); |
There was a problem hiding this comment.
Can we extract this out to a helper?
Address review feedback: - Extract createAndExecuteSchemaInitializer() helper to reduce duplication across KME, PARTITION_STATE, and STORE_VERSION_STATE initializers - Add config parsing tests for stateProtocolSchemaStartupRegistrationEnabled (default false + explicit true) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR aims to prevent child-region servers from crash-looping during rolling deployments by ensuring missing system-store schema versions for PARTITION_STATE and STORE_VERSION_STATE can be registered at controller startup (similar to the existing KAFKA_MESSAGE_ENVELOPE startup registration path).
Changes:
- Add a new controller config flag to gate startup registration of
PARTITION_STATEandSTORE_VERSION_STATEschemas. - Refactor controller startup schema initialization to reuse a helper for creating/executing
ControllerClientBackedSystemSchemaInitializer. - Add unit tests validating the new config flag default (
false) and enabled behavior.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceController.java | Adds helper-based startup schema registration; conditionally registers PARTITION_STATE and STORE_VERSION_STATE schemas. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java | Introduces a new boolean config field + getter for the startup registration gate. |
| internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java | Adds the new config key and its Javadoc describing intended behavior/dependency. |
| services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceControllerClusterConfig.java | Adds tests for the new config flag default and enabled cases. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| new ControllerClientBackedSystemSchemaInitializer( | ||
| protocolDefinition, | ||
| systemStoreCluster, | ||
| null, | ||
| null, | ||
| false, | ||
| ((VeniceHelixAdmin) admin).getSslFactory(), | ||
| childControllerUrl, | ||
| d2ServiceName, | ||
| regionD2Client, | ||
| d2ZkHost, | ||
| sslOnly).execute(); |
| private static void createAndExecuteSchemaInitializer( | ||
| AvroProtocolDefinition protocolDefinition, | ||
| String systemStoreCluster, | ||
| Admin admin, | ||
| String childControllerUrl, | ||
| String d2ServiceName, | ||
| Optional<D2Client> regionD2Client, | ||
| String d2ZkHost, | ||
| boolean sslOnly) { | ||
| new ControllerClientBackedSystemSchemaInitializer( | ||
| protocolDefinition, | ||
| systemStoreCluster, | ||
| null, | ||
| null, | ||
| false, | ||
| ((VeniceHelixAdmin) admin).getSslFactory(), | ||
| childControllerUrl, |
| /** | ||
| * Whether to register PARTITION_STATE and STORE_VERSION_STATE schemas via | ||
| * ControllerClientBackedSystemSchemaInitializer at controller startup. | ||
| * Requires {@link #SYSTEM_SCHEMA_INITIALIZATION_AT_START_TIME_ENABLED} to also be true. | ||
| * Default: false. | ||
| */ | ||
| public static final String CONTROLLER_STATE_PROTOCOL_SCHEMA_STARTUP_REGISTRATION_ENABLED = | ||
| "controller.state.protocol.schema.startup.registration.enabled"; |
| } | ||
|
|
||
| public boolean isStateProtocolSchemaStartupRegistrationEnabled() { | ||
| return stateProtocolSchemaStartupRegistrationEnabled; |
| ControllerClientBackedSystemSchemaInitializer metaSystemStoreSchemaInitializer = | ||
| new ControllerClientBackedSystemSchemaInitializer( | ||
| AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE, | ||
| systemStoreCluster, | ||
| AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE_KEY.getCurrentProtocolVersionSchema(), | ||
| VeniceSystemStoreUtils.DEFAULT_USER_SYSTEM_STORE_UPDATE_QUERY_PARAMS, | ||
| true, | ||
| ((VeniceHelixAdmin) admin).getSslFactory(), | ||
| childControllerUrl, | ||
| d2ServiceName, | ||
| regionD2Client, | ||
| d2ZkHost, | ||
| sslOnly); | ||
| metaSystemStoreSchemaInitializer.execute(); |
There was a problem hiding this comment.
Can this also use the helper?
| public void testStateProtocolSchemaStartupRegistrationDefaultsToFalse() { | ||
| Properties baseProps = getBaseSingleRegionProperties(false); | ||
| VeniceControllerClusterConfig config = new VeniceControllerClusterConfig(new VeniceProperties(baseProps)); | ||
| assertFalse(config.isStateProtocolSchemaStartupRegistrationEnabled()); | ||
| } |
There was a problem hiding this comment.
Can we add an integration test too where this is enabled?
Problem Statement
During rolling deployments, servers deployed before the system schema cluster controller
crash-loop when attempting to fetch new PartitionState or StoreVersionState schema versions.
The schemas are only registered by SystemSchemaInitializationRoutine, which runs exclusively
on the leader of the designated system schema cluster.
If that controller hasn't been deployed yet, the schema is missing and servers fail.
KAFKA_MESSAGE_ENVELOPE already has a ControllerClientBackedSystemSchemaInitializer that runs
on any child controller at startup, avoiding this deployment ordering issue. PARTITION_STATE
and STORE_VERSION_STATE do not.
Solution
Add ControllerClientBackedSystemSchemaInitializer for PARTITION_STATE and STORE_VERSION_STATE
alongside the existing KME initializer in VeniceController.java. Any child controller that
starts up will now register missing schema versions via the controller REST API.
Code changes
systemSchemaInitializationAtStartTimeEnabledgate.Concurrency-Specific Checks
How was this PR tested?
Does this PR introduce any user-facing or breaking changes?