Skip to content

[common][da-vinci] Server-side record count verification at EOP#2665

Open
sushantmane wants to merge 7 commits intolinkedin:mainfrom
sushantmane:sumane/pr3-server-record-count-verification
Open

[common][da-vinci] Server-side record count verification at EOP#2665
sushantmane wants to merge 7 commits intolinkedin:mainfrom
sushantmane:sumane/pr3-server-record-count-verification

Conversation

@sushantmane
Copy link
Copy Markdown
Contributor

VALIDATION_OVERRIDE

Summary

Server counts ingested logical records per partition and verifies against the expected count from the EOP PubSub header. On mismatch: logs error, emits metric, fails ingestion (default enabled).

Part 3 of 4 — depends on Part 1 (#2663) for the prc header.

Changes

  • PartitionState v21 schema: batchPushRecordCount field (persisted across restarts)
  • AvroProtocolDefinition: bump PARTITION_STATE 20→21
  • StoreIngestionTask: count non-chunk PUTs/DELETEs before EOP, verify at EOP
  • VeniceServerConfig: server.batch.push.record.count.verification.enabled (default true)
  • HostLevelIngestionStats: batch_push_record_count_mismatch metric

Chunking-aware

Only counts logical records (skips CHUNK_SCHEMA_ID). Chunk manifests are counted.

Test plan

  • StoreIngestionTaskRecordCountTest — 9 tests (all verification branches)
  • PartitionConsumptionStateTest — increment/get
  • TestOffsetRecord — serialization round-trip
  • TestBatchPushRecordCountVerification — E2E basic + chunked with strict verification

Copilot AI review requested due to automatic review settings March 26, 2026 23:40
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds end-to-end batch push record-count verification by propagating per-partition expected counts on EOP (via PubSub headers) and validating them server-side during ingestion, with persistence across restarts via PartitionState.

Changes:

  • Add batchPushRecordCount to PartitionState (v21) and expose it through OffsetRecord/PartitionConsumptionState.
  • Track per-partition record counts in VPJ (MR + Spark) and attach them to EOP via a new prc PubSub header.
  • Verify expected vs. actual counts at EOP in StoreIngestionTask, emit a host-level mismatch metric, and optionally fail ingestion via a new server config flag.

Reviewed changes

Copilot reviewed 26 out of 26 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestBatchPushRecordCountVerification.java New E2E tests covering basic + chunked batch push with verification enabled.
internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java Unit tests ensuring EOP broadcasts include/omit prc headers as expected.
internal/venice-common/src/test/java/com/linkedin/venice/offsets/TestOffsetRecord.java Tests for new batch-push record-count getter/setter and serialization.
internal/venice-common/src/main/resources/avro/PartitionState/v21/PartitionState.avsc New PartitionState schema v21 adding batchPushRecordCount.
internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java Add EOP broadcast overload to attach per-partition record counts as PubSub headers.
internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java Bump PARTITION_STATE protocol version 20 → 21.
internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeaders.java Add VENICE_PARTITION_RECORD_COUNT_HEADER (prc) constant.
internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java Initialize + expose persisted batchPushRecordCount in PartitionState.
internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java Add server config key for strict verification enablement.
clients/venice-push-job/src/test/java/com/linkedin/venice/spark/datawriter/task/MapLongAccumulatorTest.java Tests for Spark accumulator used to collect per-partition counts.
clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/counter/MRJobCounterHelperTest.java Tests for MR per-partition record-count counters parsing/incrementing.
clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/task/SparkDataWriterTaskTracker.java Implement per-partition record counting for Spark pushes.
clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/task/MapLongAccumulator.java New Spark accumulator to aggregate per-partition counts.
clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/task/DataWriterAccumulators.java Register new per-partition record-count accumulator.
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/DataWriterTaskTracker.java Extend tracker interface to support per-partition record counts.
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java Hook per-record production to increment per-partition tracking.
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/task/ReporterBackedMapReduceDataWriterTaskTracker.java Implement MR Reporter-backed per-partition counting.
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/task/CounterBackedMapReduceDataWriterTaskTracker.java Expose per-partition counts from MR counters.
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/counter/MRJobCounterHelper.java Add MR counter group + helpers for per-partition record counts.
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java Collect per-partition counts and pass them into broadcastEndOfPush.
clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskRecordCountTest.java New unit tests for verification logic and strict/lenient behavior.
clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java Test increment/get path for the persisted record count.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java Add batch_push_record_count_mismatch metric.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java Count logical records pre-EOP and verify against EOP header at EOP.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java Add increment/get helpers for the persisted batch-push record count.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java Add server.batch.push.record.count.verification.enabled (default true).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@sushantmane sushantmane force-pushed the sumane/pr3-server-record-count-verification branch from c3c1c21 to b1cef44 Compare April 10, 2026 01:16
verification

Track per-partition record counts during batch push and embed them as
PubSub message headers on the End-of-Push control message. This enables
server-side verification (in a follow-up PR) without requiring changes
to the EOP Avro schema.

Changes:
- PubSubMessageHeaders: new "prc" header constant
- VeniceWriter: new broadcastEndOfPush/sendControlMessage overloads with
headers
- DataWriterTaskTracker: per-partition tracking interface
- MR path: counter group in MRJobCounterHelper + tracker implementations
- Spark path: new MapLongAccumulator + tracker implementations
- VenicePushJob: collect per-partition counts, pass to broadcastEndOfPush
…Map,

rename test

Code review findings:
- VeniceWriter: make 5-param sendControlMessage delegate to 6-param overload,
  eliminating duplicated getDebugInfo/isEndOfSegment/synchronized logic
- MapLongAccumulator: ConcurrentHashMap → HashMap — Spark tasks are
  single-threaded (each gets a fresh copy()), no concurrent access occurs
- MRJobCounterHelperTest: rename misleading test to reflect what it actually
  tests (counter retrieval round-trip, not Reporter-based increment)
Server counts ingested logical records per partition and verifies against
the expected count from the EOP PubSub header. On mismatch: logs error,
emits metric, fails ingestion (default enabled).

Changes:
- PartitionState v21: batchPushRecordCount field (remove v20 override)
- StoreIngestionTask: count non-chunk PUTs/DELETEs, verify at EOP
- VeniceServerConfig: feature flag (default true)
- HostLevelIngestionStats: mismatch metric
Kafka provides at-least-once delivery — producer retries may cause the
server to consume duplicate records. Changed verification from strict
equality (actualCount != expectedCount) to deficit check
(actualCount < expectedCount). This prevents false positives from
Kafka duplicates while still catching genuine record loss.

Also:
- Add testRecordCountAboveExpectedIsAccepted test (at-least-once case)
- Fix PartitionConsumptionStateTest constructor to match current API
Copilot AI review requested due to automatic review settings April 13, 2026 22:13
@sushantmane sushantmane force-pushed the sumane/pr3-server-record-count-verification branch from b1cef44 to cb76ec2 Compare April 13, 2026 22:13
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 25 out of 25 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 448 to 474
private PartitionConsumptionState createPcsWithHll(int lgK) {
OffsetRecord offsetRecord = mock(OffsetRecord.class);
doReturn(null).when(offsetRecord).getUniqueIngestedKeyCountHllSketch();
doReturn(null).when(offsetRecord).getLeaderTopic();
PartitionConsumptionState pcs = new PartitionConsumptionState(TOPIC_PARTITION, offsetRecord, pubSubContext, false);
pcs.initializeUniqueKeyCountHll(lgK);
return pcs;
public void testBatchPushRecordCountIncrement() {
InternalAvroSpecificSerializer<com.linkedin.venice.kafka.protocol.state.PartitionState> serializer =
AvroProtocolDefinition.PARTITION_STATE.getSerializer();
OffsetRecord offsetRecord = new OffsetRecord(serializer, pubSubContext);
PartitionConsumptionState pcs = new PartitionConsumptionState(TOPIC_PARTITION, offsetRecord, pubSubContext, false);

// Initial count should be 0
assertEquals(pcs.getBatchPushRecordCount(), 0L);

// Increment once
pcs.incrementBatchPushRecordCount();
assertEquals(pcs.getBatchPushRecordCount(), 1L);

// Increment multiple times
for (int i = 0; i < 99; i++) {
pcs.incrementBatchPushRecordCount();
}
assertEquals(pcs.getBatchPushRecordCount(), 100L);

}
Copy link

Copilot AI Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testBatchPushRecordCountIncrement is currently nested inside createPcsWithHll(int) due to a missing closing brace, and it is also missing a @Test annotation. As written, this will not compile and the test won't run. Close createPcsWithHll(int) before this method and annotate testBatchPushRecordCountIncrement with @Test (and ensure braces at the end of the class are balanced).

Copilot uses AI. Check for mistakes.
Comment on lines 309 to 314
{
"name": "uniqueIngestedKeyCountHllSketch",
"doc": "Serialized HyperLogLog sketch (Apache DataSketches) tracking unique keys ingested in this partition. Null if HLL tracking is not enabled or not yet initialized.",
"type": ["null", "bytes"],
"default": null
"name": "batchPushRecordCount",
"doc": "The number of logical data records consumed during batch push, used for record count verification at EOP.",
"type": "long",
"default": 0
}
Copy link

Copilot AI Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The v21 PartitionState schema appears to have replaced the existing uniqueIngestedKeyCountHllSketch field with batchPushRecordCount. The codebase still reads/writes partitionState.uniqueIngestedKeyCountHllSketch (e.g., OffsetRecord getters/setters), so removing it will break Avro code generation/compilation and/or drop that data on serialization. Please add batchPushRecordCount as an additional field (preferably appended) without removing the existing HLL field, preserving backward/forward compatibility.

Copilot uses AI. Check for mistakes.
Comment on lines +1185 to +1187
* Whether to strictly enforce batch push record count verification by failing ingestion when a mismatch is detected.
* Verification itself still occurs whenever the EOP header is present; when this flag is false,
* mismatches are only logged and recorded as metrics instead of failing ingestion. Default: true.
Copy link

Copilot AI Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Javadoc for SERVER_BATCH_PUSH_RECORD_COUNT_VERIFICATION_ENABLED says ingestion fails (or logs/metrics) “when a mismatch is detected”, but the implementation only flags a deficit (actualCount < expectedCount) and explicitly accepts actualCount > expectedCount due to at-least-once delivery. Consider updating this comment to describe the deficit-only behavior so it matches the verification logic.

Suggested change
* Whether to strictly enforce batch push record count verification by failing ingestion when a mismatch is detected.
* Verification itself still occurs whenever the EOP header is present; when this flag is false,
* mismatches are only logged and recorded as metrics instead of failing ingestion. Default: true.
* Whether to strictly enforce batch push record count verification by failing ingestion when the actual record count
* is lower than the expected count.
* Verification itself still occurs whenever the EOP header is present; when this flag is false, count deficits are
* only logged and recorded as metrics instead of failing ingestion. Counts higher than expected may be accepted due
* to at-least-once delivery semantics. Default: true.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants