Skip to content

[vpj] counting records produced per partition as part of push and adding it to the EOP message#2758

Merged
ymuppala merged 1 commit intolinkedin:mainfrom
ymuppala:vpj-collect-partition-record-counts
May 5, 2026
Merged

[vpj] counting records produced per partition as part of push and adding it to the EOP message#2758
ymuppala merged 1 commit intolinkedin:mainfrom
ymuppala:vpj-collect-partition-record-counts

Conversation

@ymuppala
Copy link
Copy Markdown
Collaborator

@ymuppala ymuppala commented Apr 24, 2026

Problem Statement

Venice batch pushes (VPJ) currently have no mechanism to verify that the number of records written by the push job matches what the server actually ingested per partition. If records are silently dropped or duplicated during ingestion, there's no detection — the push completes successfully and the version swaps in with incorrect data.

To enable server-side verification, the EOP (End-of-Push) control message needs to carry the per-partition record count as metadata. The server can then compare its own ingested count against the expected count from the EOP header and flag mismatches.

Solution

This PR implements per-partition record counting, which provides exactly-once semantics by piggybacking on Spark's task output guarantees - only the successful task's return value is collected.

Spark pipeline changes:

  • SparkPartitionWriterFactory now emits a single (partitionId, recordCount) summary row per partition after writing all records to Kafka, instead of returning the exhausted input iterator
  • AbstractDataWriterSparkJob replaces dataFrame.count() with dataFrame.collectAsList() to retrieve these summary rows on the driver. Memory overhead is bounded by partition count (~40 bytes per partition)
  • Per-partition counts are stored on SparkDataWriterTaskTracker via a new getPerPartitionRecordCounts() method on the DataWriterTaskTracker interface

EOP header encoding:

  • New PubSub message header prc (VENICE_PARTITION_RECORD_COUNT_HEADER) carries the partition size per partition
  • VeniceWriter.broadcastEndOfPush(debugInfo, partitionRecordCounts) — new overload that attaches the prc header to each partition's EOP message. No Avro schema changes needed — headers sit outside the control message payload, so old servers ignore unknown headers (backward compatible)

Both EOP paths covered:

  • Direct path (sendControlMessagesDirectly=true): VPJ driver passes counts directly to VeniceWriter
  • Controller path (sendControlMessagesDirectly=false): Counts are JSON-serialized as a partition_record_counts query parameter on the END_OF_PUSH REST POST. CreateVersion route handler deserializes and passes to VeniceHelixAdmin.writeEndOfPush(), which was refactored to eliminate code duplication — the 4-arg overload now delegates to the 5-arg overload

Code changes

  • Added new code behind a config. If so list the config names and their default values in the PR description.
  • Introduced new log lines.
    • Confirmed if logs need to be rate limited to avoid excessive logging.

Concurrency-Specific Checks

Both reviewer and PR author to verify

  • Code has no race conditions or thread safety issues.
  • Proper synchronization mechanisms (e.g., synchronized, RWLock) are used where needed.
  • No blocking calls inside critical sections that could lead to deadlocks or performance degradation.
  • Verified thread-safe collections are used (e.g., ConcurrentHashMap, CopyOnWriteArrayList).
  • Validated proper exception handling in multi-threaded code to avoid silent thread termination.

How was this PR tested?

  • New unit tests added.
  • New integration tests added.
  • Modified or extended existing tests.
  • Verified backward compatibility (if applicable).

Does this PR introduce any user-facing or breaking changes?

  • No. You can skip the rest of this section.
  • Yes. Clearly explain the behavior change and its impact.

@ymuppala ymuppala force-pushed the vpj-collect-partition-record-counts branch 3 times, most recently from e055b19 to 23e8fea Compare April 28, 2026 15:47
@ymuppala ymuppala marked this pull request as ready for review April 28, 2026 17:06
Copilot AI review requested due to automatic review settings April 28, 2026 17:06
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds per-partition record counting in Venice Push Job (VPJ) and propagates those counts through the controller/direct EOP paths by attaching them as a new PubSub message header (prc) on End-of-Push control messages, enabling server-side ingestion verification.

