Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -329,17 +329,15 @@ subprojects {
// when actually using the new protocol. Example to pin KME to v12 when introducing v13:
// project(':internal:venice-common').file('src/main/resources/avro/KafkaMessageEnvelope/v12', PathValidation.DIRECTORY)
def versionOverrides = [
// PartitionState v23 introduces `batchPushRecordCount` for server-side batch-push record-count
// verification. Pinned to v22 until the consumer code (PartitionConsumptionState +
// StoreIngestionTask) lands in a follow-up PR.
project(':internal:venice-common').file('src/main/resources/avro/PartitionState/v22', PathValidation.DIRECTORY),
// StoreMetaValue v43 introduces `batchPushRecordCountVerificationEnabled`. Pinned to v42
// until the per-store flag wiring (Store / ZKStore / ReadOnlyStore / SystemStore / StoreInfo /
// UpdateStoreQueryParams) lands in a follow-up PR.
// StoreMetaValue v43 added `batchPushRecordCountVerificationEnabled` for the originally-
// planned per-store flag, but this PR pivoted to a server-side server-config gate
// (see `server.batch.push.record.count.verification.fail.on.mismatch.enabled`) and the
// field is no longer read or written by anyone. Pin to v42 so the field doesn't enter
// the active protocol.
project(':internal:venice-common').file('src/main/resources/avro/StoreMetaValue/v42', PathValidation.DIRECTORY),
// AdminOperation v98 carries `batchPushRecordCountVerificationEnabled` on the SetStore admin op
// for upcoming server-side batch-push record-count verification. Pinned to v97 until the
// controller-side wiring (VeniceParentHelixAdmin / AdminExecutionTask) lands in a follow-up PR.
// AdminOperation v98 carried `batchPushRecordCountVerificationEnabled` on the SetStore
// admin op for the same originally-planned per-store flag. Pin to v97 for the same
// reason as StoreMetaValue v43 above.
project(':services:venice-controller').file('src/main/resources/avro/AdminOperation/v97', PathValidation.DIRECTORY)
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import static com.linkedin.venice.ConfigKeys.SERVER_ADAPTIVE_THROTTLER_SIGNAL_REFRESH_INTERVAL_IN_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_ADAPTIVE_THROTTLER_SINGLE_GET_LATENCY_THRESHOLD;
import static com.linkedin.venice.ConfigKeys.SERVER_ADD_RMD_TO_BATCH_PUSH_FOR_HYBRID_STORES;
import static com.linkedin.venice.ConfigKeys.SERVER_BATCH_PUSH_RECORD_COUNT_VERIFICATION_FAIL_ON_MISMATCH_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_BATCH_REPORT_END_OF_INCREMENTAL_PUSH_STATUS_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_BLOB_TRANSFER_ADAPTIVE_THROTTLER_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_BLOB_TRANSFER_ADAPTIVE_THROTTLER_UPDATE_PERCENTAGE;
Expand Down Expand Up @@ -623,6 +624,7 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final NearlineLatencyTimestampSource nearlineLatencyTimestampSource;
private final boolean uniqueIngestedKeyCountHllEnabled;
private final int uniqueIngestedKeyCountHllLog2K;
private final boolean batchPushRecordCountVerificationFailOnMismatchEnabled;
private final long leaderCompleteStateCheckInFollowerValidIntervalMs;
private final boolean requireLeaderCompleteForCatchUpVtRts;
private final boolean stuckConsumerRepairEnabled;
Expand Down Expand Up @@ -1084,6 +1086,8 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
uniqueIngestedKeyCountHllEnabled = serverProperties.getBoolean(SERVER_UNIQUE_INGESTED_KEY_COUNT_HLL_ENABLED, false);
uniqueIngestedKeyCountHllLog2K = serverProperties
.getInt(SERVER_UNIQUE_INGESTED_KEY_COUNT_HLL_LOG2K, PartitionConsumptionState.HLL_DEFAULT_LOG_K);
batchPushRecordCountVerificationFailOnMismatchEnabled =
serverProperties.getBoolean(SERVER_BATCH_PUSH_RECORD_COUNT_VERIFICATION_FAIL_ON_MISMATCH_ENABLED, true);
Comment thread
ymuppala marked this conversation as resolved.
batchReportEOIPEnabled =
serverProperties.getBoolean(SERVER_BATCH_REPORT_END_OF_INCREMENTAL_PUSH_STATUS_ENABLED, false);
incrementalPushStatusWriteMode =
Expand Down Expand Up @@ -1922,6 +1926,10 @@ public int getUniqueIngestedKeyCountHllLog2K() {
return uniqueIngestedKeyCountHllLog2K;
}

public boolean isBatchPushRecordCountVerificationFailOnMismatchEnabled() {
return batchPushRecordCountVerificationFailOnMismatchEnabled;
}

public boolean isPerRecordBatchOtelMetricsEnabled() {
return perRecordBatchOtelMetricsEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import com.linkedin.venice.pubsub.PubSubConstants;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.api.DefaultPubSubMessage;
import com.linkedin.venice.pubsub.api.EmptyPubSubMessageHeaders;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubMessageHeader;
import com.linkedin.venice.pubsub.api.PubSubMessageHeaders;
Expand Down Expand Up @@ -1745,6 +1746,24 @@ protected static boolean isLeader(PartitionConsumptionState partitionConsumption
return Objects.equals(partitionConsumptionState.getLeaderFollowerState(), LEADER);
}

/**
* Extract just the per-partition record count ("prc") header from the consumed record so the
* leader can forward it on its re-emit to the local VT. Returns
* {@link EmptyPubSubMessageHeaders#SINGLETON} when no prc header is present (typical for SOP,
* EOS, and EOPs from producers that don't stamp prc), avoiding any allocation on the hot path.
*/
static PubSubMessageHeaders extractPrcHeaderToForward(DefaultPubSubMessage consumerRecord) {
PubSubMessageHeaders headers = consumerRecord.getPubSubMessageHeaders();
if (headers == null) {
return EmptyPubSubMessageHeaders.SINGLETON;
}
PubSubMessageHeader prc = headers.get(PubSubMessageHeaders.VENICE_PARTITION_RECORD_COUNT_HEADER);
if (prc == null) {
return EmptyPubSubMessageHeaders.SINGLETON;
}
return new PubSubMessageHeaders().add(prc);
}

/**
* Process {@link TopicSwitch} control message at given partition offset for a specific {@link PartitionConsumptionState}.
*/
Expand Down Expand Up @@ -3313,6 +3332,10 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
// issue. e.g. a PUT record belonging to seg:0 can come after the EOS of seg:0 due to view writer delays.
// Since SOP and EOP are rare we can simply wait for the last VT produce future.
checkAndWaitForLastVTProduceFuture(partitionConsumptionState);
/*
* Forward only the "prc" partition-record-count header from the EOP to the local VT.
*/
PubSubMessageHeaders forwardedHeadersForEop = extractPrcHeaderToForward(consumerRecord);
/**
* Simply produce this EOP to local VT. It will be processed in order in the drainer queue later
* after successfully producing to kafka.
Expand All @@ -3328,7 +3351,8 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
consumerRecord.getValue(),
callback,
consumerRecord.getTopicPartition().getPartitionNumber(),
leaderMetadataWrapper),
leaderMetadataWrapper,
forwardedHeadersForEop),
partition,
kafkaUrl,
kafkaClusterId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,20 @@ enum LatchStatus {
private final VeniceChunkingStatus chunkingStatus;
private final String localRegionName;

/**
* Logical-record count for batch-push verification — one increment per user-data record
* ingested in this partition before End-of-Push. For chunked records only the chunk manifest
* is counted (not the individual chunk fragments); regular PUT and DELETE messages each count
* once. Global RT DIV PUTs and other internal/control messages are not counted. Compared at EOP
* against the producer-side count carried on the EOP message's "prc" PubSub header (VPJ's
* {@code messageSent} — one increment per logical {@code writer.put}/{@code writer.delete}
* call; UPDATE is not consumed in batch ingestion so it doesn't contribute on either side). The
* counter uses {@link AtomicLong} for safety, though in practice writes come from the single
* drainer thread per partition. Persisted in {@link com.linkedin.venice.offsets.OffsetRecord}
* so the count survives server restarts mid-push.
*/
private final AtomicLong batchPushRecordCount = new AtomicLong(0);

/** Lazily allocated per-partition detector for partial-update amplification. */
private volatile PartialUpdateAmplificationDetector partialUpdateAmplificationDetector;

Expand Down Expand Up @@ -459,6 +473,7 @@ public PartitionConsumptionState(
this.pendingReportIncPushVersionList = offsetRecord.getPendingReportIncPushVersionList();
this.hasResubscribedAfterBootstrapAsCurrentVersion = false;
this.activeKeyCount.set(offsetRecord.getActiveKeyCount());
this.batchPushRecordCount.set(offsetRecord.getBatchPushRecordCount());
}

/** Create a fresh HLL sketch with {@link #HLL_DEFAULT_LOG_K}. */
Expand Down Expand Up @@ -509,6 +524,14 @@ public void incrementActiveKeyCount() {
activeKeyCount.incrementAndGet();
}

public long getBatchPushRecordCount() {
return batchPushRecordCount.get();
}

public void incrementBatchPushRecordCount() {
batchPushRecordCount.incrementAndGet();
}

/**
* Called at SOP to mark this partition as actively counting. Without this, a mid-batch
* restart with a newly enabled config would start counting partway through, producing
Expand Down
Loading