Skip to content

[common][vpj] Tag EOP with per-partition record counts for batch push verification#2663

Closed
sushantmane wants to merge 8 commits intolinkedin:mainfrom
sushantmane:sumane/pr1-eop-record-count-tagging
Closed

[common][vpj] Tag EOP with per-partition record counts for batch push verification#2663
sushantmane wants to merge 8 commits intolinkedin:mainfrom
sushantmane:sumane/pr1-eop-record-count-tagging

Conversation

@sushantmane
Copy link
Copy Markdown
Contributor

Summary

Track per-partition record counts during batch push and embed them as PubSub message headers on the End-of-Push control message. This enables server-side verification (in a follow-up PR) without requiring changes to the EOP Avro schema.

Part 1 of 4 in the batch push record count verification series.

Changes

  • PubSubMessageHeaders: new prc header constant
  • VeniceWriter: new broadcastEndOfPush/sendControlMessage overloads with per-partition headers (8-byte long per partition)
  • DataWriterTaskTracker: per-partition tracking interface (trackRecordSentToPubSubForPartition, getPerPartitionRecordCounts)
  • MR path: counter group in MRJobCounterHelper + tracker implementations
  • Spark path: new MapLongAccumulator + tracker implementations
  • VenicePushJob: collect per-partition counts, pass to broadcastEndOfPush

Backward compatibility

Old servers ignore unknown PubSub headers — no impact.

Test plan

  • VeniceWriterUnitTest — broadcastEndOfPush with/without counts
  • MRJobCounterHelperTest — per-partition counter round-trip
  • MapLongAccumulatorTest — add, merge, reset, copy

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds per-partition record-count tracking in VPJ and propagates those counts on the End-of-Push (EOP) control message via a new PubSub header (prc), enabling downstream batch-push verification without changing the EOP Avro schema.

Changes:

  • Introduces VENICE_PARTITION_RECORD_COUNT_HEADER (prc) and wires VeniceWriter.broadcastEndOfPush(...) to optionally attach per-partition counts as PubSub headers.
  • Implements per-partition record counting in both MR (counters) and Spark (custom MapLongAccumulator) via DataWriterTaskTracker additions.
  • Updates VenicePushJob to collect per-partition counts from the task tracker and pass them to broadcastEndOfPush, with unit tests for writer, MR counters, and Spark accumulator.

Reviewed changes

Copilot reviewed 14 out of 14 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeaders.java Adds the prc header constant for per-partition record counts.
internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java Adds EOP/control-message overloads to emit per-partition record counts in PubSub headers.
internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java Tests broadcastEndOfPush with/without per-partition record-count headers.
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/DataWriterTaskTracker.java Extends the tracker interface with per-partition tracking + retrieval methods.
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java Calls the new per-partition tracking hook when sending a record.
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/counter/MRJobCounterHelper.java Adds MR counter group + helpers for per-partition record counts.
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/task/ReporterBackedMapReduceDataWriterTaskTracker.java Tracks per-partition counts via Reporter-backed MR counters.
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/task/CounterBackedMapReduceDataWriterTaskTracker.java Exposes per-partition counts from MR Counters after job completion.
clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/task/MapLongAccumulator.java Adds a Spark accumulator for per-key (partition) long aggregation.
clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/task/DataWriterAccumulators.java Registers the new per-partition accumulator with Spark.
clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/task/SparkDataWriterTaskTracker.java Tracks and returns per-partition record counts via the Spark accumulator.
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java Collects per-partition record counts from the tracker and passes them into EOP broadcast.
clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/counter/MRJobCounterHelperTest.java Adds tests for per-partition counter parsing (and related behaviors).
clients/venice-push-job/src/test/java/com/linkedin/venice/spark/datawriter/task/MapLongAccumulatorTest.java Adds unit tests for accumulator add/merge/reset/copy/immutability.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copilot AI review requested due to automatic review settings March 31, 2026 17:25
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 14 out of 14 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copilot AI review requested due to automatic review settings April 10, 2026 01:16
@sushantmane sushantmane force-pushed the sumane/pr1-eop-record-count-tagging branch from c83652b to 9b2b905 Compare April 10, 2026 01:16
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 14 out of 14 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@sushantmane sushantmane requested a review from ymuppala April 10, 2026 17:22
@sushantmane sushantmane added the needs-reviewer Looking for a reviewer to pick this up label Apr 10, 2026
@ymuppala
Copy link
Copy Markdown
Collaborator

