Skip to content

[controller] Add ValueSchemaCreatedListener for value schema events#2714

Merged
minhmo1620 merged 22 commits into
linkedin:mainfrom
minhmo1620:minnguye/notify-value-schema-created-lifecycle-event
May 11, 2026
Merged

[controller] Add ValueSchemaCreatedListener for value schema events#2714
minhmo1620 merged 22 commits into
linkedin:mainfrom
minhmo1620:minnguye/notify-value-schema-created-lifecycle-event

Conversation

@minhmo1620
Copy link
Copy Markdown
Contributor

@minhmo1620 minhmo1620 commented Apr 9, 2026

Problem Statement

Today the controller has no clean way to tell other systems that a new value schema has been registered for a store. Things that need to react to schema additions — for example, materialized views that have to restart with the latest superset schema — currently piggyback on version lifecycle events, which only fire on version creation/deletion. A schema-only update (no new version) slips through unnoticed.

Solution

Introduce a ValueSchemaCreatedListener interface on the schema repository, modeled after the existing StoreDataChangedListener on the store repository. Anything that needs to know "a new value schema just landed for store X" can subscribe via ReadWriteSchemaRepository#registerValueSchemaCreatedListener and receive (Store snapshot, SchemaEntry) on each successful schema write.

The schema repository is the right place to fire from. Every successful schema write goes through it, so subscribers are guaranteed to see exactly the events that were durably persisted — no need to thread the dispatch through the controller admin layer.

A few things to call out about how the dispatch behaves:

  • Listeners are notified only when a schema is actually new. Duplicates (returned with SchemaData.DUPLICATE_VALUE_SCHEMA_CODE) skip the callback.
  • Both addValueSchema overloads are synchronized, and dispatch runs inside that block — persist order and listener-dispatch order are identical, and listeners do not need to be thread-safe or tolerate out-of-order delivery. The tradeoff is that a slow listener extends the lock window and stalls concurrent value-schema writes on the same repo (one repo per cluster). Schema registration is a low-rate admin path, so this is acceptable; the listener interface explicitly documents that implementations must be non-blocking.
  • The Store snapshot is wrapped in ReadOnlyStore before dispatch, so listeners cannot mutate the controller's in-memory state. Listeners that need migration awareness can inspect Store#isMigrationDuplicateStore() on the snapshot — the same trick StoreDataChangedListener uses.
  • A subscriber that throws an Exception is logged and skipped — it doesn't take down the rest of the dispatch loop or the schema-write path. The persist is already durable in ZK by the time listeners fire, so listener failures never roll back a schema registration.
  • Fatal Errors (OOM, etc.) propagate out of the dispatch loop.

Wiring is done once, at controller startup, when the cluster's schema repository is created. No new config — listeners are passed in through the controller context.

Code changes

  • Added new code behind a config. No new config flag — listeners are injected via the controller context.
  • Introduced new log lines.
    • A warn when the store snapshot is missing at dispatch time (concurrent delete). Bounded by the schema-write rate.
    • An error per listener exception. Same rate ceiling.

Concurrency-Specific Checks

  • No race conditions or thread-safety issues.
  • Persist, snapshot read, and listener dispatch all happen under the schema-repo monitor (synchronized on both addValueSchema overloads), so the persisted order matches the order listeners observe.
  • Listener authors are required by the interface contract to be non-blocking; the dispatcher does not impose an explicit timeout.
  • Listeners are held in a copy-on-write set — register/unregister is safe against in-flight dispatch, and the dispatch loop sees a stable snapshot.
  • Per-listener Exceptions are caught so one bad subscriber can't terminate the dispatch loop or the calling thread. Fatal Errors propagate.

How was this PR tested?

  • Unit tests cover the dispatch contract end to end: fires on a new schema, stays quiet on a duplicate, still fires when only the doc field changes, isolates listener exceptions, respects unregister, and delivers a ReadOnlyStore snapshot (asserted via ArgumentCaptor).
  • An integration test exercises the full wiring through the controller admin's value-schema add path.
  • Extended the controller test harness so existing tests can plug in a capturing listener.
  • Backward compatible — the API is purely additive; existing callers see no behavior change.

Does this PR introduce any user-facing or breaking changes?

  • No.

Minh Nguyen and others added 5 commits April 9, 2026 10:38
…schema

registration

Add onValueSchemaCreated event to VeniceVersionLifecycleEventListener so that
listeners (e.g. Proteus) can restart jobs when a new value schema is
registered,
avoiding data missing due to ser/de process with stale schemas.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…sitory

into listeners

- onValueSchemaCreated is now an abstract method (not default) so all
implementors must handle it
- Add default setSchemaRepository() method to
VeniceVersionLifecycleEventListener so listeners
  can receive the ReadOnlySchemaRepository after cluster initialization
- VeniceControllerStateModel.initClusterResources() injects schemaRepository
into all registered
  listeners after HelixVeniceClusterResources is created, resolving the
chicken-and-egg issue
  where listeners are created before the schema repository is available
- Add no-op onValueSchemaCreated override in AbstractTestVeniceHelixAdmin

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Listeners resolve the effective schema ID via
schemaRepository.getSupersetOrLatestValueSchema()
rather than using the newly registered schema entry directly, so the
parameter is unnecessary.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
VeniceVersionLifecycleEventManager

Covers: dispatch to all registered listeners and correct isSourceCluster flag
forwarding.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
TestVeniceVersionLifecycleEventManager

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@minhmo1620 minhmo1620 marked this pull request as ready for review April 10, 2026 22:24
Copilot AI review requested due to automatic review settings April 10, 2026 22:24
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new controller lifecycle callback so components can react when a store registers a new value schema, helping listeners keep schema state fresh and avoid ser/de issues.

Changes:

  • Extend VeniceVersionLifecycleEventListener with onValueSchemaCreated(...) and optional setSchemaRepository(...) injection.
  • Add dispatch support in VeniceVersionLifecycleEventManager and wire schema repository injection during controller cluster resource initialization.
  • Fire the new event from VeniceHelixAdmin.addValueSchema(...) and update/extend tests accordingly.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceVersionLifecycleEventListener.java Adds new schema-created callback + schema repo injection hook.
services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceVersionLifecycleEventManager.java Adds notification fan-out method for the new event.
services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java Fires value-schema-created notification after schema persistence.
services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerStateModel.java Injects schema repository into listeners post resource initialization.
services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceVersionLifecycleEventManager.java Adds unit tests for the new notify path.
internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/AbstractTestVeniceHelixAdmin.java Updates mock listener to implement the new abstract method.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Minh Nguyen and others added 2 commits April 10, 2026 15:37
explanation

Document the exact failure mode: Avro reader/writer resolution fills in
defaults
on read, but re-serialization with a stale writer schema silently drops new
fields
on the write path — causing irreversible data loss with no exception thrown.
Also note why listeners must use schemaRepository rather than
store.getLatestSuperSetValueSchemaId() to resolve the effective schema ID.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Fire notifyValueSchemaCreated from both addValueSchema overloads, not just
  the one with an explicit schemaId parameter
- Skip notification when schema is a duplicate (DUPLICATE_VALUE_SCHEMA_CODE)
- Compute isSourceCluster correctly for migrating stores instead of
hardcoding true
- Hoist HelixVeniceClusterResources lookup to avoid duplicate calls
- Assert dispatched Store is a ReadOnlyStore instance in the
passesReadOnlyStore test

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings April 10, 2026 22:40
Deduplicate the notification logic shared by both addValueSchema overloads
into a single private method: skip on duplicate, compute isSourceCluster
correctly for migrating stores.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Minh Nguyen and others added 2 commits April 10, 2026 15:50
…id for

duplicates

When addValueSchema returns DUPLICATE_VALUE_SCHEMA_CODE, look up the real
schema id via getValueSchemaId so callers (e.g. SchemaRoutes) always receive
a concrete id in the response rather than -2.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings April 10, 2026 22:53
notifyValueSchemaCreated helper

Both addValueSchema overloads now handle the duplicate case with an early
return, so the helper no longer needs the schemaId param or the guard.
Rename maybeNotify -> notify to reflect that it always fires.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.

Comments suppressed due to low confidence (1)

services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java:7027

  • maybeNotifyValueSchemaCreated dereferences resources.getStoreMetadataRepository().getStore(storeName) without any store lock or null-handling. ReadOnlyStoreRepository#getStore is allowed to return null, and the listener interface docs state lifecycle callbacks are serialized under the store-level write lock. Consider acquiring resources.getClusterLockManager().createStoreWriteLockOnly(storeName) around the store fetch / isMigrating() check and using getStoreOrThrow(...) (or an explicit null check) before notifying listeners to avoid races/NPEs and to keep callback concurrency consistent with other lifecycle events.
    }
    resources.getVeniceVersionLifecycleEventManager().notifyValueSchemaCreated(store, isSourceCluster);
  }

  /**
   * Add a new derived schema for the given store with all specified properties and return a new

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Add four unit tests in TestVeniceHelixAdmin to verify the new behavior
introduced in addValueSchema: both overloads fire notifyValueSchemaCreated
exactly once for genuinely new schemas, and skip the notification for
duplicate schema registrations.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…istener

Address review feedback (xunyin8 on linkedin#2714): schema-creation events shouldn't
ride on VeniceVersionLifecycleEventListener, which is meant for version-level
events. Introduce a dedicated ValueSchemaCreatedListener interface and a
ValueSchemaCreatedEventManager dispatcher mirroring
VeniceVersionLifecycleEventManager.

- Add ValueSchemaCreatedListener (storeName, SchemaEntry, isSourceCluster)
  in venice-common.
- Add ValueSchemaCreatedEventManager in venice-controller, held by
  HelixVeniceClusterResources alongside the version-lifecycle manager.
- Thread Optional<List<ValueSchemaCreatedListener>> through the controller
  registration chain (Context, Controller, Service, HelixAdmin, factory,
  state model).
- VeniceHelixAdmin.notifyValueSchemaCreated computes isSourceCluster (true
  for non-migrating stores; resources.isSourceCluster otherwise) and
  dispatches via the new manager after a successful, non-duplicate
  addValueSchema write.
- Remove onValueSchemaCreated and setSchemaRepository from
  VeniceVersionLifecycleEventListener; remove the matching notify path
  from VeniceVersionLifecycleEventManager.

Tests: schema-firing assertions live in TestVeniceHelixAdmin (verifying
ValueSchemaCreatedEventManager dispatch); HelixReadWriteSchemaRepositoryTest
returns to its pre-feature scope; obsolete schema cases in
TestVeniceVersionLifecycleEventManager removed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings May 7, 2026 18:46
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 15 out of 15 changed files in this pull request and generated 2 comments.

@minhmo1620 minhmo1620 changed the title [controller] Notify VeniceVersionLifecycleEventListener on new value schema registration [controller] Add ValueSchemaCreatedListener for value schema events May 7, 2026
Minh Nguyen and others added 2 commits May 7, 2026 13:11
- VeniceHelixAdmin: extract `isSourceCluster(clusterName, storeName)` private
  helper. Three pre-existing call sites (ETL offboard during store delete,
  notifyVersionDeleted, ETL strategy update) and the new
  `maybeNotifyValueSchemaCreated` helper now share a single implementation
  of the canonical pattern (`!store.isMigrating() ||
  resources.isSourceCluster(...)`).
- Rename `notifyValueSchemaCreated` → `maybeNotifyValueSchemaCreated` and
  move the duplicate-schema skip guard inside the helper, simplifying both
  `addValueSchema` overloads. Overload-1 also collapses its concrete-id
  lookup for duplicates into a ternary at the return.
- ValueSchemaCreatedEventManager: catch `Exception` (not `Throwable`) when
  isolating listener failures, so fatal JVM errors (OOM, StackOverflowError)
  propagate instead of being silently swallowed. Addresses Copilot review
  comment.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ration

test

- New `TestValueSchemaCreatedEventManager` unit test covers the dispatcher's
  in-order listener fan-out and exception-isolation behavior. Brings diff
  coverage of the new manager from 0% to 100% (try + catch branches).
- New integration test `testValueSchemaCreatedEvents` in
  `TestVeniceHelixAdminWithSharedEnvironment` exercises the end-to-end
  notification path through `VeniceHelixAdmin`: a real `addValueSchema`
  call fires the listener exactly once for new schemas, skips duplicates,
  and propagates `isSourceCluster=true` for non-migrating stores.
- Add scaffolding (`mockValueSchemaCreatedListener`, capture list, reset
  helper) to `AbstractTestVeniceHelixAdmin` and wire the listener through
  the `VeniceHelixAdmin` constructor.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings May 7, 2026 20:56
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 16 changed files in this pull request and generated 1 comment.

Minh Nguyen and others added 2 commits May 7, 2026 14:25
…a-event

dispatch

- Refactor `isSourceCluster(...)` to accept the {@code Store} explicitly so
the
  three pre-existing call sites (ETL offboard, version-deleted notify, ETL
  strategy update) reuse the {@code Store} they already have in scope without
  a second lookup.
- `maybeNotifyValueSchemaCreated` now fetches the store once (via the
  cluster resources it already has) and skips notification entirely if the
  store has been removed concurrently between the schema write and the
  listener dispatch — protects against an NPE on a successful write.
- `isSourceCluster` keeps a defensive {@code null} fallback that delegates to
  the cross-cluster {@code StoreConfig}-based check, so other callers
  remain safe.
- Update javadoc on both helpers to reflect the new signatures and skip
  semantics.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… dispatch

Mirror the version-lifecycle pattern: ValueSchemaCreatedListener now receives
the
{@link Store} snapshot directly instead of the store name. Eliminates the
need for
downstream listeners to inject a ReadOnlyStoreRepository post-init just to
look up
the Store in their callback.

- {@code ValueSchemaCreatedListener.handleValueSchemaCreated} now takes
  {@code (Store, SchemaEntry, boolean)} and the dispatcher wraps the Store in
a
  {@link ReadOnlyStore} before invoking listeners (matches the
version-lifecycle
  manager).
- {@code ValueSchemaCreatedEventManager.notifyValueSchemaCreated} adds an
  explicit {@code storeName} parameter and skips dispatch with a descriptive
  warning if the {@code Store} snapshot is null (e.g. store concurrently
  deleted between the schema write and the listener dispatch). Listeners can
  rely on the contract that the snapshot is non-null.
- {@code VeniceHelixAdmin.maybeNotifyValueSchemaCreated} passes the
already-fetched
  store and store name through.
- Tests updated:
  - {@code TestValueSchemaCreatedEventManager} adds {@code
testNullStoreSkipsListeners}
    and {@code testListenerReceivesReadOnlyStoreSnapshot}.
  - {@code TestVeniceHelixAdmin} firing tests verify against the new 4-arg
dispatch.
  - {@code AbstractTestVeniceHelixAdmin} capture struct stores the {@code
Store}
    reference; integration test updated to assert via {@code store.getName()}.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings May 7, 2026 22:14
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 16 changed files in this pull request and generated 2 comments.

Minh Nguyen and others added 2 commits May 7, 2026 18:06
Address xunyin8's review (linkedin#2714 thread): register the listener directly on
ReadWriteSchemaRepository, mirroring StoreDataChangedListener on
ReadOnlyStoreRepository, instead of the VeniceVersionLifecycleEventManager
pattern. The schema repo is the natural fire point — every successful schema
write goes through it, so the invariant "if persisted, listeners fire" is
guaranteed without threading dispatch through VeniceHelixAdmin.

- Add register/unregisterValueSchemaCreatedListener to
ReadWriteSchemaRepository.
- HelixReadWriteSchemaRepository holds the listener Set, fires from
  addValueSchema after a successful (non-duplicate) write. Notification runs
  outside the synchronized block via an addValueSchemaLocked helper, with a
  null-store guard and exception isolation.
- HelixReadWriteSchemaRepositoryAdapter delegates register/unregister to the
  inner regular-store repo.
- Drop the listener's isSourceCluster parameter — the schema-repo lives in
  venice-common and has no view into cross-cluster routing. Listeners that
  need source filtering can inspect Store.isMigrationDuplicateStore() on
  the snapshot (mirrors how StoreDataChangedListener works).
- Delete ValueSchemaCreatedEventManager (no longer needed); drop the
  manager field/getter from HelixVeniceClusterResources; drop
  maybeNotifyValueSchemaCreated from VeniceHelixAdmin.
- VeniceControllerStateModel.initClusterResources() now registers
  configured listeners directly on clusterResources.getSchemaRepository().
- Tests: drop the firing tests from TestVeniceHelixAdmin and the
  TestValueSchemaCreatedEventManager file (path moved); add coverage in
  HelixReadWriteSchemaRepositoryTest (fires-on-new, skips-on-duplicate,
  fires-on-doc-field-update, exception isolation, unregister, store
  snapshot delivery). Integration test updated to assert via
  store.getName(); capture struct loses the isSourceCluster field.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
schema-event log

VeniceHelixAdmin no longer dispatches the value-schema-created event (the
schema repository fires it now), so the only remaining isSourceCluster
callers are the three pre-existing migration sites: store delete ETL
offboard, notifyVersionDeleted, and the ETL strategy update path. Inline
the canonical check at each site and drop the
isSourceCluster(clusterName, storeName, store) helper — keeping the helper
only for those three sites was strictly less clear than the original
inline form (each site has its own polarity).

HelixReadWriteSchemaRepository.maybeNotifyValueSchemaCreated:
- thread storeName through so the null-store warn names which store was
  affected (not just the schema id).
- listener-exception error now identifies the failing listener class,
  store, and schema id, and notes that dispatch continues to remaining
  listeners — sufficient to triage which subscriber broke without
  cross-referencing the addValueSchema caller.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings May 8, 2026 17:06
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 16 changed files in this pull request and generated 3 comments.

Document the post-lock invariant, the two no-op cases (duplicate result
and null store snapshot), the per-listener error-handling semantics, and
the new storeName parameter that lets the null-store warn name which
store was affected.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…wrap

snapshot in ReadOnlyStore

Addresses PR review feedback:

- Move maybeNotifyValueSchemaCreated back inside the synchronized
  addValueSchema block (the "cheap fix" agreed on Slack). Serializing
  dispatch with the persist write removes the need for listeners to be
  thread-safe or tolerate out-of-order delivery; the tradeoff is that a
  slow listener extends the lock window, which is acceptable because
  schema evolution is not a hot path.
- Wrap the post-write Store snapshot in ReadOnlyStore before invoking
  listeners so the documented contract matches the runtime behavior
  (HelixReadWriteStoreRepositoryAdapter returns a mutable clone).
- Update ValueSchemaCreatedListener javadoc to reflect the new dispatch
  semantics (under-lock, sequential) and the read-only snapshot.
- Update HelixReadWriteSchemaRepositoryTest to actually assert the
  listener receives a ReadOnlyStore via ArgumentCaptor instead of just
  any(Store.class).
Copilot AI review requested due to automatic review settings May 11, 2026 20:09
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 16 changed files in this pull request and generated 3 comments.

…est stub

- addValueSchemaLocked javadoc: drop the stale "fire listeners after
  releasing the lock" wording. Both addValueSchema overloads are now
  synchronized methods, so listeners run while the caller still holds
  the monitor.
- HelixReadWriteSchemaRepositoryTest newRepoWithStore helper: switch
  the accessor stub from doReturn(new ArrayList<>(initialSchemas)) to
  doAnswer(inv -> new ArrayList<>(initialSchemas)) so each getAllValueSchemas
  call returns a fresh copy, matching what the helper javadoc claims and
  what the production ZK-backed accessor does.
Copy link
Copy Markdown
Contributor

@xunyin8 xunyin8 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

@minhmo1620 minhmo1620 merged commit 19a86d0 into linkedin:main May 11, 2026
131 of 134 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants