diff --git a/build.gradle b/build.gradle index 73710610972..9cf526949d3 100644 --- a/build.gradle +++ b/build.gradle @@ -329,17 +329,15 @@ subprojects { // when actually using the new protocol. Example to pin KME to v12 when introducing v13: // project(':internal:venice-common').file('src/main/resources/avro/KafkaMessageEnvelope/v12', PathValidation.DIRECTORY) def versionOverrides = [ - // PartitionState v23 introduces `batchPushRecordCount` for server-side batch-push record-count - // verification. Pinned to v22 until the consumer code (PartitionConsumptionState + - // StoreIngestionTask) lands in a follow-up PR. - project(':internal:venice-common').file('src/main/resources/avro/PartitionState/v22', PathValidation.DIRECTORY), - // StoreMetaValue v43 introduces `batchPushRecordCountVerificationEnabled`. Pinned to v42 - // until the per-store flag wiring (Store / ZKStore / ReadOnlyStore / SystemStore / StoreInfo / - // UpdateStoreQueryParams) lands in a follow-up PR. + // StoreMetaValue v43 added `batchPushRecordCountVerificationEnabled` for the originally- + // planned per-store flag, but this PR pivoted to a server-side server-config gate + // (see `server.batch.push.record.count.verification.fail.on.mismatch.enabled`) and the + // field is no longer read or written by anyone. Pin to v42 so the field doesn't enter + // the active protocol. project(':internal:venice-common').file('src/main/resources/avro/StoreMetaValue/v42', PathValidation.DIRECTORY), - // AdminOperation v98 carries `batchPushRecordCountVerificationEnabled` on the SetStore admin op - // for upcoming server-side batch-push record-count verification. Pinned to v97 until the - // controller-side wiring (VeniceParentHelixAdmin / AdminExecutionTask) lands in a follow-up PR. + // AdminOperation v98 carried `batchPushRecordCountVerificationEnabled` on the SetStore + // admin op for the same originally-planned per-store flag. Pin to v97 for the same + // reason as StoreMetaValue v43 above. project(':services:venice-controller').file('src/main/resources/avro/AdminOperation/v97', PathValidation.DIRECTORY) ] diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java index 2888a1e4769..4a714a07417 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java @@ -78,6 +78,7 @@ import static com.linkedin.venice.ConfigKeys.SERVER_ADAPTIVE_THROTTLER_SIGNAL_REFRESH_INTERVAL_IN_SECONDS; import static com.linkedin.venice.ConfigKeys.SERVER_ADAPTIVE_THROTTLER_SINGLE_GET_LATENCY_THRESHOLD; import static com.linkedin.venice.ConfigKeys.SERVER_ADD_RMD_TO_BATCH_PUSH_FOR_HYBRID_STORES; +import static com.linkedin.venice.ConfigKeys.SERVER_BATCH_PUSH_RECORD_COUNT_VERIFICATION_FAIL_ON_MISMATCH_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_BATCH_REPORT_END_OF_INCREMENTAL_PUSH_STATUS_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_BLOB_TRANSFER_ADAPTIVE_THROTTLER_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_BLOB_TRANSFER_ADAPTIVE_THROTTLER_UPDATE_PERCENTAGE; @@ -623,6 +624,7 @@ public class VeniceServerConfig extends VeniceClusterConfig { private final NearlineLatencyTimestampSource nearlineLatencyTimestampSource; private final boolean uniqueIngestedKeyCountHllEnabled; private final int uniqueIngestedKeyCountHllLog2K; + private final boolean batchPushRecordCountVerificationFailOnMismatchEnabled; private final long leaderCompleteStateCheckInFollowerValidIntervalMs; private final boolean requireLeaderCompleteForCatchUpVtRts; private final boolean stuckConsumerRepairEnabled; @@ -1084,6 +1086,8 @@ public VeniceServerConfig(VeniceProperties serverProperties, MapHLL at Venice's default precision (lgK=13) has a standard error of ~1.15%. The 3-sigma tail + * is ~3.45% — i.e. random HLL noise will deviate by more than 3.45% in only ~0.27% of cases (two- + * sided). We use 5% (slightly above 3-sigma) for a clean safety margin: random HLL noise alone + * cannot trigger a spurious mismatch, while real deviations larger than 5% of the producer's + * count — either side — are still flagged.

+ */ + static final double HLL_ERROR_TOLERANCE = 0.05; + /** Chunk fragment — not a logical key (skipped for key counting). */ protected static boolean isChunkFragment(int schemaId) { return schemaId == CHUNK_SCHEMA_ID; @@ -3570,6 +3583,7 @@ private void syncOffset(PartitionConsumptionState pcs) { // Update the push job info offsetRecord.setTrackingIncrementalPushStatus(pcs.getTrackingIncrementalPushStatus()); offsetRecord.setActiveKeyCount(pcs.getActiveKeyCount()); + offsetRecord.setBatchPushRecordCount(pcs.getBatchPushRecordCount()); // Serialize HLL sketch for unique key count persistence if (uniqueIngestedKeyCountHllEnabled && pcs.hasUniqueIngestedKeyCountHll()) { byte[] hllBytes = pcs.serializeUniqueIngestedKeyCountHll(); @@ -3918,7 +3932,8 @@ protected void processEndOfPush( KafkaMessageEnvelope endOfPushKME, PubSubPosition offset, PartitionConsumptionState partitionConsumptionState, - EndOfPush endOfPush) { + EndOfPush endOfPush, + PubSubMessageHeaders headers) { // Do not process duplication EOP messages. if (partitionConsumptionState.getOffsetRecord().isEndOfPushReceived()) { @@ -3982,6 +3997,120 @@ protected void processEndOfPush( partitionConsumptionState.setDataRecoveryCompleted(true); ingestionNotificationDispatcher.reportDataRecoveryCompleted(partitionConsumptionState); } + + verifyBatchPushRecordCount(partitionConsumptionState, headers); + } + + /** + * Compare the partition's locally-counted batchPushRecordCount AND its HLL-estimated unique key + * count against the producer's count carried on the EOP message's "prc" PubSub header. + * + *

The check has two legs and BOTH must pass for the push to be considered match:

+ *
    + *
  1. {@code counter >= expected} — exact PUT/DELETE count is at least the producer's count. + * Asymmetric (over-count tolerated) because raw counts legitimately inflate from at-least- + * once delivery, spec-exec dups, and cross-colo re-replication.
  2. + *
  3. {@code |hll_estimate - expected| <= (expected * HLL_ERROR_TOLERANCE)} — the HLL-estimated + * unique-key count is within ±HLL_ERROR_TOLERANCE of the producer's count. Symmetric: an HLL estimate + * materially above the producer count is also flagged, since structurally HLL counts unique + * keys and unique keys ≤ raw ops; a large over-estimate signals a bug (wrong data, count + * mis-stamping, etc.) rather than benign duplicate inflation. Skipped if HLL tracking is + * disabled.
  4. + *
+ * + *

If either leg fails: increments {@code batch_push_record_count_mismatch} (informational — + * fires regardless of strict-mode state) and logs a tagged error string. On a non-DaVinci + * replica, if the server-level config + * {@code server.batch.push.record.count.verification.fail.on.mismatch.enabled} is {@code true} + * (default), also increments {@code record_count_mismatch_failure} and throws + * {@link VeniceException} (failing ingestion). DaVinci replicas skip both the failure sensor + * and the throw

+ * + *

Skip cases (no-op, no metric):

+ * + */ + void verifyBatchPushRecordCount(PartitionConsumptionState pcs, PubSubMessageHeaders headers) { + if (headers == null) { + return; + } + // View-topic ingestion paths re-emit records via separate writers; their counts won't match + // the base store's prc. Defensive — currently a StoreIngestionTask should always be on a VT. + if (versionTopic != null && versionTopic.isViewTopic()) { + return; + } + PubSubMessageHeader prcHeader = headers.get(PubSubMessageHeaders.VENICE_PARTITION_RECORD_COUNT_HEADER); + if (prcHeader == null || prcHeader.value() == null || prcHeader.value().length != Long.BYTES) { + return; + } + long expectedCount = ByteBuffer.wrap(prcHeader.value()).getLong(); + if (expectedCount == PubSubMessageHeaders.PRC_HEADER_UNAVAILABLE_SENTINEL) { + return; + } + /* + * Only verify while the push is in progress (future-version state). Once the version has been + * promoted to current or demoted to backup, the count has already been judged in this server's + * prior lifecycle and shouldn't be re-judged on any re-emit / re-ingestion path. + */ + if (versionRole != VersionRole.FUTURE) { + return; + } + + long actualCount = pcs.getBatchPushRecordCount(); + boolean counterOk = actualCount >= expectedCount; + + /* + * Second leg: HLL-based unique-key check with a symmetric ±5% tolerance. Random HLL noise at + * default precision (lgK=13) lives well inside this band; a deviation larger than 5% in either + * direction is a real signal — under-count is data loss, over-count means HLL saw materially + * more unique keys than the producer claims to have written (structurally impossible without + * a bug). If HLL tracking is disabled on this server (rare), fall back to the counter alone. + */ + boolean hllOk; + long hllEstimate; + long hllThreshold = (long) Math.ceil(expectedCount * HLL_ERROR_TOLERANCE); + if (uniqueIngestedKeyCountHllEnabled) { + hllEstimate = pcs.getEstimatedUniqueIngestedKeyCount(); + hllOk = Math.abs(hllEstimate - expectedCount) <= hllThreshold; + } else { + hllEstimate = -1; // not tracked + hllOk = true; + } + + if (!counterOk || !hllOk) { + String taggedMsg = "RECORD_COUNT_DEFICIT:counterOk=" + counterOk + ":hllOk=" + hllOk + ":expected=" + + expectedCount + ":actual=" + actualCount + ":hll=" + hllEstimate + ":hllThreshold=" + hllThreshold + + ":replica=" + pcs.getReplicaId() + ":topic=" + kafkaVersionTopic; + LOGGER.error(taggedMsg); + versionedIngestionStats.recordBatchPushRecordCountMismatch(storeName, versionNumber); + // Server-side strict-mode is controlled by the cluster-wide config + // `server.batch.push.record.count.verification.fail.on.mismatch.enabled` (default: true). + // DaVinci replicas unconditionally skip the throw — DVC failure aggregation is handled + // separately via the push status store, and throwing here would convert a single noisy + // subscriber into a hard local replica ERROR. + if (serverConfig.isBatchPushRecordCountVerificationFailOnMismatchEnabled() && !isDaVinciClient) { + versionedIngestionStats.recordRecordCountMismatchFailure(storeName, versionNumber); + throw new VeniceException(taggedMsg); + } + } else { + versionedIngestionStats.recordBatchPushRecordCountMatch(storeName, versionNumber); + LOGGER.debug( + "Record count verification passed for replica: {}. Expected: {}, Actual: {}, HLL: {}", + pcs.getReplicaId(), + expectedCount, + actualCount, + hllEstimate); + } } protected void processStartOfIncrementalPush( @@ -4056,7 +4185,8 @@ private void processControlMessage( int partition, PubSubPosition offset, long pubSubMessageTime, - PartitionConsumptionState partitionConsumptionState) { + PartitionConsumptionState partitionConsumptionState, + PubSubMessageHeaders headers) { /** * If leader consumes control messages from topics other than version topic, it should produce * them to version topic; however, START_OF_SEGMENT and END_OF_SEGMENT should not be forwarded @@ -4081,7 +4211,7 @@ private void processControlMessage( break; case END_OF_PUSH: EndOfPush endOfPush = (EndOfPush) controlMessage.controlMessageUnion; - processEndOfPush(kafkaMessageEnvelope, offset, partitionConsumptionState, endOfPush); + processEndOfPush(kafkaMessageEnvelope, offset, partitionConsumptionState, endOfPush, headers); if (recordTransformer != null) { recordTransformer.onControlMessage(partition, offset, controlMessage, pubSubMessageTime); } @@ -4211,7 +4341,8 @@ private int internalProcessConsumerRecord( consumerRecord.getTopicPartition().getPartitionNumber(), consumerRecord.getPosition(), consumerRecord.getPubSubMessageTime(), - partitionConsumptionState); + partitionConsumptionState, + consumerRecord.getPubSubMessageHeaders()); try { if (controlMessage.controlMessageType == START_OF_SEGMENT.getValue()) { if (Arrays.equals(consumerRecord.getKey().getKey(), KafkaKey.HEART_BEAT.getKey())) { @@ -4860,6 +4991,37 @@ private void invalidateActiveKeyCountAndLog( } } + /** + * Increment {@link PartitionConsumptionState#batchPushRecordCount} for one consumed user-data + * record during batch ingestion. Counted on every replica so the count is comparable, at EOP, + * against the "prc" PubSub header VPJ stamps onto the EOP message. + * + * Filters: + * - Only PUT and DELETE message types — control messages are excluded. + * - Skip after EOP — RT replay records do not contribute to the batch count. + * - Skip chunk fragments (CHUNK_SCHEMA_ID); chunk manifests count as one logical record. + * - Skip Global RT DIV PUTs (key-level filter; produced metadata for the DIV, not user data). + */ + private void trackBatchPushRecordCount( + DefaultPubSubMessage consumerRecord, + PartitionConsumptionState partitionConsumptionState, + MessageType messageType, + int writerSchemaId) { + if (partitionConsumptionState.isEndOfPushReceived()) { + return; + } + if (consumerRecord.getKey().isGlobalRtDiv()) { + return; + } + if (messageType == MessageType.PUT && isChunkFragment(writerSchemaId)) { + return; + } + if (messageType != MessageType.PUT && messageType != MessageType.DELETE) { + return; + } + partitionConsumptionState.incrementBatchPushRecordCount(); + } + /** Records active-key-count invalidation on both the OTel (per-version) and Tehuti (host-level) paths. */ protected final void recordActiveKeyCountInvalidation() { versionedIngestionStats.recordActiveKeyCountInvalidation(storeName, versionNumber); @@ -5152,6 +5314,8 @@ private int processKafkaDataMessage( messageType, writerSchemaId); + trackBatchPushRecordCount(consumerRecord, partitionConsumptionState, messageType, writerSchemaId); + // Track key in HLL for unique key count estimation. // Only count user data operations (PUT/DELETE), skip chunk fragments, internal metadata, etc. if (uniqueIngestedKeyCountHllEnabled && keyLen > 0 && !isChunkFragment(writerSchemaId) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedIngestionStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedIngestionStats.java index 50a4cb12ee3..d299ef58233 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedIngestionStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggVersionedIngestionStats.java @@ -538,6 +538,18 @@ public void recordChecksumVerificationFailureCount(String storeName, int version getIngestionOtelStats(storeName).recordChecksumVerificationFailureCount(version, 1); } + public void recordBatchPushRecordCountMatch(String storeName, int version) { + getIngestionOtelStats(storeName).recordBatchPushRecordCountMatch(version, 1); + } + + public void recordBatchPushRecordCountMismatch(String storeName, int version) { + getIngestionOtelStats(storeName).recordBatchPushRecordCountMismatch(version, 1); + } + + public void recordRecordCountMismatchFailure(String storeName, int version) { + getIngestionOtelStats(storeName).recordRecordCountMismatchFailure(version, 1); + } + // Count methods with 2nd enum dimension public void recordIngestionFailureCount(String storeName, int version, VeniceIngestionFailureReason reason) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/IngestionOtelMetricEntity.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/IngestionOtelMetricEntity.java index 685318fb38a..b37aa352126 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/IngestionOtelMetricEntity.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/IngestionOtelMetricEntity.java @@ -290,6 +290,41 @@ public enum IngestionOtelMetricEntity implements ModuleMetricEntityInterface { "Count of checksum verification failures", setOf(VENICE_STORE_NAME, VENICE_CLUSTER_NAME, VENICE_VERSION_ROLE) ), + /** + * Server-side batch-push record-count verification: the consumer-counted PUT/DELETE total at + * EOP matched the producer's "prc" count (and the HLL leg also passed, if HLL tracking is on). + * Informational — fires whether or not the per-server fail-on-mismatch flag is enabled. + */ + BATCH_PUSH_RECORD_COUNT_MATCH_COUNT( + "ingestion.batch_push_record_count_match.count", MetricType.COUNTER, MetricUnit.NUMBER, + "Count of batch-push EOPs where the consumer-side record count matched the producer's count", + setOf(VENICE_STORE_NAME, VENICE_CLUSTER_NAME, VENICE_VERSION_ROLE) + ), + + /** + * Server-side batch-push record-count verification: either leg (counter or HLL) failed at EOP. + * Informational — fires whether or not the per-server fail-on-mismatch flag is enabled. Use + * this for visibility / dashboards; use {@link #RECORD_COUNT_MISMATCH_FAILURE_COUNT} to alert + * on strict-mode failed-ingestion events. + */ + BATCH_PUSH_RECORD_COUNT_MISMATCH_COUNT( + "ingestion.batch_push_record_count_mismatch.count", MetricType.COUNTER, MetricUnit.NUMBER, + "Count of batch-push EOPs where the consumer-side record count did not match the producer's count", + setOf(VENICE_STORE_NAME, VENICE_CLUSTER_NAME, VENICE_VERSION_ROLE) + ), + + /** + * Strict-mode record-count mismatch — fires whenever a mismatch is detected AND the server's + * fail-on-mismatch flag is enabled. On Venice servers this is paired with a thrown + * VeniceException (ingestion fails). On DaVinci replicas the throw is suppressed (DVC failure + * is aggregated separately via the push status store) and this counter does not fire. + */ + RECORD_COUNT_MISMATCH_FAILURE_COUNT( + "ingestion.record_count_mismatch_failure.count", MetricType.COUNTER, MetricUnit.NUMBER, + "Count of strict-mode record-count mismatches that failed ingestion (servers only; DaVinci is excluded)", + setOf(VENICE_STORE_NAME, VENICE_CLUSTER_NAME, VENICE_VERSION_ROLE) + ), + DCR_LOOKUP_CACHE_HIT_COUNT( "ingestion.dcr.lookup.cache.hit_count", MetricType.COUNTER, MetricUnit.NUMBER, "Count of cache hits when looking up existing value bytes or replication metadata before conflict resolution", diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/IngestionOtelStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/IngestionOtelStats.java index 48bff951f16..8796c2fd7fa 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/IngestionOtelStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/IngestionOtelStats.java @@ -6,6 +6,8 @@ import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.BATCH_PROCESSING_REQUEST_ERROR_COUNT; import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.BATCH_PROCESSING_REQUEST_RECORD_COUNT; import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.BATCH_PROCESSING_REQUEST_TIME; +import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.BATCH_PUSH_RECORD_COUNT_MATCH_COUNT; +import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.BATCH_PUSH_RECORD_COUNT_MISMATCH_COUNT; import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.BYTES_CONSUMED_AS_UNCOMPRESSED_SIZE; import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.CHECKSUM_VERIFICATION_FAILURE_COUNT; import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.CONSUMER_ACTION_TIME; @@ -42,6 +44,7 @@ import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.PRODUCER_SYNCHRONIZE_TIME; import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.RECORD_ASSEMBLED_SIZE; import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.RECORD_ASSEMBLED_SIZE_RATIO; +import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.RECORD_COUNT_MISMATCH_FAILURE_COUNT; import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.RECORD_KEY_SIZE; import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.RECORD_VALUE_SIZE; import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.RESUBSCRIPTION_FAILURE_COUNT; @@ -183,6 +186,9 @@ public class IngestionOtelStats { private final MetricEntityStateOneEnum checksumVerificationFailureCountMetric; private final MetricEntityStateOneEnum partialUpdateAmplificationAlertCountMetric; private final MetricEntityStateOneEnum activeKeyCountInvalidationMetric; + private final MetricEntityStateOneEnum batchPushRecordCountMatchMetric; + private final MetricEntityStateOneEnum batchPushRecordCountMismatchMetric; + private final MetricEntityStateOneEnum recordCountMismatchFailureMetric; // Counter metrics with 2nd enum dimension private final MetricEntityStateTwoEnums ingestionFailureCountMetric; @@ -257,6 +263,9 @@ public class IngestionOtelStats { this.checksumVerificationFailureCountMetric = null; this.partialUpdateAmplificationAlertCountMetric = null; this.activeKeyCountInvalidationMetric = null; + this.batchPushRecordCountMatchMetric = null; + this.batchPushRecordCountMismatchMetric = null; + this.recordCountMismatchFailureMetric = null; this.ingestionFailureCountMetric = null; this.dcrLookupCacheHitCountMetric = null; this.bytesConsumedAsUncompressedSizeMetric = null; @@ -386,6 +395,9 @@ public IngestionOtelStats( createOneEnumMetric(PARTIAL_UPDATE_AMPLIFICATION_ALERT_COUNT.getMetricEntity()); activeKeyCountInvalidationMetric = activeKeyCountEnabled ? createOneEnumMetric(ACTIVE_KEY_COUNT_INVALIDATION.getMetricEntity()) : null; + batchPushRecordCountMatchMetric = createOneEnumMetric(BATCH_PUSH_RECORD_COUNT_MATCH_COUNT.getMetricEntity()); + batchPushRecordCountMismatchMetric = createOneEnumMetric(BATCH_PUSH_RECORD_COUNT_MISMATCH_COUNT.getMetricEntity()); + recordCountMismatchFailureMetric = createOneEnumMetric(RECORD_COUNT_MISMATCH_FAILURE_COUNT.getMetricEntity()); // Initialize HostLevelIngestionStats OTel metrics - counters with 2nd enum dimension ingestionFailureCountMetric = @@ -770,6 +782,18 @@ public void recordChecksumVerificationFailureCount(int version, long value) { checksumVerificationFailureCountMetric.record(value, classifyVersion(version, versionInfo)); } + public void recordBatchPushRecordCountMatch(int version, long value) { + batchPushRecordCountMatchMetric.record(value, classifyVersion(version, versionInfo)); + } + + public void recordBatchPushRecordCountMismatch(int version, long value) { + batchPushRecordCountMismatchMetric.record(value, classifyVersion(version, versionInfo)); + } + + public void recordRecordCountMismatchFailure(int version, long value) { + recordCountMismatchFailureMetric.record(value, classifyVersion(version, versionInfo)); + } + // Count methods with 2nd enum dimension public void recordIngestionFailureCount(int version, VeniceIngestionFailureReason reason, long value) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/NoOpIngestionOtelStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/NoOpIngestionOtelStats.java index 112fa6f7da2..16dead1d870 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/NoOpIngestionOtelStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/NoOpIngestionOtelStats.java @@ -217,6 +217,18 @@ public void recordPartialUpdateCacheHitCount(int version, long value) { public void recordChecksumVerificationFailureCount(int version, long value) { } + @Override + public void recordBatchPushRecordCountMatch(int version, long value) { + } + + @Override + public void recordBatchPushRecordCountMismatch(int version, long value) { + } + + @Override + public void recordRecordCountMismatchFailure(int version, long value) { + } + @Override public void recordIngestionFailureCount(int version, VeniceIngestionFailureReason reason, long value) { } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveKeyCountTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveKeyCountTest.java index dea1af7bfdf..fe42fd0ab96 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveKeyCountTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveKeyCountTest.java @@ -262,7 +262,7 @@ private PubSubMessageHeaders createSignalHeaders(byte signalValue) { private StoreIngestionTask setupProcessEndOfPush(boolean activeKeyCountEnabled) throws Exception { StoreIngestionTask sitMock = mock(StoreIngestionTask.class); - doCallRealMethod().when(sitMock).processEndOfPush(any(), any(), any(), any()); + doCallRealMethod().when(sitMock).processEndOfPush(any(), any(), any(), any(), any()); VeniceServerConfig mockServerConfig = mock(VeniceServerConfig.class); doReturn(activeKeyCountEnabled).when(mockServerConfig).isActiveKeyCountForAllBatchPushEnabled(); doReturn(activeKeyCountEnabled).when(mockServerConfig).isActiveKeyCountForHybridStoreEnabled(); @@ -940,7 +940,7 @@ public void testProcessEndOfPush() throws Exception { KafkaMessageEnvelope kme = new KafkaMessageEnvelope(); kme.producerMetadata = new ProducerMetadata(); kme.producerMetadata.messageTimestamp = System.currentTimeMillis(); - sitMock.processEndOfPush(kme, mock(PubSubPosition.class), mockPcs, new EndOfPush()); + sitMock.processEndOfPush(kme, mock(PubSubPosition.class), mockPcs, new EndOfPush(), null); if (enabled) { verify(mockPcs).cleanupBatchKeyCountState(); } else { diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java index 09ad77ec091..f6ea4a9b702 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTaskTest.java @@ -27,6 +27,9 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNotSame; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertSame; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -92,6 +95,9 @@ import com.linkedin.venice.pubsub.PubSubTopicRepository; import com.linkedin.venice.pubsub.adapter.kafka.common.ApacheKafkaOffsetPosition; import com.linkedin.venice.pubsub.api.DefaultPubSubMessage; +import com.linkedin.venice.pubsub.api.EmptyPubSubMessageHeaders; +import com.linkedin.venice.pubsub.api.PubSubMessageHeader; +import com.linkedin.venice.pubsub.api.PubSubMessageHeaders; import com.linkedin.venice.pubsub.api.PubSubPosition; import com.linkedin.venice.pubsub.api.PubSubProduceResult; import com.linkedin.venice.pubsub.api.PubSubProducerCallback; @@ -3867,4 +3873,60 @@ public void testResolveFollowerSourceRegion( expected, description); } + + @DataProvider(name = "extractPrcHeaderToForwardCases") + public static Object[][] extractPrcHeaderToForwardCases() { + PubSubMessageHeaders prcOnly = new PubSubMessageHeaders().add( + PubSubMessageHeaders.VENICE_PARTITION_RECORD_COUNT_HEADER, + ByteBuffer.allocate(Long.BYTES).putLong(1234L).array()); + PubSubMessageHeaders prcWithOthers = new PubSubMessageHeaders() + .add( + PubSubMessageHeaders.VENICE_PARTITION_RECORD_COUNT_HEADER, + ByteBuffer.allocate(Long.BYTES).putLong(7L).array()) + .add(PubSubMessageHeaders.VENICE_TRANSPORT_PROTOCOL_HEADER, "vtp-bytes".getBytes()) + .add(PubSubMessageHeaders.VENICE_LEADER_COMPLETION_STATE_HEADER, new byte[] { (byte) 1 }); + PubSubMessageHeaders othersOnly = + new PubSubMessageHeaders().add(PubSubMessageHeaders.VENICE_TRANSPORT_PROTOCOL_HEADER, "vtp-bytes".getBytes()) + .add(PubSubMessageHeaders.VENICE_LEADER_COMPLETION_STATE_HEADER, new byte[] { (byte) 1 }); + // { description, inputHeaders, expectedPrcValueOrNull (null = expect SINGLETON returned) } + return new Object[][] { { "null headers returns SINGLETON without NPE", null, null }, + { "empty headers (no prc) returns SINGLETON", new PubSubMessageHeaders(), null }, + { "vtp/lcs present but no prc returns SINGLETON (no allocation)", othersOnly, null }, + { "prc alone returns fresh headers with prc value", prcOnly, 1234L }, + { "prc among other headers — only prc is forwarded; vtp/lcs are stripped", prcWithOthers, 7L } }; + } + + /** + * The leader's helper must forward only the "prc" partition-record-count header when present so + * remote-fabric followers can run record-count verification at EOP. Non-prc headers + * (vtp/lcs/etc.) are leader-regenerated downstream and must NOT be carried over from upstream. + * When no prc is present, the helper returns {@link EmptyPubSubMessageHeaders#SINGLETON} to + * avoid hot-path allocation. + */ + @Test(dataProvider = "extractPrcHeaderToForwardCases") + public void testExtractPrcHeaderToForward( + String description, + PubSubMessageHeaders inputHeaders, + Long expectedPrcValue) { + DefaultPubSubMessage consumerRecord = mock(DefaultPubSubMessage.class); + when(consumerRecord.getPubSubMessageHeaders()).thenReturn(inputHeaders); + + PubSubMessageHeaders forwarded = LeaderFollowerStoreIngestionTask.extractPrcHeaderToForward(consumerRecord); + + if (expectedPrcValue == null) { + assertSame(forwarded, EmptyPubSubMessageHeaders.SINGLETON, description); + } else { + assertNotSame(forwarded, EmptyPubSubMessageHeaders.SINGLETON, description); + PubSubMessageHeader prc = forwarded.get(PubSubMessageHeaders.VENICE_PARTITION_RECORD_COUNT_HEADER); + assertNotNull(prc, "prc header must be in the forwarded set for case: " + description); + assertEquals(ByteBuffer.wrap(prc.value()).getLong(), expectedPrcValue.longValue(), description); + // Non-prc headers are leader-regenerated and must NOT propagate. + assertNull( + forwarded.get(PubSubMessageHeaders.VENICE_TRANSPORT_PROTOCOL_HEADER), + "vtp must not propagate: " + description); + assertNull( + forwarded.get(PubSubMessageHeaders.VENICE_LEADER_COMPLETION_STATE_HEADER), + "lcs must not propagate: " + description); + } + } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java index 45d45fcf1bb..1c197040338 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java @@ -35,6 +35,7 @@ import org.apache.avro.generic.GenericRecord; import org.testng.Assert; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -607,4 +608,30 @@ public void testGetOrCreateCachedHeartbeatKeyResolvesRegularUnchunked() { assertEquals(k.getChunkingStatus(), VeniceChunkingStatus.UNCHUNKED); assertEquals(k.getLocality(), VeniceRegionLocality.LOCAL); } + + @DataProvider(name = "batchPushRecordCountCases") + public static Object[][] batchPushRecordCountCases() { + // { description, priorCountInOffsetRecord, incrementsAfterConstruction, expectedFinalCount } + return new Object[][] { { "default count is 0", 0L, 0, 0L }, { "increments are monotonic", 0L, 10, 10L }, + { "count restored from OffsetRecord on construction (restart safety)", 57L, 0, 57L }, + { "restored count continues to accumulate further increments", 57L, 5, 62L } }; + } + + @Test(dataProvider = "batchPushRecordCountCases") + public void testBatchPushRecordCountLifecycle( + String description, + long priorCountInOffsetRecord, + int incrementsAfterConstruction, + long expectedFinalCount) { + OffsetRecord offsetRecord = new OffsetRecord(AvroProtocolDefinition.PARTITION_STATE.getSerializer(), pubSubContext); + if (priorCountInOffsetRecord != 0L) { + offsetRecord.setBatchPushRecordCount(priorCountInOffsetRecord); + } + PartitionConsumptionState pcs = + new PartitionConsumptionState(TOPIC_PARTITION, offsetRecord, pubSubContext, false, false, false, null); + for (int i = 0; i < incrementsAfterConstruction; i++) { + pcs.incrementBatchPushRecordCount(); + } + assertEquals(pcs.getBatchPushRecordCount(), expectedFinalCount, description); + } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskRecordCountTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskRecordCountTest.java new file mode 100644 index 00000000000..d2a145220cc --- /dev/null +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskRecordCountTest.java @@ -0,0 +1,445 @@ +package com.linkedin.davinci.kafka.consumer; + +import static com.linkedin.davinci.kafka.consumer.ActiveKeyCountTestUtils.setField; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.expectThrows; + +import com.linkedin.davinci.config.VeniceServerConfig; +import com.linkedin.davinci.stats.AggVersionedIngestionStats; +import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.pubsub.api.EmptyPubSubMessageHeaders; +import com.linkedin.venice.pubsub.api.PubSubMessageHeaders; +import com.linkedin.venice.pubsub.api.PubSubTopic; +import com.linkedin.venice.server.VersionRole; +import java.nio.ByteBuffer; +import org.testng.annotations.Test; + + +public class StoreIngestionTaskRecordCountTest { + private static final String TEST_TOPIC = "test_store_v1"; + private static final String TEST_STORE = "test_store"; + private static final int TEST_VERSION = 1; + + private static StoreIngestionTask buildSit(boolean failOnMismatchEnabled, AggVersionedIngestionStats statsMock) + throws Exception { + return buildSit(failOnMismatchEnabled, statsMock, VersionRole.FUTURE, false, false); + } + + /** + * @param failOnMismatchEnabled server-level config + * {@code server.batch.push.record.count.verification.fail.on.mismatch.enabled} (default + * {@code true} in production). + * @param versionRole role of the SIT's version: only {@link VersionRole#FUTURE} runs the + * verification — current/backup skip the entire check (re-emit-after-promotion safety). + * @param hllEnabled toggles {@code uniqueIngestedKeyCountHllEnabled} on the SIT. When false the + * HLL leg of the dual check is bypassed (matches the existing-test path). + * @param isDaVinciClient toggles the DaVinci skip-throw branch on the failure path. + */ + private static StoreIngestionTask buildSit( + boolean failOnMismatchEnabled, + AggVersionedIngestionStats statsMock, + VersionRole versionRole, + boolean hllEnabled, + boolean isDaVinciClient) throws Exception { + StoreIngestionTask sit = mock(StoreIngestionTask.class); + setField(sit, "versionedIngestionStats", statsMock); + setField(sit, "kafkaVersionTopic", TEST_TOPIC); + setField(sit, "storeName", TEST_STORE); + setField(sit, "versionNumber", TEST_VERSION); + setField(sit, "uniqueIngestedKeyCountHllEnabled", hllEnabled); + setField(sit, "isDaVinciClient", isDaVinciClient); + setField(sit, "versionRole", versionRole); + + VeniceServerConfig serverConfigMock = mock(VeniceServerConfig.class); + doReturn(failOnMismatchEnabled).when(serverConfigMock).isBatchPushRecordCountVerificationFailOnMismatchEnabled(); + setField(sit, "serverConfig", serverConfigMock); + + PubSubTopic vt = mock(PubSubTopic.class); + doReturn(false).when(vt).isViewTopic(); + setField(sit, "versionTopic", vt); + + doCallRealMethod().when(sit).verifyBatchPushRecordCount(any(), any()); + return sit; + } + + private static StoreIngestionTask buildSitOnViewTopic(AggVersionedIngestionStats statsMock) throws Exception { + StoreIngestionTask sit = buildSit(true, statsMock); + PubSubTopic vt = mock(PubSubTopic.class); + doReturn(true).when(vt).isViewTopic(); + setField(sit, "versionTopic", vt); + return sit; + } + + private static PartitionConsumptionState pcsWithCount(long count) { + PartitionConsumptionState pcs = mock(PartitionConsumptionState.class); + doReturn(count).when(pcs).getBatchPushRecordCount(); + doReturn("test_replica").when(pcs).getReplicaId(); + return pcs; + } + + private static PartitionConsumptionState pcsWithCountAndHll(long count, long hllEstimate) { + PartitionConsumptionState pcs = pcsWithCount(count); + doReturn(hllEstimate).when(pcs).getEstimatedUniqueIngestedKeyCount(); + return pcs; + } + + private static PubSubMessageHeaders headersWithPrc(long expectedCount) { + return new PubSubMessageHeaders().add( + PubSubMessageHeaders.VENICE_PARTITION_RECORD_COUNT_HEADER, + ByteBuffer.allocate(Long.BYTES).putLong(expectedCount).array()); + } + + @Test + public void testVerifySkipsOnNullHeaders() throws Exception { + AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class); + StoreIngestionTask sit = buildSit(true, stats); + sit.verifyBatchPushRecordCount(pcsWithCount(100L), null); + verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION); + verify(stats, never()).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION); + } + + @Test + public void testVerifySkipsOnMissingPrcHeader() throws Exception { + AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class); + StoreIngestionTask sit = buildSit(true, stats); + sit.verifyBatchPushRecordCount(pcsWithCount(100L), new PubSubMessageHeaders()); + verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION); + verify(stats, never()).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION); + } + + @Test + public void testVerifySkipsOnMalformedPrcHeader() throws Exception { + AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class); + StoreIngestionTask sit = buildSit(true, stats); + PubSubMessageHeaders headers = new PubSubMessageHeaders() + .add(PubSubMessageHeaders.VENICE_PARTITION_RECORD_COUNT_HEADER, new byte[] { 1, 2, 3 }); // not 8 bytes + sit.verifyBatchPushRecordCount(pcsWithCount(100L), headers); + verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION); + verify(stats, never()).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION); + } + + @Test + public void testVerifySkipsOnSentinelExpectedCount() throws Exception { + AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class); + StoreIngestionTask sit = buildSit(true, stats); + sit.verifyBatchPushRecordCount( + pcsWithCount(100L), + headersWithPrc(PubSubMessageHeaders.PRC_HEADER_UNAVAILABLE_SENTINEL)); + verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION); + verify(stats, never()).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION); + } + + @Test + public void testVerifySkipsOnViewTopic() throws Exception { + AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class); + StoreIngestionTask sit = buildSitOnViewTopic(stats); + sit.verifyBatchPushRecordCount(pcsWithCount(50L), headersWithPrc(100L)); // would otherwise fail + verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION); + verify(stats, never()).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION); + } + + @Test + public void testVerifySkipsOnEmptyPubSubMessageHeadersSingleton() throws Exception { + AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class); + StoreIngestionTask sit = buildSit(true, stats); + sit.verifyBatchPushRecordCount(pcsWithCount(100L), EmptyPubSubMessageHeaders.SINGLETON); + verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION); + verify(stats, never()).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION); + } + + /** + * Verification only runs while the push is in progress. Once the version is current (or backup), + * any re-emit of EOP should not re-fire the check. No metrics, no throw, even on a clear deficit. + */ + @Test + public void testVerifySkipsWhenNotFutureVersion() throws Exception { + AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class); + StoreIngestionTask sit = buildSit( + /* failOnMismatchEnabled */ true, + stats, + VersionRole.CURRENT, // not FUTURE — verification should skip + /* hllEnabled */ false, + /* isDaVinciClient */ false); + sit.verifyBatchPushRecordCount(pcsWithCount(50L), headersWithPrc(100L)); // would otherwise fail + verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION); + verify(stats, never()).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION); + verify(stats, never()).recordRecordCountMismatchFailure(TEST_STORE, TEST_VERSION); + } + + @Test + public void testVerifyEmitsMatchSensorOnExactCount() throws Exception { + AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class); + StoreIngestionTask sit = buildSit(true, stats); + sit.verifyBatchPushRecordCount(pcsWithCount(100L), headersWithPrc(100L)); + verify(stats, times(1)).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION); + verify(stats, never()).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION); + } + + @Test + public void testVerifyEmitsMatchSensorOnSurplus() throws Exception { + AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class); + StoreIngestionTask sit = buildSit(true, stats); + sit.verifyBatchPushRecordCount(pcsWithCount(105L), headersWithPrc(100L)); + verify(stats, times(1)).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION); + verify(stats, never()).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION); + } + + @Test + public void testVerifyEmitsMatchSensorOnZeroExpectedAndActual() throws Exception { + AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class); + StoreIngestionTask sit = buildSit(true, stats); + sit.verifyBatchPushRecordCount(pcsWithCount(0L), headersWithPrc(0L)); + verify(stats, times(1)).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION); + } + + /** With server-strict-mode disabled, mismatch records the metric and logs but does NOT throw. */ + @Test + public void testVerifyEmitsMismatchSensorOnDeficitWhenStrictModeDisabled() throws Exception { + AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class); + StoreIngestionTask sit = buildSit(/* failOnMismatchEnabled */ false, stats); + sit.verifyBatchPushRecordCount(pcsWithCount(50L), headersWithPrc(100L)); + verify(stats, times(1)).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION); + verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION); + } + + /** With server-strict-mode enabled (default), mismatch records the metric AND throws. */ + @Test + public void testVerifyThrowsOnDeficitWhenStrictModeEnabled() throws Exception { + AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class); + StoreIngestionTask sit = buildSit(/* failOnMismatchEnabled */ true, stats); + VeniceException ex = expectThrows( + VeniceException.class, + () -> sit.verifyBatchPushRecordCount(pcsWithCount(50L), headersWithPrc(100L))); + verify(stats, times(1)).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION); + verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION); + + String msg = ex.getMessage(); + assertTrue(msg.contains("RECORD_COUNT_DEFICIT"), "Tagged error class missing in: " + msg); + assertTrue(msg.contains("expected=100"), "expected=N missing in: " + msg); + assertTrue(msg.contains("actual=50"), "actual=M missing in: " + msg); + assertTrue(msg.contains("replica=test_replica"), "replica id missing in: " + msg); + assertTrue(msg.contains("topic=" + TEST_TOPIC), "topic missing in: " + msg); + // Failed-and-throwing mismatches must also increment the dedicated failure sensor — distinct + // from the informational mismatch sensor, which fires regardless of strict-mode state. + verify(stats, times(1)).recordRecordCountMismatchFailure(TEST_STORE, TEST_VERSION); + } + + /** When server strict-mode is disabled, the dedicated failure sensor must NOT fire. */ + @Test + public void testVerifyDoesNotEmitFailureSensorWhenStrictModeDisabled() throws Exception { + AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class); + StoreIngestionTask sit = buildSit(/* failOnMismatchEnabled */ false, stats); + sit.verifyBatchPushRecordCount(pcsWithCount(50L), headersWithPrc(100L)); + verify(stats, times(1)).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION); + verify(stats, never()).recordRecordCountMismatchFailure(TEST_STORE, TEST_VERSION); + } + + /** + * Dual-check passes when both legs pass: counter ≥ expected AND |hll − expected| ≤ tolerance. + * With expected=100 and {@code HLL_ERROR_TOLERANCE=0.05}, threshold = ceil(100 * 0.05) = 5, so + * an HLL estimate of 98 sits |98−100|=2 ≤ 5 → HLL leg passes. Counter at 100 ≥ 100 → counter + * leg passes. Match. + */ + @Test + public void testVerifyDualCheckPassesWhenBothLegsPass() throws Exception { + AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class); + StoreIngestionTask sit = buildSit( + /* failOnMismatchEnabled */ true, + stats, + VersionRole.FUTURE, + /* hllEnabled */ true, + /* isDaVinciClient */ false); + sit.verifyBatchPushRecordCount(pcsWithCountAndHll(100L, 98L), headersWithPrc(100L)); + verify(stats, times(1)).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION); + verify(stats, never()).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION); + verify(stats, never()).recordRecordCountMismatchFailure(TEST_STORE, TEST_VERSION); + } + + /** + * Dual-check fails when the HLL leg fails (under-count) even though counter passes. The HLL leg + * catches duplicate-key inflation that the counter alone would miss. threshold = 5, |50−100|=50 + * > 5 → HLL leg fails → mismatch. + */ + @Test + public void testVerifyDualCheckFailsWhenHllUnderCounts() throws Exception { + AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class); + StoreIngestionTask sit = buildSit( + /* failOnMismatchEnabled */ false, + stats, + VersionRole.FUTURE, + /* hllEnabled */ true, + /* isDaVinciClient */ false); + sit.verifyBatchPushRecordCount(pcsWithCountAndHll(100L, 50L), headersWithPrc(100L)); + verify(stats, times(1)).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION); + verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION); + } + + /** + * Symmetric: dual-check also fails when the HLL leg over-counts beyond tolerance. counter=120 + * ≥ 100 → counter passes (raw over-count is benign — dup replication / spec-exec). hll=109, + * |109−100|=9 > 5 → HLL leg fails. Structurally HLL counts unique keys and unique keys cannot + * exceed raw producer ops, so a >5% over-estimate signals a bug worth flagging. + */ + @Test + public void testVerifyDualCheckFailsWhenHllOverCounts() throws Exception { + AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class); + StoreIngestionTask sit = buildSit( + /* failOnMismatchEnabled */ false, + stats, + VersionRole.FUTURE, + /* hllEnabled */ true, + /* isDaVinciClient */ false); + sit.verifyBatchPushRecordCount(pcsWithCountAndHll(120L, 109L), headersWithPrc(100L)); + verify(stats, times(1)).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION); + verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION); + } + + /** + * Boundary: HLL exactly at the upper edge of the tolerance window still passes. expected=100, + * threshold=5, hll=105 → |105−100|=5 ≤ 5 → HLL leg passes. Confirms the window is inclusive on + * both sides. + */ + @Test + public void testVerifyDualCheckPassesAtUpperHllToleranceBoundary() throws Exception { + AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class); + StoreIngestionTask sit = buildSit( + /* failOnMismatchEnabled */ true, + stats, + VersionRole.FUTURE, + /* hllEnabled */ true, + /* isDaVinciClient */ false); + sit.verifyBatchPushRecordCount(pcsWithCountAndHll(100L, 105L), headersWithPrc(100L)); + verify(stats, times(1)).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION); + verify(stats, never()).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION); + } + + /** + * Dual-check fails when the counter leg fails even though HLL passes. counter=50 < 100 → counter + * fails; hll=100 sits |100−100|=0 ≤ 5 → HLL alone would have passed. Confirms that EITHER leg + * failing is sufficient to trigger mismatch. + */ + @Test + public void testVerifyDualCheckFailsWhenOnlyCounterFails() throws Exception { + AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class); + StoreIngestionTask sit = buildSit( + /* failOnMismatchEnabled */ false, + stats, + VersionRole.FUTURE, + /* hllEnabled */ true, + /* isDaVinciClient */ false); + sit.verifyBatchPushRecordCount(pcsWithCountAndHll(50L, 100L), headersWithPrc(100L)); + verify(stats, times(1)).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION); + verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION); + } + + /** + * DaVinci, server strict-mode enabled, counter-leg deficit: both the failure sensor and the + * throw are suppressed — DaVinci failure aggregation happens separately via the DaVinci push + * status store. Only the informational {@code _mismatch} sensor (which fires regardless of + * strict-mode state) is incremented. + */ + @Test + public void testVerifyDaVinciDoesNotThrowOnDeficitWhenStrictModeEnabled() throws Exception { + AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class); + StoreIngestionTask sit = buildSit( + /* failOnMismatchEnabled */ true, + stats, + VersionRole.FUTURE, + /* hllEnabled */ false, + /* isDaVinciClient */ true); + // Should NOT throw — DaVinci skip path. Failure sensor and throw both suppressed; only the + // informational mismatch sensor fires. + sit.verifyBatchPushRecordCount(pcsWithCount(50L), headersWithPrc(100L)); + verify(stats, times(1)).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION); + verify(stats, never()).recordRecordCountMismatchFailure(TEST_STORE, TEST_VERSION); + verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION); + } + + /** + * DaVinci, server strict-mode disabled, counter-leg deficit: only the informational mismatch + * sensor fires — no failure sensor, no throw. Mirrors the non-DaVinci strict-mode-disabled case + * and confirms the DaVinci skip-throw guard does not perturb the metric-only path. + */ + @Test + public void testVerifyDaVinciEmitsMismatchSensorOnDeficitWhenStrictModeDisabled() throws Exception { + AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class); + StoreIngestionTask sit = buildSit( + /* failOnMismatchEnabled */ false, + stats, + VersionRole.FUTURE, + /* hllEnabled */ false, + /* isDaVinciClient */ true); + sit.verifyBatchPushRecordCount(pcsWithCount(50L), headersWithPrc(100L)); + verify(stats, times(1)).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION); + verify(stats, never()).recordRecordCountMismatchFailure(TEST_STORE, TEST_VERSION); + verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION); + } + + /** + * DaVinci, server strict-mode enabled, HLL-leg failure (counter passes): confirms the DaVinci + * skip guard is keyed to {@code isDaVinciClient}, not to which leg failed. HLL deviation > + * tolerance → mismatch detected; failure sensor and throw are both suppressed. + */ + @Test + public void testVerifyDaVinciDoesNotThrowOnHllLegFailureWhenStrictModeEnabled() throws Exception { + AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class); + StoreIngestionTask sit = buildSit( + /* failOnMismatchEnabled */ true, + stats, + VersionRole.FUTURE, + /* hllEnabled */ true, + /* isDaVinciClient */ true); + // counter=100 ≥ 100 (passes); hll=50, |50−100|=50 > 5 (fails) → mismatch. + sit.verifyBatchPushRecordCount(pcsWithCountAndHll(100L, 50L), headersWithPrc(100L)); + verify(stats, times(1)).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION); + verify(stats, never()).recordRecordCountMismatchFailure(TEST_STORE, TEST_VERSION); + verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION); + } + + /** + * DaVinci, match path: a clean push still records the match sensor and does not trip the + * mismatch/failure sensors. Sanity check that DaVinci doesn't accidentally skip the match + * recording. + */ + @Test + public void testVerifyDaVinciEmitsMatchSensorOnExactCount() throws Exception { + AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class); + StoreIngestionTask sit = buildSit( + /* failOnMismatchEnabled */ true, + stats, + VersionRole.FUTURE, + /* hllEnabled */ false, + /* isDaVinciClient */ true); + sit.verifyBatchPushRecordCount(pcsWithCount(100L), headersWithPrc(100L)); + verify(stats, times(1)).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION); + verify(stats, never()).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION); + verify(stats, never()).recordRecordCountMismatchFailure(TEST_STORE, TEST_VERSION); + } + + /** + * DaVinci, not-future-version: the future-version gate runs before the DaVinci branch, so an + * already-current version on DaVinci skips the entire verification — no metrics, no throw. + */ + @Test + public void testVerifyDaVinciSkipsWhenNotFutureVersion() throws Exception { + AggVersionedIngestionStats stats = mock(AggVersionedIngestionStats.class); + StoreIngestionTask sit = buildSit( + /* failOnMismatchEnabled */ true, + stats, + VersionRole.CURRENT, + /* hllEnabled */ false, + /* isDaVinciClient */ true); + sit.verifyBatchPushRecordCount(pcsWithCount(50L), headersWithPrc(100L)); // would otherwise fail + verify(stats, never()).recordBatchPushRecordCountMatch(TEST_STORE, TEST_VERSION); + verify(stats, never()).recordBatchPushRecordCountMismatch(TEST_STORE, TEST_VERSION); + verify(stats, never()).recordRecordCountMismatchFailure(TEST_STORE, TEST_VERSION); + } + +} diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/AggVersionedIngestionStatsTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/AggVersionedIngestionStatsTest.java index 17ddd6d19fd..ab5de6db97c 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/AggVersionedIngestionStatsTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/AggVersionedIngestionStatsTest.java @@ -273,6 +273,9 @@ public void testAllRecordingMethodsWorkWithConfig(boolean ingestionOtelStatsEnab aggStats.recordResubscriptionFailureCount(STORE_NAME, VERSION_1); aggStats.recordPartialUpdateCacheHitCount(STORE_NAME, VERSION_1); aggStats.recordChecksumVerificationFailureCount(STORE_NAME, VERSION_1); + aggStats.recordBatchPushRecordCountMatch(STORE_NAME, VERSION_1); + aggStats.recordBatchPushRecordCountMismatch(STORE_NAME, VERSION_1); + aggStats.recordRecordCountMismatchFailure(STORE_NAME, VERSION_1); aggStats.recordIngestionFailureCount(STORE_NAME, VERSION_1, VeniceIngestionFailureReason.GENERAL); aggStats.recordDcrLookupCacheHitCount(STORE_NAME, VERSION_1, VeniceRecordType.REPLICATION_METADATA); aggStats.recordBytesConsumedAsUncompressedSize(STORE_NAME, VERSION_1, 1024); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ServerMetricEntityTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ServerMetricEntityTest.java index 23ee50e53a2..8fee846e59d 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ServerMetricEntityTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ServerMetricEntityTest.java @@ -22,7 +22,7 @@ public class ServerMetricEntityTest { @Test public void testServerMetricEntitiesCount() { - assertEquals(SERVER_METRIC_ENTITIES.size(), 182, "Expected 182 unique metric entities"); + assertEquals(SERVER_METRIC_ENTITIES.size(), 185, "Expected 185 unique metric entities"); } /** diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/IngestionOtelMetricEntityTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/IngestionOtelMetricEntityTest.java index 0fe0107bd56..59be77f4ddf 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/IngestionOtelMetricEntityTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/IngestionOtelMetricEntityTest.java @@ -425,6 +425,30 @@ private static Map expectedD MetricUnit.NUMBER, "Count of checksum verification failures", storeClusterVersion)); + map.put( + IngestionOtelMetricEntity.BATCH_PUSH_RECORD_COUNT_MATCH_COUNT, + new MetricEntityExpectation( + "ingestion.batch_push_record_count_match.count", + MetricType.COUNTER, + MetricUnit.NUMBER, + "Count of batch-push EOPs where the consumer-side record count matched the producer's count", + storeClusterVersion)); + map.put( + IngestionOtelMetricEntity.BATCH_PUSH_RECORD_COUNT_MISMATCH_COUNT, + new MetricEntityExpectation( + "ingestion.batch_push_record_count_mismatch.count", + MetricType.COUNTER, + MetricUnit.NUMBER, + "Count of batch-push EOPs where the consumer-side record count did not match the producer's count", + storeClusterVersion)); + map.put( + IngestionOtelMetricEntity.RECORD_COUNT_MISMATCH_FAILURE_COUNT, + new MetricEntityExpectation( + "ingestion.record_count_mismatch_failure.count", + MetricType.COUNTER, + MetricUnit.NUMBER, + "Count of strict-mode record-count mismatches that failed ingestion (servers only; DaVinci is excluded)", + storeClusterVersion)); map.put( IngestionOtelMetricEntity.DCR_LOOKUP_CACHE_HIT_COUNT, new MetricEntityExpectation( diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/IngestionOtelStatsTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/IngestionOtelStatsTest.java index 977dad3c098..f64ed2cd520 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/IngestionOtelStatsTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/IngestionOtelStatsTest.java @@ -6,6 +6,8 @@ import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.BATCH_PROCESSING_REQUEST_ERROR_COUNT; import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.BATCH_PROCESSING_REQUEST_RECORD_COUNT; import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.BATCH_PROCESSING_REQUEST_TIME; +import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.BATCH_PUSH_RECORD_COUNT_MATCH_COUNT; +import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.BATCH_PUSH_RECORD_COUNT_MISMATCH_COUNT; import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.BYTES_CONSUMED_AS_UNCOMPRESSED_SIZE; import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.CHECKSUM_VERIFICATION_FAILURE_COUNT; import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.CONSUMER_ACTION_TIME; @@ -36,6 +38,7 @@ import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.PRODUCER_SYNCHRONIZE_TIME; import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.RECORD_ASSEMBLED_SIZE; import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.RECORD_ASSEMBLED_SIZE_RATIO; +import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.RECORD_COUNT_MISMATCH_FAILURE_COUNT; import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.RECORD_KEY_SIZE; import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.RECORD_VALUE_SIZE; import static com.linkedin.davinci.stats.ingestion.IngestionOtelMetricEntity.RESUBSCRIPTION_FAILURE_COUNT; @@ -794,7 +797,13 @@ public static Object[][] simpleCounterMetrics() { { (BiConsumer) (s, v) -> s.recordPartialUpdateCacheHitCount(CURRENT_VERSION, v), 1, PARTIAL_UPDATE_CACHE_HIT_COUNT }, { (BiConsumer) (s, v) -> s - .recordChecksumVerificationFailureCount(CURRENT_VERSION, v), 1, CHECKSUM_VERIFICATION_FAILURE_COUNT }, }; + .recordChecksumVerificationFailureCount(CURRENT_VERSION, v), 1, CHECKSUM_VERIFICATION_FAILURE_COUNT }, + { (BiConsumer) (s, v) -> s.recordBatchPushRecordCountMatch(CURRENT_VERSION, v), 1, + BATCH_PUSH_RECORD_COUNT_MATCH_COUNT }, + { (BiConsumer) (s, v) -> s.recordBatchPushRecordCountMismatch(CURRENT_VERSION, v), + 1, BATCH_PUSH_RECORD_COUNT_MISMATCH_COUNT }, + { (BiConsumer) (s, v) -> s.recordRecordCountMismatchFailure(CURRENT_VERSION, v), 1, + RECORD_COUNT_MISMATCH_FAILURE_COUNT } }; } @Test(dataProvider = "simpleCounterMetrics") diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index 0460876a835..2228f2b2f2a 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -2795,6 +2795,19 @@ private ConfigKeys() { */ public static final String SERVER_UNIQUE_INGESTED_KEY_COUNT_HLL_LOG2K = "server.unique.ingested.key.count.hll.log2k"; + /** + * Server-side strict mode for batch-push record-count verification at EOP. When {@code true} + * (default), a deficit detected by {@code verifyBatchPushRecordCount} fails ingestion via + * {@link com.linkedin.venice.exceptions.VeniceException}; the informational + * {@code batch_push_record_count_mismatch} + {@code record_count_mismatch_failure} OTel + * metrics fire alongside the throw. When {@code false}, only the informational + * {@code batch_push_record_count_mismatch} metric fires — no throw, ingestion continues. The + * throw is unconditionally suppressed on DaVinci replicas regardless of this flag (DaVinci + * failure is aggregated separately via the push status store). + */ + public static final String SERVER_BATCH_PUSH_RECORD_COUNT_VERIFICATION_FAIL_ON_MISMATCH_ENABLED = + "server.batch.push.record.count.verification.fail.on.mismatch.enabled"; + /** * Follower replicas and DavinciClient will only consider heartbeats received within * this time window to mark themselves as completed. This is to avoid the cases that diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java b/internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java index 368bdbf0b66..9d629b4560b 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/offsets/OffsetRecord.java @@ -393,6 +393,14 @@ public void setActiveKeyCount(long activeKeyCount) { this.partitionState.activeKeyCount = activeKeyCount; } + public long getBatchPushRecordCount() { + return this.partitionState.batchPushRecordCount; + } + + public void setBatchPushRecordCount(long batchPushRecordCount) { + this.partitionState.batchPushRecordCount = batchPushRecordCount; + } + public Map getTrackingIncrementalPushStatus() { return partitionState.trackingIncrementalPushStatus; } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeaders.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeaders.java index 8620e2eac26..fcd0a5001b3 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeaders.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeaders.java @@ -40,6 +40,14 @@ public class PubSubMessageHeaders implements Measurable, Iterable put( PubSubProducerCallback callback, int upstreamPartition, LeaderMetadataWrapper leaderMetadataWrapper) { + return put( + kafkaKey, + kafkaMessageEnvelope, + callback, + upstreamPartition, + leaderMetadataWrapper, + EmptyPubSubMessageHeaders.SINGLETON); + } + + /** + * Pass-through put that lets the caller forward selected upstream PubSub headers (e.g. the + * "prc" partition-record-count header on EOP) to the local VT. Used by the leader's + * cross-region re-emit path so remote-fabric followers can run record-count verification at EOP. + * + *

Callers should pass {@link EmptyPubSubMessageHeaders#SINGLETON} when no upstream header + * needs to be forwarded — the data path uses this to avoid per-message header allocation. Pass + * a fresh {@link PubSubMessageHeaders} containing only the headers that should propagate. + * A {@code null} argument is treated as empty.

+ */ + public Future put( + KafkaKey kafkaKey, + KafkaMessageEnvelope kafkaMessageEnvelope, + PubSubProducerCallback callback, + int upstreamPartition, + LeaderMetadataWrapper leaderMetadataWrapper, + PubSubMessageHeaders pubSubMessageHeaders) { // Self-adjust the chunking setting in pass-through mode verifyChunkingSetting(kafkaMessageEnvelope); @@ -1101,7 +1127,7 @@ public Future put( upstreamPartition, callback, false, - EmptyPubSubMessageHeaders.SINGLETON); + pubSubMessageHeaders != null ? pubSubMessageHeaders : EmptyPubSubMessageHeaders.SINGLETON); } /** diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/offsets/TestOffsetRecord.java b/internal/venice-common/src/test/java/com/linkedin/venice/offsets/TestOffsetRecord.java index 857e1b944db..63faa633fad 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/offsets/TestOffsetRecord.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/offsets/TestOffsetRecord.java @@ -175,4 +175,27 @@ public void testDeserializePositionWithOffsetFallback( assertEquals(result, PubSubSymbolicPosition.LATEST, name + ": expected LATEST"); } } + + // -- batchPushRecordCount serialization round-trip (Phase 1) --------------------------------- + + @DataProvider(name = "batchPushRecordCountCases") + public static Object[][] batchPushRecordCountCases() { + // { description, valueToStampBeforeRoundTrip (null = leave at default), expectedAfterRoundTrip } + return new Object[][] { { "default value is 0 (no stamping)", null, 0L }, + { "non-zero value survives serialize -> deserialize", 123_456L, 123_456L } }; + } + + @Test(dataProvider = "batchPushRecordCountCases") + public void testBatchPushRecordCountSerialization(String description, Long valueToStamp, long expected) { + OffsetRecord r1 = TestUtils + .getOffsetRecord(ApacheKafkaOffsetPosition.of(100), Optional.empty(), DEFAULT_PUBSUB_CONTEXT_FOR_UNIT_TESTING); + if (valueToStamp != null) { + r1.setBatchPushRecordCount(valueToStamp); + } + OffsetRecord r2 = new OffsetRecord( + r1.toBytes(), + AvroProtocolDefinition.PARTITION_STATE.getSerializer(), + DEFAULT_PUBSUB_CONTEXT_FOR_UNIT_TESTING); + assertEquals(r2.getBatchPushRecordCount(), expected, description); + } } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java index 35187772d69..535cc417e6d 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java @@ -51,6 +51,7 @@ import com.linkedin.venice.partitioner.DefaultVenicePartitioner; import com.linkedin.venice.pubsub.adapter.kafka.common.ApacheKafkaOffsetPosition; import com.linkedin.venice.pubsub.api.DefaultPubSubMessage; +import com.linkedin.venice.pubsub.api.EmptyPubSubMessageHeaders; import com.linkedin.venice.pubsub.api.PubSubMessageHeader; import com.linkedin.venice.pubsub.api.PubSubMessageHeaders; import com.linkedin.venice.pubsub.api.PubSubPosition; @@ -1694,4 +1695,99 @@ public void testDefaultLeaderMetadataWrapperHasSentinelUpstreamMessageTimestamp( DEFAULT_LEADER_METADATA_WRAPPER.getUpstreamMessageTimestamp(), LeaderMetadataWrapper.DEFAULT_UPSTREAM_MESSAGE_TIMESTAMP); } + + /** Symbolic input for the {@code passThroughHeaders} parameter in the dedupe'd test below. */ + private enum PassThroughHeadersInput { + /** Caller passes a fresh PubSubMessageHeaders with prc=42 (the new 6-arg overload). */ + PRC_HEADERS, + /** Caller passes {@link EmptyPubSubMessageHeaders#SINGLETON} (no headers forwarded). */ + SINGLETON, + /** Caller passes {@code null} (treated as empty — must not NPE). */ + NULL, + /** Caller uses the existing 5-arg overload (no headers parameter at all). */ + FIVE_ARG_OVERLOAD + } + + @DataProvider(name = "passThroughPutHeadersCases") + public static Object[][] passThroughPutHeadersCases() { + // { description, input, expectedPrcValue (-1L means "no prc expected in captured headers") } + return new Object[][] { + { "6-arg overload with prc header threads it through", PassThroughHeadersInput.PRC_HEADERS, 42L }, + { "6-arg overload with SINGLETON forwards no headers", PassThroughHeadersInput.SINGLETON, -1L }, + { "6-arg overload with null is treated as empty (no NPE)", PassThroughHeadersInput.NULL, -1L }, + { "Existing 5-arg overload strips headers — no prc", PassThroughHeadersInput.FIVE_ARG_OVERLOAD, -1L } }; + } + + @Test(dataProvider = "passThroughPutHeadersCases") + public void testPassThroughPutHeaderHandling( + String description, + PassThroughHeadersInput input, + long expectedPrcValue) { + PubSubProducerAdapter mockedProducer = mock(PubSubProducerAdapter.class); + when(mockedProducer.sendMessage(any(), any(), any(), any(), any(), any())) + .thenReturn(CompletableFuture.completedFuture(null)); + VeniceWriterOptions vwOpts = new VeniceWriterOptions.Builder("test_topic") + .setKeyPayloadSerializer(new VeniceAvroKafkaSerializer("\"string\"")) + .setValuePayloadSerializer(new VeniceAvroKafkaSerializer("\"string\"")) + .setPartitioner(new DefaultVenicePartitioner()) + .setPartitionCount(1) + .build(); + VeniceWriter writer = new VeniceWriter(vwOpts, VeniceProperties.empty(), mockedProducer); + KafkaKey key = new KafkaKey(MessageType.CONTROL_MESSAGE, "k".getBytes()); + KafkaMessageEnvelope kme = buildEopEnvelope(); + + switch (input) { + case PRC_HEADERS: + byte[] prcBytes = java.nio.ByteBuffer.allocate(Long.BYTES).putLong(42L).array(); + PubSubMessageHeaders forwarded = + new PubSubMessageHeaders().add(PubSubMessageHeaders.VENICE_PARTITION_RECORD_COUNT_HEADER, prcBytes); + writer.put(key, kme, null, 0, DEFAULT_LEADER_METADATA_WRAPPER, forwarded); + break; + case SINGLETON: + writer.put(key, kme, null, 0, DEFAULT_LEADER_METADATA_WRAPPER, EmptyPubSubMessageHeaders.SINGLETON); + break; + case NULL: + writer.put(key, kme, null, 0, DEFAULT_LEADER_METADATA_WRAPPER, null); + break; + case FIVE_ARG_OVERLOAD: + writer.put(key, kme, null, 0, DEFAULT_LEADER_METADATA_WRAPPER); + break; + } + + ArgumentCaptor headersCaptor = ArgumentCaptor.forClass(PubSubMessageHeaders.class); + verify(mockedProducer, atLeast(1)).sendMessage(anyString(), eq(0), any(), any(), headersCaptor.capture(), any()); + if (expectedPrcValue == -1L) { + // No prc should be present in any captured headers. + for (PubSubMessageHeaders captured: headersCaptor.getAllValues()) { + assertNull(captured.get(PubSubMessageHeaders.VENICE_PARTITION_RECORD_COUNT_HEADER), description); + } + } else { + // prc should be present with the expected value in at least one captured invocation. + boolean found = false; + for (PubSubMessageHeaders captured: headersCaptor.getAllValues()) { + PubSubMessageHeader h = captured.get(PubSubMessageHeaders.VENICE_PARTITION_RECORD_COUNT_HEADER); + if (h != null) { + assertEquals(java.nio.ByteBuffer.wrap(h.value()).getLong(), expectedPrcValue, description); + found = true; + } + } + assertTrue(found, description); + } + } + + private static KafkaMessageEnvelope buildEopEnvelope() { + KafkaMessageEnvelope kme = new KafkaMessageEnvelope(); + kme.messageType = MessageType.CONTROL_MESSAGE.getValue(); + kme.producerMetadata = new ProducerMetadata(); + kme.producerMetadata.producerGUID = new com.linkedin.venice.kafka.protocol.GUID(); + kme.producerMetadata.segmentNumber = 0; + kme.producerMetadata.messageSequenceNumber = 0; + kme.producerMetadata.messageTimestamp = 0L; + ControlMessage cm = new ControlMessage(); + cm.controlMessageType = ControlMessageType.END_OF_PUSH.getValue(); + cm.controlMessageUnion = new com.linkedin.venice.kafka.protocol.EndOfPush(); + cm.debugInfo = Collections.emptyMap(); + kme.payloadUnion = cm; + return kme; + } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/AbstractTestRepush.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/AbstractTestRepush.java index 457be476c27..659db0c389d 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/AbstractTestRepush.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/AbstractTestRepush.java @@ -20,6 +20,7 @@ import com.linkedin.venice.client.store.ClientFactory; import com.linkedin.venice.controllerapi.ControllerClient; import com.linkedin.venice.integration.utils.VeniceClusterWrapper; +import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper; import com.linkedin.venice.integration.utils.VeniceServerWrapper; import com.linkedin.venice.jobs.StageMetricsSnapshot; import com.linkedin.venice.jobs.StageMetricsSnapshot.StageSummary; @@ -28,7 +29,9 @@ import com.linkedin.venice.utils.TestUtils; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutionException; @@ -154,6 +157,26 @@ protected void verifyBatchData(String storeName, int recordCount, int dcIndex) { }); } + /** + * Asserts the {@code ingestion.batch_push_record_count_match.count} / + * {@code ingestion.batch_push_record_count_mismatch.count} OTel counters fired (or stayed at 0) + * across servers in all child DCs. Validates that the EOP prc header propagates leader → + * remote local VT and that the server-side counter at EOP agrees with the producer's count. + * + *

Thin wrapper around {@link IntegrationTestPushUtils#assertBatchPushRecordCountSensors} that + * collects servers across all child DCs in this multi-region fixture.

+ */ + protected void assertBatchPushRecordCountSensorsAllDcs( + String storeName, + boolean expectMatch, + boolean expectMismatch) { + List allServers = new ArrayList<>(); + for (VeniceMultiClusterWrapper dc: childDatacenters) { + allServers.addAll(dc.getClusters().get(CLUSTER_NAME).getVeniceServers()); + } + IntegrationTestPushUtils.assertBatchPushRecordCountSensors(allServers, storeName, expectMatch, expectMismatch); + } + protected GenericRecord readValue(AvroGenericStoreClient storeReader, String key) throws ExecutionException, InterruptedException { return (GenericRecord) storeReader.get(key).get(); @@ -186,19 +209,38 @@ protected void validateRmdData( /** * Verifies that EOP prc headers match the expected per-partition record distribution * for the given keys. Reads from dc-0's broker. + * + *

Also verifies that the {@code prc} header reaches the local VT in every remote + * region — exercising the leader's cross-region EOP re-emit header-propagation path. Without + * propagation, remote-region followers would consume an EOP with no prc header and be unable + * to run record-count verification.

*/ protected void verifyEopPartitionRecordCounts( String storeName, int version, int partitionCount, Collection keys) { - Map actualCounts = IntegrationTestPushUtils.getEopPartitionRecordCounts( + // Source region (dc-0): assert prc matches the expected partitioner-derived distribution. + Map sourceCounts = IntegrationTestPushUtils.getEopPartitionRecordCounts( childDatacenters.get(0).getClusters().get(CLUSTER_NAME).getPubSubBrokerWrapper(), storeName, version, partitionCount); + IntegrationTestPushUtils.verifyPerPartitionCounts(sourceCounts, keys, "\"string\"", partitionCount); - IntegrationTestPushUtils.verifyPerPartitionCounts(actualCounts, keys, "\"string\"", partitionCount); + // Remote regions: assert prc was preserved through the leader's pass-through put on local VT. + for (int dcIndex = 1; dcIndex < childDatacenters.size(); dcIndex++) { + Map remoteLocalVtCounts = IntegrationTestPushUtils.getEopPartitionRecordCounts( + childDatacenters.get(dcIndex).getClusters().get(CLUSTER_NAME).getPubSubBrokerWrapper(), + storeName, + version, + partitionCount); + assertEquals( + remoteLocalVtCounts, + sourceCounts, + "prc header on EOP must be preserved by the leader's re-emit to local VT in remote region dc-" + dcIndex + + " — without propagation, remote-region followers cannot run record-count verification"); + } } protected byte[] serializeStringKeyToByteArray(String key) { diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestBatch.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestBatch.java index 9ca5c15645d..9cda85d71b7 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestBatch.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestBatch.java @@ -705,6 +705,12 @@ public void testKafkaInputBatchJobWithZstdCompression(boolean sendDirectControlM Assert.assertEquals(avroClient.get(Integer.toString(i)).get().toString(), "test_name_" + i); } }; + /* + * Enable per-store batch-push record count verification: the server is expected to count the + * exact same set of data records as VPJ, so the match sensor must fire on every partition and + * the mismatch sensor must remain at 0. Validates the Store flag wiring end-to-end: + * UpdateStore -> ZK -> server-side StoreRepository read in verifyBatchPushRecordCount. + */ String storeName = testBatchStore( inputDir -> new KeyAndValueSchemas(writeSimpleAvroFileWithStringToStringSchema(inputDir)), properties -> { @@ -716,6 +722,12 @@ public void testKafkaInputBatchJobWithZstdCompression(boolean sendDirectControlM if (sendDirectControlMessage) { // Verify EOP messages carry per-partition record count headers verifyEopPartitionRecordCounts(storeName, numRecords); + // Verify server-side: match OTel counter fires; mismatch counter stays at 0. + IntegrationTestPushUtils.assertBatchPushRecordCountSensors( + veniceCluster.getVeniceServers(), + storeName, + /* expectMatch */ true, + /* expectMismatch */ false); } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestBatchPushRecordCountVerification.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestBatchPushRecordCountVerification.java new file mode 100644 index 00000000000..3342207700e --- /dev/null +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestBatchPushRecordCountVerification.java @@ -0,0 +1,150 @@ +package com.linkedin.venice.endToEnd; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; + +import com.linkedin.venice.client.exceptions.VeniceClientException; +import com.linkedin.venice.controllerapi.ControllerClient; +import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; +import com.linkedin.venice.controllerapi.VersionCreationResponse; +import com.linkedin.venice.helix.HelixReadOnlySchemaRepository; +import com.linkedin.venice.integration.utils.ServiceFactory; +import com.linkedin.venice.integration.utils.VeniceClusterCreateOptions; +import com.linkedin.venice.integration.utils.VeniceClusterWrapper; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.pushmonitor.ExecutionStatus; +import com.linkedin.venice.utils.IntegrationTestPushUtils; +import com.linkedin.venice.utils.TestUtils; +import com.linkedin.venice.utils.Utils; +import com.linkedin.venice.writer.VeniceWriter; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.commons.io.IOUtils; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +/** + * Integration test for server-side batch-push record count verification end-to-end. + * + *

Producer (VPJ / Spark) and consumer (server) count records the same way by construction, so + * a "natural" mismatch never occurs in normal pushes. To exercise the server's deficit-detection + * path, this test bypasses VPJ and uses {@link VeniceWriter} directly to:

+ *
    + *
  1. Push a known number of records.
  2. + *
  3. Broadcast EOP with an inflated {@code prc} (per-partition record count) header — i.e. claim + * the producer wrote more records than were actually sent.
  4. + *
+ * + *

The server runs with the cluster default {@code + * server.batch.push.record.count.verification.fail.on.mismatch.enabled = true}, so it must throw + * {@link com.linkedin.venice.exceptions.VeniceException} on the EOP control message, the replica + * goes to ERROR, push status transitions to {@link ExecutionStatus#ERROR}, and the version never + * becomes current. This is the regression that would have caught the March 2026 incident-10468 + * shape (VPJ wrote N, server got 0).

+ * + *

Metric-only mode (server flag {@code false}) and per-leg / DaVinci-skip / not-future-version + * behavior are covered by unit tests in {@code StoreIngestionTaskRecordCountTest} and the OTel + * stats tests, not here.

+ */ +public class TestBatchPushRecordCountVerification { + private static final int RECORDS_PRODUCED = 50; + private static final long INFLATED_PRC = 200L; // > RECORDS_PRODUCED — guaranteed deficit on every partition + private static final int PARTITION_COUNT = 2; + private static final int VALUE_SCHEMA_ID = HelixReadOnlySchemaRepository.VALUE_SCHEMA_STARTING_ID; + + private VeniceClusterWrapper veniceCluster; + private ControllerClient controllerClient; + + @BeforeClass(alwaysRun = true) + public void setUp() throws VeniceClientException { + Utils.thisIsLocalhost(); + VeniceClusterCreateOptions options = new VeniceClusterCreateOptions.Builder().numberOfControllers(1) + .numberOfServers(1) + .numberOfRouters(1) + .replicationFactor(1) + .partitionSize(100) + .sslToStorageNodes(false) + .sslToKafka(false) + .build(); + veniceCluster = ServiceFactory.getVeniceCluster(options); + controllerClient = new ControllerClient(veniceCluster.getClusterName(), veniceCluster.getAllControllersURLs()); + } + + @AfterClass(alwaysRun = true) + public void cleanUp() { + Utils.closeQuietlyWithErrorLogged(controllerClient); + IOUtils.closeQuietly(veniceCluster); + } + + /** + * Strict mode (server default): the server throws {@code VeniceException} when the locally- + * counted record count is less than the producer-side {@code prc} count. The exception + * propagates {@code processEndOfPush} → SIT consumer loop → {@code reportError}. The replica + * is marked ERROR, push status transitions to {@link ExecutionStatus#ERROR}, and the version + * never becomes current. This is the path that would fail a real VPJ push job in incident-10468 + * shape. Metric-only mode (server flag {@code false}) and per-leg / DaVinci-skip behavior are + * covered by {@code StoreIngestionTaskRecordCountTest} at the unit level. + */ + @Test(timeOut = 120_000) + public void testDeficitFailsPushWhenServerStrictModeDefault() throws Exception { + String storeName = Utils.getUniqueString("test_prc_mismatch_throw"); + veniceCluster.getNewStore(storeName); + veniceCluster.useControllerClient(client -> { + TestUtils.assertCommand( + client.updateStore(storeName, new UpdateStoreQueryParams().setPartitionCount(PARTITION_COUNT))); + }); + + String storeVersionName; + try (VeniceWriter writer = createWriterAndPushDeficit(storeName)) { + storeVersionName = writer.getTopicName(); + } + final int pushVersion = Version.parseVersionFromKafkaTopicName(storeVersionName); + + // Behavioral assertion: push status must transition to ERROR and version must never become current. + TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, true, () -> { + ExecutionStatus status = ExecutionStatus.valueOf(controllerClient.queryJobStatus(storeVersionName).getStatus()); + assertEquals( + status, + ExecutionStatus.ERROR, + "Push status should be ERROR — server should throw on prc deficit in default strict mode, " + "but observed: " + + status); + int currentVersion = controllerClient.getStore(storeName).getStore().getCurrentVersion(); + assertNotEquals( + currentVersion, + pushVersion, + "Version " + pushVersion + " must NOT become current after a record-count deficit failure"); + }); + + // Metric assertion: mismatch OTel counter fires; match counter stays at 0 (every partition's + // EOP claimed an inflated count, so no match path executed). + IntegrationTestPushUtils.assertBatchPushRecordCountSensors( + veniceCluster.getVeniceServers(), + storeName, + /* expectMatch */ false, + /* expectMismatch */ true); + } + + /** + * Creates a new version, opens a {@link VeniceWriter}, pushes {@link #RECORDS_PRODUCED} records, + * then broadcasts EOP with an inflated prc header on every partition (guaranteed deficit). The + * caller is responsible for closing the returned writer. + */ + private VeniceWriter createWriterAndPushDeficit(String storeName) throws Exception { + VersionCreationResponse creationResponse = veniceCluster.getNewVersion(storeName); + VeniceWriter writer = veniceCluster.getVeniceWriter(creationResponse.getKafkaTopic()); + + writer.broadcastStartOfPush(new HashMap<>()); + for (int i = 0; i < RECORDS_PRODUCED; i++) { + writer.put("key_" + i, "value_" + i, VALUE_SCHEMA_ID).get(); + } + Map badPrc = new HashMap<>(); + for (int p = 0; p < PARTITION_COUNT; p++) { + badPrc.put(p, INFLATED_PRC); + } + writer.broadcastEndOfPush(new HashMap<>(), badPrc); + return writer; + } +} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestRepushDiagnostics.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestRepushDiagnostics.java index 3b53cc6c96f..f61b695a7ab 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestRepushDiagnostics.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestRepushDiagnostics.java @@ -71,6 +71,11 @@ public void testKafkaInputBatchJob() throws Exception { IntegrationTestPushUtils.defaultVPJProps(multiRegionMultiClusterWrapper, inputDirPath, storeName); batchProps.put(SEND_CONTROL_MESSAGES_DIRECTLY, true); batchProps.setProperty(DATA_WRITER_COMPUTE_JOB_CLASS, DataWriterSparkJob.class.getCanonicalName()); + // Enable per-store batch-push record count verification: validates the parent controller forwards + // the field via the admin topic SetStore op, child controllers persist it, and remote-region + // servers read it before counting. Producer and consumer counts agree by construction, so the + // match sensor is expected to fire on every partition in every DC and the mismatch sensor must + // remain at 0. createStoreForJob( CLUSTER_NAME, keySchemaStr, @@ -93,8 +98,10 @@ public void testKafkaInputBatchJob() throws Exception { keys.add(Integer.toString(i)); } - // Verify EOP on the initial batch push has per-partition record counts + // Verify EOP on the initial batch push has per-partition record counts (source + remote local VTs) verifyEopPartitionRecordCounts(storeName, 1, 2, keys); + // Verify server-side: match OTel counter fires across all DCs; mismatch counter stays at 0. + assertBatchPushRecordCountSensorsAllDcs(storeName, /* expectMatch */ true, /* expectMismatch */ false); // Repush twice: combiner=true then combiner=false int repushVersion = 1; @@ -118,6 +125,8 @@ public void testKafkaInputBatchJob() throws Exception { // Verify EOP on repush version also has per-partition record counts verifyEopPartitionRecordCounts(storeName, expectedVersion, 2, keys); } + // After both repushes, mismatch counter must still be 0 cluster-wide. + assertBatchPushRecordCountSensorsAllDcs(storeName, /* expectMatch */ true, /* expectMismatch */ false); verifyBatchData(storeName, 100, 0); } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/utils/IntegrationTestPushUtils.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/utils/IntegrationTestPushUtils.java index 91595538421..4ab816ca3fe 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/utils/IntegrationTestPushUtils.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/utils/IntegrationTestPushUtils.java @@ -81,7 +81,13 @@ import com.linkedin.venice.samza.VeniceSystemFactory; import com.linkedin.venice.samza.VeniceSystemProducer; import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; +import com.linkedin.venice.stats.VeniceMetricsRepository; import com.linkedin.venice.writer.VeniceWriterFactory; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import io.tehuti.metrics.MetricsRepository; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; @@ -771,6 +777,80 @@ public static void verifyPerPartitionCounts( } } + /** + * Shared assertion helper for the per-store batch-push record-count match/mismatch OTel counters. + * Reads from each server's {@link InMemoryMetricReader} (attached by + * {@code VeniceServerWrapper}), filters counter points by the + * {@code venice.store.name} attribute, and sums across the supplied servers. + * + *

OTel metric names match {@code IngestionOtelMetricEntity}: + * {@code ingestion.batch_push_record_count_match.count} and + * {@code ingestion.batch_push_record_count_mismatch.count}.

+ * + *

{@code expectMatch=true} requires at least one match increment somewhere in the supplied + * set (every partition's EOP triggers one increment); {@code expectMismatch=false} requires the + * mismatch counter to remain at 0 across the supplied set.

+ * + *

Wrapped in {@link TestUtils#waitForNonDeterministicAssertion} because the OTel SDK can lag + * the test thread's collection sample by a small margin after the SIT consumer thread records.

+ */ + public static void assertBatchPushRecordCountSensors( + Collection servers, + String storeName, + boolean expectMatch, + boolean expectMismatch) { + String matchMetricName = "ingestion.batch_push_record_count_match.count"; + String mismatchMetricName = "ingestion.batch_push_record_count_mismatch.count"; + AttributeKey storeKey = AttributeKey.stringKey("venice.store.name"); + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, () -> { + long matchSum = 0L; + long mismatchSum = 0L; + for (VeniceServerWrapper server: servers) { + MetricsRepository repo = server.getMetricsRepository(); + if (!(repo instanceof VeniceMetricsRepository)) { + continue; + } + InMemoryMetricReader reader = (InMemoryMetricReader) ((VeniceMetricsRepository) repo).getVeniceMetricsConfig() + .getOtelAdditionalMetricsReader(); + if (reader == null) { + continue; + } + for (MetricData md: reader.collectAllMetrics()) { + boolean isMatch = md.getName().endsWith(matchMetricName); + boolean isMismatch = md.getName().endsWith(mismatchMetricName); + if (!isMatch && !isMismatch) { + continue; + } + long sum = md.getLongSumData() + .getPoints() + .stream() + .filter(p -> storeName.equals(p.getAttributes().get(storeKey))) + .mapToLong(LongPointData::getValue) + .sum(); + if (isMatch) { + matchSum += sum; + } else { + mismatchSum += sum; + } + } + } + if (expectMatch) { + Assert.assertTrue( + matchSum > 0, + "Expected " + matchMetricName + " > 0 for store " + storeName + " but got " + matchSum); + } else { + Assert.assertEquals(matchSum, 0L, matchMetricName + " should be 0 for store " + storeName); + } + if (expectMismatch) { + Assert.assertTrue( + mismatchSum > 0, + "Expected " + mismatchMetricName + " > 0 for store " + storeName + " but got " + mismatchSum); + } else { + Assert.assertEquals(mismatchSum, 0L, mismatchMetricName + " should be 0 for store " + storeName); + } + }); + } + private static void assertStoreHealth(ControllerClient controllerClient, String systemStoreName, String regionName) { StoreResponse storeResponse = assertCommand(controllerClient.getStore(systemStoreName)); Assert.assertTrue(