Skip to content

[server][common] server-side batch-push record count verification at EOP#2774

Open
ymuppala wants to merge 2 commits intolinkedin:mainfrom
ymuppala:vpj-leader-forward-prc-header
Open

[server][common] server-side batch-push record count verification at EOP#2774
ymuppala wants to merge 2 commits intolinkedin:mainfrom
ymuppala:vpj-leader-forward-prc-header

Conversation

@ymuppala
Copy link
Copy Markdown
Collaborator

@ymuppala ymuppala commented May 5, 2026

Problem Statement

Part 4

A Spark-based VPJ KIF re-push silently wrote batch data to the wrong datacenter. Two of three DCs lost batch data while the push completed cleanly. The push job reported success. The store served partial data for hours before anyone noticed.
There was no defense at the consumer end against the producer claiming "I sent N records" when the consumer only received M < N. Venice servers count records but never compared their count against the producer's. Any failure mode that drops records between VPJ and the server — wrong-DC writes, partial broker
connectivity loss, intermittent produce errors not surfacing to VPJ.

VPJ doesn't see what the server actually received. It has to be a server-side check, but with no extra round-trip and tolerant of Kafka's at-least-once semantics (consumer can legitimately see surplus from producer retries).

Solution

End-to-end record-count verification piggy-backed on the EOP control message.

PR #2758 (already merged) attached a new prc (per-partition record count) PubSub header to every EOP. This PR makes the server count its own data records into PartitionState.batchPushRecordCount (Avro schema bumped to v23), persists it to RocksDB on every checkpoint, and compares against the prc header at EOP time. Match path emits batch_push_record_count_match; deficit path emits batch_push_record_count_mismatch

Coverage every replica, every region. Cross-region followers consume EOP from local VT — but the leader's pass-through put in LeaderFollowerStoreIngestionTask was hardcoded to EmptyPubSubMessageHeaders.SINGLETON, stripping the prc header before re-emit. This PR forwards
only the prc header to local VT.

Per-store opt-in. New store-level config batchPushRecordCountVerificationEnabled (default false). When true and a deficit is detected, the server throws a VeniceException carrying the tagged error string.

End-to-end push-job integration. The tagged error string survives the full propagation chain:

  1. StoreIngestionTask.handleIngestionException includes e.getMessage() in the reportError message (without this fix the bare "Caught Exception during ingestion." masked the tag).
  2. PushStatusNotifier.error writes the message to ZK via OfflinePushAccessor.updateReplicaStatus.
  3. PushStatusDecider.findTaggedReplicaErrorMessage scans error replicas' messages for the RECORD_COUNT_DEFICIT: prefix and surfaces the cleaned tagged segment in ExecutionStatusWithDetails.details.
  4. JobStatusQueryResponse.statusDetails carries it to VPJ.
  5. VenicePushJob.pollStatusUntilCompletion detects the tag and sets PushJobCheckpoints.RECORD_COUNT_MISMATCH(-15) on pushJobDetails.pushJobLatestCheckpoint.

Sensors. Two distinct sensors:

  • batch_push_record_count_match / batch_push_record_count_mismatch (informational — fired regardless of flag state).
  • record_count_mismatch_failure (failed-and-throwing — fires only when flag enabled AND deficit caused a throw).

Code changes

  • Added new code behind a config. If so list the config names and their default values in the PR description.
  • Introduced new log lines.
    • Confirmed if logs need to be rate limited to avoid excessive logging.

Concurrency-Specific Checks

Both reviewer and PR author to verify

  • Code has no race conditions or thread safety issues.
  • Proper synchronization mechanisms (e.g., synchronized, RWLock) are used where needed.
  • No blocking calls inside critical sections that could lead to deadlocks or performance degradation.
  • Verified thread-safe collections are used (e.g., ConcurrentHashMap, CopyOnWriteArrayList).
  • Validated proper exception handling in multi-threaded code to avoid silent thread termination.

How was this PR tested?

  • New unit tests added.
  • New integration tests added.
  • Modified or extended existing tests.
  • Verified backward compatibility (if applicable).

Does this PR introduce any user-facing or breaking changes?

  • No. You can skip the rest of this section.
  • Yes. Clearly explain the behavior change and its impact.

@ymuppala ymuppala force-pushed the vpj-leader-forward-prc-header branch from 20bc510 to ae3085e Compare May 5, 2026 21:28
@ymuppala ymuppala changed the title [server][common] server-side batch-push record count verification at… [server][common] server-side batch-push record count verification at EOP May 6, 2026
@ymuppala ymuppala force-pushed the vpj-leader-forward-prc-header branch 2 times, most recently from 264139e to eb5dc97 Compare May 6, 2026 21:04
@ymuppala ymuppala marked this pull request as ready for review May 6, 2026 21:04
Copilot AI review requested due to automatic review settings May 6, 2026 21:04
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds server-side, end-to-end batch-push record-count verification at End-of-Push (EOP) by persisting a per-partition consumed record count and comparing it to the producer-provided per-partition record count (prc) header on the EOP control message. It also propagates deficit failures up to VPJ with a tagged error string, and introduces a per-store opt-in flag to fail ingestion on deficits.

Changes:

  • Add per-partition batchPushRecordCount persistence (PartitionState v23) and record counting during ingestion; compare against EOP prc header and emit match/mismatch/failure sensors.
  • Forward only the prc header on leader EOP re-emits to local VT so remote-region followers can verify counts.
  • Add per-store config batchPushRecordCountVerificationEnabled (StoreMetaValue v43 + AdminOperation v98) and propagate tagged deficit errors to push status / VPJ checkpoint classification.

Reviewed changes

Copilot reviewed 35 out of 35 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
services/venice-controller/src/main/resources/avro/AdminOperation/v98/AdminOperation.avsc Bump AdminOperation schema to include the new per-store flag in UpdateStore payload.
services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java Plumb new UpdateStoreQueryParams flag into SetStore admin message for child clusters.
services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java Persist the new per-store flag into store metadata.
services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java Apply new SetStore flag when consuming admin messages.
internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestRepushDiagnostics.java Extend repush diagnostics to enable verification flag and assert match/mismatch sensors across DCs.
internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestBatchPushRecordCountVerification.java New integration test covering mismatch behavior with flag disabled vs enabled, plus tagged error propagation.
internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestBatch.java Enable verification in an existing batch test and add helper assertions for match/mismatch sensors.
internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/AbstractTestRepush.java Verify prc header propagation to remote local VTs and add cross-DC sensor assertion helper.
internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java Unit tests for the new pass-through put overload that forwards selected headers.
internal/venice-common/src/test/java/com/linkedin/venice/pushmonitor/PushStatusDeciderTaggedMessageTest.java New unit tests for extracting tagged error messages from errored replicas.
internal/venice-common/src/test/java/com/linkedin/venice/PushJobCheckPointsTest.java Add checkpoint mapping for RECORD_COUNT_MISMATCH (-15).
internal/venice-common/src/test/java/com/linkedin/venice/offsets/TestOffsetRecord.java Validate batchPushRecordCount OffsetRecord serialization round-trip.
internal/venice-common/src/main/resources/avro/StoreMetaValue/v43/StoreMetaValue.avsc Bump StoreMetaValue schema to include the per-store verification flag.
internal/venice-common/src/main/resources/avro/PartitionState/v23/PartitionState.avsc Bump PartitionState schema to persist batchPushRecordCount.
internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java Add pass-through put overload that can forward upstream headers to local VT.
internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java Bump protocol versions for PartitionState, AdminOperation, and StoreMetaValue.
internal/venice-common/src/main/java/com/linkedin/venice/pushmonitor/PushStatusDecider.java Surface tagged replica error strings in status details and add helper to extract them.
internal/venice-common/src/main/java/com/linkedin/venice/PushJobCheckpoints.java Add RECORD_COUNT_MISMATCH checkpoint and tagged error prefix constant.
internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java Add getter/setter for batchPushRecordCount.
internal/venice-common/src/main/java/com/linkedin/venice/meta/ZKStore.java Copy new store flag into ZKStore-backed store properties.
internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java Expose the new store flag via SystemStore read path (setter unsupported).
internal/venice-common/src/main/java/com/linkedin/venice/meta/StoreInfo.java Include the new store flag in StoreInfo DTO.
internal/venice-common/src/main/java/com/linkedin/venice/meta/Store.java Add new store-level config API for batch push record count verification.
internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java Expose new flag on ReadOnlyStore delegate (setter unsupported).
internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/UpdateStoreQueryParams.java Add UpdateStoreQueryParams getter/setter for the new flag.
internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java Add REST query param constant for the new flag.
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java Classify tagged deficit failures as RECORD_COUNT_MISMATCH checkpoint.
clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskRecordCountTest.java New unit tests for mismatch vs match behavior and failure sensor behavior.
clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java Unit tests for default/monotonic/restored batchPushRecordCount.
clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java Unit tests for forwarding only the prc header on EOP re-emits.
clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveKeyCountTest.java Update test to match new processEndOfPush signature including headers.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java Add match/mismatch/failure sensors for record-count verification.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java Count batch records, persist count, compare at EOP, emit sensors, and optionally throw with tagged error.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java Store and persist per-partition batchPushRecordCount.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java Extract and forward prc header on leader EOP re-emits to local VT.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

ymuppala added a commit to ymuppala/venice that referenced this pull request May 7, 2026
1. SIT.verifyBatchPushRecordCount Javadoc: was referencing
   `VeniceServerConfig#isBatchPushRecordCountVerificationEnabled` (the
   server-level config that was removed when we converted to a per-store
   flag). Updated to reference Store#isBatchPushRecordCountVerificationEnabled
   so operators understand this is store-config driven, not a server-wide
   toggle.

2. TestBatchPushRecordCountVerification.testDeficitFailsPushWhenFlagEnabled:
   wrap the metric + statusDetails assertions in
waitForNonDeterministicAssertion.
   Tehuti recordings are observed via the metrics repository which can lag
   behind the actual record() call (the throw runs on the SIT consumer thread,
   metric is sampled on the test thread). The statusDetails propagation chain
   (replica error message via OfflinePushAccessor -> push monitor aggregation)
   is also async and can lag the ERROR transition by a small margin.

Both items flagged by the Copilot reviewer.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
ymuppala added a commit to ymuppala/venice that referenced this pull request May 7, 2026
1. SIT.verifyBatchPushRecordCount Javadoc: was referencing
   `VeniceServerConfig#isBatchPushRecordCountVerificationEnabled` (the
   server-level config that was removed when we converted to a per-store
   flag). Updated to reference Store#isBatchPushRecordCountVerificationEnabled
   so operators understand this is store-config driven, not a server-wide
   toggle.

2. TestBatchPushRecordCountVerification.testDeficitFailsPushWhenFlagEnabled:
   wrap the metric + statusDetails assertions in
waitForNonDeterministicAssertion.
   Tehuti recordings are observed via the metrics repository which can lag
   behind the actual record() call (the throw runs on the SIT consumer thread,
   metric is sampled on the test thread). The statusDetails propagation chain
   (replica error message via OfflinePushAccessor -> push monitor aggregation)
   is also async and can lag the ERROR transition by a small margin.

Both items flagged by the Copilot reviewer.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings May 7, 2026 20:37
@ymuppala ymuppala force-pushed the vpj-leader-forward-prc-header branch from 58bfd72 to 6576b08 Compare May 7, 2026 20:37
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 32 out of 33 changed files in this pull request and generated 2 comments.

@ymuppala ymuppala force-pushed the vpj-leader-forward-prc-header branch from 6576b08 to cca9d58 Compare May 7, 2026 20:45
Copilot AI review requested due to automatic review settings May 7, 2026 21:34
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 32 out of 33 changed files in this pull request and generated 3 comments.

Comment thread internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java Outdated
@ymuppala ymuppala force-pushed the vpj-leader-forward-prc-header branch from 6f311eb to b5cfb07 Compare May 7, 2026 21:46
Copilot AI review requested due to automatic review settings May 7, 2026 22:25
@ymuppala ymuppala force-pushed the vpj-leader-forward-prc-header branch from b5cfb07 to a604f92 Compare May 7, 2026 22:25
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 32 out of 33 changed files in this pull request and generated 2 comments.

Copilot AI review requested due to automatic review settings May 7, 2026 22:58
@ymuppala ymuppala force-pushed the vpj-leader-forward-prc-header branch from 8ee9162 to 62419d5 Compare May 7, 2026 22:58
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 33 out of 34 changed files in this pull request and generated 2 comments.

Fix integration test failure in
TestBatchPushRecordCountVerification.testDeficitFailsPushWhenFlagEnabled:

  Got: 2/2 partitions completed.
  expected statusDetails to contain 'RECORD_COUNT_DEFICIT:'

Root cause: OfflinePushStatus.updateStatusDetails() walks replica statuses,
counts ones in any *terminal* state (which includes ERROR), and rewrites
statusDetails to "<X>/<Y> partitions completed." This races against the
authoritative path:

  PushStatusDecider.checkPushStatusAndDetailsByPartitionsStatus
    -> ExecutionStatusWithDetails(ERROR, "too many ERROR... + tagged details")
    -> handleTerminalOfflinePushUpdate -> handleErrorPush -> updatePushStatus
    -> OfflinePushStatus.updateStatus(ERROR, taggedDetails)
    -> statusDetails = taggedDetails

Once the push has transitioned to ERROR with tagged details, a *late*
partition status update (replica status arriving after the push transition)
calls setPartitionStatus(updateDetails=true) which calls updateStatusDetails,
clobbering the tagged details with the partition-progress counter.

Fix: short-circuit updateStatusDetails when currentStatus is already terminal.
Once terminal, statusDetails is authoritative (the success path leaves it
empty; the error path holds the tagged details VPJ relies on for
PushJobCheckpoints classification). The partition-progress counter is
meaningless after the push has already concluded.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants