diff --git a/build.gradle b/build.gradle
index 73710610972..9cf526949d3 100644
--- a/build.gradle
+++ b/build.gradle
@@ -329,17 +329,15 @@ subprojects {
// when actually using the new protocol. Example to pin KME to v12 when introducing v13:
// project(':internal:venice-common').file('src/main/resources/avro/KafkaMessageEnvelope/v12', PathValidation.DIRECTORY)
def versionOverrides = [
- // PartitionState v23 introduces `batchPushRecordCount` for server-side batch-push record-count
- // verification. Pinned to v22 until the consumer code (PartitionConsumptionState +
- // StoreIngestionTask) lands in a follow-up PR.
- project(':internal:venice-common').file('src/main/resources/avro/PartitionState/v22', PathValidation.DIRECTORY),
- // StoreMetaValue v43 introduces `batchPushRecordCountVerificationEnabled`. Pinned to v42
- // until the per-store flag wiring (Store / ZKStore / ReadOnlyStore / SystemStore / StoreInfo /
- // UpdateStoreQueryParams) lands in a follow-up PR.
+ // StoreMetaValue v43 added `batchPushRecordCountVerificationEnabled` for the originally-
+ // planned per-store flag, but this PR pivoted to a server-side server-config gate
+ // (see `server.batch.push.record.count.verification.fail.on.mismatch.enabled`) and the
+ // field is no longer read or written by anyone. Pin to v42 so the field doesn't enter
+ // the active protocol.
project(':internal:venice-common').file('src/main/resources/avro/StoreMetaValue/v42', PathValidation.DIRECTORY),
- // AdminOperation v98 carries `batchPushRecordCountVerificationEnabled` on the SetStore admin op
- // for upcoming server-side batch-push record-count verification. Pinned to v97 until the
- // controller-side wiring (VeniceParentHelixAdmin / AdminExecutionTask) lands in a follow-up PR.
+ // AdminOperation v98 carried `batchPushRecordCountVerificationEnabled` on the SetStore
+ // admin op for the same originally-planned per-store flag. Pin to v97 for the same
+ // reason as StoreMetaValue v43 above.
project(':services:venice-controller').file('src/main/resources/avro/AdminOperation/v97', PathValidation.DIRECTORY)
]
diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java
index 2888a1e4769..4a714a07417 100644
--- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java
+++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java
@@ -78,6 +78,7 @@
import static com.linkedin.venice.ConfigKeys.SERVER_ADAPTIVE_THROTTLER_SIGNAL_REFRESH_INTERVAL_IN_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_ADAPTIVE_THROTTLER_SINGLE_GET_LATENCY_THRESHOLD;
import static com.linkedin.venice.ConfigKeys.SERVER_ADD_RMD_TO_BATCH_PUSH_FOR_HYBRID_STORES;
+import static com.linkedin.venice.ConfigKeys.SERVER_BATCH_PUSH_RECORD_COUNT_VERIFICATION_FAIL_ON_MISMATCH_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_BATCH_REPORT_END_OF_INCREMENTAL_PUSH_STATUS_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_BLOB_TRANSFER_ADAPTIVE_THROTTLER_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_BLOB_TRANSFER_ADAPTIVE_THROTTLER_UPDATE_PERCENTAGE;
@@ -623,6 +624,7 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final NearlineLatencyTimestampSource nearlineLatencyTimestampSource;
private final boolean uniqueIngestedKeyCountHllEnabled;
private final int uniqueIngestedKeyCountHllLog2K;
+ private final boolean batchPushRecordCountVerificationFailOnMismatchEnabled;
private final long leaderCompleteStateCheckInFollowerValidIntervalMs;
private final boolean requireLeaderCompleteForCatchUpVtRts;
private final boolean stuckConsumerRepairEnabled;
@@ -1084,6 +1086,8 @@ public VeniceServerConfig(VeniceProperties serverProperties, MapHLL at Venice's default precision (lgK=13) has a standard error of ~1.15%. The 3-sigma tail
+ * is ~3.45% — i.e. random HLL noise will deviate by more than 3.45% in only ~0.27% of cases (two-
+ * sided). We use 5% (slightly above 3-sigma) for a clean safety margin: random HLL noise alone
+ * cannot trigger a spurious mismatch, while real deviations larger than 5% of the producer's
+ * count — either side — are still flagged.
+ */
+ static final double HLL_ERROR_TOLERANCE = 0.05;
+
/** Chunk fragment — not a logical key (skipped for key counting). */
protected static boolean isChunkFragment(int schemaId) {
return schemaId == CHUNK_SCHEMA_ID;
@@ -3570,6 +3583,7 @@ private void syncOffset(PartitionConsumptionState pcs) {
// Update the push job info
offsetRecord.setTrackingIncrementalPushStatus(pcs.getTrackingIncrementalPushStatus());
offsetRecord.setActiveKeyCount(pcs.getActiveKeyCount());
+ offsetRecord.setBatchPushRecordCount(pcs.getBatchPushRecordCount());
// Serialize HLL sketch for unique key count persistence
if (uniqueIngestedKeyCountHllEnabled && pcs.hasUniqueIngestedKeyCountHll()) {
byte[] hllBytes = pcs.serializeUniqueIngestedKeyCountHll();
@@ -3918,7 +3932,8 @@ protected void processEndOfPush(
KafkaMessageEnvelope endOfPushKME,
PubSubPosition offset,
PartitionConsumptionState partitionConsumptionState,
- EndOfPush endOfPush) {
+ EndOfPush endOfPush,
+ PubSubMessageHeaders headers) {
// Do not process duplication EOP messages.
if (partitionConsumptionState.getOffsetRecord().isEndOfPushReceived()) {
@@ -3982,6 +3997,120 @@ protected void processEndOfPush(
partitionConsumptionState.setDataRecoveryCompleted(true);
ingestionNotificationDispatcher.reportDataRecoveryCompleted(partitionConsumptionState);
}
+
+ verifyBatchPushRecordCount(partitionConsumptionState, headers);
+ }
+
+ /**
+ * Compare the partition's locally-counted batchPushRecordCount AND its HLL-estimated unique key
+ * count against the producer's count carried on the EOP message's "prc" PubSub header.
+ *
+ *
The check has two legs and BOTH must pass for the push to be considered match:
+ *
+ *
{@code counter >= expected} — exact PUT/DELETE count is at least the producer's count.
+ * Asymmetric (over-count tolerated) because raw counts legitimately inflate from at-least-
+ * once delivery, spec-exec dups, and cross-colo re-replication.
+ *
{@code |hll_estimate - expected| <= (expected * HLL_ERROR_TOLERANCE)} — the HLL-estimated
+ * unique-key count is within ±HLL_ERROR_TOLERANCE of the producer's count. Symmetric: an HLL estimate
+ * materially above the producer count is also flagged, since structurally HLL counts unique
+ * keys and unique keys ≤ raw ops; a large over-estimate signals a bug (wrong data, count
+ * mis-stamping, etc.) rather than benign duplicate inflation. Skipped if HLL tracking is
+ * disabled.
+ *
+ *
+ *
If either leg fails: increments {@code batch_push_record_count_mismatch} (informational —
+ * fires regardless of strict-mode state) and logs a tagged error string. On a non-DaVinci
+ * replica, if the server-level config
+ * {@code server.batch.push.record.count.verification.fail.on.mismatch.enabled} is {@code true}
+ * (default), also increments {@code record_count_mismatch_failure} and throws
+ * {@link VeniceException} (failing ingestion). DaVinci replicas skip both the failure sensor
+ * and the throw
+ *
+ *
Skip cases (no-op, no metric):
+ *
+ *
{@code headers == null} or no "prc" header — producer didn't stamp the header.
+ *
"prc" header value is malformed (not 8 bytes).
+ *
"prc" header value equals the sentinel {@code -1L} — producer signaled "count not
+ * available."
+ *
{@code versionTopic.isViewTopic()} — view-topic ingestion paths re-emit records via
+ * separate writers.
+ *
The current store-version is NOT the future version — verification only runs while a
+ * push is in progress (i.e., {@code store.getCurrentVersion() < this version}). Already-
+ * current and backup versions skip verification, since their EOP was already processed in
+ * a prior lifecycle and a re-emit (e.g., re-ingestion from snapshot) shouldn't re-fire it.
+ *
+ */
+ void verifyBatchPushRecordCount(PartitionConsumptionState pcs, PubSubMessageHeaders headers) {
+ if (headers == null) {
+ return;
+ }
+ // View-topic ingestion paths re-emit records via separate writers; their counts won't match
+ // the base store's prc. Defensive — currently a StoreIngestionTask should always be on a VT.
+ if (versionTopic != null && versionTopic.isViewTopic()) {
+ return;
+ }
+ PubSubMessageHeader prcHeader = headers.get(PubSubMessageHeaders.VENICE_PARTITION_RECORD_COUNT_HEADER);
+ if (prcHeader == null || prcHeader.value() == null || prcHeader.value().length != Long.BYTES) {
+ return;
+ }
+ long expectedCount = ByteBuffer.wrap(prcHeader.value()).getLong();
+ if (expectedCount == PubSubMessageHeaders.PRC_HEADER_UNAVAILABLE_SENTINEL) {
+ return;
+ }
+ /*
+ * Only verify while the push is in progress (future-version state). Once the version has been
+ * promoted to current or demoted to backup, the count has already been judged in this server's
+ * prior lifecycle and shouldn't be re-judged on any re-emit / re-ingestion path.
+ */
+ if (versionRole != VersionRole.FUTURE) {
+ return;
+ }
+
+ long actualCount = pcs.getBatchPushRecordCount();
+ boolean counterOk = actualCount >= expectedCount;
+
+ /*
+ * Second leg: HLL-based unique-key check with a symmetric ±5% tolerance. Random HLL noise at
+ * default precision (lgK=13) lives well inside this band; a deviation larger than 5% in either
+ * direction is a real signal — under-count is data loss, over-count means HLL saw materially
+ * more unique keys than the producer claims to have written (structurally impossible without
+ * a bug). If HLL tracking is disabled on this server (rare), fall back to the counter alone.
+ */
+ boolean hllOk;
+ long hllEstimate;
+ long hllThreshold = (long) Math.ceil(expectedCount * HLL_ERROR_TOLERANCE);
+ if (uniqueIngestedKeyCountHllEnabled) {
+ hllEstimate = pcs.getEstimatedUniqueIngestedKeyCount();
+ hllOk = Math.abs(hllEstimate - expectedCount) <= hllThreshold;
+ } else {
+ hllEstimate = -1; // not tracked
+ hllOk = true;
+ }
+
+ if (!counterOk || !hllOk) {
+ String taggedMsg = "RECORD_COUNT_DEFICIT:counterOk=" + counterOk + ":hllOk=" + hllOk + ":expected="
+ + expectedCount + ":actual=" + actualCount + ":hll=" + hllEstimate + ":hllThreshold=" + hllThreshold
+ + ":replica=" + pcs.getReplicaId() + ":topic=" + kafkaVersionTopic;
+ LOGGER.error(taggedMsg);
+ versionedIngestionStats.recordBatchPushRecordCountMismatch(storeName, versionNumber);
+ // Server-side strict-mode is controlled by the cluster-wide config
+ // `server.batch.push.record.count.verification.fail.on.mismatch.enabled` (default: true).
+ // DaVinci replicas unconditionally skip the throw — DVC failure aggregation is handled
+ // separately via the push status store, and throwing here would convert a single noisy
+ // subscriber into a hard local replica ERROR.
+ if (serverConfig.isBatchPushRecordCountVerificationFailOnMismatchEnabled() && !isDaVinciClient) {
+ versionedIngestionStats.recordRecordCountMismatchFailure(storeName, versionNumber);
+ throw new VeniceException(taggedMsg);
+ }
+ } else {
+ versionedIngestionStats.recordBatchPushRecordCountMatch(storeName, versionNumber);
+ LOGGER.debug(
+ "Record count verification passed for replica: {}. Expected: {}, Actual: {}, HLL: {}",
+ pcs.getReplicaId(),
+ expectedCount,
+ actualCount,
+ hllEstimate);
+ }
}
protected void processStartOfIncrementalPush(
@@ -4056,7 +4185,8 @@ private void processControlMessage(
int partition,
PubSubPosition offset,
long pubSubMessageTime,
- PartitionConsumptionState partitionConsumptionState) {
+ PartitionConsumptionState partitionConsumptionState,
+ PubSubMessageHeaders headers) {
/**
* If leader consumes control messages from topics other than version topic, it should produce
* them to version topic; however, START_OF_SEGMENT and END_OF_SEGMENT should not be forwarded
@@ -4081,7 +4211,7 @@ private void processControlMessage(
break;
case END_OF_PUSH:
EndOfPush endOfPush = (EndOfPush) controlMessage.controlMessageUnion;
- processEndOfPush(kafkaMessageEnvelope, offset, partitionConsumptionState, endOfPush);
+ processEndOfPush(kafkaMessageEnvelope, offset, partitionConsumptionState, endOfPush, headers);
if (recordTransformer != null) {
recordTransformer.onControlMessage(partition, offset, controlMessage, pubSubMessageTime);
}
@@ -4211,7 +4341,8 @@ private int internalProcessConsumerRecord(
consumerRecord.getTopicPartition().getPartitionNumber(),
consumerRecord.getPosition(),
consumerRecord.getPubSubMessageTime(),
- partitionConsumptionState);
+ partitionConsumptionState,
+ consumerRecord.getPubSubMessageHeaders());
try {
if (controlMessage.controlMessageType == START_OF_SEGMENT.getValue()) {
if (Arrays.equals(consumerRecord.getKey().getKey(), KafkaKey.HEART_BEAT.getKey())) {
@@ -4860,6 +4991,37 @@ private void invalidateActiveKeyCountAndLog(
}
}
+ /**
+ * Increment {@link PartitionConsumptionState#batchPushRecordCount} for one consumed user-data
+ * record during batch ingestion. Counted on every replica so the count is comparable, at EOP,
+ * against the "prc" PubSub header VPJ stamps onto the EOP message.
+ *
+ * Filters:
+ * - Only PUT and DELETE message types — control messages are excluded.
+ * - Skip after EOP — RT replay records do not contribute to the batch count.
+ * - Skip chunk fragments (CHUNK_SCHEMA_ID); chunk manifests count as one logical record.
+ * - Skip Global RT DIV PUTs (key-level filter; produced metadata for the DIV, not user data).
+ */
+ private void trackBatchPushRecordCount(
+ DefaultPubSubMessage consumerRecord,
+ PartitionConsumptionState partitionConsumptionState,
+ MessageType messageType,
+ int writerSchemaId) {
+ if (partitionConsumptionState.isEndOfPushReceived()) {
+ return;
+ }
+ if (consumerRecord.getKey().isGlobalRtDiv()) {
+ return;
+ }
+ if (messageType == MessageType.PUT && isChunkFragment(writerSchemaId)) {
+ return;
+ }
+ if (messageType != MessageType.PUT && messageType != MessageType.DELETE) {
+ return;
+ }
+ partitionConsumptionState.incrementBatchPushRecordCount();
+ }
+
/** Records active-key-count invalidation on both the OTel (per-version) and Tehuti (host-level) paths. */
protected final void recordActiveKeyCountInvalidation() {
versionedIngestionStats.recordActiveKeyCountInvalidation(storeName, versionNumber);
@@ -5152,6 +5314,8 @@ private int processKafkaDataMessage(
messageType,
writerSchemaId);
+ trackBatchPushRecordCount(consumerRecord, partitionConsumptionState, messageType, writerSchemaId);
+
// Track key in HLL for unique key count estimation.
// Only count user data operations (PUT/DELETE), skip chunk fragments, internal metadata, etc.
if (uniqueIngestedKeyCountHllEnabled && keyLen > 0 && !isChunkFragment(writerSchemaId)
diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedIngestionStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedIngestionStats.java
index 50a4cb12ee3..d299ef58233 100644
--- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedIngestionStats.java
+++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedIngestionStats.java
@@ -538,6 +538,18 @@ public void recordChecksumVerificationFailureCount(String storeName, int version
getIngestionOtelStats(storeName).recordChecksumVerificationFailureCount(version, 1);
}
+ public void recordBatchPushRecordCountMatch(String storeName, int version) {
+ getIngestionOtelStats(storeName).recordBatchPushRecordCountMatch(version, 1);
+ }
+
+ public void recordBatchPushRecordCountMismatch(String storeName, int version) {
+ getIngestionOtelStats(storeName).recordBatchPushRecordCountMismatch(version, 1);
+ }
+
+ public void recordRecordCountMismatchFailure(String storeName, int version) {
+ getIngestionOtelStats(storeName).recordRecordCountMismatchFailure(version, 1);
+ }
+
// Count methods with 2nd enum dimension
public void recordIngestionFailureCount(String storeName, int version, VeniceIngestionFailureReason reason) {
diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/IngestionOtelMetricEntity.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/IngestionOtelMetricEntity.java
index 685318fb38a..b37aa352126 100644
--- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/IngestionOtelMetricEntity.java
+++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/IngestionOtelMetricEntity.java
@@ -290,6 +290,41 @@ public enum IngestionOtelMetricEntity implements ModuleMetricEntityInterface {
"Count of checksum verification failures", setOf(VENICE_STORE_NAME, VENICE_CLUSTER_NAME, VENICE_VERSION_ROLE)
),
+ /**
+ * Server-side batch-push record-count verification: the consumer-counted PUT/DELETE total at
+ * EOP matched the producer's "prc" count (and the HLL leg also passed, if HLL tracking is on).
+ * Informational — fires whether or not the per-server fail-on-mismatch flag is enabled.
+ */
+ BATCH_PUSH_RECORD_COUNT_MATCH_COUNT(
+ "ingestion.batch_push_record_count_match.count", MetricType.COUNTER, MetricUnit.NUMBER,
+ "Count of batch-push EOPs where the consumer-side record count matched the producer's count",
+ setOf(VENICE_STORE_NAME, VENICE_CLUSTER_NAME, VENICE_VERSION_ROLE)
+ ),
+
+ /**
+ * Server-side batch-push record-count verification: either leg (counter or HLL) failed at EOP.
+ * Informational — fires whether or not the per-server fail-on-mismatch flag is enabled. Use
+ * this for visibility / dashboards; use {@link #RECORD_COUNT_MISMATCH_FAILURE_COUNT} to alert
+ * on strict-mode failed-ingestion events.
+ */
+ BATCH_PUSH_RECORD_COUNT_MISMATCH_COUNT(
+ "ingestion.batch_push_record_count_mismatch.count", MetricType.COUNTER, MetricUnit.NUMBER,
+ "Count of batch-push EOPs where the consumer-side record count did not match the producer's count",
+ setOf(VENICE_STORE_NAME, VENICE_CLUSTER_NAME, VENICE_VERSION_ROLE)
+ ),
+
+ /**
+ * Strict-mode record-count mismatch — fires whenever a mismatch is detected AND the server's
+ * fail-on-mismatch flag is enabled. On Venice servers this is paired with a thrown
+ * VeniceException (ingestion fails). On DaVinci replicas the throw is suppressed (DVC failure
+ * is aggregated separately via the push status store) and this counter does not fire.
+ */
+ RECORD_COUNT_MISMATCH_FAILURE_COUNT(
+ "ingestion.record_count_mismatch_failure.count", MetricType.COUNTER, MetricUnit.NUMBER,
+ "Count of strict-mode record-count mismatches that failed ingestion (servers only; DaVinci is excluded)",
+ setOf(VENICE_STORE_NAME, VENICE_CLUSTER_NAME, VENICE_VERSION_ROLE)
+ ),
+
DCR_LOOKUP_CACHE_HIT_COUNT(
"ingestion.dcr.lookup.cache.hit_count", MetricType.COUNTER, MetricUnit.NUMBER,
"Count of cache hits when looking up existing value bytes or replication metadata before conflict resolution",
diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/IngestionOtelStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/IngestionOtelStats.java
index 48bff951f16..8796c2fd7fa 100644
--- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/IngestionOtelStats.java
+++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/IngestionOtelStats.java
@@ -6,6 +6,8 @@
import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.BATCH_PROCESSING_REQUEST_ERROR_COUNT;
import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.BATCH_PROCESSING_REQUEST_RECORD_COUNT;
import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.BATCH_PROCESSING_REQUEST_TIME;
+import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.BATCH_PUSH_RECORD_COUNT_MATCH_COUNT;
+import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.BATCH_PUSH_RECORD_COUNT_MISMATCH_COUNT;
import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.BYTES_CONSUMED_AS_UNCOMPRESSED_SIZE;
import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.CHECKSUM_VERIFICATION_FAILURE_COUNT;
import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.CONSUMER_ACTION_TIME;
@@ -42,6 +44,7 @@
import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.PRODUCER_SYNCHRONIZE_TIME;
import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.RECORD_ASSEMBLED_SIZE;
import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.RECORD_ASSEMBLED_SIZE_RATIO;
+import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.RECORD_COUNT_MISMATCH_FAILURE_COUNT;
import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.RECORD_KEY_SIZE;
import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.RECORD_VALUE_SIZE;
import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.RESUBSCRIPTION_FAILURE_COUNT;
@@ -183,6 +186,9 @@ public class IngestionOtelStats {
private final MetricEntityStateOneEnum checksumVerificationFailureCountMetric;
private final MetricEntityStateOneEnum partialUpdateAmplificationAlertCountMetric;
private final MetricEntityStateOneEnum activeKeyCountInvalidationMetric;
+ private final MetricEntityStateOneEnum batchPushRecordCountMatchMetric;
+ private final MetricEntityStateOneEnum batchPushRecordCountMismatchMetric;
+ private final MetricEntityStateOneEnum recordCountMismatchFailureMetric;
// Counter metrics with 2nd enum dimension
private final MetricEntityStateTwoEnums ingestionFailureCountMetric;
@@ -257,6 +263,9 @@ public class IngestionOtelStats {
this.checksumVerificationFailureCountMetric = null;
this.partialUpdateAmplificationAlertCountMetric = null;
this.activeKeyCountInvalidationMetric = null;
+ this.batchPushRecordCountMatchMetric = null;
+ this.batchPushRecordCountMismatchMetric = null;
+ this.recordCountMismatchFailureMetric = null;
this.ingestionFailureCountMetric = null;
this.dcrLookupCacheHitCountMetric = null;
this.bytesConsumedAsUncompressedSizeMetric = null;
@@ -386,6 +395,9 @@ public IngestionOtelStats(
createOneEnumMetric(PARTIAL_UPDATE_AMPLIFICATION_ALERT_COUNT.getMetricEntity());
activeKeyCountInvalidationMetric =
activeKeyCountEnabled ? createOneEnumMetric(ACTIVE_KEY_COUNT_INVALIDATION.getMetricEntity()) : null;
+ batchPushRecordCountMatchMetric = createOneEnumMetric(BATCH_PUSH_RECORD_COUNT_MATCH_COUNT.getMetricEntity());
+ batchPushRecordCountMismatchMetric = createOneEnumMetric(BATCH_PUSH_RECORD_COUNT_MISMATCH_COUNT.getMetricEntity());
+ recordCountMismatchFailureMetric = createOneEnumMetric(RECORD_COUNT_MISMATCH_FAILURE_COUNT.getMetricEntity());
// Initialize HostLevelIngestionStats OTel metrics - counters with 2nd enum dimension
ingestionFailureCountMetric =
@@ -770,6 +782,18 @@ public void recordChecksumVerificationFailureCount(int version, long value) {
checksumVerificationFailureCountMetric.record(value, classifyVersion(version, versionInfo));
}
+ public void recordBatchPushRecordCountMatch(int version, long value) {
+ batchPushRecordCountMatchMetric.record(value, classifyVersion(version, versionInfo));
+ }
+
+ public void recordBatchPushRecordCountMismatch(int version, long value) {
+ batchPushRecordCountMismatchMetric.record(value, classifyVersion(version, versionInfo));
+ }
+
+ public void recordRecordCountMismatchFailure(int version, long value) {
+ recordCountMismatchFailureMetric.record(value, classifyVersion(version, versionInfo));
+ }
+
// Count methods with 2nd enum dimension
public void recordIngestionFailureCount(int version, VeniceIngestionFailureReason reason, long value) {
diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/NoOpIngestionOtelStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/NoOpIngestionOtelStats.java
index 112fa6f7da2..16dead1d870 100644
--- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/NoOpIngestionOtelStats.java
+++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/NoOpIngestionOtelStats.java
@@ -217,6 +217,18 @@ public void recordPartialUpdateCacheHitCount(int version, long value) {
public void recordChecksumVerificationFailureCount(int version, long value) {
}
+ @Override
+ public void recordBatchPushRecordCountMatch(int version, long value) {
+ }
+
+ @Override
+ public void recordBatchPushRecordCountMismatch(int version, long value) {
+ }
+
+ @Override
+ public void recordRecordCountMismatchFailure(int version, long value) {
+ }
+
@Override
public void recordIngestionFailureCount(int version, VeniceIngestionFailureReason reason, long value) {
}
diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveKeyCountTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveKeyCountTest.java
index dea1af7bfdf..fe42fd0ab96 100644
--- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveKeyCountTest.java
+++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveKeyCountTest.java
@@ -262,7 +262,7 @@ private PubSubMessageHeaders createSignalHeaders(byte signalValue) {
private StoreIngestionTask setupProcessEndOfPush(boolean activeKeyCountEnabled) throws Exception {
StoreIngestionTask sitMock = mock(StoreIngestionTask.class);
- doCallRealMethod().when(sitMock).processEndOfPush(any(), any(), any(), any());
+ doCallRealMethod().when(sitMock).processEndOfPush(any(), any(), any(), any(), any());
VeniceServerConfig mockServerConfig = mock(VeniceServerConfig.class);
doReturn(activeKeyCountEnabled).when(mockServerConfig).isActiveKeyCountForAllBatchPushEnabled();
doReturn(activeKeyCountEnabled).when(mockServerConfig).isActiveKeyCountForHybridStoreEnabled();
@@ -940,7 +940,7 @@ public void testProcessEndOfPush() throws Exception {
KafkaMessageEnvelope kme = new KafkaMessageEnvelope();
kme.producerMetadata = new ProducerMetadata();
kme.producerMetadata.messageTimestamp = System.currentTimeMillis();
- sitMock.processEndOfPush(kme, mock(PubSubPosition.class), mockPcs, new EndOfPush());
+ sitMock.processEndOfPush(kme, mock(PubSubPosition.class), mockPcs, new EndOfPush(), null);
if (enabled) {
verify(mockPcs).cleanupBatchKeyCountState();
} else {
diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java
index 09ad77ec091..f6ea4a9b702 100644
--- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java
+++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java
@@ -27,6 +27,9 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNotSame;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -92,6 +95,9 @@
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.common.ApacheKafkaOffsetPosition;
import com.linkedin.venice.pubsub.api.DefaultPubSubMessage;
+import com.linkedin.venice.pubsub.api.EmptyPubSubMessageHeaders;
+import com.linkedin.venice.pubsub.api.PubSubMessageHeader;
+import com.linkedin.venice.pubsub.api.PubSubMessageHeaders;
import com.linkedin.venice.pubsub.api.PubSubPosition;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.pubsub.api.PubSubProducerCallback;
@@ -3867,4 +3873,60 @@ public void testResolveFollowerSourceRegion(
expected,
description);
}
+
+ @DataProvider(name = "extractPrcHeaderToForwardCases")
+ public static Object[][] extractPrcHeaderToForwardCases() {
+ PubSubMessageHeaders prcOnly = new PubSubMessageHeaders().add(
+ PubSubMessageHeaders.VENICE_PARTITION_RECORD_COUNT_HEADER,
+ ByteBuffer.allocate(Long.BYTES).putLong(1234L).array());
+ PubSubMessageHeaders prcWithOthers = new PubSubMessageHeaders()
+ .add(
+ PubSubMessageHeaders.VENICE_PARTITION_RECORD_COUNT_HEADER,
+ ByteBuffer.allocate(Long.BYTES).putLong(7L).array())
+ .add(PubSubMessageHeaders.VENICE_TRANSPORT_PROTOCOL_HEADER, "vtp-bytes".getBytes())
+ .add(PubSubMessageHeaders.VENICE_LEADER_COMPLETION_STATE_HEADER, new byte[] { (byte) 1 });
+ PubSubMessageHeaders othersOnly =
+ new PubSubMessageHeaders().add(PubSubMessageHeaders.VENICE_TRANSPORT_PROTOCOL_HEADER, "vtp-bytes".getBytes())
+ .add(PubSubMessageHeaders.VENICE_LEADER_COMPLETION_STATE_HEADER, new byte[] { (byte) 1 });
+ // { description, inputHeaders, expectedPrcValueOrNull (null = expect SINGLETON returned) }
+ return new Object[][] { { "null headers returns SINGLETON without NPE", null, null },
+ { "empty headers (no prc) returns SINGLETON", new PubSubMessageHeaders(), null },
+ { "vtp/lcs present but no prc returns SINGLETON (no allocation)", othersOnly, null },
+ { "prc alone returns fresh headers with prc value", prcOnly, 1234L },
+ { "prc among other headers — only prc is forwarded; vtp/lcs are stripped", prcWithOthers, 7L } };
+ }
+
+ /**
+ * The leader's helper must forward only the "prc" partition-record-count header when present so
+ * remote-fabric followers can run record-count verification at EOP. Non-prc headers
+ * (vtp/lcs/etc.) are leader-regenerated downstream and must NOT be carried over from upstream.
+ * When no prc is present, the helper returns {@link EmptyPubSubMessageHeaders#SINGLETON} to
+ * avoid hot-path allocation.
+ */
+ @Test(dataProvider = "extractPrcHeaderToForwardCases")
+ public void testExtractPrcHeaderToForward(
+ String description,
+ PubSubMessageHeaders inputHeaders,
+ Long expectedPrcValue) {
+ DefaultPubSubMessage consumerRecord = mock(DefaultPubSubMessage.class);
+ when(consumerRecord.getPubSubMessageHeaders()).thenReturn(inputHeaders);
+
+ PubSubMessageHeaders forwarded = LeaderFollowerStoreIngestionTask.extractPrcHeaderToForward(consumerRecord);
+
+ if (expectedPrcValue == null) {
+ assertSame(forwarded, EmptyPubSubMessageHeaders.SINGLETON, description);
+ } else {
+ assertNotSame(forwarded, EmptyPubSubMessageHeaders.SINGLETON, description);
+ PubSubMessageHeader prc = forwarded.get(PubSubMessageHeaders.VENICE_PARTITION_RECORD_COUNT_HEADER);
+ assertNotNull(prc, "prc header must be in the forwarded set for case: " + description);
+ assertEquals(ByteBuffer.wrap(prc.value()).getLong(), expectedPrcValue.longValue(), description);
+ // Non-prc headers are leader-regenerated and must NOT propagate.
+ assertNull(
+ forwarded.get(PubSubMessageHeaders.VENICE_TRANSPORT_PROTOCOL_HEADER),
+ "vtp must not propagate: " + description);
+ assertNull(
+ forwarded.get(PubSubMessageHeaders.VENICE_LEADER_COMPLETION_STATE_HEADER),
+ "lcs must not propagate: " + description);
+ }
+ }
}
diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java
index 45d45fcf1bb..1c197040338 100644
--- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java
+++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java
@@ -35,6 +35,7 @@
import org.apache.avro.generic.GenericRecord;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -607,4 +608,30 @@ public void testGetOrCreateCachedHeartbeatKeyResolvesRegularUnchunked() {
assertEquals(k.getChunkingStatus(), VeniceChunkingStatus.UNCHUNKED);
assertEquals(k.getLocality(), VeniceRegionLocality.LOCAL);
}
+
+ @DataProvider(name = "batchPushRecordCountCases")
+ public static Object[][] batchPushRecordCountCases() {
+ // { description, priorCountInOffsetRecord, incrementsAfterConstruction, expectedFinalCount }
+ return new Object[][] { { "default count is 0", 0L, 0, 0L }, { "increments are monotonic", 0L, 10, 10L },
+ { "count restored from OffsetRecord on construction (restart safety)", 57L, 0, 57L },
+ { "restored count continues to accumulate further increments", 57L, 5, 62L } };
+ }
+
+ @Test(dataProvider = "batchPushRecordCountCases")
+ public void testBatchPushRecordCountLifecycle(
+ String description,
+ long priorCountInOffsetRecord,
+ int incrementsAfterConstruction,
+ long expectedFinalCount) {
+ OffsetRecord offsetRecord = new OffsetRecord(AvroProtocolDefinition.PARTITION_STATE.getSerializer(), pubSubContext);
+ if (priorCountInOffsetRecord != 0L) {
+ offsetRecord.setBatchPushRecordCount(priorCountInOffsetRecord);
+ }
+ PartitionConsumptionState pcs =
+ new PartitionConsumptionState(TOPIC_PARTITION, offsetRecord, pubSubContext, false, false, false, null);
+ for (int i = 0; i < incrementsAfterConstruction; i++) {
+ pcs.incrementBatchPushRecordCount();
+ }
+ assertEquals(pcs.getBatchPushRecordCount(), expectedFinalCount, description);
+ }
}
diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskRecordCountTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskRecordCountTest.java
new file mode 100644
index 00000000000..d2a145220cc
--- /dev/null
+++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskRecordCountTest.java
@@ -0,0 +1,445 @@
+package com.linkedin.davinci.kafka.consumer;
+
+import static com.linkedin.davinci.kafka.consumer.ActiveKeyCountTestUtils.setField;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+
+import com.linkedin.davinci.config.VeniceServerConfig;
+import com.linkedin.davinci.stats.AggVersionedIngestionStats;
+import com.linkedin.venice.exceptions.VeniceException;
+import com.linkedin.venice.pubsub.api.EmptyPubSubMessageHeaders;
+import com.linkedin.venice.pubsub.api.PubSubMessageHeaders;
+import com.linkedin.venice.pubsub.api.PubSubTopic;
+import com.linkedin.venice.server.VersionRole;
+import java.nio.ByteBuffer;
+import org.testng.annotations.Test;
+
+
+public class StoreIngestionTaskRecordCountTest {
+ private static final String TEST_TOPIC = "test_store_v1";
+ private static final String TEST_STORE = "test_store";
+ private static final int TEST_VERSION = 1;
+
+ private static StoreIngestionTask buildSit(boolean failOnMismatchEnabled, AggVersionedIngestionStats statsMock)
+ throws Exception {
+ return buildSit(failOnMismatchEnabled, statsMock, VersionRole.FUTURE, false, false);
+ }
+
+ /**
+ * @param failOnMismatchEnabled server-level config
+ * {@code server.batch.push.record.count.verification.fail.on.mismatch.enabled} (default
+ * {@code true} in production).
+ * @param versionRole role of the SIT's version: only {@link VersionRole#FUTURE} runs the
+ * verification — current/backup skip the entire check (re-emit-after-promotion safety).
+ * @param hllEnabled toggles {@code uniqueIngestedKeyCountHllEnabled} on the SIT. When false the
+ * HLL leg of the dual check is bypassed (matches the existing-test path).
+ * @param isDaVinciClient toggles the DaVinci skip-throw branch on the failure path.
+ */
+ private static StoreIngestionTask buildSit(
+ boolean failOnMismatchEnabled,
+ AggVersionedIngestionStats statsMock,
+ VersionRole versionRole,
+ boolean hllEnabled,
+ boolean isDaVinciClient) throws Exception {
+ StoreIngestionTask sit = mock(StoreIngestionTask.class);
+ setField(sit, "versionedIngestionStats", statsMock);
+ setField(sit, "kafkaVersionTopic", TEST_TOPIC);
+ setField(sit, "storeName", TEST_STORE);
+ setField(sit, "versionNumber", TEST_VERSION);
+ setField(sit, "uniqueIngestedKeyCountHllEnabled", hllEnabled);
+ setField(sit, "isDaVinciClient", isDaVinciClient);
+ setField(sit, "versionRole", versionRole);
+
+ VeniceServerConfig serverConfigMock = mock(VeniceServerConfig.class);
+ doReturn(failOnMismatchEnabled).when(serverConfigMock).isBatchPushRecordCountVerificationFailOnMismatchEnabled();
+ setField(sit, "serverConfig", serverConfigMock);
+
+ PubSubTopic vt = mock(PubSubTopic.class);
+ doReturn(false).when(vt).isViewTopic();
+ setField(sit, "versionTopic", vt);
+
+ doCallRealMethod().when(sit).verifyBatchPushRecordCount(any(), any());
+ return sit;
+ }
+
+ private static StoreIngestionTask buildSitOnViewTopic(AggVersionedIngestionStats statsMock) throws Exception {
+ StoreIngestionTask sit = buildSit(true, statsMock);
+ PubSubTopic vt = mock(PubSubTopic.class);
+ doReturn(true).when(vt).isViewTopic();
+ setField(sit, "versionTopic", vt);
+ return sit;
+ }
+
+ private static PartitionConsumptionState pcsWithCount(long count) {
+ PartitionConsumptionState pcs = mock(PartitionConsumptionState.class);
+ doReturn(count).when(pcs).getBatchPushRecordCount();
+ doReturn("test_replica").when(pcs).getReplicaId();
+ return pcs;
+ }
+
+ private static PartitionConsumptionState pcsWithCountAndHll(long count, long hllEstimate) {
+ PartitionConsumptionState pcs = pcsWithCount(count);
+ doReturn(hllEstimate).when(pcs).getEstimatedUniqueIngestedKeyCount();
+ return pcs;
+ }
+
+ private static PubSubMessageHeaders headersWithPrc(long expectedCount) {
+ return new PubSubMessageHeaders().add(
+ PubSubMessageHeaders.VENICE_PARTITION_RECORD_COUNT_HEADER,
+ ByteBuffer.allocate(Long.BYTES).putLong(expectedCount).array());
+ }
+
+ @Test
+ public void testVerifySkipsOnNullHeaders() throws Exception {
+ AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class);
+ StoreIngestionTask sit = buildSit(true, stats);
+ sit.verifyBatchPushRecordCount(pcsWithCount(100L), null);
+ verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION);
+ verify(stats, never()).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION);
+ }
+
+ @Test
+ public void testVerifySkipsOnMissingPrcHeader() throws Exception {
+ AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class);
+ StoreIngestionTask sit = buildSit(true, stats);
+ sit.verifyBatchPushRecordCount(pcsWithCount(100L), new PubSubMessageHeaders());
+ verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION);
+ verify(stats, never()).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION);
+ }
+
+ @Test
+ public void testVerifySkipsOnMalformedPrcHeader() throws Exception {
+ AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class);
+ StoreIngestionTask sit = buildSit(true, stats);
+ PubSubMessageHeaders headers = new PubSubMessageHeaders()
+ .add(PubSubMessageHeaders.VENICE_PARTITION_RECORD_COUNT_HEADER, new byte[] { 1, 2, 3 }); // not 8 bytes
+ sit.verifyBatchPushRecordCount(pcsWithCount(100L), headers);
+ verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION);
+ verify(stats, never()).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION);
+ }
+
+ @Test
+ public void testVerifySkipsOnSentinelExpectedCount() throws Exception {
+ AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class);
+ StoreIngestionTask sit = buildSit(true, stats);
+ sit.verifyBatchPushRecordCount(
+ pcsWithCount(100L),
+ headersWithPrc(PubSubMessageHeaders.PRC_HEADER_UNAVAILABLE_SENTINEL));
+ verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION);
+ verify(stats, never()).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION);
+ }
+
+ @Test
+ public void testVerifySkipsOnViewTopic() throws Exception {
+ AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class);
+ StoreIngestionTask sit = buildSitOnViewTopic(stats);
+ sit.verifyBatchPushRecordCount(pcsWithCount(50L), headersWithPrc(100L)); // would otherwise fail
+ verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION);
+ verify(stats, never()).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION);
+ }
+
+ @Test
+ public void testVerifySkipsOnEmptyPubSubMessageHeadersSingleton() throws Exception {
+ AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class);
+ StoreIngestionTask sit = buildSit(true, stats);
+ sit.verifyBatchPushRecordCount(pcsWithCount(100L), EmptyPubSubMessageHeaders.SINGLETON);
+ verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION);
+ verify(stats, never()).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION);
+ }
+
+ /**
+ * Verification only runs while the push is in progress. Once the version is current (or backup),
+ * any re-emit of EOP should not re-fire the check. No metrics, no throw, even on a clear deficit.
+ */
+ @Test
+ public void testVerifySkipsWhenNotFutureVersion() throws Exception {
+ AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class);
+ StoreIngestionTask sit = buildSit(
+ /* failOnMismatchEnabled */ true,
+ stats,
+ VersionRole.CURRENT, // not FUTURE — verification should skip
+ /* hllEnabled */ false,
+ /* isDaVinciClient */ false);
+ sit.verifyBatchPushRecordCount(pcsWithCount(50L), headersWithPrc(100L)); // would otherwise fail
+ verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION);
+ verify(stats, never()).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION);
+ verify(stats, never()).recordRecordCountMismatchFailure(TEST_STORE, TEST_VERSION);
+ }
+
+ @Test
+ public void testVerifyEmitsMatchSensorOnExactCount() throws Exception {
+ AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class);
+ StoreIngestionTask sit = buildSit(true, stats);
+ sit.verifyBatchPushRecordCount(pcsWithCount(100L), headersWithPrc(100L));
+ verify(stats, times(1)).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION);
+ verify(stats, never()).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION);
+ }
+
+ @Test
+ public void testVerifyEmitsMatchSensorOnSurplus() throws Exception {
+ AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class);
+ StoreIngestionTask sit = buildSit(true, stats);
+ sit.verifyBatchPushRecordCount(pcsWithCount(105L), headersWithPrc(100L));
+ verify(stats, times(1)).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION);
+ verify(stats, never()).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION);
+ }
+
+ @Test
+ public void testVerifyEmitsMatchSensorOnZeroExpectedAndActual() throws Exception {
+ AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class);
+ StoreIngestionTask sit = buildSit(true, stats);
+ sit.verifyBatchPushRecordCount(pcsWithCount(0L), headersWithPrc(0L));
+ verify(stats, times(1)).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION);
+ }
+
+ /** With server-strict-mode disabled, mismatch records the metric and logs but does NOT throw. */
+ @Test
+ public void testVerifyEmitsMismatchSensorOnDeficitWhenStrictModeDisabled() throws Exception {
+ AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class);
+ StoreIngestionTask sit = buildSit(/* failOnMismatchEnabled */ false, stats);
+ sit.verifyBatchPushRecordCount(pcsWithCount(50L), headersWithPrc(100L));
+ verify(stats, times(1)).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION);
+ verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION);
+ }
+
+ /** With server-strict-mode enabled (default), mismatch records the metric AND throws. */
+ @Test
+ public void testVerifyThrowsOnDeficitWhenStrictModeEnabled() throws Exception {
+ AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class);
+ StoreIngestionTask sit = buildSit(/* failOnMismatchEnabled */ true, stats);
+ VeniceException ex = expectThrows(
+ VeniceException.class,
+ () -> sit.verifyBatchPushRecordCount(pcsWithCount(50L), headersWithPrc(100L)));
+ verify(stats, times(1)).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION);
+ verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION);
+
+ String msg = ex.getMessage();
+ assertTrue(msg.contains("RECORD_COUNT_DEFICIT"), "Tagged error class missing in: " + msg);
+ assertTrue(msg.contains("expected=100"), "expected=N missing in: " + msg);
+ assertTrue(msg.contains("actual=50"), "actual=M missing in: " + msg);
+ assertTrue(msg.contains("replica=test_replica"), "replica id missing in: " + msg);
+ assertTrue(msg.contains("topic=" + TEST_TOPIC), "topic missing in: " + msg);
+ // Failed-and-throwing mismatches must also increment the dedicated failure sensor — distinct
+ // from the informational mismatch sensor, which fires regardless of strict-mode state.
+ verify(stats, times(1)).recordRecordCountMismatchFailure(TEST_STORE, TEST_VERSION);
+ }
+
+ /** When server strict-mode is disabled, the dedicated failure sensor must NOT fire. */
+ @Test
+ public void testVerifyDoesNotEmitFailureSensorWhenStrictModeDisabled() throws Exception {
+ AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class);
+ StoreIngestionTask sit = buildSit(/* failOnMismatchEnabled */ false, stats);
+ sit.verifyBatchPushRecordCount(pcsWithCount(50L), headersWithPrc(100L));
+ verify(stats, times(1)).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION);
+ verify(stats, never()).recordRecordCountMismatchFailure(TEST_STORE, TEST_VERSION);
+ }
+
+ /**
+ * Dual-check passes when both legs pass: counter ≥ expected AND |hll − expected| ≤ tolerance.
+ * With expected=100 and {@code HLL_ERROR_TOLERANCE=0.05}, threshold = ceil(100 * 0.05) = 5, so
+ * an HLL estimate of 98 sits |98−100|=2 ≤ 5 → HLL leg passes. Counter at 100 ≥ 100 → counter
+ * leg passes. Match.
+ */
+ @Test
+ public void testVerifyDualCheckPassesWhenBothLegsPass() throws Exception {
+ AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class);
+ StoreIngestionTask sit = buildSit(
+ /* failOnMismatchEnabled */ true,
+ stats,
+ VersionRole.FUTURE,
+ /* hllEnabled */ true,
+ /* isDaVinciClient */ false);
+ sit.verifyBatchPushRecordCount(pcsWithCountAndHll(100L, 98L), headersWithPrc(100L));
+ verify(stats, times(1)).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION);
+ verify(stats, never()).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION);
+ verify(stats, never()).recordRecordCountMismatchFailure(TEST_STORE, TEST_VERSION);
+ }
+
+ /**
+ * Dual-check fails when the HLL leg fails (under-count) even though counter passes. The HLL leg
+ * catches duplicate-key inflation that the counter alone would miss. threshold = 5, |50−100|=50
+ * > 5 → HLL leg fails → mismatch.
+ */
+ @Test
+ public void testVerifyDualCheckFailsWhenHllUnderCounts() throws Exception {
+ AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class);
+ StoreIngestionTask sit = buildSit(
+ /* failOnMismatchEnabled */ false,
+ stats,
+ VersionRole.FUTURE,
+ /* hllEnabled */ true,
+ /* isDaVinciClient */ false);
+ sit.verifyBatchPushRecordCount(pcsWithCountAndHll(100L, 50L), headersWithPrc(100L));
+ verify(stats, times(1)).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION);
+ verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION);
+ }
+
+ /**
+ * Symmetric: dual-check also fails when the HLL leg over-counts beyond tolerance. counter=120
+ * ≥ 100 → counter passes (raw over-count is benign — dup replication / spec-exec). hll=109,
+ * |109−100|=9 > 5 → HLL leg fails. Structurally HLL counts unique keys and unique keys cannot
+ * exceed raw producer ops, so a >5% over-estimate signals a bug worth flagging.
+ */
+ @Test
+ public void testVerifyDualCheckFailsWhenHllOverCounts() throws Exception {
+ AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class);
+ StoreIngestionTask sit = buildSit(
+ /* failOnMismatchEnabled */ false,
+ stats,
+ VersionRole.FUTURE,
+ /* hllEnabled */ true,
+ /* isDaVinciClient */ false);
+ sit.verifyBatchPushRecordCount(pcsWithCountAndHll(120L, 109L), headersWithPrc(100L));
+ verify(stats, times(1)).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION);
+ verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION);
+ }
+
+ /**
+ * Boundary: HLL exactly at the upper edge of the tolerance window still passes. expected=100,
+ * threshold=5, hll=105 → |105−100|=5 ≤ 5 → HLL leg passes. Confirms the window is inclusive on
+ * both sides.
+ */
+ @Test
+ public void testVerifyDualCheckPassesAtUpperHllToleranceBoundary() throws Exception {
+ AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class);
+ StoreIngestionTask sit = buildSit(
+ /* failOnMismatchEnabled */ true,
+ stats,
+ VersionRole.FUTURE,
+ /* hllEnabled */ true,
+ /* isDaVinciClient */ false);
+ sit.verifyBatchPushRecordCount(pcsWithCountAndHll(100L, 105L), headersWithPrc(100L));
+ verify(stats, times(1)).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION);
+ verify(stats, never()).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION);
+ }
+
+ /**
+ * Dual-check fails when the counter leg fails even though HLL passes. counter=50 < 100 → counter
+ * fails; hll=100 sits |100−100|=0 ≤ 5 → HLL alone would have passed. Confirms that EITHER leg
+ * failing is sufficient to trigger mismatch.
+ */
+ @Test
+ public void testVerifyDualCheckFailsWhenOnlyCounterFails() throws Exception {
+ AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class);
+ StoreIngestionTask sit = buildSit(
+ /* failOnMismatchEnabled */ false,
+ stats,
+ VersionRole.FUTURE,
+ /* hllEnabled */ true,
+ /* isDaVinciClient */ false);
+ sit.verifyBatchPushRecordCount(pcsWithCountAndHll(50L, 100L), headersWithPrc(100L));
+ verify(stats, times(1)).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION);
+ verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION);
+ }
+
+ /**
+ * DaVinci, server strict-mode enabled, counter-leg deficit: both the failure sensor and the
+ * throw are suppressed — DaVinci failure aggregation happens separately via the DaVinci push
+ * status store. Only the informational {@code _mismatch} sensor (which fires regardless of
+ * strict-mode state) is incremented.
+ */
+ @Test
+ public void testVerifyDaVinciDoesNotThrowOnDeficitWhenStrictModeEnabled() throws Exception {
+ AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class);
+ StoreIngestionTask sit = buildSit(
+ /* failOnMismatchEnabled */ true,
+ stats,
+ VersionRole.FUTURE,
+ /* hllEnabled */ false,
+ /* isDaVinciClient */ true);
+ // Should NOT throw — DaVinci skip path. Failure sensor and throw both suppressed; only the
+ // informational mismatch sensor fires.
+ sit.verifyBatchPushRecordCount(pcsWithCount(50L), headersWithPrc(100L));
+ verify(stats, times(1)).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION);
+ verify(stats, never()).recordRecordCountMismatchFailure(TEST_STORE, TEST_VERSION);
+ verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION);
+ }
+
+ /**
+ * DaVinci, server strict-mode disabled, counter-leg deficit: only the informational mismatch
+ * sensor fires — no failure sensor, no throw. Mirrors the non-DaVinci strict-mode-disabled case
+ * and confirms the DaVinci skip-throw guard does not perturb the metric-only path.
+ */
+ @Test
+ public void testVerifyDaVinciEmitsMismatchSensorOnDeficitWhenStrictModeDisabled() throws Exception {
+ AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class);
+ StoreIngestionTask sit = buildSit(
+ /* failOnMismatchEnabled */ false,
+ stats,
+ VersionRole.FUTURE,
+ /* hllEnabled */ false,
+ /* isDaVinciClient */ true);
+ sit.verifyBatchPushRecordCount(pcsWithCount(50L), headersWithPrc(100L));
+ verify(stats, times(1)).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION);
+ verify(stats, never()).recordRecordCountMismatchFailure(TEST_STORE, TEST_VERSION);
+ verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION);
+ }
+
+ /**
+ * DaVinci, server strict-mode enabled, HLL-leg failure (counter passes): confirms the DaVinci
+ * skip guard is keyed to {@code isDaVinciClient}, not to which leg failed. HLL deviation >
+ * tolerance → mismatch detected; failure sensor and throw are both suppressed.
+ */
+ @Test
+ public void testVerifyDaVinciDoesNotThrowOnHllLegFailureWhenStrictModeEnabled() throws Exception {
+ AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class);
+ StoreIngestionTask sit = buildSit(
+ /* failOnMismatchEnabled */ true,
+ stats,
+ VersionRole.FUTURE,
+ /* hllEnabled */ true,
+ /* isDaVinciClient */ true);
+ // counter=100 ≥ 100 (passes); hll=50, |50−100|=50 > 5 (fails) → mismatch.
+ sit.verifyBatchPushRecordCount(pcsWithCountAndHll(100L, 50L), headersWithPrc(100L));
+ verify(stats, times(1)).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION);
+ verify(stats, never()).recordRecordCountMismatchFailure(TEST_STORE, TEST_VERSION);
+ verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION);
+ }
+
+ /**
+ * DaVinci, match path: a clean push still records the match sensor and does not trip the
+ * mismatch/failure sensors. Sanity check that DaVinci doesn't accidentally skip the match
+ * recording.
+ */
+ @Test
+ public void testVerifyDaVinciEmitsMatchSensorOnExactCount() throws Exception {
+ AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class);
+ StoreIngestionTask sit = buildSit(
+ /* failOnMismatchEnabled */ true,
+ stats,
+ VersionRole.FUTURE,
+ /* hllEnabled */ false,
+ /* isDaVinciClient */ true);
+ sit.verifyBatchPushRecordCount(pcsWithCount(100L), headersWithPrc(100L));
+ verify(stats, times(1)).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION);
+ verify(stats, never()).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION);
+ verify(stats, never()).recordRecordCountMismatchFailure(TEST_STORE, TEST_VERSION);
+ }
+
+ /**
+ * DaVinci, not-future-version: the future-version gate runs before the DaVinci branch, so an
+ * already-current version on DaVinci skips the entire verification — no metrics, no throw.
+ */
+ @Test
+ public void testVerifyDaVinciSkipsWhenNotFutureVersion() throws Exception {
+ AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class);
+ StoreIngestionTask sit = buildSit(
+ /* failOnMismatchEnabled */ true,
+ stats,
+ VersionRole.CURRENT,
+ /* hllEnabled */ false,
+ /* isDaVinciClient */ true);
+ sit.verifyBatchPushRecordCount(pcsWithCount(50L), headersWithPrc(100L)); // would otherwise fail
+ verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION);
+ verify(stats, never()).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION);
+ verify(stats, never()).recordRecordCountMismatchFailure(TEST_STORE, TEST_VERSION);
+ }
+
+}
diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/AggVersionedIngestionStatsTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/AggVersionedIngestionStatsTest.java
index 17ddd6d19fd..ab5de6db97c 100644
--- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/AggVersionedIngestionStatsTest.java
+++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/AggVersionedIngestionStatsTest.java
@@ -273,6 +273,9 @@ public void testAllRecordingMethodsWorkWithConfig(boolean ingestionOtelStatsEnab
aggStats.recordResubscriptionFailureCount(STORE_NAME, VERSION_1);
aggStats.recordPartialUpdateCacheHitCount(STORE_NAME, VERSION_1);
aggStats.recordChecksumVerificationFailureCount(STORE_NAME, VERSION_1);
+ aggStats.recordBatchPushRecordCountMatch(STORE_NAME, VERSION_1);
+ aggStats.recordBatchPushRecordCountMismatch(STORE_NAME, VERSION_1);
+ aggStats.recordRecordCountMismatchFailure(STORE_NAME, VERSION_1);
aggStats.recordIngestionFailureCount(STORE_NAME, VERSION_1, VeniceIngestionFailureReason.GENERAL);
aggStats.recordDcrLookupCacheHitCount(STORE_NAME, VERSION_1, VeniceRecordType.REPLICATION_METADATA);
aggStats.recordBytesConsumedAsUncompressedSize(STORE_NAME, VERSION_1, 1024);
diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ServerMetricEntityTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ServerMetricEntityTest.java
index 23ee50e53a2..8fee846e59d 100644
--- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ServerMetricEntityTest.java
+++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ServerMetricEntityTest.java
@@ -22,7 +22,7 @@
public class ServerMetricEntityTest {
@Test
public void testServerMetricEntitiesCount() {
- assertEquals(SERVER_METRIC_ENTITIES.size(), 182, "Expected 182 unique metric entities");
+ assertEquals(SERVER_METRIC_ENTITIES.size(), 185, "Expected 185 unique metric entities");
}
/**
diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/IngestionOtelMetricEntityTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/IngestionOtelMetricEntityTest.java
index 0fe0107bd56..59be77f4ddf 100644
--- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/IngestionOtelMetricEntityTest.java
+++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/IngestionOtelMetricEntityTest.java
@@ -425,6 +425,30 @@ private static Map expectedD
MetricUnit.NUMBER,
"Count of checksum verification failures",
storeClusterVersion));
+ map.put(
+ IngestionOtelMetricEntity.BATCH_PUSH_RECORD_COUNT_MATCH_COUNT,
+ new MetricEntityExpectation(
+ "ingestion.batch_push_record_count_match.count",
+ MetricType.COUNTER,
+ MetricUnit.NUMBER,
+ "Count of batch-push EOPs where the consumer-side record count matched the producer's count",
+ storeClusterVersion));
+ map.put(
+ IngestionOtelMetricEntity.BATCH_PUSH_RECORD_COUNT_MISMATCH_COUNT,
+ new MetricEntityExpectation(
+ "ingestion.batch_push_record_count_mismatch.count",
+ MetricType.COUNTER,
+ MetricUnit.NUMBER,
+ "Count of batch-push EOPs where the consumer-side record count did not match the producer's count",
+ storeClusterVersion));
+ map.put(
+ IngestionOtelMetricEntity.RECORD_COUNT_MISMATCH_FAILURE_COUNT,
+ new MetricEntityExpectation(
+ "ingestion.record_count_mismatch_failure.count",
+ MetricType.COUNTER,
+ MetricUnit.NUMBER,
+ "Count of strict-mode record-count mismatches that failed ingestion (servers only; DaVinci is excluded)",
+ storeClusterVersion));
map.put(
IngestionOtelMetricEntity.DCR_LOOKUP_CACHE_HIT_COUNT,
new MetricEntityExpectation(
diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/IngestionOtelStatsTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/IngestionOtelStatsTest.java
index 977dad3c098..f64ed2cd520 100644
--- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/IngestionOtelStatsTest.java
+++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/IngestionOtelStatsTest.java
@@ -6,6 +6,8 @@
import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.BATCH_PROCESSING_REQUEST_ERROR_COUNT;
import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.BATCH_PROCESSING_REQUEST_RECORD_COUNT;
import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.BATCH_PROCESSING_REQUEST_TIME;
+import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.BATCH_PUSH_RECORD_COUNT_MATCH_COUNT;
+import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.BATCH_PUSH_RECORD_COUNT_MISMATCH_COUNT;
import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.BYTES_CONSUMED_AS_UNCOMPRESSED_SIZE;
import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.CHECKSUM_VERIFICATION_FAILURE_COUNT;
import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.CONSUMER_ACTION_TIME;
@@ -36,6 +38,7 @@
import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.PRODUCER_SYNCHRONIZE_TIME;
import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.RECORD_ASSEMBLED_SIZE;
import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.RECORD_ASSEMBLED_SIZE_RATIO;
+import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.RECORD_COUNT_MISMATCH_FAILURE_COUNT;
import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.RECORD_KEY_SIZE;
import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.RECORD_VALUE_SIZE;
import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.RESUBSCRIPTION_FAILURE_COUNT;
@@ -794,7 +797,13 @@ public static Object[][] simpleCounterMetrics() {
{ (BiConsumer) (s, v) -> s.recordPartialUpdateCacheHitCount(CURRENT_VERSION, v), 1,
PARTIAL_UPDATE_CACHE_HIT_COUNT },
{ (BiConsumer) (s, v) -> s
- .recordChecksumVerificationFailureCount(CURRENT_VERSION, v), 1, CHECKSUM_VERIFICATION_FAILURE_COUNT }, };
+ .recordChecksumVerificationFailureCount(CURRENT_VERSION, v), 1, CHECKSUM_VERIFICATION_FAILURE_COUNT },
+ { (BiConsumer) (s, v) -> s.recordBatchPushRecordCountMatch(CURRENT_VERSION, v), 1,
+ BATCH_PUSH_RECORD_COUNT_MATCH_COUNT },
+ { (BiConsumer) (s, v) -> s.recordBatchPushRecordCountMismatch(CURRENT_VERSION, v),
+ 1, BATCH_PUSH_RECORD_COUNT_MISMATCH_COUNT },
+ { (BiConsumer) (s, v) -> s.recordRecordCountMismatchFailure(CURRENT_VERSION, v), 1,
+ RECORD_COUNT_MISMATCH_FAILURE_COUNT } };
}
@Test(dataProvider = "simpleCounterMetrics")
diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java
index 0460876a835..2228f2b2f2a 100644
--- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java
+++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java
@@ -2795,6 +2795,19 @@ private ConfigKeys() {
*/
public static final String SERVER_UNIQUE_INGESTED_KEY_COUNT_HLL_LOG2K = "server.unique.ingested.key.count.hll.log2k";
+ /**
+ * Server-side strict mode for batch-push record-count verification at EOP. When {@code true}
+ * (default), a deficit detected by {@code verifyBatchPushRecordCount} fails ingestion via
+ * {@link com.linkedin.venice.exceptions.VeniceException}; the informational
+ * {@code batch_push_record_count_mismatch} + {@code record_count_mismatch_failure} OTel
+ * metrics fire alongside the throw. When {@code false}, only the informational
+ * {@code batch_push_record_count_mismatch} metric fires — no throw, ingestion continues. The
+ * throw is unconditionally suppressed on DaVinci replicas regardless of this flag (DaVinci
+ * failure is aggregated separately via the push status store).
+ */
+ public static final String SERVER_BATCH_PUSH_RECORD_COUNT_VERIFICATION_FAIL_ON_MISMATCH_ENABLED =
+ "server.batch.push.record.count.verification.fail.on.mismatch.enabled";
+
/**
* Follower replicas and DavinciClient will only consider heartbeats received within
* this time window to mark themselves as completed. This is to avoid the cases that
diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java b/internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java
index 368bdbf0b66..9d629b4560b 100644
--- a/internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java
+++ b/internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java
@@ -393,6 +393,14 @@ public void setActiveKeyCount(long activeKeyCount) {
this.partitionState.activeKeyCount = activeKeyCount;
}
+ public long getBatchPushRecordCount() {
+ return this.partitionState.batchPushRecordCount;
+ }
+
+ public void setBatchPushRecordCount(long batchPushRecordCount) {
+ this.partitionState.batchPushRecordCount = batchPushRecordCount;
+ }
+
public Map getTrackingIncrementalPushStatus() {
return partitionState.trackingIncrementalPushStatus;
}
diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeaders.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeaders.java
index 8620e2eac26..fcd0a5001b3 100644
--- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeaders.java
+++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeaders.java
@@ -40,6 +40,14 @@ public class PubSubMessageHeaders implements Measurable, Iterable put(
PubSubProducerCallback callback,
int upstreamPartition,
LeaderMetadataWrapper leaderMetadataWrapper) {
+ return put(
+ kafkaKey,
+ kafkaMessageEnvelope,
+ callback,
+ upstreamPartition,
+ leaderMetadataWrapper,
+ EmptyPubSubMessageHeaders.SINGLETON);
+ }
+
+ /**
+ * Pass-through put that lets the caller forward selected upstream PubSub headers (e.g. the
+ * "prc" partition-record-count header on EOP) to the local VT. Used by the leader's
+ * cross-region re-emit path so remote-fabric followers can run record-count verification at EOP.
+ *
+ *
Callers should pass {@link EmptyPubSubMessageHeaders#SINGLETON} when no upstream header
+ * needs to be forwarded — the data path uses this to avoid per-message header allocation. Pass
+ * a fresh {@link PubSubMessageHeaders} containing only the headers that should propagate.
+ * A {@code null} argument is treated as empty.
+ */
+ public Future put(
+ KafkaKey kafkaKey,
+ KafkaMessageEnvelope kafkaMessageEnvelope,
+ PubSubProducerCallback callback,
+ int upstreamPartition,
+ LeaderMetadataWrapper leaderMetadataWrapper,
+ PubSubMessageHeaders pubSubMessageHeaders) {
// Self-adjust the chunking setting in pass-through mode
verifyChunkingSetting(kafkaMessageEnvelope);
@@ -1101,7 +1127,7 @@ public Future put(
upstreamPartition,
callback,
false,
- EmptyPubSubMessageHeaders.SINGLETON);
+ pubSubMessageHeaders != null ? pubSubMessageHeaders : EmptyPubSubMessageHeaders.SINGLETON);
}
/**
diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/offsets/TestOffsetRecord.java b/internal/venice-common/src/test/java/com/linkedin/venice/offsets/TestOffsetRecord.java
index 857e1b944db..63faa633fad 100644
--- a/internal/venice-common/src/test/java/com/linkedin/venice/offsets/TestOffsetRecord.java
+++ b/internal/venice-common/src/test/java/com/linkedin/venice/offsets/TestOffsetRecord.java
@@ -175,4 +175,27 @@ public void testDeserializePositionWithOffsetFallback(
assertEquals(result, PubSubSymbolicPosition.LATEST, name + ": expected LATEST");
}
}
+
+ // -- batchPushRecordCount serialization round-trip (Phase 1) ---------------------------------
+
+ @DataProvider(name = "batchPushRecordCountCases")
+ public static Object[][] batchPushRecordCountCases() {
+ // { description, valueToStampBeforeRoundTrip (null = leave at default), expectedAfterRoundTrip }
+ return new Object[][] { { "default value is 0 (no stamping)", null, 0L },
+ { "non-zero value survives serialize -> deserialize", 123_456L, 123_456L } };
+ }
+
+ @Test(dataProvider = "batchPushRecordCountCases")
+ public void testBatchPushRecordCountSerialization(String description, Long valueToStamp, long expected) {
+ OffsetRecord r1 = TestUtils
+ .getOffsetRecord(ApacheKafkaOffsetPosition.of(100), Optional.empty(), DEFAULT_PUBSUB_CONTEXT_FOR_UNIT_TESTING);
+ if (valueToStamp != null) {
+ r1.setBatchPushRecordCount(valueToStamp);
+ }
+ OffsetRecord r2 = new OffsetRecord(
+ r1.toBytes(),
+ AvroProtocolDefinition.PARTITION_STATE.getSerializer(),
+ DEFAULT_PUBSUB_CONTEXT_FOR_UNIT_TESTING);
+ assertEquals(r2.getBatchPushRecordCount(), expected, description);
+ }
}
diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java
index 35187772d69..535cc417e6d 100644
--- a/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java
+++ b/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java
@@ -51,6 +51,7 @@
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.pubsub.adapter.kafka.common.ApacheKafkaOffsetPosition;
import com.linkedin.venice.pubsub.api.DefaultPubSubMessage;
+import com.linkedin.venice.pubsub.api.EmptyPubSubMessageHeaders;
import com.linkedin.venice.pubsub.api.PubSubMessageHeader;
import com.linkedin.venice.pubsub.api.PubSubMessageHeaders;
import com.linkedin.venice.pubsub.api.PubSubPosition;
@@ -1694,4 +1695,99 @@ public void testDefaultLeaderMetadataWrapperHasSentinelUpstreamMessageTimestamp(
DEFAULT_LEADER_METADATA_WRAPPER.getUpstreamMessageTimestamp(),
LeaderMetadataWrapper.DEFAULT_UPSTREAM_MESSAGE_TIMESTAMP);
}
+
+ /** Symbolic input for the {@code passThroughHeaders} parameter in the dedupe'd test below. */
+ private enum PassThroughHeadersInput {
+ /** Caller passes a fresh PubSubMessageHeaders with prc=42 (the new 6-arg overload). */
+ PRC_HEADERS,
+ /** Caller passes {@link EmptyPubSubMessageHeaders#SINGLETON} (no headers forwarded). */
+ SINGLETON,
+ /** Caller passes {@code null} (treated as empty — must not NPE). */
+ NULL,
+ /** Caller uses the existing 5-arg overload (no headers parameter at all). */
+ FIVE_ARG_OVERLOAD
+ }
+
+ @DataProvider(name = "passThroughPutHeadersCases")
+ public static Object[][] passThroughPutHeadersCases() {
+ // { description, input, expectedPrcValue (-1L means "no prc expected in captured headers") }
+ return new Object[][] {
+ { "6-arg overload with prc header threads it through", PassThroughHeadersInput.PRC_HEADERS, 42L },
+ { "6-arg overload with SINGLETON forwards no headers", PassThroughHeadersInput.SINGLETON, -1L },
+ { "6-arg overload with null is treated as empty (no NPE)", PassThroughHeadersInput.NULL, -1L },
+ { "Existing 5-arg overload strips headers — no prc", PassThroughHeadersInput.FIVE_ARG_OVERLOAD, -1L } };
+ }
+
+ @Test(dataProvider = "passThroughPutHeadersCases")
+ public void testPassThroughPutHeaderHandling(
+ String description,
+ PassThroughHeadersInput input,
+ long expectedPrcValue) {
+ PubSubProducerAdapter mockedProducer = mock(PubSubProducerAdapter.class);
+ when(mockedProducer.sendMessage(any(), any(), any(), any(), any(), any()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ VeniceWriterOptions vwOpts = new VeniceWriterOptions.Builder("test_topic")
+ .setKeyPayloadSerializer(new VeniceAvroKafkaSerializer("\"string\""))
+ .setValuePayloadSerializer(new VeniceAvroKafkaSerializer("\"string\""))
+ .setPartitioner(new DefaultVenicePartitioner())
+ .setPartitionCount(1)
+ .build();
+ VeniceWriter