ymuppala commented Apr 13, 2026

The PR's accumulator-based counting is unreliable under Spark speculative execution and task retries due to failures. We shouldn't rely on an inaccurate signal to validate the data on server side. Replacing the accumulator with more deterministic dataframe.count() will improve reliability of the data. There will be additional cost incurred in running the count() action, but IMO it's worthwhile in this case

Another thought while you work on the server side PR. We should account for at least once semantics for Kafka as in
server_consumed_count >= expected_count in EOP

@sushantmane sushantmane added requested-changes Reviewer requested changes and removed needs-reviewer Looking for a reviewer to pick this up labels Apr 13, 2026
@sushantmane
Copy link
Copy Markdown
Contributor Author

sushantmane commented Apr 13, 2026

The PR's accumulator-based counting is unreliable under Spark speculative execution and task retries due to failures. We shouldn't rely on an inaccurate signal to validate the data on server side. Replacing the accumulator with more deterministic dataframe.count() will improve reliability of the data. There will be additional cost incurred in running the count() action, but IMO it's worthwhile in this case

Another thought while you work on the server side PR. We should account for at least once semantics for Kafka as in server_consumed_count >= expected_count in EOP

IMO verification is a best-effort mechanism. I do not think that is a good justification for using dataFrame.count().

As for Kafka duplicate delivery issue, we already detect those cases and drop records that are duplicates as part of DIV, which will happen before we increment counters

Copilot AI review requested due to automatic review settings April 13, 2026 22:13
@sushantmane sushantmane force-pushed the sumane/pr1-eop-record-count-tagging branch from a4f4fbb to eeca5d2 Compare April 13, 2026 22:13
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 14 out of 14 changed files in this pull request and generated no new comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@ymuppala
Copy link
Copy Markdown
Collaborator

ymuppala commented Apr 13, 2026

The PR's accumulator-based counting is unreliable under Spark speculative execution and task retries due to failures. We shouldn't rely on an inaccurate signal to validate the data on server side. Replacing the accumulator with more deterministic dataframe.count() will improve reliability of the data. There will be additional cost incurred in running the count() action, but IMO it's worthwhile in this case
Another thought while you work on the server side PR. We should account for at least once semantics for Kafka as in server_consumed_count >= expected_count in EOP

IMO verification is a best-effort mechanism. I do not think that is a good justification for using dataFrame.count().

As for Kafka duplicate delivery issue, we already detect those cases and drop records that are duplicates as part of DIV, which will happen before we increment counters

I mean, if we are going to fail a push job based on the counts, the counts have to be accurate. This process automation of validation cannot be a best effort, it has to be reliable. Currently, the only spark action in VPJ is the count() action. We can change it to a per partition count. What is your argument for against using count?

Sounds good on the kafka duplicates.

verification

Track per-partition record counts during batch push and embed them as
PubSub message headers on the End-of-Push control message. This enables
server-side verification (in a follow-up PR) without requiring changes
to the EOP Avro schema.

Changes:
- PubSubMessageHeaders: new "prc" header constant
- VeniceWriter: new broadcastEndOfPush/sendControlMessage overloads with
headers
- DataWriterTaskTracker: per-partition tracking interface
- MR path: counter group in MRJobCounterHelper + tracker implementations
- Spark path: new MapLongAccumulator + tracker implementations
- VenicePushJob: collect per-partition counts, pass to broadcastEndOfPush
…Map,

rename test

Code review findings:
- VeniceWriter: make 5-param sendControlMessage delegate to 6-param overload,
  eliminating duplicated getDebugInfo/isEndOfSegment/synchronized logic
- MapLongAccumulator: ConcurrentHashMap → HashMap — Spark tasks are
  single-threaded (each gets a fresh copy()), no concurrent access occurs
- MRJobCounterHelperTest: rename misleading test to reflect what it actually
  tests (counter retrieval round-trip, not Reporter-based increment)
Move accumulator/counter update from the per-record hot path
(sendMessageToKafka) to the close() method. The local messageSent
field already tracks the exact count — flush it to the tracker once
when the partition writer closes. This eliminates per-record
Tuple2 allocations (Spark) and counter increments (MR).

