[vpj] Add HLL-based repush pipeline integrity verification#2666
[vpj] Add HLL-based repush pipeline integrity verification#2666sushantmane wants to merge 10 commits intolinkedin:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds HyperLogLog (HLL)-based integrity verification for Kafka Input Format (KIF) repush jobs by estimating read-side unique-key cardinality and comparing it against write-side output counts, with optional per-partition diagnostics.
Changes:
- Introduces a standalone
HyperLogLogSketchutility (plus comprehensive tests) and Spark accumulators to track unique-key cardinality (aggregate + per-partition). - Adds repush verification logic in the job driver to fail the push when divergence exceeds a configurable tolerance, and adds new repush configs/constants.
- Extends batch push plumbing to carry per-partition output record counts (tracking + EOP header propagation) for verification.
Reviewed changes
Copilot reviewed 23 out of 23 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/venice-common/src/main/java/com/linkedin/venice/utils/HyperLogLogSketch.java | New pure-Java HLL implementation used for cardinality estimation. |
| internal/venice-common/src/test/java/com/linkedin/venice/utils/HyperLogLogSketchTest.java | Extensive HLL correctness/merge/serialization tests. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/task/HyperLogLogAccumulator.java | Spark accumulator wrapping HyperLogLogSketch for aggregate unique count. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/task/MapHyperLogLogAccumulator.java | Per-partition HLL accumulator for diagnostic comparisons. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/jobs/DataWriterComputeJob.java | Adds driver-side repush HLL verification and diagnostic per-partition logging. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/jobs/AbstractDataWriterSparkJob.java | Wires HLL accumulators and updates read path to feed HLL during repush. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java | Reads new repush configs and passes per-partition record counts to EOP broadcast. |
| internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java | Adds EOP broadcast overload that can attach per-partition record-count headers. |
| internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeaders.java | Defines the prc header constant for per-partition record counts. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/counter/MRJobCounterHelper.java | Adds MR counters read/write helpers for per-partition record counts. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/task/MapLongAccumulator.java | New Spark accumulator for per-partition long counters (output record counts). |
| clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/task/SparkDataWriterTaskTracker.java | Exposes per-partition record counts + read-side HLL estimates via tracker API. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/DataWriterTaskTracker.java | Extends tracker interface with default methods for per-partition counts + HLL. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java | Records per-partition output counts on each PubSub write. |
| internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java | Adds unit tests asserting EOP carries (or omits) prc header as expected. |
| clients/venice-push-job/src/test/java/.../HyperLogLogAccumulatorTest.java | Tests Spark HLL accumulator semantics (merge/reset/accuracy). |
| clients/venice-push-job/src/test/java/.../MapLongAccumulatorTest.java | Tests Spark per-partition counter accumulator semantics. |
| clients/venice-push-job/src/test/java/.../MRJobCounterHelperTest.java | Tests MR per-partition counter extraction and null reporter behavior. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/vpj/VenicePushJobConstants.java | Adds repush HLL verification config keys and defaults. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/PushJobSetting.java | Stores new repush HLL verification settings on the job setting object. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
9c9427a to
69c0c85
Compare
verification Track per-partition record counts during batch push and embed them as PubSub message headers on the End-of-Push control message. This enables server-side verification (in a follow-up PR) without requiring changes to the EOP Avro schema. Changes: - PubSubMessageHeaders: new "prc" header constant - VeniceWriter: new broadcastEndOfPush/sendControlMessage overloads with headers - DataWriterTaskTracker: per-partition tracking interface - MR path: counter group in MRJobCounterHelper + tracker implementations - Spark path: new MapLongAccumulator + tracker implementations - VenicePushJob: collect per-partition counts, pass to broadcastEndOfPush
Pure-Java HyperLogLog implementation in venice-common for estimating the number of distinct elements in a multiset. Designed for use across VPJ (Spark accumulators), server-side (PCS), and other components. Features: - Configurable precision (p=4..18, default 14 = ~0.8% error, 16KB) - add(byte[]) and addHash(long) for flexible key input - merge() for combining sketches (associative, commutative, idempotent) - Compact serialization: toBytes/fromBytes, toByteBuffer/fromByteBuffer Format: [1 byte precision][2^p bytes registers] - Static hash64() method (FNV-1a + MurmurHash3 fmix64) 46 tests: accuracy, merge (disjoint/overlap/identical), split-merge simulation at max precision, serialization round-trip, custom precision, error handling, hash quality.
…Map, rename test Code review findings: - VeniceWriter: make 5-param sendControlMessage delegate to 6-param overload, eliminating duplicated getDebugInfo/isEndOfSegment/synchronized logic - MapLongAccumulator: ConcurrentHashMap → HashMap — Spark tasks are single-threaded (each gets a fresh copy()), no concurrent access occurs - MRJobCounterHelperTest: rename misleading test to reflect what it actually tests (counter retrieval round-trip, not Reporter-based increment)
Human reviewer feedback (m-nagarajan, pthirun): - Add large-range correction in estimate() per Flajolet paper — fixes systematic underestimation above ~143M unique keys - Use Arrays.copyOfRange in fromBytes() instead of manual arraycopy - Extract updateRegister() private helper to deduplicate add/addHash - Fix copy() double-allocation: create via public constructor + arraycopy instead of private constructor that was also allocating Copilot feedback: - Add null check in merge() with clear error message - Add null check in hash64() with IllegalArgumentException - Use registers[i] & 0xFF in estimate() to treat registers as unsigned - Fix testHash64Distribution: check all 4 quadrants (top 2 bits) - Replace 10M-key test with 1M-key test for CI speed - Fix private constructor Javadoc to say "takes ownership" - Add testHash64NullThrows and testMergeNullThrows
sumane/pr4-repush-hll-verification
During KIF repush, track read-side unique key cardinality via HLL and compare with write-side output count at the driver. Two-tier verification: aggregate HLL (fails push) + per-partition HLL (diagnostic log).
69c0c85 to
577c411
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 23 out of 23 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public class HyperLogLogSketch { | ||
| /** Default precision. 2^14 = 16384 registers, ~0.8% standard error, 16 KB memory. */ | ||
| public static final int DEFAULT_PRECISION = 14; | ||
|
|
||
| /** Minimum supported precision (64 registers, ~26% error). */ | ||
| public static final int MIN_PRECISION = 4; | ||
|
|
||
| /** Maximum supported precision (2^18 = 262144 registers, ~0.1% error, 256 KB). */ | ||
| public static final int MAX_PRECISION = 18; | ||
|
|
||
| private final int p; | ||
| private final int m; // 2^p | ||
| private final double alphaM; | ||
| private final byte[] registers; | ||
|
|
There was a problem hiding this comment.
HyperLogLogSketch is used inside Spark AccumulatorV2 implementations (HyperLogLogAccumulator / MapHyperLogLogAccumulator). Since HyperLogLogSketch does not implement Serializable, Spark task/closure serialization can fail under the default Java serializer. Please make the sketch safely serializable (e.g., implement java.io.Serializable with a serialVersionUID, or implement custom serialization in the accumulator and keep only primitive/byte[] state).
| } | ||
|
|
||
| @Override | ||
| public void merge(AccumulatorV2<byte[], Long> other) { |
There was a problem hiding this comment.
merge(...) blindly casts other to HyperLogLogAccumulator, which can surface as a ClassCastException with an unhelpful stack trace if Spark ever merges an unexpected accumulator type. Consider adding an instanceof check (and a clear IllegalArgumentException) similar to MapLongAccumulator.merge(...).
| public void merge(AccumulatorV2<byte[], Long> other) { | |
| public void merge(AccumulatorV2<byte[], Long> other) { | |
| if (!(other instanceof HyperLogLogAccumulator)) { | |
| throw new IllegalArgumentException( | |
| "Cannot merge HyperLogLogAccumulator with " | |
| + (other == null ? "null" : other.getClass().getName())); | |
| } |
| } | ||
|
|
||
| @Override | ||
| public void add(Tuple2<Integer, byte[]> v) { |
There was a problem hiding this comment.
add(...) always creates a per-partition HyperLogLogAccumulator via computeIfAbsent(...) even when the key is null/empty (which HyperLogLogAccumulator.add ignores). This can leave empty sketches in hllMap, making isZero() inaccurate and wasting memory. Consider checking v._2() for null/empty before allocating/adding, or delaying computeIfAbsent until after validating the key.
| public void add(Tuple2<Integer, byte[]> v) { | |
| public void add(Tuple2<Integer, byte[]> v) { | |
| if (v == null || v._2() == null || v._2().length == 0) { | |
| return; | |
| } |
| @Override | ||
| public void merge(AccumulatorV2<Tuple2<Integer, byte[]>, Map<Integer, Long>> other) { | ||
| MapHyperLogLogAccumulator otherAcc = (MapHyperLogLogAccumulator) other; | ||
| otherAcc.hllMap.forEach((partition, otherHll) -> { | ||
| hllMap.merge(partition, otherHll, (existing, incoming) -> { | ||
| existing.merge(incoming); | ||
| return existing; | ||
| }); | ||
| }); | ||
| } |
There was a problem hiding this comment.
merge(...) casts other to MapHyperLogLogAccumulator without an instanceof check. If Spark (or future refactors) ever call merge with a different accumulator instance, this will throw ClassCastException. Please add a type check and throw a clear IllegalArgumentException (as done in MapLongAccumulator.merge).
| @Test | ||
| public void testHash64QualityAt1MKeys() { | ||
| int n = 1_000_000; | ||
| HyperLogLogSketch hll = new HyperLogLogSketch(); | ||
| for (int i = 0; i < n; i++) { | ||
| hll.add(("key-" + i).getBytes(StandardCharsets.UTF_8)); | ||
| } | ||
| long estimate = hll.estimate(); | ||
| double relativeError = Math.abs((double) (estimate - n)) / n; | ||
| assertTrue(relativeError < 0.02, "At 1M keys, relative error " + relativeError + " exceeds 2%"); | ||
| } |
There was a problem hiding this comment.
testHash64QualityAt1MKeys adds 1,000,000 distinct keys, which can make unit test runs noticeably slower and more variable across CI environments. Consider either reducing n (while keeping enough signal for the assertion) or adding a timeOut to keep the suite from hanging on slow builders.
Summary
During KIF repush, track read-side unique key cardinality via HLL and compare with write-side output count at the driver. Catches silent record loss within the VPJ pipeline.
Part 4 of 4 — depends on Part 1 (#2663) and Part 2 (#2664).
Two-tier verification
HLL_cardinality ≈ outputCount + ttlFiltered. Fails push if divergence > tolerance.Key design decisions
DataWriterAccumulatorsto prevent Spark task serialization issuesisSourceKafka=true)MapHyperLogLogAccumulator.merge()does per-partition HLL union (element-wise max)Config
repush.hll.verification.enabled(default true)repush.hll.error.tolerance(default 5%)Test plan
HyperLogLogAccumulatorTest— 9 tests