[server][common] server-side batch-push record count verification at EOP#2774
Open
ymuppala wants to merge 2 commits intolinkedin:mainfrom
Open
[server][common] server-side batch-push record count verification at EOP#2774ymuppala wants to merge 2 commits intolinkedin:mainfrom
ymuppala wants to merge 2 commits intolinkedin:mainfrom
Conversation
20bc510 to
ae3085e
Compare
264139e to
eb5dc97
Compare
There was a problem hiding this comment.
Pull request overview
This PR adds server-side, end-to-end batch-push record-count verification at End-of-Push (EOP) by persisting a per-partition consumed record count and comparing it to the producer-provided per-partition record count (prc) header on the EOP control message. It also propagates deficit failures up to VPJ with a tagged error string, and introduces a per-store opt-in flag to fail ingestion on deficits.
Changes:
- Add per-partition
batchPushRecordCountpersistence (PartitionState v23) and record counting during ingestion; compare against EOPprcheader and emit match/mismatch/failure sensors. - Forward only the
prcheader on leader EOP re-emits to local VT so remote-region followers can verify counts. - Add per-store config
batchPushRecordCountVerificationEnabled(StoreMetaValue v43 + AdminOperation v98) and propagate tagged deficit errors to push status / VPJ checkpoint classification.
Reviewed changes
Copilot reviewed 35 out of 35 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| services/venice-controller/src/main/resources/avro/AdminOperation/v98/AdminOperation.avsc | Bump AdminOperation schema to include the new per-store flag in UpdateStore payload. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java | Plumb new UpdateStoreQueryParams flag into SetStore admin message for child clusters. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java | Persist the new per-store flag into store metadata. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminExecutionTask.java | Apply new SetStore flag when consuming admin messages. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestRepushDiagnostics.java | Extend repush diagnostics to enable verification flag and assert match/mismatch sensors across DCs. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestBatchPushRecordCountVerification.java | New integration test covering mismatch behavior with flag disabled vs enabled, plus tagged error propagation. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestBatch.java | Enable verification in an existing batch test and add helper assertions for match/mismatch sensors. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/AbstractTestRepush.java | Verify prc header propagation to remote local VTs and add cross-DC sensor assertion helper. |
| internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java | Unit tests for the new pass-through put overload that forwards selected headers. |
| internal/venice-common/src/test/java/com/linkedin/venice/pushmonitor/PushStatusDeciderTaggedMessageTest.java | New unit tests for extracting tagged error messages from errored replicas. |
| internal/venice-common/src/test/java/com/linkedin/venice/PushJobCheckPointsTest.java | Add checkpoint mapping for RECORD_COUNT_MISMATCH (-15). |
| internal/venice-common/src/test/java/com/linkedin/venice/offsets/TestOffsetRecord.java | Validate batchPushRecordCount OffsetRecord serialization round-trip. |
| internal/venice-common/src/main/resources/avro/StoreMetaValue/v43/StoreMetaValue.avsc | Bump StoreMetaValue schema to include the per-store verification flag. |
| internal/venice-common/src/main/resources/avro/PartitionState/v23/PartitionState.avsc | Bump PartitionState schema to persist batchPushRecordCount. |
| internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java | Add pass-through put overload that can forward upstream headers to local VT. |
| internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java | Bump protocol versions for PartitionState, AdminOperation, and StoreMetaValue. |
| internal/venice-common/src/main/java/com/linkedin/venice/pushmonitor/PushStatusDecider.java | Surface tagged replica error strings in status details and add helper to extract them. |
| internal/venice-common/src/main/java/com/linkedin/venice/PushJobCheckpoints.java | Add RECORD_COUNT_MISMATCH checkpoint and tagged error prefix constant. |
| internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java | Add getter/setter for batchPushRecordCount. |
| internal/venice-common/src/main/java/com/linkedin/venice/meta/ZKStore.java | Copy new store flag into ZKStore-backed store properties. |
| internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java | Expose the new store flag via SystemStore read path (setter unsupported). |
| internal/venice-common/src/main/java/com/linkedin/venice/meta/StoreInfo.java | Include the new store flag in StoreInfo DTO. |
| internal/venice-common/src/main/java/com/linkedin/venice/meta/Store.java | Add new store-level config API for batch push record count verification. |
| internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java | Expose new flag on ReadOnlyStore delegate (setter unsupported). |
| internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/UpdateStoreQueryParams.java | Add UpdateStoreQueryParams getter/setter for the new flag. |
| internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java | Add REST query param constant for the new flag. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java | Classify tagged deficit failures as RECORD_COUNT_MISMATCH checkpoint. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskRecordCountTest.java | New unit tests for mismatch vs match behavior and failure sensor behavior. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java | Unit tests for default/monotonic/restored batchPushRecordCount. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java | Unit tests for forwarding only the prc header on EOP re-emits. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveKeyCountTest.java | Update test to match new processEndOfPush signature including headers. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java | Add match/mismatch/failure sensors for record-count verification. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java | Count batch records, persist count, compare at EOP, emit sensors, and optionally throw with tagged error. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java | Store and persist per-partition batchPushRecordCount. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java | Extract and forward prc header on leader EOP re-emits to local VT. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
This was referenced May 6, 2026
Merged
ymuppala
added a commit
to ymuppala/venice
that referenced
this pull request
May 7, 2026
1. SIT.verifyBatchPushRecordCount Javadoc: was referencing `VeniceServerConfig#isBatchPushRecordCountVerificationEnabled` (the server-level config that was removed when we converted to a per-store flag). Updated to reference Store#isBatchPushRecordCountVerificationEnabled so operators understand this is store-config driven, not a server-wide toggle. 2. TestBatchPushRecordCountVerification.testDeficitFailsPushWhenFlagEnabled: wrap the metric + statusDetails assertions in waitForNonDeterministicAssertion. Tehuti recordings are observed via the metrics repository which can lag behind the actual record() call (the throw runs on the SIT consumer thread, metric is sampled on the test thread). The statusDetails propagation chain (replica error message via OfflinePushAccessor -> push monitor aggregation) is also async and can lag the ERROR transition by a small margin. Both items flagged by the Copilot reviewer. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
ymuppala
added a commit
to ymuppala/venice
that referenced
this pull request
May 7, 2026
1. SIT.verifyBatchPushRecordCount Javadoc: was referencing `VeniceServerConfig#isBatchPushRecordCountVerificationEnabled` (the server-level config that was removed when we converted to a per-store flag). Updated to reference Store#isBatchPushRecordCountVerificationEnabled so operators understand this is store-config driven, not a server-wide toggle. 2. TestBatchPushRecordCountVerification.testDeficitFailsPushWhenFlagEnabled: wrap the metric + statusDetails assertions in waitForNonDeterministicAssertion. Tehuti recordings are observed via the metrics repository which can lag behind the actual record() call (the throw runs on the SIT consumer thread, metric is sampled on the test thread). The statusDetails propagation chain (replica error message via OfflinePushAccessor -> push monitor aggregation) is also async and can lag the ERROR transition by a small margin. Both items flagged by the Copilot reviewer. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
58bfd72 to
6576b08
Compare
6576b08 to
cca9d58
Compare
6f311eb to
b5cfb07
Compare
b5cfb07 to
a604f92
Compare
8ee9162 to
62419d5
Compare
Fix integration test failure in
TestBatchPushRecordCountVerification.testDeficitFailsPushWhenFlagEnabled:
Got: 2/2 partitions completed.
expected statusDetails to contain 'RECORD_COUNT_DEFICIT:'
Root cause: OfflinePushStatus.updateStatusDetails() walks replica statuses,
counts ones in any *terminal* state (which includes ERROR), and rewrites
statusDetails to "<X>/<Y> partitions completed." This races against the
authoritative path:
PushStatusDecider.checkPushStatusAndDetailsByPartitionsStatus
-> ExecutionStatusWithDetails(ERROR, "too many ERROR... + tagged details")
-> handleTerminalOfflinePushUpdate -> handleErrorPush -> updatePushStatus
-> OfflinePushStatus.updateStatus(ERROR, taggedDetails)
-> statusDetails = taggedDetails
Once the push has transitioned to ERROR with tagged details, a *late*
partition status update (replica status arriving after the push transition)
calls setPartitionStatus(updateDetails=true) which calls updateStatusDetails,
clobbering the tagged details with the partition-progress counter.
Fix: short-circuit updateStatusDetails when currentStatus is already terminal.
Once terminal, statusDetails is authoritative (the success path leaves it
empty; the error path holds the tagged details VPJ relies on for
PushJobCheckpoints classification). The partition-progress counter is
meaningless after the push has already concluded.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem Statement
Part 4
A Spark-based VPJ KIF re-push silently wrote batch data to the wrong datacenter. Two of three DCs lost batch data while the push completed cleanly. The push job reported success. The store served partial data for hours before anyone noticed.
There was no defense at the consumer end against the producer claiming "I sent N records" when the consumer only received M < N. Venice servers count records but never compared their count against the producer's. Any failure mode that drops records between VPJ and the server — wrong-DC writes, partial broker
connectivity loss, intermittent produce errors not surfacing to VPJ.
VPJ doesn't see what the server actually received. It has to be a server-side check, but with no extra round-trip and tolerant of Kafka's at-least-once semantics (consumer can legitimately see surplus from producer retries).
Solution
End-to-end record-count verification piggy-backed on the EOP control message.
PR #2758 (already merged) attached a new
prc(per-partition record count) PubSub header to every EOP. This PR makes the server count its own data records intoPartitionState.batchPushRecordCount (Avro schema bumped to v23), persists it to RocksDB on every checkpoint, and compares against theprcheader at EOP time. Match path emitsbatch_push_record_count_match; deficit path emitsbatch_push_record_count_mismatchCoverage every replica, every region. Cross-region followers consume EOP from local VT — but the leader's pass-through put in LeaderFollowerStoreIngestionTask was hardcoded to
EmptyPubSubMessageHeaders.SINGLETON, stripping theprcheader before re-emit. This PR forwardsonly the
prcheader to local VT.Per-store opt-in. New store-level config
batchPushRecordCountVerificationEnabled (default false). When true and a deficit is detected, the server throws aVeniceExceptioncarrying the tagged error string.End-to-end push-job integration. The tagged error string survives the full propagation chain:
StoreIngestionTask.handleIngestionExceptionincludese.getMessage()in the reportError message (without this fix the bare "Caught Exception during ingestion." masked the tag).PushStatusNotifier.errorwrites the message to ZK viaOfflinePushAccessor.updateReplicaStatus.PushStatusDecider.findTaggedReplicaErrorMessagescans error replicas' messages for theRECORD_COUNT_DEFICIT: prefix and surfaces the cleaned tagged segment in ExecutionStatusWithDetails.details.JobStatusQueryResponse.statusDetailscarries it to VPJ.VenicePushJob.pollStatusUntilCompletiondetects the tag and setsPushJobCheckpoints.RECORD_COUNT_MISMATCH(-15)onpushJobDetails.pushJobLatestCheckpoint.Sensors. Two distinct sensors:
Code changes
Concurrency-Specific Checks
Both reviewer and PR author to verify
synchronized,RWLock) are used where needed.ConcurrentHashMap,CopyOnWriteArrayList).How was this PR tested?
Does this PR introduce any user-facing or breaking changes?