Changed trackRecordSentToPubSubForPartition(int) to
trackRecordSentToPubSubForPartition(int, long) to accept bulk count.
Copilot AI review requested due to automatic review settings April 14, 2026 18:05
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 14 out of 14 changed files in this pull request and generated no new comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

TestBatchPushEopRecordCount runs a batch push with
SEND_CONTROL_MESSAGES_DIRECTLY=true and verifies:
- Per-partition record counts are populated (non-empty map)
- Sum of per-partition counts equals total records pushed (100)
- Each partition has a non-negative count
- Push completes successfully (data is readable)

Uses a VenicePushJob subclass to capture the per-partition counts
via reflection on the private getPerPartitionRecordCounts() method.
Copilot AI review requested due to automatic review settings April 14, 2026 21:46
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 15 out of 15 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +120 to +129
public void run() {
super.run();
// Capture the counts after the job completes but before close
try {
java.lang.reflect.Method method = VenicePushJob.class.getDeclaredMethod("getPerPartitionRecordCounts");
method.setAccessible(true);
capturedCounts = (Map<Integer, Long>) method.invoke(this);
} catch (Exception e) {
throw new RuntimeException("Failed to capture per-partition record counts", e);
}
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test uses reflection to call the private VenicePushJob#getPerPartitionRecordCounts() method. That’s brittle (method rename/visibility changes will break at runtime) and bypasses compile-time checks. Prefer exposing a test-friendly hook (e.g., @VisibleForTesting protected Map<Integer,Long> getPerPartitionRecordCounts() or a protected accessor on the task tracker) so the subclass can call it directly without reflection.

Suggested change
public void run() {
super.run();
// Capture the counts after the job completes but before close
try {
java.lang.reflect.Method method = VenicePushJob.class.getDeclaredMethod("getPerPartitionRecordCounts");
method.setAccessible(true);
capturedCounts = (Map<Integer, Long>) method.invoke(this);
} catch (Exception e) {
throw new RuntimeException("Failed to capture per-partition record counts", e);
}
protected void broadcastEndOfPush(Map<Integer, Long> perPartitionRecordCounts) {
capturedCounts =
perPartitionRecordCounts == null ? null : new java.util.HashMap<>(perPartitionRecordCounts);
super.broadcastEndOfPush(perPartitionRecordCounts);

Copilot uses AI. Check for mistakes.
Comment on lines +1374 to +1384
// Capture all sendMessage calls (SOS + EOP + EOS for each partition)
ArgumentCaptor<Integer> partitionCaptor = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<KafkaMessageEnvelope> kmeCaptor = ArgumentCaptor.forClass(KafkaMessageEnvelope.class);
ArgumentCaptor<PubSubMessageHeaders> headersCaptor = ArgumentCaptor.forClass(PubSubMessageHeaders.class);
verify(mockedProducer, atLeast(6)).sendMessage(
anyString(),
partitionCaptor.capture(),
any(),
kmeCaptor.capture(),
headersCaptor.capture(),
any());
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The verification threshold atLeast(6) is too low for what this test claims to capture (“SOS + EOP + EOS for each partition”). broadcastEndOfPush() will typically produce 3 control messages per partition (SOS triggered by first message, then EOP, then EOS), so for 3 partitions you can assert times(9) (or otherwise assert the expected minimum per-partition) to make the test catch regressions where EOS/SOS stops being emitted.

Copilot uses AI. Check for mistakes.
// Capture all sendMessage calls
ArgumentCaptor<PubSubMessageHeaders> headersCaptor = ArgumentCaptor.forClass(PubSubMessageHeaders.class);
ArgumentCaptor<KafkaMessageEnvelope> kmeCaptor = ArgumentCaptor.forClass(KafkaMessageEnvelope.class);
verify(mockedProducer, atLeast(4))
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly here, verify(..., atLeast(4)) is lower than the expected number of control-message sends for 2 partitions when calling broadcastEndOfPush() (typically SOS + EOP + EOS per partition). Tightening this to the expected count (or at least >= 6) will make the test meaningfully assert the full control-message sequence, not just that “something” was sent.

Suggested change
verify(mockedProducer, atLeast(4))
verify(mockedProducer, times(partitionCount * 3))

Copilot uses AI. Check for mistakes.
Comment on lines +81 to +102
Assert.assertEquals(
totalFromPartitions,
DEFAULT_USER_DATA_RECORD_COUNT,
"Sum of per-partition record counts (" + perPartitionCounts + ") should equal total records pushed ("
+ DEFAULT_USER_DATA_RECORD_COUNT + ")");

// Verify each partition has a non-negative count
for (Map.Entry<Integer, Long> entry: perPartitionCounts.entrySet()) {
Assert.assertTrue(
entry.getValue() >= 0,
"Partition " + entry.getKey() + " should have non-negative count, got " + entry.getValue());
}

// Verify push actually succeeded — data is readable
veniceCluster.useControllerClient(controllerClient -> {
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, true, () -> {
int currentVersion = controllerClient.getStore(storeName).getStore().getCurrentVersion();
Assert.assertTrue(currentVersion > 0, "Store should have a current version");
});
});

pushJob.close();
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pushJob.close() is only called at the end of the test. If pushJob.run() or any assertion throws, the job/underlying resources won’t be closed, which can leak threads and make subsequent integration tests flaky. Wrap the push job in a try/finally (or try-with-resources if applicable) so close() is guaranteed to run.

Suggested change
Assert.assertEquals(
totalFromPartitions,
DEFAULT_USER_DATA_RECORD_COUNT,
"Sum of per-partition record counts (" + perPartitionCounts + ") should equal total records pushed ("
+ DEFAULT_USER_DATA_RECORD_COUNT + ")");
// Verify each partition has a non-negative count
for (Map.Entry<Integer, Long> entry: perPartitionCounts.entrySet()) {
Assert.assertTrue(
entry.getValue() >= 0,
"Partition " + entry.getKey() + " should have non-negative count, got " + entry.getValue());
}
// Verify push actually succeeded — data is readable
veniceCluster.useControllerClient(controllerClient -> {
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, true, () -> {
int currentVersion = controllerClient.getStore(storeName).getStore().getCurrentVersion();
Assert.assertTrue(currentVersion > 0, "Store should have a current version");
});
});
pushJob.close();
try {
Assert.assertEquals(
totalFromPartitions,
DEFAULT_USER_DATA_RECORD_COUNT,
"Sum of per-partition record counts (" + perPartitionCounts + ") should equal total records pushed ("
+ DEFAULT_USER_DATA_RECORD_COUNT + ")");
// Verify each partition has a non-negative count
for (Map.Entry<Integer, Long> entry: perPartitionCounts.entrySet()) {
Assert.assertTrue(
entry.getValue() >= 0,
"Partition " + entry.getKey() + " should have non-negative count, got " + entry.getValue());
}
// Verify push actually succeeded — data is readable
veniceCluster.useControllerClient(controllerClient -> {
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, true, () -> {
int currentVersion = controllerClient.getStore(storeName).getStore().getCurrentVersion();
Assert.assertTrue(currentVersion > 0, "Store should have a current version");
});
});
} finally {
pushJob.close();
}