Changes:

  • Update VPJ Spark write pipeline to emit one (partitionId, recordCount) summary row per partition and collect it on the driver into DataWriterTaskTracker.
  • Add controller + writer plumbing to send per-partition counts via REST (partition_record_counts) or direct VeniceWriter.broadcastEndOfPush(..., partitionRecordCounts).
  • Add unit/integration test coverage and new test utilities to read/verify prc headers from EOP messages.

Reviewed changes

Copilot reviewed 24 out of 24 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
services/venice-controller/src/test/java/com/linkedin/venice/controller/server/CreateVersionTest.java Adds tests covering END_OF_PUSH route behavior with/without partition_record_counts.
services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithoutCluster.java Adds tests validating writeEndOfPush overload delegation and behavior with partition counts.
services/venice-controller/src/main/java/com/linkedin/venice/controller/server/CreateVersion.java Parses partition_record_counts JSON and calls appropriate Admin.writeEndOfPush overload.
services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java Plumbs new writeEndOfPush overload through the parent admin wrapper.
services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java Refactors writeEndOfPush to delegate 4-arg to 5-arg and pass counts to VeniceWriter.
services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java Adds default 5-arg writeEndOfPush overload for backward compatibility.
internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/utils/IntegrationTestPushUtils.java Adds utilities to consume EOP messages and extract/verify prc headers.
internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestRepushDiagnostics.java Extends repush E2E test to assert EOP prc headers across versions.
internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestBatch.java Adds E2E verification helper for EOP prc headers in batch push tests.
internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/AbstractTestRepush.java Adds shared helper to verify EOP prc headers for repush tests.
internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java Adds unit tests for the new broadcastEndOfPush(..., partitionRecordCounts) behavior.
internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java Implements EOP broadcast overload that conditionally attaches prc header per partition.
internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeaders.java Introduces header key constant VENICE_PARTITION_RECORD_COUNT_HEADER (prc).
internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java Adds writeEndOfPush overload sending partition_record_counts param.
internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java Adds REST parameter constant PARTITION_RECORD_COUNTS.
clients/venice-push-job/src/test/java/com/linkedin/venice/spark/datawriter/task/SparkDataWriterTaskTrackerTest.java Adds tests for default/immutability/set-get behavior of per-partition counts.
clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/writer/SparkPartitionWriterFactory.java Emits a single summary row (partitionId, recordCount) per Spark partition.
clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/writer/SparkPartitionWriter.java Exposes record count from the underlying partition writer.
clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/task/SparkDataWriterTaskTracker.java Stores and exposes per-partition record counts.
clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/jobs/AbstractDataWriterSparkJob.java Collects partition summary rows and stores them into the task tracker.
clients/venice-push-job/src/main/java/com/linkedin/venice/spark/SparkConstants.java Adds schema/column constants for partition record count summary output.
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/DataWriterTaskTracker.java Adds getPerPartitionRecordCounts() to the tracker interface (default empty).
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java Exposes messageSent via a protected getter for record counting.
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java Sends EOP with counts through both direct and controller paths.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java Outdated
@ymuppala ymuppala force-pushed the vpj-collect-partition-record-counts branch from 23e8fea to d650174 Compare April 28, 2026 23:05
@ymuppala ymuppala enabled auto-merge (squash) May 1, 2026 15:50
@sushantmane sushantmane added the review-in-progress Reviewer is actively reviewing label May 1, 2026
Copy link
Copy Markdown
Contributor

@sushantmane sushantmane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall the feature design is solid and the test infrastructure in IntegrationTestPushUtils is well-done. A few things need fixing before merge, mostly around segment-state semantics in VeniceWriter and missing error handling on the controller EOP path.

Comment thread internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java Outdated
Comment thread internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java Outdated
Copilot AI review requested due to automatic review settings May 4, 2026 20:16
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 25 out of 25 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

sushantmane
sushantmane previously approved these changes May 4, 2026
Copy link
Copy Markdown
Contributor

@sushantmane sushantmane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

few more nits. Ovearall LGTM. Thanks

Comment thread internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java Outdated
@ymuppala ymuppala force-pushed the vpj-collect-partition-record-counts branch from 5547946 to a352ca8 Compare May 4, 2026 22:57
…ing it

to the EOP message

Adds per-partition record counts to End-of-Push (EOP) control messages via
PubSub headers, so consumers can verify whether a partition received the
expected number of data records without scanning the topic.

VeniceWriter:
- New broadcastEndOfPush(Map debugInfo, Map<Integer, Long>
partitionRecordCounts)
  overload that attaches each partition's count as the
  VENICE_PARTITION_RECORD_COUNT_HEADER ("prc") header — 8-byte big-endian lo
ng.
  Both EOP overloads now route through sendControlMessage so isEndOfSegment
  derivation, partition-lock semantics, and debugInfo handling live in one
  place. The 5-arg sendControlMessage delegates to a new 6-arg overload
  taking PubSubMessageHeaders.
- buildPartitionRecordCountHeaders() always returns a fresh
  PubSubMessageHeaders instance (never EmptyPubSubMessageHeaders.SINGLETON)
  so any future code that mutates the headers reference cannot pollute
  process-wide state.
- WARN log when partitionRecordCounts covers fewer than numberOfPartitions
  so partial coverage is visible.

Spark data writer:
- SparkPartitionWriter exposes getRecordCount() (= getMessageSent),
  counting only data records (PUT/DELETE), not control messages.
- SparkPartitionWriterFactory emits a single (partitionId, recordCount)
  Row per Spark partition.
- AbstractDataWriterSparkJob collects those rows via collectAsList(),
  builds the Map, and stores it on SparkDataWriterTaskTracker. Speculative-
  execution collisions WARN-log existing/incoming/sum so silent
  double-counting is visible.

VPJ:
- VenicePushJob retrieves per-partition counts from the task tracker and
  passes them to either broadcastEndOfPush (direct) or
  controllerClient.writeEndOfPush (via controller). Controller-side errors
  now fail fast with a descriptive VeniceException instead of timing out.
- WARN with the compute job class name when the counts map is empty.

Controller:
- ControllerClient.writeEndOfPush(store, version, partitionRecordCounts)
  serializes via writeValueAsString and surfaces JsonProcessingException as
  a VeniceException with context.
- CreateVersion always calls the 5-arg admin.writeEndOfPush overload (empty
  map when no JSON), eliminating the dual call site that hardcoded
  alsoWriteStartOfPush=false in two places. Malformed JSON now throws
  VeniceHttpException(SC_BAD_REQUEST) instead of being absorbed as a 500.

Integration test helpers:
- IntegrationTestPushUtils.getEopPartitionRecordCounts reads EOP messages
  from every partition of a version topic and returns the per-partition
  prc values; verifyPerPartitionCounts compares against expected counts
  computed by running the Venice partitioner over the pushed keys.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings May 4, 2026 23:37
@ymuppala ymuppala force-pushed the vpj-collect-partition-record-counts branch from a352ca8 to f5e8a28 Compare May 4, 2026 23:37
@ymuppala ymuppala disabled auto-merge May 4, 2026 23:41
@ymuppala ymuppala enabled auto-merge (squash) May 4, 2026 23:41
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 26 out of 26 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Copy Markdown
Contributor

@sushantmane sushantmane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@ymuppala ymuppala merged commit 8f922d1 into linkedin:main May 5, 2026
110 checks passed
ymuppala added a commit to ymuppala/venice that referenced this pull request May 6, 2026
…count

verification

Schema-only PR. Adds two of the three schema versions needed for upcoming
server-side batch-push record-count verification (prc) work. AdminOperation
v98 lands in a separate follow-up PR (split to stay under the
enforce-lines-added 2000-line cap).

Schemas added:
- PartitionState v22 -> v23: adds `batchPushRecordCount` (long, default 0).
  Persists the server-side count of batch-push data records, used at EOP
  to verify against the producer-side `prc` PubSub header (PR linkedin#2758).
- StoreMetaValue v42 -> v43: adds `batchPushRecordCountVerificationEnabled`
  (boolean, default false). Per-store opt-in for the throw path on
  record-count deficits.

Both additive with safe defaults — backward-compatible.

`build.gradle` pins compileAvro to v22/v42 via `versionOverrides`. The
follow-up consumer-code PR will remove the pins and bump
`AvroProtocolDefinition.PARTITION_STATE` (22->23) and
`METADATA_SYSTEM_SCHEMA_STORE` (42->43) constants in lockstep.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

review-in-progress Reviewer is actively reviewing

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants