[controller] Add ValueSchemaCreatedListener for value schema events#2714
Conversation
…schema registration Add onValueSchemaCreated event to VeniceVersionLifecycleEventListener so that listeners (e.g. Proteus) can restart jobs when a new value schema is registered, avoiding data missing due to ser/de process with stale schemas. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…sitory into listeners - onValueSchemaCreated is now an abstract method (not default) so all implementors must handle it - Add default setSchemaRepository() method to VeniceVersionLifecycleEventListener so listeners can receive the ReadOnlySchemaRepository after cluster initialization - VeniceControllerStateModel.initClusterResources() injects schemaRepository into all registered listeners after HelixVeniceClusterResources is created, resolving the chicken-and-egg issue where listeners are created before the schema repository is available - Add no-op onValueSchemaCreated override in AbstractTestVeniceHelixAdmin Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Listeners resolve the effective schema ID via schemaRepository.getSupersetOrLatestValueSchema() rather than using the newly registered schema entry directly, so the parameter is unnecessary. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
VeniceVersionLifecycleEventManager Covers: dispatch to all registered listeners and correct isSourceCluster flag forwarding. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
TestVeniceVersionLifecycleEventManager Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Adds a new controller lifecycle callback so components can react when a store registers a new value schema, helping listeners keep schema state fresh and avoid ser/de issues.
Changes:
- Extend
VeniceVersionLifecycleEventListenerwithonValueSchemaCreated(...)and optionalsetSchemaRepository(...)injection. - Add dispatch support in
VeniceVersionLifecycleEventManagerand wire schema repository injection during controller cluster resource initialization. - Fire the new event from
VeniceHelixAdmin.addValueSchema(...)and update/extend tests accordingly.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceVersionLifecycleEventListener.java | Adds new schema-created callback + schema repo injection hook. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceVersionLifecycleEventManager.java | Adds notification fan-out method for the new event. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java | Fires value-schema-created notification after schema persistence. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerStateModel.java | Injects schema repository into listeners post resource initialization. |
| services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceVersionLifecycleEventManager.java | Adds unit tests for the new notify path. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/AbstractTestVeniceHelixAdmin.java | Updates mock listener to implement the new abstract method. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
explanation Document the exact failure mode: Avro reader/writer resolution fills in defaults on read, but re-serialization with a stale writer schema silently drops new fields on the write path — causing irreversible data loss with no exception thrown. Also note why listeners must use schemaRepository rather than store.getLatestSuperSetValueSchemaId() to resolve the effective schema ID. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Fire notifyValueSchemaCreated from both addValueSchema overloads, not just the one with an explicit schemaId parameter - Skip notification when schema is a duplicate (DUPLICATE_VALUE_SCHEMA_CODE) - Compute isSourceCluster correctly for migrating stores instead of hardcoding true - Hoist HelixVeniceClusterResources lookup to avoid duplicate calls - Assert dispatched Store is a ReadOnlyStore instance in the passesReadOnlyStore test Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Deduplicate the notification logic shared by both addValueSchema overloads into a single private method: skip on duplicate, compute isSourceCluster correctly for migrating stores. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…id for duplicates When addValueSchema returns DUPLICATE_VALUE_SCHEMA_CODE, look up the real schema id via getValueSchemaId so callers (e.g. SchemaRoutes) always receive a concrete id in the response rather than -2. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
notifyValueSchemaCreated helper Both addValueSchema overloads now handle the duplicate case with an early return, so the helper no longer needs the schemaId param or the guard. Rename maybeNotify -> notify to reflect that it always fires. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (1)
services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java:7027
maybeNotifyValueSchemaCreateddereferencesresources.getStoreMetadataRepository().getStore(storeName)without any store lock or null-handling.ReadOnlyStoreRepository#getStoreis allowed to return null, and the listener interface docs state lifecycle callbacks are serialized under the store-level write lock. Consider acquiringresources.getClusterLockManager().createStoreWriteLockOnly(storeName)around the store fetch /isMigrating()check and usinggetStoreOrThrow(...)(or an explicit null check) before notifying listeners to avoid races/NPEs and to keep callback concurrency consistent with other lifecycle events.
}
resources.getVeniceVersionLifecycleEventManager().notifyValueSchemaCreated(store, isSourceCluster);
}
/**
* Add a new derived schema for the given store with all specified properties and return a new
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Add four unit tests in TestVeniceHelixAdmin to verify the new behavior introduced in addValueSchema: both overloads fire notifyValueSchemaCreated exactly once for genuinely new schemas, and skip the notification for duplicate schema registrations. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…istener Address review feedback (xunyin8 on linkedin#2714): schema-creation events shouldn't ride on VeniceVersionLifecycleEventListener, which is meant for version-level events. Introduce a dedicated ValueSchemaCreatedListener interface and a ValueSchemaCreatedEventManager dispatcher mirroring VeniceVersionLifecycleEventManager. - Add ValueSchemaCreatedListener (storeName, SchemaEntry, isSourceCluster) in venice-common. - Add ValueSchemaCreatedEventManager in venice-controller, held by HelixVeniceClusterResources alongside the version-lifecycle manager. - Thread Optional<List<ValueSchemaCreatedListener>> through the controller registration chain (Context, Controller, Service, HelixAdmin, factory, state model). - VeniceHelixAdmin.notifyValueSchemaCreated computes isSourceCluster (true for non-migrating stores; resources.isSourceCluster otherwise) and dispatches via the new manager after a successful, non-duplicate addValueSchema write. - Remove onValueSchemaCreated and setSchemaRepository from VeniceVersionLifecycleEventListener; remove the matching notify path from VeniceVersionLifecycleEventManager. Tests: schema-firing assertions live in TestVeniceHelixAdmin (verifying ValueSchemaCreatedEventManager dispatch); HelixReadWriteSchemaRepositoryTest returns to its pre-feature scope; obsolete schema cases in TestVeniceVersionLifecycleEventManager removed. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- VeniceHelixAdmin: extract `isSourceCluster(clusterName, storeName)` private helper. Three pre-existing call sites (ETL offboard during store delete, notifyVersionDeleted, ETL strategy update) and the new `maybeNotifyValueSchemaCreated` helper now share a single implementation of the canonical pattern (`!store.isMigrating() || resources.isSourceCluster(...)`). - Rename `notifyValueSchemaCreated` → `maybeNotifyValueSchemaCreated` and move the duplicate-schema skip guard inside the helper, simplifying both `addValueSchema` overloads. Overload-1 also collapses its concrete-id lookup for duplicates into a ternary at the return. - ValueSchemaCreatedEventManager: catch `Exception` (not `Throwable`) when isolating listener failures, so fatal JVM errors (OOM, StackOverflowError) propagate instead of being silently swallowed. Addresses Copilot review comment. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ration test - New `TestValueSchemaCreatedEventManager` unit test covers the dispatcher's in-order listener fan-out and exception-isolation behavior. Brings diff coverage of the new manager from 0% to 100% (try + catch branches). - New integration test `testValueSchemaCreatedEvents` in `TestVeniceHelixAdminWithSharedEnvironment` exercises the end-to-end notification path through `VeniceHelixAdmin`: a real `addValueSchema` call fires the listener exactly once for new schemas, skips duplicates, and propagates `isSourceCluster=true` for non-migrating stores. - Add scaffolding (`mockValueSchemaCreatedListener`, capture list, reset helper) to `AbstractTestVeniceHelixAdmin` and wire the listener through the `VeniceHelixAdmin` constructor. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…a-event
dispatch
- Refactor `isSourceCluster(...)` to accept the {@code Store} explicitly so
the
three pre-existing call sites (ETL offboard, version-deleted notify, ETL
strategy update) reuse the {@code Store} they already have in scope without
a second lookup.
- `maybeNotifyValueSchemaCreated` now fetches the store once (via the
cluster resources it already has) and skips notification entirely if the
store has been removed concurrently between the schema write and the
listener dispatch — protects against an NPE on a successful write.
- `isSourceCluster` keeps a defensive {@code null} fallback that delegates to
the cross-cluster {@code StoreConfig}-based check, so other callers
remain safe.
- Update javadoc on both helpers to reflect the new signatures and skip
semantics.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… dispatch
Mirror the version-lifecycle pattern: ValueSchemaCreatedListener now receives
the
{@link Store} snapshot directly instead of the store name. Eliminates the
need for
downstream listeners to inject a ReadOnlyStoreRepository post-init just to
look up
the Store in their callback.
- {@code ValueSchemaCreatedListener.handleValueSchemaCreated} now takes
{@code (Store, SchemaEntry, boolean)} and the dispatcher wraps the Store in
a
{@link ReadOnlyStore} before invoking listeners (matches the
version-lifecycle
manager).
- {@code ValueSchemaCreatedEventManager.notifyValueSchemaCreated} adds an
explicit {@code storeName} parameter and skips dispatch with a descriptive
warning if the {@code Store} snapshot is null (e.g. store concurrently
deleted between the schema write and the listener dispatch). Listeners can
rely on the contract that the snapshot is non-null.
- {@code VeniceHelixAdmin.maybeNotifyValueSchemaCreated} passes the
already-fetched
store and store name through.
- Tests updated:
- {@code TestValueSchemaCreatedEventManager} adds {@code
testNullStoreSkipsListeners}
and {@code testListenerReceivesReadOnlyStoreSnapshot}.
- {@code TestVeniceHelixAdmin} firing tests verify against the new 4-arg
dispatch.
- {@code AbstractTestVeniceHelixAdmin} capture struct stores the {@code
Store}
reference; integration test updated to assert via {@code store.getName()}.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address xunyin8's review (linkedin#2714 thread): register the listener directly on ReadWriteSchemaRepository, mirroring StoreDataChangedListener on ReadOnlyStoreRepository, instead of the VeniceVersionLifecycleEventManager pattern. The schema repo is the natural fire point — every successful schema write goes through it, so the invariant "if persisted, listeners fire" is guaranteed without threading dispatch through VeniceHelixAdmin. - Add register/unregisterValueSchemaCreatedListener to ReadWriteSchemaRepository. - HelixReadWriteSchemaRepository holds the listener Set, fires from addValueSchema after a successful (non-duplicate) write. Notification runs outside the synchronized block via an addValueSchemaLocked helper, with a null-store guard and exception isolation. - HelixReadWriteSchemaRepositoryAdapter delegates register/unregister to the inner regular-store repo. - Drop the listener's isSourceCluster parameter — the schema-repo lives in venice-common and has no view into cross-cluster routing. Listeners that need source filtering can inspect Store.isMigrationDuplicateStore() on the snapshot (mirrors how StoreDataChangedListener works). - Delete ValueSchemaCreatedEventManager (no longer needed); drop the manager field/getter from HelixVeniceClusterResources; drop maybeNotifyValueSchemaCreated from VeniceHelixAdmin. - VeniceControllerStateModel.initClusterResources() now registers configured listeners directly on clusterResources.getSchemaRepository(). - Tests: drop the firing tests from TestVeniceHelixAdmin and the TestValueSchemaCreatedEventManager file (path moved); add coverage in HelixReadWriteSchemaRepositoryTest (fires-on-new, skips-on-duplicate, fires-on-doc-field-update, exception isolation, unregister, store snapshot delivery). Integration test updated to assert via store.getName(); capture struct loses the isSourceCluster field. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
schema-event log VeniceHelixAdmin no longer dispatches the value-schema-created event (the schema repository fires it now), so the only remaining isSourceCluster callers are the three pre-existing migration sites: store delete ETL offboard, notifyVersionDeleted, and the ETL strategy update path. Inline the canonical check at each site and drop the isSourceCluster(clusterName, storeName, store) helper — keeping the helper only for those three sites was strictly less clear than the original inline form (each site has its own polarity). HelixReadWriteSchemaRepository.maybeNotifyValueSchemaCreated: - thread storeName through so the null-store warn names which store was affected (not just the schema id). - listener-exception error now identifies the failing listener class, store, and schema id, and notes that dispatch continues to remaining listeners — sufficient to triage which subscriber broke without cross-referencing the addValueSchema caller. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Document the post-lock invariant, the two no-op cases (duplicate result and null store snapshot), the per-listener error-handling semantics, and the new storeName parameter that lets the null-store warn name which store was affected. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…wrap snapshot in ReadOnlyStore Addresses PR review feedback: - Move maybeNotifyValueSchemaCreated back inside the synchronized addValueSchema block (the "cheap fix" agreed on Slack). Serializing dispatch with the persist write removes the need for listeners to be thread-safe or tolerate out-of-order delivery; the tradeoff is that a slow listener extends the lock window, which is acceptable because schema evolution is not a hot path. - Wrap the post-write Store snapshot in ReadOnlyStore before invoking listeners so the documented contract matches the runtime behavior (HelixReadWriteStoreRepositoryAdapter returns a mutable clone). - Update ValueSchemaCreatedListener javadoc to reflect the new dispatch semantics (under-lock, sequential) and the read-only snapshot. - Update HelixReadWriteSchemaRepositoryTest to actually assert the listener receives a ReadOnlyStore via ArgumentCaptor instead of just any(Store.class).
…est stub - addValueSchemaLocked javadoc: drop the stale "fire listeners after releasing the lock" wording. Both addValueSchema overloads are now synchronized methods, so listeners run while the caller still holds the monitor. - HelixReadWriteSchemaRepositoryTest newRepoWithStore helper: switch the accessor stub from doReturn(new ArrayList<>(initialSchemas)) to doAnswer(inv -> new ArrayList<>(initialSchemas)) so each getAllValueSchemas call returns a fresh copy, matching what the helper javadoc claims and what the production ZK-backed accessor does.
Problem Statement
Today the controller has no clean way to tell other systems that a new value schema has been registered for a store. Things that need to react to schema additions — for example, materialized views that have to restart with the latest superset schema — currently piggyback on version lifecycle events, which only fire on version creation/deletion. A schema-only update (no new version) slips through unnoticed.
Solution
Introduce a
ValueSchemaCreatedListenerinterface on the schema repository, modeled after the existingStoreDataChangedListeneron the store repository. Anything that needs to know "a new value schema just landed for store X" can subscribe viaReadWriteSchemaRepository#registerValueSchemaCreatedListenerand receive(Store snapshot, SchemaEntry)on each successful schema write.The schema repository is the right place to fire from. Every successful schema write goes through it, so subscribers are guaranteed to see exactly the events that were durably persisted — no need to thread the dispatch through the controller admin layer.
A few things to call out about how the dispatch behaves:
SchemaData.DUPLICATE_VALUE_SCHEMA_CODE) skip the callback.addValueSchemaoverloads aresynchronized, and dispatch runs inside that block — persist order and listener-dispatch order are identical, and listeners do not need to be thread-safe or tolerate out-of-order delivery. The tradeoff is that a slow listener extends the lock window and stalls concurrent value-schema writes on the same repo (one repo per cluster). Schema registration is a low-rate admin path, so this is acceptable; the listener interface explicitly documents that implementations must be non-blocking.Storesnapshot is wrapped inReadOnlyStorebefore dispatch, so listeners cannot mutate the controller's in-memory state. Listeners that need migration awareness can inspectStore#isMigrationDuplicateStore()on the snapshot — the same trickStoreDataChangedListeneruses.Exceptionis logged and skipped — it doesn't take down the rest of the dispatch loop or the schema-write path. The persist is already durable in ZK by the time listeners fire, so listener failures never roll back a schema registration.Errors (OOM, etc.) propagate out of the dispatch loop.Wiring is done once, at controller startup, when the cluster's schema repository is created. No new config — listeners are passed in through the controller context.
Code changes
warnwhen the store snapshot is missing at dispatch time (concurrent delete). Bounded by the schema-write rate.errorper listener exception. Same rate ceiling.Concurrency-Specific Checks
synchronizedon bothaddValueSchemaoverloads), so the persisted order matches the order listeners observe.Exceptions are caught so one bad subscriber can't terminate the dispatch loop or the calling thread. FatalErrors propagate.How was this PR tested?
ReadOnlyStoresnapshot (asserted viaArgumentCaptor).Does this PR introduce any user-facing or breaking changes?