[common][da-vinci] Server-side record count verification at EOP#2665
[common][da-vinci] Server-side record count verification at EOP#2665sushantmane wants to merge 7 commits intolinkedin:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds end-to-end batch push record-count verification by propagating per-partition expected counts on EOP (via PubSub headers) and validating them server-side during ingestion, with persistence across restarts via PartitionState.
Changes:
- Add
batchPushRecordCounttoPartitionState(v21) and expose it throughOffsetRecord/PartitionConsumptionState. - Track per-partition record counts in VPJ (MR + Spark) and attach them to EOP via a new
prcPubSub header. - Verify expected vs. actual counts at EOP in
StoreIngestionTask, emit a host-level mismatch metric, and optionally fail ingestion via a new server config flag.
Reviewed changes
Copilot reviewed 26 out of 26 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestBatchPushRecordCountVerification.java | New E2E tests covering basic + chunked batch push with verification enabled. |
| internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java | Unit tests ensuring EOP broadcasts include/omit prc headers as expected. |
| internal/venice-common/src/test/java/com/linkedin/venice/offsets/TestOffsetRecord.java | Tests for new batch-push record-count getter/setter and serialization. |
| internal/venice-common/src/main/resources/avro/PartitionState/v21/PartitionState.avsc | New PartitionState schema v21 adding batchPushRecordCount. |
| internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java | Add EOP broadcast overload to attach per-partition record counts as PubSub headers. |
| internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java | Bump PARTITION_STATE protocol version 20 → 21. |
| internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeaders.java | Add VENICE_PARTITION_RECORD_COUNT_HEADER (prc) constant. |
| internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java | Initialize + expose persisted batchPushRecordCount in PartitionState. |
| internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java | Add server config key for strict verification enablement. |
| clients/venice-push-job/src/test/java/com/linkedin/venice/spark/datawriter/task/MapLongAccumulatorTest.java | Tests for Spark accumulator used to collect per-partition counts. |
| clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/mapreduce/counter/MRJobCounterHelperTest.java | Tests for MR per-partition record-count counters parsing/incrementing. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/task/SparkDataWriterTaskTracker.java | Implement per-partition record counting for Spark pushes. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/task/MapLongAccumulator.java | New Spark accumulator to aggregate per-partition counts. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/task/DataWriterAccumulators.java | Register new per-partition record-count accumulator. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/DataWriterTaskTracker.java | Extend tracker interface to support per-partition record counts. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java | Hook per-record production to increment per-partition tracking. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/task/ReporterBackedMapReduceDataWriterTaskTracker.java | Implement MR Reporter-backed per-partition counting. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/task/CounterBackedMapReduceDataWriterTaskTracker.java | Expose per-partition counts from MR counters. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/counter/MRJobCounterHelper.java | Add MR counter group + helpers for per-partition record counts. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java | Collect per-partition counts and pass them into broadcastEndOfPush. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskRecordCountTest.java | New unit tests for verification logic and strict/lenient behavior. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java | Test increment/get path for the persisted record count. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java | Add batch_push_record_count_mismatch metric. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java | Count logical records pre-EOP and verify against EOP header at EOP. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java | Add increment/get helpers for the persisted batch-push record count. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java | Add server.batch.push.record.count.verification.enabled (default true). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
c3c1c21 to
b1cef44
Compare
verification Track per-partition record counts during batch push and embed them as PubSub message headers on the End-of-Push control message. This enables server-side verification (in a follow-up PR) without requiring changes to the EOP Avro schema. Changes: - PubSubMessageHeaders: new "prc" header constant - VeniceWriter: new broadcastEndOfPush/sendControlMessage overloads with headers - DataWriterTaskTracker: per-partition tracking interface - MR path: counter group in MRJobCounterHelper + tracker implementations - Spark path: new MapLongAccumulator + tracker implementations - VenicePushJob: collect per-partition counts, pass to broadcastEndOfPush
…Map, rename test Code review findings: - VeniceWriter: make 5-param sendControlMessage delegate to 6-param overload, eliminating duplicated getDebugInfo/isEndOfSegment/synchronized logic - MapLongAccumulator: ConcurrentHashMap → HashMap — Spark tasks are single-threaded (each gets a fresh copy()), no concurrent access occurs - MRJobCounterHelperTest: rename misleading test to reflect what it actually tests (counter retrieval round-trip, not Reporter-based increment)
Server counts ingested logical records per partition and verifies against the expected count from the EOP PubSub header. On mismatch: logs error, emits metric, fails ingestion (default enabled). Changes: - PartitionState v21: batchPushRecordCount field (remove v20 override) - StoreIngestionTask: count non-chunk PUTs/DELETEs, verify at EOP - VeniceServerConfig: feature flag (default true) - HostLevelIngestionStats: mismatch metric
Kafka provides at-least-once delivery — producer retries may cause the server to consume duplicate records. Changed verification from strict equality (actualCount != expectedCount) to deficit check (actualCount < expectedCount). This prevents false positives from Kafka duplicates while still catching genuine record loss. Also: - Add testRecordCountAboveExpectedIsAccepted test (at-least-once case) - Fix PartitionConsumptionStateTest constructor to match current API
b1cef44 to
cb76ec2
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 25 out of 25 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| private PartitionConsumptionState createPcsWithHll(int lgK) { | ||
| OffsetRecord offsetRecord = mock(OffsetRecord.class); | ||
| doReturn(null).when(offsetRecord).getUniqueIngestedKeyCountHllSketch(); | ||
| doReturn(null).when(offsetRecord).getLeaderTopic(); | ||
| PartitionConsumptionState pcs = new PartitionConsumptionState(TOPIC_PARTITION, offsetRecord, pubSubContext, false); | ||
| pcs.initializeUniqueKeyCountHll(lgK); | ||
| return pcs; | ||
| public void testBatchPushRecordCountIncrement() { | ||
| InternalAvroSpecificSerializer<com.linkedin.venice.kafka.protocol.state.PartitionState> serializer = | ||
| AvroProtocolDefinition.PARTITION_STATE.getSerializer(); | ||
| OffsetRecord offsetRecord = new OffsetRecord(serializer, pubSubContext); | ||
| PartitionConsumptionState pcs = new PartitionConsumptionState(TOPIC_PARTITION, offsetRecord, pubSubContext, false); | ||
|
|
||
| // Initial count should be 0 | ||
| assertEquals(pcs.getBatchPushRecordCount(), 0L); | ||
|
|
||
| // Increment once | ||
| pcs.incrementBatchPushRecordCount(); | ||
| assertEquals(pcs.getBatchPushRecordCount(), 1L); | ||
|
|
||
| // Increment multiple times | ||
| for (int i = 0; i < 99; i++) { | ||
| pcs.incrementBatchPushRecordCount(); | ||
| } | ||
| assertEquals(pcs.getBatchPushRecordCount(), 100L); | ||
|
|
||
| } |
There was a problem hiding this comment.
testBatchPushRecordCountIncrement is currently nested inside createPcsWithHll(int) due to a missing closing brace, and it is also missing a @Test annotation. As written, this will not compile and the test won't run. Close createPcsWithHll(int) before this method and annotate testBatchPushRecordCountIncrement with @Test (and ensure braces at the end of the class are balanced).
| { | ||
| "name": "uniqueIngestedKeyCountHllSketch", | ||
| "doc": "Serialized HyperLogLog sketch (Apache DataSketches) tracking unique keys ingested in this partition. Null if HLL tracking is not enabled or not yet initialized.", | ||
| "type": ["null", "bytes"], | ||
| "default": null | ||
| "name": "batchPushRecordCount", | ||
| "doc": "The number of logical data records consumed during batch push, used for record count verification at EOP.", | ||
| "type": "long", | ||
| "default": 0 | ||
| } |
There was a problem hiding this comment.
The v21 PartitionState schema appears to have replaced the existing uniqueIngestedKeyCountHllSketch field with batchPushRecordCount. The codebase still reads/writes partitionState.uniqueIngestedKeyCountHllSketch (e.g., OffsetRecord getters/setters), so removing it will break Avro code generation/compilation and/or drop that data on serialization. Please add batchPushRecordCount as an additional field (preferably appended) without removing the existing HLL field, preserving backward/forward compatibility.
| * Whether to strictly enforce batch push record count verification by failing ingestion when a mismatch is detected. | ||
| * Verification itself still occurs whenever the EOP header is present; when this flag is false, | ||
| * mismatches are only logged and recorded as metrics instead of failing ingestion. Default: true. |
There was a problem hiding this comment.
The Javadoc for SERVER_BATCH_PUSH_RECORD_COUNT_VERIFICATION_ENABLED says ingestion fails (or logs/metrics) “when a mismatch is detected”, but the implementation only flags a deficit (actualCount < expectedCount) and explicitly accepts actualCount > expectedCount due to at-least-once delivery. Consider updating this comment to describe the deficit-only behavior so it matches the verification logic.
| * Whether to strictly enforce batch push record count verification by failing ingestion when a mismatch is detected. | |
| * Verification itself still occurs whenever the EOP header is present; when this flag is false, | |
| * mismatches are only logged and recorded as metrics instead of failing ingestion. Default: true. | |
| * Whether to strictly enforce batch push record count verification by failing ingestion when the actual record count | |
| * is lower than the expected count. | |
| * Verification itself still occurs whenever the EOP header is present; when this flag is false, count deficits are | |
| * only logged and recorded as metrics instead of failing ingestion. Counts higher than expected may be accepted due | |
| * to at-least-once delivery semantics. Default: true. |
VALIDATION_OVERRIDE
Summary
Server counts ingested logical records per partition and verifies against the expected count from the EOP PubSub header. On mismatch: logs error, emits metric, fails ingestion (default enabled).
Part 3 of 4 — depends on Part 1 (#2663) for the
prcheader.Changes
PartitionStatev21 schema:batchPushRecordCountfield (persisted across restarts)AvroProtocolDefinition: bump PARTITION_STATE 20→21StoreIngestionTask: count non-chunk PUTs/DELETEs before EOP, verify at EOPVeniceServerConfig:server.batch.push.record.count.verification.enabled(default true)HostLevelIngestionStats:batch_push_record_count_mismatchmetricChunking-aware
Only counts logical records (skips
CHUNK_SCHEMA_ID). Chunk manifests are counted.Test plan
StoreIngestionTaskRecordCountTest— 9 tests (all verification branches)PartitionConsumptionStateTest— increment/getTestOffsetRecord— serialization round-tripTestBatchPushRecordCountVerification— E2E basic + chunked with strict verification