Skip to content

[vpj] Add HLL-based repush pipeline integrity verification#2666

Closed
sushantmane wants to merge 10 commits intolinkedin:mainfrom
sushantmane:sumane/pr4-repush-hll-verification
Closed

[vpj] Add HLL-based repush pipeline integrity verification#2666
sushantmane wants to merge 10 commits intolinkedin:mainfrom
sushantmane:sumane/pr4-repush-hll-verification

Conversation

@sushantmane
Copy link
Copy Markdown
Contributor

Summary

During KIF repush, track read-side unique key cardinality via HLL and compare with write-side output count at the driver. Catches silent record loss within the VPJ pipeline.

Part 4 of 4 — depends on Part 1 (#2663) and Part 2 (#2664).

Two-tier verification

  • Aggregate HLL (always runs): HLL_cardinality ≈ outputCount + ttlFiltered. Fails push if divergence > tolerance.
  • Per-partition HLL (diagnostic): when partition counts match, compares per-partition HLL vs write count. Log-only.

Key design decisions

  • HLL accumulators isolated from DataWriterAccumulators to prevent Spark task serialization issues
  • Only created for KIF repush jobs (isSourceKafka=true)
  • Multi-split correctness: MapHyperLogLogAccumulator.merge() does per-partition HLL union (element-wise max)

Config

  • repush.hll.verification.enabled (default true)
  • repush.hll.error.tolerance (default 5%)
  • Spark path only (MR inherits no-op defaults)

Test plan

  • HyperLogLogAccumulatorTest — 9 tests

Copilot AI review requested due to automatic review settings March 26, 2026 23:40
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds HyperLogLog (HLL)-based integrity verification for Kafka Input Format (KIF) repush jobs by estimating read-side unique-key cardinality and comparing it against write-side output counts, with optional per-partition diagnostics.

Changes:

  • Introduces a standalone HyperLogLogSketch utility (plus comprehensive tests) and Spark accumulators to track unique-key cardinality (aggregate + per-partition).
  • Adds repush verification logic in the job driver to fail the push when divergence exceeds a configurable tolerance, and adds new repush configs/constants.
  • Extends batch push plumbing to carry per-partition output record counts (tracking + EOP header propagation) for verification.

Reviewed changes

Copilot reviewed 23 out of 23 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
internal/venice-common/src/main/java/com/linkedin/venice/utils/HyperLogLogSketch.java New pure-Java HLL implementation used for cardinality estimation.
internal/venice-common/src/test/java/com/linkedin/venice/utils/HyperLogLogSketchTest.java Extensive HLL correctness/merge/serialization tests.
clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/task/HyperLogLogAccumulator.java Spark accumulator wrapping HyperLogLogSketch for aggregate unique count.
clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/task/MapHyperLogLogAccumulator.java Per-partition HLL accumulator for diagnostic comparisons.
clients/venice-push-job/src/main/java/com/linkedin/venice/jobs/DataWriterComputeJob.java Adds driver-side repush HLL verification and diagnostic per-partition logging.
clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/jobs/AbstractDataWriterSparkJob.java Wires HLL accumulators and updates read path to feed HLL during repush.
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java Reads new repush configs and passes per-partition record counts to EOP broadcast.
internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java Adds EOP broadcast overload that can attach per-partition record-count headers.
internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeaders.java Defines the prc header constant for per-partition record counts.
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/counter/MRJobCounterHelper.java Adds MR counters read/write helpers for per-partition record counts.
clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/task/MapLongAccumulator.java New Spark accumulator for per-partition long counters (output record counts).
clients/venice-push-job/src/main/java/com/linkedin/venice/spark/datawriter/task/SparkDataWriterTaskTracker.java Exposes per-partition record counts + read-side HLL estimates via tracker API.
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/DataWriterTaskTracker.java Extends tracker interface with default methods for per-partition counts + HLL.
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java Records per-partition output counts on each PubSub write.
internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java Adds unit tests asserting EOP carries (or omits) prc header as expected.
clients/venice-push-job/src/test/java/.../HyperLogLogAccumulatorTest.java Tests Spark HLL accumulator semantics (merge/reset/accuracy).
clients/venice-push-job/src/test/java/.../MapLongAccumulatorTest.java Tests Spark per-partition counter accumulator semantics.
clients/venice-push-job/src/test/java/.../MRJobCounterHelperTest.java Tests MR per-partition counter extraction and null reporter behavior.
clients/venice-push-job/src/main/java/com/linkedin/venice/vpj/VenicePushJobConstants.java Adds repush HLL verification config keys and defaults.
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/PushJobSetting.java Stores new repush HLL verification settings on the job setting object.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@sushantmane sushantmane force-pushed the sumane/pr4-repush-hll-verification branch from 9c9427a to 69c0c85 Compare April 10, 2026 01:16
verification

Track per-partition record counts during batch push and embed them as
PubSub message headers on the End-of-Push control message. This enables
server-side verification (in a follow-up PR) without requiring changes
to the EOP Avro schema.

Changes:
- PubSubMessageHeaders: new "prc" header constant
- VeniceWriter: new broadcastEndOfPush/sendControlMessage overloads with
headers
- DataWriterTaskTracker: per-partition tracking interface
- MR path: counter group in MRJobCounterHelper + tracker implementations
- Spark path: new MapLongAccumulator + tracker implementations
- VenicePushJob: collect per-partition counts, pass to broadcastEndOfPush
Pure-Java HyperLogLog implementation in venice-common for estimating the
number of distinct elements in a multiset. Designed for use across VPJ
(Spark accumulators), server-side (PCS), and other components.

Features:
- Configurable precision (p=4..18, default 14 = ~0.8% error, 16KB)
- add(byte[]) and addHash(long) for flexible key input
- merge() for combining sketches (associative, commutative, idempotent)
- Compact serialization: toBytes/fromBytes, toByteBuffer/fromByteBuffer
  Format: [1 byte precision][2^p bytes registers]
- Static hash64() method (FNV-1a + MurmurHash3 fmix64)

46 tests: accuracy, merge (disjoint/overlap/identical), split-merge
simulation at max precision, serialization round-trip, custom precision,
error handling, hash quality.
…Map,

rename test

Code review findings:
- VeniceWriter: make 5-param sendControlMessage delegate to 6-param overload,
  eliminating duplicated getDebugInfo/isEndOfSegment/synchronized logic
- MapLongAccumulator: ConcurrentHashMap → HashMap — Spark tasks are
  single-threaded (each gets a fresh copy()), no concurrent access occurs
- MRJobCounterHelperTest: rename misleading test to reflect what it actually
  tests (counter retrieval round-trip, not Reporter-based increment)
Human reviewer feedback (m-nagarajan, pthirun):
- Add large-range correction in estimate() per Flajolet paper — fixes
  systematic underestimation above ~143M unique keys
- Use Arrays.copyOfRange in fromBytes() instead of manual arraycopy
- Extract updateRegister() private helper to deduplicate add/addHash
- Fix copy() double-allocation: create via public constructor + arraycopy
  instead of private constructor that was also allocating

Copilot feedback:
- Add null check in merge() with clear error message
- Add null check in hash64() with IllegalArgumentException
- Use registers[i] & 0xFF in estimate() to treat registers as unsigned
- Fix testHash64Distribution: check all 4 quadrants (top 2 bits)
- Replace 10M-key test with 1M-key test for CI speed
- Fix private constructor Javadoc to say "takes ownership"
- Add testHash64NullThrows and testMergeNullThrows
sumane/pr4-repush-hll-verification
During KIF repush, track read-side unique key cardinality via HLL and
compare with write-side output count at the driver. Two-tier verification:
aggregate HLL (fails push) + per-partition HLL (diagnostic log).
Copilot AI review requested due to automatic review settings April 13, 2026 22:14
@sushantmane sushantmane force-pushed the sumane/pr4-repush-hll-verification branch from 69c0c85 to 577c411 Compare April 13, 2026 22:14
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 23 out of 23 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +41 to +55
public class HyperLogLogSketch {
/** Default precision. 2^14 = 16384 registers, ~0.8% standard error, 16 KB memory. */
public static final int DEFAULT_PRECISION = 14;

/** Minimum supported precision (64 registers, ~26% error). */
public static final int MIN_PRECISION = 4;

/** Maximum supported precision (2^18 = 262144 registers, ~0.1% error, 256 KB). */
public static final int MAX_PRECISION = 18;

private final int p;
private final int m; // 2^p
private final double alphaM;
private final byte[] registers;

Copy link

Copilot AI Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HyperLogLogSketch is used inside Spark AccumulatorV2 implementations (HyperLogLogAccumulator / MapHyperLogLogAccumulator). Since HyperLogLogSketch does not implement Serializable, Spark task/closure serialization can fail under the default Java serializer. Please make the sketch safely serializable (e.g., implement java.io.Serializable with a serialVersionUID, or implement custom serialization in the accumulator and keep only primitive/byte[] state).

Copilot uses AI. Check for mistakes.
}

@Override
public void merge(AccumulatorV2<byte[], Long> other) {
Copy link

Copilot AI Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge(...) blindly casts other to HyperLogLogAccumulator, which can surface as a ClassCastException with an unhelpful stack trace if Spark ever merges an unexpected accumulator type. Consider adding an instanceof check (and a clear IllegalArgumentException) similar to MapLongAccumulator.merge(...).

Suggested change
public void merge(AccumulatorV2<byte[], Long> other) {
public void merge(AccumulatorV2<byte[], Long> other) {
if (!(other instanceof HyperLogLogAccumulator)) {
throw new IllegalArgumentException(
"Cannot merge HyperLogLogAccumulator with "
+ (other == null ? "null" : other.getClass().getName()));
}

Copilot uses AI. Check for mistakes.
}

@Override
public void add(Tuple2<Integer, byte[]> v) {
Copy link

Copilot AI Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add(...) always creates a per-partition HyperLogLogAccumulator via computeIfAbsent(...) even when the key is null/empty (which HyperLogLogAccumulator.add ignores). This can leave empty sketches in hllMap, making isZero() inaccurate and wasting memory. Consider checking v._2() for null/empty before allocating/adding, or delaying computeIfAbsent until after validating the key.

Suggested change
public void add(Tuple2<Integer, byte[]> v) {
public void add(Tuple2<Integer, byte[]> v) {
if (v == null || v._2() == null || v._2().length == 0) {
return;
}

Copilot uses AI. Check for mistakes.
Comment on lines +49 to +58
@Override
public void merge(AccumulatorV2<Tuple2<Integer, byte[]>, Map<Integer, Long>> other) {
MapHyperLogLogAccumulator otherAcc = (MapHyperLogLogAccumulator) other;
otherAcc.hllMap.forEach((partition, otherHll) -> {
hllMap.merge(partition, otherHll, (existing, incoming) -> {
existing.merge(incoming);
return existing;
});
});
}
Copy link

Copilot AI Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge(...) casts other to MapHyperLogLogAccumulator without an instanceof check. If Spark (or future refactors) ever call merge with a different accumulator instance, this will throw ClassCastException. Please add a type check and throw a clear IllegalArgumentException (as done in MapLongAccumulator.merge).

Copilot uses AI. Check for mistakes.
Comment on lines +639 to +649
@Test
public void testHash64QualityAt1MKeys() {
int n = 1_000_000;
HyperLogLogSketch hll = new HyperLogLogSketch();
for (int i = 0; i < n; i++) {
hll.add(("key-" + i).getBytes(StandardCharsets.UTF_8));
}
long estimate = hll.estimate();
double relativeError = Math.abs((double) (estimate - n)) / n;
assertTrue(relativeError < 0.02, "At 1M keys, relative error " + relativeError + " exceeds 2%");
}
Copy link

Copilot AI Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testHash64QualityAt1MKeys adds 1,000,000 distinct keys, which can make unit test runs noticeably slower and more variable across CI environments. Consider either reducing n (while keeping enough signal for the assertion) or adding a timeOut to keep the suite from hanging on slow builders.

Copilot uses AI. Check for mistakes.
@sushantmane sushantmane closed this May 8, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants