[vpj] counting records produced per partition as part of push and adding it to the EOP message#2758
Conversation
e055b19 to
23e8fea
Compare
There was a problem hiding this comment.
Pull request overview
Adds per-partition record counting in Venice Push Job (VPJ) and propagates those counts through the controller/direct EOP paths by attaching them as a new PubSub message header (prc) on End-of-Push control messages, enabling server-side ingestion verification.
Changes:
- Update VPJ Spark write pipeline to emit one
(partitionId, recordCount)summary row per partition and collect it on the driver intoDataWriterTaskTracker. - Add controller + writer plumbing to send per-partition counts via REST (
partition_record_counts) or directVeniceWriter.broadcastEndOfPush(..., partitionRecordCounts). - Add unit/integration test coverage and new test utilities to read/verify
prcheaders from EOP messages.
Reviewed changes
Copilot reviewed 24 out of 24 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| services/venice-controller/src/test/java/com/linkedin/venice/controller/server/CreateVersionTest.java | Adds tests covering END_OF_PUSH route behavior with/without partition_record_counts. |
| services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithoutCluster.java | Adds tests validating writeEndOfPush overload delegation and behavior with partition counts. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/server/CreateVersion.java | Parses partition_record_counts JSON and calls appropriate Admin.writeEndOfPush overload. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java | Plumbs new writeEndOfPush overload through the parent admin wrapper. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java | Refactors writeEndOfPush to delegate 4-arg to 5-arg and pass counts to VeniceWriter. |
| services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java | Adds default 5-arg writeEndOfPush overload for backward compatibility. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/utils/IntegrationTestPushUtils.java | Adds utilities to consume EOP messages and extract/verify prc headers. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestRepushDiagnostics.java | Extends repush E2E test to assert EOP prc headers across versions. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestBatch.java | Adds E2E verification helper for EOP prc headers in batch push tests. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/AbstractTestRepush.java | Adds shared helper to verify EOP prc headers for repush tests. |
| internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java | Adds unit tests for the new broadcastEndOfPush(..., partitionRecordCounts) behavior. |
| internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java | Implements EOP broadcast overload that conditionally attaches prc header per partition. |
| internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeaders.java | Introduces header key constant VENICE_PARTITION_RECORD_COUNT_HEADER (prc). |
| internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java | Adds writeEndOfPush overload sending partition_record_counts param. |
| internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java | Adds REST parameter constant PARTITION_RECORD_COUNTS. |
| clients/venice-push-job/src/test/java/com/linkedin/venice/spark/datawriter/task/SparkDataWriterTaskTrackerTest.java | Adds tests for default/immutability/set-get behavior of per-partition counts. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/writer/SparkPartitionWriterFactory.java | Emits a single summary row (partitionId, recordCount) per Spark partition. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/writer/SparkPartitionWriter.java | Exposes record count from the underlying partition writer. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/task/SparkDataWriterTaskTracker.java | Stores and exposes per-partition record counts. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/jobs/AbstractDataWriterSparkJob.java | Collects partition summary rows and stores them into the task tracker. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/spark/SparkConstants.java | Adds schema/column constants for partition record count summary output. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/DataWriterTaskTracker.java | Adds getPerPartitionRecordCounts() to the tracker interface (default empty). |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java | Exposes messageSent via a protected getter for record counting. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java | Sends EOP with counts through both direct and controller paths. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
23e8fea to
d650174
Compare
sushantmane
left a comment
There was a problem hiding this comment.
Overall the feature design is solid and the test infrastructure in IntegrationTestPushUtils is well-done. A few things need fixing before merge, mostly around segment-state semantics in VeniceWriter and missing error handling on the controller EOP path.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 25 out of 25 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
5547946 to
a352ca8
Compare
…ing it
to the EOP message
Adds per-partition record counts to End-of-Push (EOP) control messages via
PubSub headers, so consumers can verify whether a partition received the
expected number of data records without scanning the topic.
VeniceWriter:
- New broadcastEndOfPush(Map debugInfo, Map<Integer, Long>
partitionRecordCounts)
overload that attaches each partition's count as the
VENICE_PARTITION_RECORD_COUNT_HEADER ("prc") header — 8-byte big-endian lo
ng.
Both EOP overloads now route through sendControlMessage so isEndOfSegment
derivation, partition-lock semantics, and debugInfo handling live in one
place. The 5-arg sendControlMessage delegates to a new 6-arg overload
taking PubSubMessageHeaders.
- buildPartitionRecordCountHeaders() always returns a fresh
PubSubMessageHeaders instance (never EmptyPubSubMessageHeaders.SINGLETON)
so any future code that mutates the headers reference cannot pollute
process-wide state.
- WARN log when partitionRecordCounts covers fewer than numberOfPartitions
so partial coverage is visible.
Spark data writer:
- SparkPartitionWriter exposes getRecordCount() (= getMessageSent),
counting only data records (PUT/DELETE), not control messages.
- SparkPartitionWriterFactory emits a single (partitionId, recordCount)
Row per Spark partition.
- AbstractDataWriterSparkJob collects those rows via collectAsList(),
builds the Map, and stores it on SparkDataWriterTaskTracker. Speculative-
execution collisions WARN-log existing/incoming/sum so silent
double-counting is visible.
VPJ:
- VenicePushJob retrieves per-partition counts from the task tracker and
passes them to either broadcastEndOfPush (direct) or
controllerClient.writeEndOfPush (via controller). Controller-side errors
now fail fast with a descriptive VeniceException instead of timing out.
- WARN with the compute job class name when the counts map is empty.
Controller:
- ControllerClient.writeEndOfPush(store, version, partitionRecordCounts)
serializes via writeValueAsString and surfaces JsonProcessingException as
a VeniceException with context.
- CreateVersion always calls the 5-arg admin.writeEndOfPush overload (empty
map when no JSON), eliminating the dual call site that hardcoded
alsoWriteStartOfPush=false in two places. Malformed JSON now throws
VeniceHttpException(SC_BAD_REQUEST) instead of being absorbed as a 500.
Integration test helpers:
- IntegrationTestPushUtils.getEopPartitionRecordCounts reads EOP messages
from every partition of a version topic and returns the per-partition
prc values; verifyPerPartitionCounts compares against expected counts
computed by running the Venice partitioner over the pushed keys.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
a352ca8 to
f5e8a28
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 26 out of 26 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…count verification Schema-only PR. Adds two of the three schema versions needed for upcoming server-side batch-push record-count verification (prc) work. AdminOperation v98 lands in a separate follow-up PR (split to stay under the enforce-lines-added 2000-line cap). Schemas added: - PartitionState v22 -> v23: adds `batchPushRecordCount` (long, default 0). Persists the server-side count of batch-push data records, used at EOP to verify against the producer-side `prc` PubSub header (PR linkedin#2758). - StoreMetaValue v42 -> v43: adds `batchPushRecordCountVerificationEnabled` (boolean, default false). Per-store opt-in for the throw path on record-count deficits. Both additive with safe defaults — backward-compatible. `build.gradle` pins compileAvro to v22/v42 via `versionOverrides`. The follow-up consumer-code PR will remove the pins and bump `AvroProtocolDefinition.PARTITION_STATE` (22->23) and `METADATA_SYSTEM_SCHEMA_STORE` (42->43) constants in lockstep. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Problem Statement
Venice batch pushes (VPJ) currently have no mechanism to verify that the number of records written by the push job matches what the server actually ingested per partition. If records are silently dropped or duplicated during ingestion, there's no detection — the push completes successfully and the version swaps in with incorrect data.
To enable server-side verification, the EOP (End-of-Push) control message needs to carry the per-partition record count as metadata. The server can then compare its own ingested count against the expected count from the EOP header and flag mismatches.
Solution
This PR implements per-partition record counting, which provides exactly-once semantics by piggybacking on Spark's task output guarantees - only the successful task's return value is collected.
Spark pipeline changes:
SparkPartitionWriterFactorynow emits a single(partitionId, recordCount)summary row per partition after writing all records to Kafka, instead of returning the exhausted input iteratorAbstractDataWriterSparkJobreplacesdataFrame.count()withdataFrame.collectAsList()to retrieve these summary rows on the driver. Memory overhead is bounded by partition count (~40 bytes per partition)SparkDataWriterTaskTrackervia a newgetPerPartitionRecordCounts()method on theDataWriterTaskTrackerinterfaceEOP header encoding:
prc(VENICE_PARTITION_RECORD_COUNT_HEADER) carries the partition size per partitionVeniceWriter.broadcastEndOfPush(debugInfo, partitionRecordCounts)— new overload that attaches theprcheader to each partition's EOP message. No Avro schema changes needed — headers sit outside the control message payload, so old servers ignore unknown headers (backward compatible)Both EOP paths covered:
(sendControlMessagesDirectly=true): VPJ driver passes counts directly to VeniceWriter(sendControlMessagesDirectly=false): Counts are JSON-serialized as a partition_record_counts query parameter on theEND_OF_PUSH REST POST.CreateVersionroute handler deserializes and passes toVeniceHelixAdmin.writeEndOfPush(), which was refactored to eliminate code duplication — the 4-arg overload now delegates to the 5-arg overloadCode changes
Concurrency-Specific Checks
Both reviewer and PR author to verify
synchronized,RWLock) are used where needed.ConcurrentHashMap,CopyOnWriteArrayList).How was this PR tested?
Does this PR introduce any user-facing or breaking changes?