Copilot uses AI. Check for mistakes.
@sushantmane
Copy link
Copy Markdown
Contributor Author

Analysis: Accumulator vs cache()+count() for per-partition record counts

Current approach: Accumulator (flush at close)

Each partition writer maintains a local messageSent counter and flushes it to the Spark accumulator (or MR counter) once at close(), after veniceWriter.flush() confirms all records were acked by Kafka.

Spark accumulator guarantees (from official docs):

"For accumulator updates performed inside actions only, Spark guarantees that each task's update to the accumulator will only be applied once, i.e. restarted tasks will not update the value."

Our accumulator update happens inside mapPartitions (an action), so Spark guarantees exactly-once merge for successful tasks. Failed/speculative tasks' accumulators are discarded.

Known risk — silent merge failure:

"When a Spark task finishes, Spark will try to merge the accumulated updates. If it fails, Spark will ignore the failure and still mark the task successful."

If merge() throws, the count is silently wrong. Mitigation: our merge() is trivial (HashMap.merge with Long::sum) — extremely unlikely to fail. Worst case: empty counts → no prc header on EOP → server skips verification (silent degradation, not false failure).

Task retry scenario: If an executor dies after writing to Kafka but before reporting task success, Spark discards the accumulator and retries the task. The retry writes records again (Kafka duplicates). The accumulator reflects the retry's count. The server-side >= check (at-least-once semantics) handles this correctly.

Alternative: cache() + count()

input → transform → repartition → cache() → countByPartition() → mapPartitions(write)

Pros:

  • Deterministic: count() is a Spark action with guaranteed correctness
  • No accumulator dependency

Cons:

  • Memory cost: Caching the entire repartitioned dataset in executor memory. For a 100GB push across executors, this is significant. Venice pushes already operate near executor memory limits. If memory is insufficient, Spark spills to disk, negating the performance benefit.
  • Without cache(): The repartition shuffle runs twice (once for count, once for write) — roughly 2x shuffle I/O overhead.
  • Per-partition counts require groupBy(partition).count(): An additional shuffle, or counting after repartition which means caching post-shuffle data.
  • Current DAG uses count() as the terminal action (line 809 in AbstractDataWriterSparkJob): restructuring the DAG to insert a counting step before the write mapPartitions is a non-trivial change.

Recommendation

Keep the accumulator approach. The exactly-once guarantee for action-side updates, combined with the close() verification (messageSent == messageCompleted) and server-side >= check, provides sufficient reliability without the memory/shuffle cost of cache() + count(). The worst-case failure mode (silent merge failure) degrades gracefully to "no verification" rather than incorrect verification.

@ymuppala
Copy link
Copy Markdown
Collaborator

Analysis: Accumulator vs cache()+count() for per-partition record counts

Current approach: Accumulator (flush at close)

Each partition writer maintains a local messageSent counter and flushes it to the Spark accumulator (or MR counter) once at close(), after veniceWriter.flush() confirms all records were acked by Kafka.

Spark accumulator guarantees (from official docs):

"For accumulator updates performed inside actions only, Spark guarantees that each task's update to the accumulator will only be applied once, i.e. restarted tasks will not update the value."

Our accumulator update happens inside mapPartitions (an action), so Spark guarantees exactly-once merge for successful tasks. Failed/speculative tasks' accumulators are discarded.

Known risk — silent merge failure:

"When a Spark task finishes, Spark will try to merge the accumulated updates. If it fails, Spark will ignore the failure and still mark the task successful."

If merge() throws, the count is silently wrong. Mitigation: our merge() is trivial (HashMap.merge with Long::sum) — extremely unlikely to fail. Worst case: empty counts → no prc header on EOP → server skips verification (silent degradation, not false failure).

Task retry scenario: If an executor dies after writing to Kafka but before reporting task success, Spark discards the accumulator and retries the task. The retry writes records again (Kafka duplicates). The accumulator reflects the retry's count. The server-side >= check (at-least-once semantics) handles this correctly.

Alternative: cache() + count()

input → transform → repartition → cache() → countByPartition() → mapPartitions(write)

Pros:

  • Deterministic: count() is a Spark action with guaranteed correctness
  • No accumulator dependency

Cons:

  • Memory cost: Caching the entire repartitioned dataset in executor memory. For a 100GB push across executors, this is significant. Venice pushes already operate near executor memory limits. If memory is insufficient, Spark spills to disk, negating the performance benefit.
  • Without cache(): The repartition shuffle runs twice (once for count, once for write) — roughly 2x shuffle I/O overhead.
  • Per-partition counts require groupBy(partition).count(): An additional shuffle, or counting after repartition which means caching post-shuffle data.
  • Current DAG uses count() as the terminal action (line 809 in AbstractDataWriterSparkJob): restructuring the DAG to insert a counting step before the write mapPartitions is a non-trivial change.

Recommendation

Keep the accumulator approach. The exactly-once guarantee for action-side updates, combined with the close() verification (messageSent == messageCompleted) and server-side >= check, provides sufficient reliability without the memory/shuffle cost of cache() + count(). The worst-case failure mode (silent merge failure) degrades gracefully to "no verification" rather than incorrect verification.

I do not think mapPartitions is an action

@sushantmane
Copy link
Copy Markdown
Contributor Author

Closing this as changes from this PR were shipped in #2758

@sushantmane sushantmane closed this May 8, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

requested-changes Reviewer requested changes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants