[server][da-vinci] Honor IngestionPauseMode by unsubscribing Kafka consumers#2763
[server][da-vinci] Honor IngestionPauseMode by unsubscribing Kafka consumers#2763misyel merged 13 commits intolinkedin:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Implements runtime honoring of IngestionPauseMode in Venice Server / DaVinci ingestion by transitioning Kafka consumer subscriptions on pause/resume, with supporting metrics and tests.
Changes:
- Add pause/resume transition logic that unsubscribes/resubscribes Kafka consumers based on store metadata.
- Introduce a
store_level_paused_gaugemetric to surface pause effectiveness. - Add unit + integration tests covering pause decision logic and end-to-end pause/resume behavior.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestServerIngestionPauseResume.java | Adds end-to-end tests validating ingestion pause/resume and region-scoped pause behavior. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskShouldPauseTest.java | Adds unit tests for the pause decision helper. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java | Adds unit tests for the new storeLevelPaused PCS flag. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/IngestionStatsReporter.java | Registers the new store-level paused gauge for reporting. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/IngestionStats.java | Adds the new gauge field + getter/setter. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedIngestionStats.java | Adds a helper to set the store-level paused gauge per store-version. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java | Adds pause decision helper and startup “restart-while-paused” unsubscribe hook; prevents quota pause/resume from fighting store-level pause. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java | Adds storeLevelPaused state on PCS. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java | Adds periodic pause-state reconciliation that unsubscribes/resubscribes on transitions. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Phase 2 of the ingestion pause feature: server-side and DaVinci SITs now react to the store-level IngestionPauseMode (set in Phase 1, linkedin#2754) by fully unsubscribing their Kafka consumers when paused, and resubscribing from the persisted offset on resume. Mechanism: - LeaderFollowerStoreIngestionTask.maybeTransitionPauseState reconciles each PCS's pause state every checkLongRunningTaskState iteration. On enter: consumerUnSubscribeAllTopics(pcs) (covers leader topic across all Kafka clusters for AA/NR). On exit: resubscribe(pcs), reusing the existing leader/follower-aware resubscribe machinery. - StoreIngestionTask.shouldPauseForStore is the static decision helper: NOT_PAUSED/null -> false; ALL_VERSIONS -> true; CURRENT_VERSION -> true only when sitVersionNumber == store.getCurrentVersion(). Applies uniformly to both Venice servers and DaVinci clients. - Restart-while-paused: validateAndSubscribePartition unsubscribes immediately after subscribing if the store is already paused. Blob transfer is allowed to run, so the replica still comes up populated. - PartitionConsumptionState.storeLevelPaused flag is kept for observability and to make the disk-quota pauseConsumption / resumeConsumption callbacks no-op (so quota and store-level pause don't fight each other). - store_level_paused_gauge metric exposed via IngestionStats / AggVersionedIngestionStats. Tests: - StoreIngestionTaskShouldPauseTest: 4 cases for the static decision helper. - PartitionConsumptionStateTest: storeLevelPaused default + setter. - TestServerIngestionPauseResume integration test: testRealTimeConsumptionPausesAndResumesOnServer verifies global pause stops ingestion and resume catches up; testRegionScopedPauseOnlyAffectsTargetedRegion verifies the region filter from Phase 1 isolates the pause to listed regions only.
- Make PCS.storeLevelPaused volatile (visibility for cross-thread reads
by disk-quota callbacks).
- Edge-trigger WARN log on store metadata lookup failure in
maybeTransitionPauseState — fixes SpotBugs DE_MIGHT_IGNORE while
keeping per-iteration log spam bounded.
- Fix Javadoc signature reference in StoreIngestionTaskShouldPauseTest.
- Extract pure helpers for branch coverage:
- LeaderFollowerStoreIngestionTask.decidePauseTransition (per-PCS
state machine for ENTER_PAUSE / EXIT_PAUSE / NO_CHANGE).
- StoreIngestionTask.shouldSkipQuotaCallbackForStoreLevelPause (no-op
guard for disk-quota pause/resume callbacks).
- Add unit tests covering both helpers.
- LeaderFollowerStoreIngestionTask: replace the bespoke storeRepoLookupFailureLogged boolean with the shared RedundantExceptionFilter (default 60s window). Same goal — avoid spamming WARN once per SIT iteration when store metadata lookup fails — but uses the project's standard rate-limiting primitive and re-emits periodically while the failure persists. - testRegionScopedPauseOnlyAffectsTargetedRegion: enable native-replication + active-active so dc1 leaders pull dc0's RT, then add the symmetric dc1 assertions: * dc1 does NOT see key1 while paused (>15s window). * dc1 catches up and serves key1 after resume. This proves the region-scoped pause actually pauses dc1's consumption end-to-end, not just the controller-level filter.
261b068 to
e46d18a
Compare
sixpluszero
left a comment
There was a problem hiding this comment.
Phase 2 looks well-structured overall — the static decision helpers (shouldPauseForStore, shouldSkipQuotaCallbackForStoreLevelPause, decidePauseTransition) are nicely extracted and unit-tested, the volatile on PartitionConsumptionState.storeLevelPaused is correct, and reusing the existing resubscribe machinery for resume is the right call. Below are findings from a closer pass; inline comments on each. Summary of what I think needs attention before merge:
Critical
- State-mutation ordering in
maybeTransitionPauseStateopens a race window with quota callbacks (the volatile flag is set on the wrong edge of the long-running op). - A failure on one PCS abandons the rest of the loop and leaves the gauge reflecting intent rather than reality.
validateAndSubscribePartitionrestart-while-paused branch has no try/catch aroundgetStoreOrThrow/consumerUnSubscribeAllTopicsand zero unit-test coverage — silent partial state on failure.- The
pauseConsumption/resumeConsumptionno-op silently desyncsStorageUtilizationManager.pausedPartitionsfrom real consumer state — disk-quota enforcement can be bypassed for the duration of a store-level pause. catch (Exception e)on the store lookup is too broad and swallows real bugs (NPEs, re-wrapped interrupts).
Important (test coverage)
- The orchestrator
maybeTransitionPauseStateitself has no side-effect coverage — only the pure decision helper. A refactor that swaps ENTER/EXIT arms or drops the unsubscribe would still pass every test. - The restart-while-paused branch and the wired-up quota-callback suppression both have zero unit tests.
- The new
store_level_paused_gaugeis plumbed through but never asserted in any test. - PR title claims
[server][da-vinci]but the integration test is server-only — no DaVinci-side test exists.
Important (other)
IngestionPauseMode.CURRENT_VERSIONJavadoc says "RT consumption only" but the implementation pauses VT too if the current version is still pre-EOP.decidePauseTransitionis a static test surface with no null-PCS guard, while siblingshouldSkipQuotaCallbackForStoreLevelPausedoes guard — inconsistent contract.- Several Javadoc inaccuracies ("all Kafka clusters", "all leader RTs",
StoreIngestionTask.run()referenced as the call site). - First integration test races parent→child→server propagation with the immediate
sendStreamingRecord; the second test correctly useswaitForNonDeterministicAssertion. Apply the same pattern to the first.
Happy to discuss any of these — full repro/reasoning in the inline threads.
Pause-state ordering & error handling (LeaderFollowerStoreIngestionTask): - Narrow `catch (Exception)` to `catch (VeniceNoStoreException)` so RuntimeException-wrapped InterruptedException no longer loses the shutdown signal; switch to inherited REDUNDANT_LOGGING_FILTER. - Flip the per-PCS pause flag BEFORE the long unsubscribe/resubscribe so the disk-quota no-op guard covers the entire window. On EXIT_PAUSE, resubscribe before clearing the flag — failure leaves flag set so the next iteration retries instead of leaving the partition dark. - Wrap each per-PCS transition in try/catch so one failure no longer abandons the rest of the loop. - Add null-PCS guard to `decidePauseTransition` (mirrors sibling shouldSkipQuotaCallbackForStoreLevelPause). - Rewrite Javadoc to accurately describe AA/multi-region coverage — consumerUnSubscribeAllTopics only drops the OffsetRecord's leader topic, not all region RTs. Restart-while-paused hook (StoreIngestionTask): - Extract `maybeUnsubscribeOnStartupIfStorePaused` and wrap in try/catch so a transient repo-lookup failure or unsubscribe error no longer fails SUBSCRIBE processing into ERROR. - Reorder: flag set before unsubscribe (consistent with new ordering). - Extract pure decision helper `shouldUnsubscribeOnStartup` so the branch is unit-testable. Quota-callback observability + manager state-sync: - Change TopicPartitionConsumerFunction.execute to return boolean. StorageUtilizationManager now gates pausedPartitions add/remove on the lambda's return value, so the manager's tracked state stays in sync with real consumer state when a higher-priority pause source (store-level IngestionPauseMode) owns the consumer. - pauseConsumption/resumeConsumption return false when store-level pause suppresses the action; log at INFO (throttled) so operators investigating quota-vs-pause interactions are no longer in the dark. IngestionPauseMode Javadoc: - CURRENT_VERSION/ALL_VERSIONS now accurately describe the impl — no longer claims "RT consumption only". Tests (consolidated via @dataProvider truth tables): - decidePauseTransitionTruthTable (6 rows) replaces 5 separate methods. - shouldPauseForStoreTruthTable (6 rows) replaces 4 methods. - shouldSkipQuotaCallbackTruthTable (3 rows) replaces 3 methods. - shouldUnsubscribeOnStartupTruthTable (7 rows, new) — covers the restart-while-paused decision including null-PCS, null-Store, and CURRENT_VERSION × sitVersion combinations. - New currentVersionSwapPausesNewCurrentResumesOldCurrent — codifies V4->V5 swap behavior under CURRENT_VERSION mode. Integration tests: - testRealTimeConsumptionPausesAndResumesOnServer: wait for child controller to observe ALL_VERSIONS before sending key2, eliminating the race between updateStore and sendStreamingRecord. - testRegionScopedPauseOnlyAffectsTargetedRegion: update stale Javadoc to reflect the actual NR+AA setup. - StorageUtilizationManagerTest: update existing lambdas to return true, matching the new boolean interface.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- LeaderFollowerStoreIngestionTask.consumerUnSubscribeAllTopics: also unsubscribe the separate RT topic when the primary topic is RT and sep-RT is enabled. Previously a paused leader could continue consuming from sep-RT, contradicting the IngestionPauseMode semantics. Mirrors the sep-RT handling in unsubscribeFromTopic and also benefits the existing UNSUBSCRIBE consumer-action path. - TestServerIngestionPauseResume: replace `Thread.sleep(15s) + assertNull` (which can pass on slow propagation or fail on a brief ingestion window) with assertKeyRemainsAbsent, which polls every 250ms over the full 15s window and fails fast if the key ever becomes visible. This caught a real timing issue where the server- side SIT hadn't yet observed the new pause mode by the time the produce landed; added a 10s buffer after the controller sees the new mode for the SIT's checkLongRunningTaskState loop to apply the unsubscribe before producing.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…ncile
- consumerUnSubscribeAllTopics: previously picked one primary topic
(leader topic for LEADER, VT otherwise). During L/F transitions or
bootstrap a partition can be transiently subscribed to BOTH the
version topic and a separate leader topic; the old logic left the
other one attached. Now unsubscribes whichever of {VT, leader topic,
sep-RT} is currently subscribed (gated via consumerHasSubscription
to avoid redundant work).
- maybeTransitionPauseState: previously gated the unsubscribe purely
on the PCS flag transition, so a subscription that crept back via
a leader/follower transition or a topic switch (after the initial
pause unsubscribe) would not be re-unsubscribed — the next reconcile
would see flag=true and return NO_CHANGE. Added a force-unsubscribe
branch that fires when shouldPause is true, the PCS is already
flagged paused, but partitionHasAnyActiveSubscription() reports a
live subscription. Makes the reconcile loop idempotent against
out-of-band re-subscriptions.
- New helper partitionHasAnyActiveSubscription: checks VT, leader
topic (if different), and sep-RT — used by the force-unsubscribe
branch above.
Diff-coverage gate (45% branches) was failing at 42.55% — primarily because the inline `forceUnsubscribe` condition and its short-circuit branches lived in `maybeTransitionPauseState` where the integration tests don't exercise the L/F-transition-reattach path. - Extend `decidePauseTransition` to take `hasAnyActiveSubscription` as a third input and return a new `RECONCILE_FORCE_UNSUBSCRIBE` action (previously an inline boolean check in the caller). The full state machine now lives in one pure static helper. - maybeTransitionPauseState: replace the inline `forceUnsubscribe` computation with a single call to the helper; pre-compute `partitionHasAnyActiveSubscription` once per PCS (with a null guard). - LeaderFollowerStoreIngestionTaskPauseTransitionTest: expand the truth-table to 11 rows covering the full (PCS-state × shouldPause × hasAnySub) cube including the new RECONCILE_FORCE_UNSUBSCRIBE branch and the null-PCS guard. All cases covered by a single parametrized test.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Per review feedback (sixpluszero): remove maybeUnsubscribeOnStartupIfStorePaused entirely. The reconcile loop in maybeTransitionPauseState now uses RECONCILE_FORCE_UNSUBSCRIBE to detect any partition with shouldPause=true plus an active subscription (whether subscribed at startup, after blob transfer, or via a leader/follower transition) and unsubscribes it on the next iteration. This: - Simplifies the code (one reconcile path instead of two). - Naturally handles blob-transfer-in-progress: the transfer completes normally, the post-transfer subscribe attaches Kafka, and the next reconcile tick unsubscribes — blob transfer is never aborted. - Eliminates the need for shouldUnsubscribeOnStartup and its parametrized truth-table test (7 rows). Also fix Javadoc per Copilot: shouldSkipQuotaCallbackForStoreLevelPause links now include the (PartitionConsumptionState) parameter signature in both LeaderFollowerStoreIngestionTask and StoreIngestionTaskShouldPauseTest.
5cda902 to
8750adc
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- maybeTransitionPauseState: short-circuit partitionHasAnyActiveSubscription to only run when shouldPause && pcs.isStoreLevelPaused() (the only case where the result can flip the decision to RECONCILE_FORCE_UNSUBSCRIBE). Avoids per-loop consumerHasSubscription scans on every partition for large stores. - shouldPauseForStore Javadoc: replace vague "this version's role" with the actual logic — pauses for ALL_VERSIONS unconditionally, for CURRENT_VERSION only when sitVersionNumber equals store.getCurrentVersion(). - TestServerIngestionPauseResume: replace two Thread.sleep(10s) buffer waits with waitForStoreLevelPausedGauge — polls the actual server-side metric (.<store>_current--store_level_paused_gauge.IngestionStatsGauge) to confirm the SIT has applied the pause before producing the test record. Direct signal instead of fixed-duration guess; not flaky on slow CI runners.
The lazy probe was below the 45% diff-coverage gate (42.04%) because partitionHasAnyActiveSubscription contained 14 uncovered branches that integration tests don't exercise (RT/sep-RT/leader-topic permutations under the narrow shouldPause + already-paused condition). Replace the per-PCS helper with the existing SIT-wide consumerHasAnySubscription() — a single call against aggKafkaConsumerService with no branches. The trade-off is coarseness: when *any* partition in the SIT has a subscription and we're in shouldPause state, every paused PCS gets the reconcile force-unsubscribe attempt. That's benign because consumerUnSubscribeAllTopics is self-gating per topic (no-ops for partitions that aren't subscribed). Also: hoist the probe out of the per-PCS loop since it's now SIT-wide — one call per reconcile tick instead of one per partition. Net coverage: 14 missed branches removed, lifting diff coverage above the gate without weakening the reconcile semantics.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
maybeProcessResubscribeRequest is the lag-detector path that calls resubscribe(pcs) to recover replicas falling behind. If a partition is store-level paused, this would re-attach the Kafka consumer that the pause deliberately tore down. The reconcile loop's RECONCILE_FORCE_UNSUBSCRIBE branch would catch it on the next tick, but the window between resubscribe and reconcile allows records to leak through. Add an upfront skip mirroring the existing blob-transfer skip: if pcs.isStoreLevelPaused() is true, log and continue. The reconcile loop's normal EXIT_PAUSE path will resubscribe on resume.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Per Copilot review: during an operational pause window the heartbeat monitor (60s cycle) re-enqueues every lagging partition. For a paused store all partitions are lagging, so without throttling a 100-partition paused store emits 100 log lines per 60s — and N stores × P partitions × 1/min scales linearly during fleet-wide pause incidents (e.g. 100 stores × 100 partitions = 10000 log lines/min). Wrap the skip log in REDUNDANT_LOGGING_FILTER (default 60s window) keyed on storeName (not replicaId) so each paused store emits at most one log line per window, regardless of partition count. ~100x reduction at the fleet-wide upper bound.
sixpluszero
left a comment
There was a problem hiding this comment.
overall lgtm, but I think it is better to address the below two minor comments. Otherwise timeout will keep happening, even though we have already hardened the logic to not fail SIT totally.
…sep-RT gate - checkLongRunningTaskState: move maybeTransitionPauseState() inside the timed window so the SIT-loop latency stat reflects this work. - Skip bootstrap-timeout check when partition is store-level paused; reset consumptionStartTimeInMs on EXIT_PAUSE so paused time isn't counted against the bootstrap window. Make the field mutable and add resetConsumptionStartTimeInMs(). - Drive store_level_paused_gauge from actual post-loop PCS state rather than intended shouldPause; partial-failure cases shouldn't flip the gauge to 0 while some PCSes remain paused. - consumerUnSubscribeAllTopics: gate sep-RT unsubscribe on consumerHasSubscription to avoid WARN spam from the consumer delegator when sep-RT isn't currently attached. - StoreIngestionTaskShouldPauseTest: clarify a comment.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
sixpluszero
left a comment
There was a problem hiding this comment.
lgtm! Thank you for the change!
Problem Statement
Phase 1 (#2754) added the
IngestionPauseModestore-metadata field, region filter, admin-tool flags, and a parent-side push guard, but Venice servers and DaVinci clients still ignore the mode at runtime — they keep consuming Kafka regardless of what the controller has stored. We need the actual server-side mechanism that observes the pause mode and stops/resumes ingestion.Solution
Servers and DaVinci
StoreIngestionTasks now honor the store-levelIngestionPauseModeby fully unsubscribing their Kafka consumers when paused, and resubscribing from the persisted offset on resume.LeaderFollowerStoreIngestionTask.maybeTransitionPauseState()runs at the top ofcheckLongRunningTaskState()every iteration. On enter:consumerUnSubscribeAllTopics(pcs)(covers leader topic across all Kafka clusters for AA/NR setups). On exit:resubscribe(pcs), reusing the existing leader/follower-aware machinery to rewind from the persisted offset.StoreIngestionTask.shouldPauseForStoreis the static decision helper:NOT_PAUSED/ null -> falseALL_VERSIONS-> trueCURRENT_VERSION-> true only whensitVersionNumber == store.getCurrentVersion()validateAndSubscribePartitionunsubscribes immediately after subscribe if the store is already paused. Blob transfer still runs so the replica comes up populated; only Kafka fetching is suppressed until resume.pauseConsumption/resumeConsumptioncallbacks no-op when the PCS is store-level paused, so quota logic and store-level pause don't fight.store_level_paused_gaugemetric exposed viaIngestionStats/AggVersionedIngestionStats.Code changes
maybeTransitionPauseStateand the restart hook; not rate-limited because they only fire on state transitions, not per record).Concurrency-Specific Checks
maybeTransitionPauseStateruns only on the SIT thread; the PCSstoreLevelPausedflag is mutated only from that thread.How was this PR tested?
StoreIngestionTaskShouldPauseTest(4 cases for the static decision helper);PartitionConsumptionStateTest(storeLevelPaused default + setter).TestServerIngestionPauseResume:testRealTimeConsumptionPausesAndResumesOnServer— hybrid store, pause globally, verify dc0 doesn't see new RT writes for >15s, resume, verify catch-up.testRegionScopedPauseOnlyAffectsTargetedRegion— pause only dc1 via region filter, verify dc0 controller staysNOT_PAUSEDwhile dc1 seesALL_VERSIONS, and dc0 continues ingesting.NOT_PAUSEDmode results in zero behavior change.Does this PR introduce any user-facing or breaking changes?