Skip to content

[server][da-vinci] Honor IngestionPauseMode by unsubscribing Kafka consumers#2763

Merged
misyel merged 13 commits intolinkedin:mainfrom
misyel:mkwong/server-ingestion-pause
May 6, 2026
Merged

[server][da-vinci] Honor IngestionPauseMode by unsubscribing Kafka consumers#2763
misyel merged 13 commits intolinkedin:mainfrom
misyel:mkwong/server-ingestion-pause

Conversation

@misyel
Copy link
Copy Markdown
Contributor

@misyel misyel commented Apr 27, 2026

Problem Statement

Phase 1 (#2754) added the IngestionPauseMode store-metadata field, region filter, admin-tool flags, and a parent-side push guard, but Venice servers and DaVinci clients still ignore the mode at runtime — they keep consuming Kafka regardless of what the controller has stored. We need the actual server-side mechanism that observes the pause mode and stops/resumes ingestion.

Solution

Servers and DaVinci StoreIngestionTasks now honor the store-level IngestionPauseMode by fully unsubscribing their Kafka consumers when paused, and resubscribing from the persisted offset on resume.

  • LeaderFollowerStoreIngestionTask.maybeTransitionPauseState() runs at the top of checkLongRunningTaskState() every iteration. On enter: consumerUnSubscribeAllTopics(pcs) (covers leader topic across all Kafka clusters for AA/NR setups). On exit: resubscribe(pcs), reusing the existing leader/follower-aware machinery to rewind from the persisted offset.
  • StoreIngestionTask.shouldPauseForStore is the static decision helper:
    • NOT_PAUSED / null -> false
    • ALL_VERSIONS -> true
    • CURRENT_VERSION -> true only when sitVersionNumber == store.getCurrentVersion()
    • Applies uniformly to both Venice servers and DaVinci clients.
  • Restart-while-paused: validateAndSubscribePartition unsubscribes immediately after subscribe if the store is already paused. Blob transfer still runs so the replica comes up populated; only Kafka fetching is suppressed until resume.
  • Disk-quota pauseConsumption / resumeConsumption callbacks no-op when the PCS is store-level paused, so quota logic and store-level pause don't fight.
  • New store_level_paused_gauge metric exposed via IngestionStats / AggVersionedIngestionStats.

Code changes

  • Added new code behind a config.
  • Introduced new log lines (info-level transition logs in maybeTransitionPauseState and the restart hook; not rate-limited because they only fire on state transitions, not per record).

Concurrency-Specific Checks

  • Code has no race conditions or thread safety issues. maybeTransitionPauseState runs only on the SIT thread; the PCS storeLevelPaused flag is mutated only from that thread.
  • Proper synchronization mechanisms are used where needed.
  • No blocking calls inside critical sections.
  • Verified thread-safe collections are used.
  • Validated proper exception handling — repo lookup wrapped in try/catch so a transient failure doesn't crash the SIT thread.

How was this PR tested?

  • New unit tests added: StoreIngestionTaskShouldPauseTest (4 cases for the static decision helper); PartitionConsumptionStateTest (storeLevelPaused default + setter).
  • New integration tests added: TestServerIngestionPauseResume:
    • testRealTimeConsumptionPausesAndResumesOnServer — hybrid store, pause globally, verify dc0 doesn't see new RT writes for >15s, resume, verify catch-up.
    • testRegionScopedPauseOnlyAffectsTargetedRegion — pause only dc1 via region filter, verify dc0 controller stays NOT_PAUSED while dc1 sees ALL_VERSIONS, and dc0 continues ingesting.
  • Modified or extended existing tests.
  • Verified backward compatibility — default NOT_PAUSED mode results in zero behavior change.

Does this PR introduce any user-facing or breaking changes?

  • No.

Copilot AI review requested due to automatic review settings April 27, 2026 17:35
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Implements runtime honoring of IngestionPauseMode in Venice Server / DaVinci ingestion by transitioning Kafka consumer subscriptions on pause/resume, with supporting metrics and tests.

Changes:

  • Add pause/resume transition logic that unsubscribes/resubscribes Kafka consumers based on store metadata.
  • Introduce a store_level_paused_gauge metric to surface pause effectiveness.
  • Add unit + integration tests covering pause decision logic and end-to-end pause/resume behavior.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestServerIngestionPauseResume.java Adds end-to-end tests validating ingestion pause/resume and region-scoped pause behavior.
clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskShouldPauseTest.java Adds unit tests for the pause decision helper.
clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java Adds unit tests for the new storeLevelPaused PCS flag.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/IngestionStatsReporter.java Registers the new store-level paused gauge for reporting.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/IngestionStats.java Adds the new gauge field + getter/setter.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedIngestionStats.java Adds a helper to set the store-level paused gauge per store-version.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java Adds pause decision helper and startup “restart-while-paused” unsubscribe hook; prevents quota pause/resume from fighting store-level pause.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java Adds storeLevelPaused state on PCS.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java Adds periodic pause-state reconciliation that unsubscribes/resubscribes on transitions.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copilot AI review requested due to automatic review settings April 27, 2026 20:56
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 10 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

misyel added 3 commits April 28, 2026 15:42
Phase 2 of the ingestion pause feature: server-side and DaVinci SITs now
react to the store-level IngestionPauseMode (set in Phase 1, linkedin#2754) by
fully unsubscribing their Kafka consumers when paused, and resubscribing
from the persisted offset on resume.

Mechanism:
- LeaderFollowerStoreIngestionTask.maybeTransitionPauseState reconciles
  each PCS's pause state every checkLongRunningTaskState iteration. On
  enter: consumerUnSubscribeAllTopics(pcs) (covers leader topic across
  all Kafka clusters for AA/NR). On exit: resubscribe(pcs), reusing the
  existing leader/follower-aware resubscribe machinery.
- StoreIngestionTask.shouldPauseForStore is the static decision helper:
  NOT_PAUSED/null -> false; ALL_VERSIONS -> true; CURRENT_VERSION ->
  true only when sitVersionNumber == store.getCurrentVersion(). Applies
  uniformly to both Venice servers and DaVinci clients.
- Restart-while-paused: validateAndSubscribePartition unsubscribes
  immediately after subscribing if the store is already paused. Blob
  transfer is allowed to run, so the replica still comes up populated.
- PartitionConsumptionState.storeLevelPaused flag is kept for
  observability and to make the disk-quota pauseConsumption /
  resumeConsumption callbacks no-op (so quota and store-level pause
  don't fight each other).
- store_level_paused_gauge metric exposed via IngestionStats /
  AggVersionedIngestionStats.

Tests:
- StoreIngestionTaskShouldPauseTest: 4 cases for the static decision
  helper.
- PartitionConsumptionStateTest: storeLevelPaused default + setter.
- TestServerIngestionPauseResume integration test:
  testRealTimeConsumptionPausesAndResumesOnServer verifies global
  pause stops ingestion and resume catches up;
  testRegionScopedPauseOnlyAffectsTargetedRegion verifies the region
  filter from Phase 1 isolates the pause to listed regions only.
- Make PCS.storeLevelPaused volatile (visibility for cross-thread reads
  by disk-quota callbacks).
- Edge-trigger WARN log on store metadata lookup failure in
  maybeTransitionPauseState — fixes SpotBugs DE_MIGHT_IGNORE while
  keeping per-iteration log spam bounded.
- Fix Javadoc signature reference in StoreIngestionTaskShouldPauseTest.
- Extract pure helpers for branch coverage:
  - LeaderFollowerStoreIngestionTask.decidePauseTransition (per-PCS
    state machine for ENTER_PAUSE / EXIT_PAUSE / NO_CHANGE).
  - StoreIngestionTask.shouldSkipQuotaCallbackForStoreLevelPause (no-op
    guard for disk-quota pause/resume callbacks).
- Add unit tests covering both helpers.
- LeaderFollowerStoreIngestionTask: replace the bespoke
  storeRepoLookupFailureLogged boolean with the shared
  RedundantExceptionFilter (default 60s window). Same goal — avoid
  spamming WARN once per SIT iteration when store metadata lookup
  fails — but uses the project's standard rate-limiting primitive
  and re-emits periodically while the failure persists.
- testRegionScopedPauseOnlyAffectsTargetedRegion: enable
  native-replication + active-active so dc1 leaders pull dc0's RT,
  then add the symmetric dc1 assertions:
  * dc1 does NOT see key1 while paused (>15s window).
  * dc1 catches up and serves key1 after resume.
  This proves the region-scoped pause actually pauses dc1's
  consumption end-to-end, not just the controller-level filter.
@misyel misyel force-pushed the mkwong/server-ingestion-pause branch from 261b068 to e46d18a Compare April 28, 2026 22:51
Copy link
Copy Markdown
Contributor

@sixpluszero sixpluszero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Phase 2 looks well-structured overall — the static decision helpers (shouldPauseForStore, shouldSkipQuotaCallbackForStoreLevelPause, decidePauseTransition) are nicely extracted and unit-tested, the volatile on PartitionConsumptionState.storeLevelPaused is correct, and reusing the existing resubscribe machinery for resume is the right call. Below are findings from a closer pass; inline comments on each. Summary of what I think needs attention before merge:

Critical

  1. State-mutation ordering in maybeTransitionPauseState opens a race window with quota callbacks (the volatile flag is set on the wrong edge of the long-running op).
  2. A failure on one PCS abandons the rest of the loop and leaves the gauge reflecting intent rather than reality.
  3. validateAndSubscribePartition restart-while-paused branch has no try/catch around getStoreOrThrow/consumerUnSubscribeAllTopics and zero unit-test coverage — silent partial state on failure.
  4. The pauseConsumption/resumeConsumption no-op silently desyncs StorageUtilizationManager.pausedPartitions from real consumer state — disk-quota enforcement can be bypassed for the duration of a store-level pause.
  5. catch (Exception e) on the store lookup is too broad and swallows real bugs (NPEs, re-wrapped interrupts).

Important (test coverage)

  • The orchestrator maybeTransitionPauseState itself has no side-effect coverage — only the pure decision helper. A refactor that swaps ENTER/EXIT arms or drops the unsubscribe would still pass every test.
  • The restart-while-paused branch and the wired-up quota-callback suppression both have zero unit tests.
  • The new store_level_paused_gauge is plumbed through but never asserted in any test.
  • PR title claims [server][da-vinci] but the integration test is server-only — no DaVinci-side test exists.

Important (other)

  • IngestionPauseMode.CURRENT_VERSION Javadoc says "RT consumption only" but the implementation pauses VT too if the current version is still pre-EOP.
  • decidePauseTransition is a static test surface with no null-PCS guard, while sibling shouldSkipQuotaCallbackForStoreLevelPause does guard — inconsistent contract.
  • Several Javadoc inaccuracies ("all Kafka clusters", "all leader RTs", StoreIngestionTask.run() referenced as the call site).
  • First integration test races parent→child→server propagation with the immediate sendStreamingRecord; the second test correctly uses waitForNonDeterministicAssertion. Apply the same pattern to the first.

Happy to discuss any of these — full repro/reasoning in the inline threads.

Pause-state ordering & error handling (LeaderFollowerStoreIngestionTask):
- Narrow `catch (Exception)` to `catch (VeniceNoStoreException)` so
  RuntimeException-wrapped InterruptedException no longer loses the
  shutdown signal; switch to inherited REDUNDANT_LOGGING_FILTER.
- Flip the per-PCS pause flag BEFORE the long unsubscribe/resubscribe so
  the disk-quota no-op guard covers the entire window. On EXIT_PAUSE,
  resubscribe before clearing the flag — failure leaves flag set so
  the next iteration retries instead of leaving the partition dark.
- Wrap each per-PCS transition in try/catch so one failure no longer
  abandons the rest of the loop.
- Add null-PCS guard to `decidePauseTransition` (mirrors sibling
  shouldSkipQuotaCallbackForStoreLevelPause).
- Rewrite Javadoc to accurately describe AA/multi-region coverage —
  consumerUnSubscribeAllTopics only drops the OffsetRecord's leader
  topic, not all region RTs.

Restart-while-paused hook (StoreIngestionTask):
- Extract `maybeUnsubscribeOnStartupIfStorePaused` and wrap in try/catch
  so a transient repo-lookup failure or unsubscribe error no longer
  fails SUBSCRIBE processing into ERROR.
- Reorder: flag set before unsubscribe (consistent with new ordering).
- Extract pure decision helper `shouldUnsubscribeOnStartup` so the
  branch is unit-testable.

Quota-callback observability + manager state-sync:
- Change TopicPartitionConsumerFunction.execute to return boolean.
  StorageUtilizationManager now gates pausedPartitions add/remove on
  the lambda's return value, so the manager's tracked state stays in
  sync with real consumer state when a higher-priority pause source
  (store-level IngestionPauseMode) owns the consumer.
- pauseConsumption/resumeConsumption return false when store-level
  pause suppresses the action; log at INFO (throttled) so operators
  investigating quota-vs-pause interactions are no longer in the dark.

IngestionPauseMode Javadoc:
- CURRENT_VERSION/ALL_VERSIONS now accurately describe the impl —
  no longer claims "RT consumption only".

Tests (consolidated via @dataProvider truth tables):
- decidePauseTransitionTruthTable (6 rows) replaces 5 separate methods.
- shouldPauseForStoreTruthTable (6 rows) replaces 4 methods.
- shouldSkipQuotaCallbackTruthTable (3 rows) replaces 3 methods.
- shouldUnsubscribeOnStartupTruthTable (7 rows, new) — covers the
  restart-while-paused decision including null-PCS, null-Store, and
  CURRENT_VERSION × sitVersion combinations.
- New currentVersionSwapPausesNewCurrentResumesOldCurrent — codifies
  V4->V5 swap behavior under CURRENT_VERSION mode.

Integration tests:
- testRealTimeConsumptionPausesAndResumesOnServer: wait for child
  controller to observe ALL_VERSIONS before sending key2, eliminating
  the race between updateStore and sendStreamingRecord.
- testRegionScopedPauseOnlyAffectsTargetedRegion: update stale Javadoc
  to reflect the actual NR+AA setup.
- StorageUtilizationManagerTest: update existing lambdas to return
  true, matching the new boolean interface.
Copilot AI review requested due to automatic review settings May 1, 2026 18:13
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 14 out of 14 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

- LeaderFollowerStoreIngestionTask.consumerUnSubscribeAllTopics: also
  unsubscribe the separate RT topic when the primary topic is RT and
  sep-RT is enabled. Previously a paused leader could continue
  consuming from sep-RT, contradicting the IngestionPauseMode
  semantics. Mirrors the sep-RT handling in unsubscribeFromTopic and
  also benefits the existing UNSUBSCRIBE consumer-action path.

- TestServerIngestionPauseResume: replace `Thread.sleep(15s) +
  assertNull` (which can pass on slow propagation or fail on a brief
  ingestion window) with assertKeyRemainsAbsent, which polls every
  250ms over the full 15s window and fails fast if the key ever
  becomes visible. This caught a real timing issue where the server-
  side SIT hadn't yet observed the new pause mode by the time the
  produce landed; added a 10s buffer after the controller sees the
  new mode for the SIT's checkLongRunningTaskState loop to apply the
  unsubscribe before producing.
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 14 out of 14 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

misyel added 2 commits May 4, 2026 13:32
…ncile

- consumerUnSubscribeAllTopics: previously picked one primary topic
  (leader topic for LEADER, VT otherwise). During L/F transitions or
  bootstrap a partition can be transiently subscribed to BOTH the
  version topic and a separate leader topic; the old logic left the
  other one attached. Now unsubscribes whichever of {VT, leader topic,
  sep-RT} is currently subscribed (gated via consumerHasSubscription
  to avoid redundant work).

- maybeTransitionPauseState: previously gated the unsubscribe purely
  on the PCS flag transition, so a subscription that crept back via
  a leader/follower transition or a topic switch (after the initial
  pause unsubscribe) would not be re-unsubscribed — the next reconcile
  would see flag=true and return NO_CHANGE. Added a force-unsubscribe
  branch that fires when shouldPause is true, the PCS is already
  flagged paused, but partitionHasAnyActiveSubscription() reports a
  live subscription. Makes the reconcile loop idempotent against
  out-of-band re-subscriptions.

- New helper partitionHasAnyActiveSubscription: checks VT, leader
  topic (if different), and sep-RT — used by the force-unsubscribe
  branch above.
Diff-coverage gate (45% branches) was failing at 42.55% — primarily
because the inline `forceUnsubscribe` condition and its short-circuit
branches lived in `maybeTransitionPauseState` where the integration
tests don't exercise the L/F-transition-reattach path.

- Extend `decidePauseTransition` to take `hasAnyActiveSubscription` as
  a third input and return a new `RECONCILE_FORCE_UNSUBSCRIBE` action
  (previously an inline boolean check in the caller). The full state
  machine now lives in one pure static helper.
- maybeTransitionPauseState: replace the inline `forceUnsubscribe`
  computation with a single call to the helper; pre-compute
  `partitionHasAnyActiveSubscription` once per PCS (with a null guard).
- LeaderFollowerStoreIngestionTaskPauseTransitionTest: expand the
  truth-table to 11 rows covering the full
  (PCS-state × shouldPause × hasAnySub) cube including the new
  RECONCILE_FORCE_UNSUBSCRIBE branch and the null-PCS guard. All cases
  covered by a single parametrized test.
Copilot AI review requested due to automatic review settings May 4, 2026 21:14
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 14 out of 14 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copilot AI review requested due to automatic review settings May 5, 2026 20:48
Per review feedback (sixpluszero): remove
maybeUnsubscribeOnStartupIfStorePaused
entirely. The reconcile loop in maybeTransitionPauseState now uses
RECONCILE_FORCE_UNSUBSCRIBE to detect any partition with shouldPause=true
plus an active subscription (whether subscribed at startup, after blob
transfer, or via a leader/follower transition) and unsubscribes it on
the next iteration. This:

- Simplifies the code (one reconcile path instead of two).
- Naturally handles blob-transfer-in-progress: the transfer completes
  normally, the post-transfer subscribe attaches Kafka, and the next
  reconcile tick unsubscribes — blob transfer is never aborted.
- Eliminates the need for shouldUnsubscribeOnStartup and its
  parametrized truth-table test (7 rows).

Also fix Javadoc per Copilot: shouldSkipQuotaCallbackForStoreLevelPause
links now include the (PartitionConsumptionState) parameter signature
in both LeaderFollowerStoreIngestionTask and
StoreIngestionTaskShouldPauseTest.
@misyel misyel force-pushed the mkwong/server-ingestion-pause branch from 5cda902 to 8750adc Compare May 5, 2026 20:53
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 14 out of 14 changed files in this pull request and generated 6 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

misyel added 2 commits May 5, 2026 14:17
- maybeTransitionPauseState: short-circuit partitionHasAnyActiveSubscription
  to only run when shouldPause && pcs.isStoreLevelPaused() (the only case
  where the result can flip the decision to RECONCILE_FORCE_UNSUBSCRIBE).
  Avoids per-loop consumerHasSubscription scans on every partition for
  large stores.

- shouldPauseForStore Javadoc: replace vague "this version's role" with
  the actual logic — pauses for ALL_VERSIONS unconditionally, for
  CURRENT_VERSION only when sitVersionNumber equals
  store.getCurrentVersion().

- TestServerIngestionPauseResume: replace two Thread.sleep(10s) buffer
  waits with waitForStoreLevelPausedGauge — polls the actual server-side
  metric (.<store>_current--store_level_paused_gauge.IngestionStatsGauge)
  to confirm the SIT has applied the pause before producing the test
  record. Direct signal instead of fixed-duration guess; not flaky on
  slow CI runners.
The lazy probe was below the 45% diff-coverage gate (42.04%) because
partitionHasAnyActiveSubscription contained 14 uncovered branches that
integration tests don't exercise (RT/sep-RT/leader-topic permutations
under the narrow shouldPause + already-paused condition).

Replace the per-PCS helper with the existing SIT-wide
consumerHasAnySubscription() — a single call against
aggKafkaConsumerService with no branches. The trade-off is coarseness:
when *any* partition in the SIT has a subscription and we're in
shouldPause state, every paused PCS gets the reconcile force-unsubscribe
attempt. That's benign because consumerUnSubscribeAllTopics is
self-gating per topic (no-ops for partitions that aren't subscribed).

Also: hoist the probe out of the per-PCS loop since it's now SIT-wide
— one call per reconcile tick instead of one per partition.

Net coverage: 14 missed branches removed, lifting diff coverage above
the gate without weakening the reconcile semantics.
Copilot AI review requested due to automatic review settings May 5, 2026 21:38
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 14 out of 14 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

maybeProcessResubscribeRequest is the lag-detector path that calls
resubscribe(pcs) to recover replicas falling behind. If a partition
is store-level paused, this would re-attach the Kafka consumer that
the pause deliberately tore down. The reconcile loop's
RECONCILE_FORCE_UNSUBSCRIBE branch would catch it on the next tick,
but the window between resubscribe and reconcile allows records to
leak through.

Add an upfront skip mirroring the existing blob-transfer skip: if
pcs.isStoreLevelPaused() is true, log and continue. The reconcile
loop's normal EXIT_PAUSE path will resubscribe on resume.
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 14 out of 14 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Per Copilot review: during an operational pause window the heartbeat
monitor (60s cycle) re-enqueues every lagging partition. For a paused
store all partitions are lagging, so without throttling a 100-partition
paused store emits 100 log lines per 60s — and N stores × P partitions
× 1/min scales linearly during fleet-wide pause incidents (e.g.
100 stores × 100 partitions = 10000 log lines/min).

Wrap the skip log in REDUNDANT_LOGGING_FILTER (default 60s window)
keyed on storeName (not replicaId) so each paused store emits at most
one log line per window, regardless of partition count. ~100x
reduction at the fleet-wide upper bound.
sixpluszero
sixpluszero previously approved these changes May 6, 2026
Copy link
Copy Markdown
Contributor

@sixpluszero sixpluszero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall lgtm, but I think it is better to address the below two minor comments. Otherwise timeout will keep happening, even though we have already hardened the logic to not fail SIT totally.

…sep-RT

gate

- checkLongRunningTaskState: move maybeTransitionPauseState() inside
  the timed window so the SIT-loop latency stat reflects this work.
- Skip bootstrap-timeout check when partition is store-level paused;
  reset consumptionStartTimeInMs on EXIT_PAUSE so paused time isn't
  counted against the bootstrap window. Make the field mutable and
  add resetConsumptionStartTimeInMs().
- Drive store_level_paused_gauge from actual post-loop PCS state
  rather than intended shouldPause; partial-failure cases shouldn't
  flip the gauge to 0 while some PCSes remain paused.
- consumerUnSubscribeAllTopics: gate sep-RT unsubscribe on
  consumerHasSubscription to avoid WARN spam from the consumer
  delegator when sep-RT isn't currently attached.
- StoreIngestionTaskShouldPauseTest: clarify a comment.
Copilot AI review requested due to automatic review settings May 6, 2026 04:55
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 14 out of 14 changed files in this pull request and generated no new comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Copy Markdown
Contributor

@sixpluszero sixpluszero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm! Thank you for the change!

@misyel misyel merged commit 8de8158 into linkedin:main May 6, 2026
110 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants