Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
46b2329
[server] Add SLO classification dimensions to record-level delay metric
sushantmane May 8, 2026
79aef3d
Make HeartbeatKey labels mandatory PCS constructor params
sushantmane May 8, 2026
7a0798e
Address review: bake SLO labels into HMS-created keys; harden test as…
sushantmane May 8, 2026
8a3518b
Add direct test for MetricEntityStateFiveEnums.getAllMetricAttributes…
sushantmane May 8, 2026
fcf70aa
Fix spotbugs DLS_DEAD_LOCAL_STORE in MetricEntityStateFiveEnumsTest
sushantmane May 8, 2026
d676fe9
Address review: rename RecordLagFunction param; fix Javadoc link
sushantmane May 8, 2026
b1a1fec
HMS: resolve SLO labels in updateLagMonitor, plumb to
sushantmane May 8, 2026
d6dfccc
Tighten E2E SLO test: assert non-zero sum/max on matched point
sushantmane May 8, 2026
b91817e
Address review: PCS takes booleans, drop defensive null checks, renam…
sushantmane May 8, 2026
7ae121d
Address review: empty-region locality, exception metric, missing tests,
sushantmane May 8, 2026
905ac3f
Fix CI: register LAG_MONITOR_UPDATE in dimension test, give state-mod…
sushantmane May 8, 2026
6fe46b9
HeartbeatKey: split single-line Javadoc onto multi-line block form
sushantmane May 8, 2026
ec00da1
Address review: locality null→REMOTE default, requireNonNull on stats
sushantmane May 8, 2026
15070ad
[ci] Move TestAdminSparkServerWithMultiServers from shard 28 → 85
sushantmane May 8, 2026
a31c77f
Sync locality-null Javadocs with the emit-path REMOTE default
sushantmane May 8, 2026
17a30cd
Relax per-shard timeout from 12 → 15 minutes
sushantmane May 8, 2026
eb23c20
Normalize empty localRegionName via RegionUtils in non-AA initializeE…
sushantmane May 8, 2026
aef8537
Enable per-record OTel metrics by default in E2E test framework
sushantmane May 8, 2026
f49f49e
Add SLO classification dimensions to heartbeat-delay metric
sushantmane May 8, 2026
2bfd2aa
Update HeartbeatOtelMetricEntityTest expected dim list
sushantmane May 8, 2026
be6335e
Pin NR source fabric in SLO E2E test so dc1 leaders pull cross-region
sushantmane May 8, 2026
70c8a46
Switch SLO E2E to AA + dual-fabric Samza for cross-region locality co…
sushantmane May 9, 2026
408777a
Parameterize SLO E2E across (writeCompute, chunking) + speed up via 1s
sushantmane May 9, 2026
07b622b
Revert global 1s heartbeat reporter override in E2E framework
sushantmane May 9, 2026
05264eb
PCS#getOrCreateCachedHeartbeatKey: normalize region to match HMS path
sushantmane May 9, 2026
06ee7fc
Validate heartbeatReporterIntervalSeconds at config load
sushantmane May 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 86 additions & 86 deletions .github/workflows/VeniceCI-E2ETests.yml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
import static com.linkedin.venice.ConfigKeys.SERVER_ENABLE_PARALLEL_BATCH_GET;
import static com.linkedin.venice.ConfigKeys.SERVER_FORKED_PROCESS_JVM_ARGUMENT_LIST;
import static com.linkedin.venice.ConfigKeys.SERVER_GLOBAL_RT_DIV_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_HEARTBEAT_REPORTER_INTERVAL_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_HELIX_JOIN_AS_UNKNOWN;
import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_HEADER_TABLE_SIZE;
import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_INBOUND_ENABLED;
Expand Down Expand Up @@ -616,6 +617,7 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final boolean recordLevelTimestampEnabled;
private final boolean perRecordOtelMetricsEnabled;
private final boolean perRecordBatchOtelMetricsEnabled;
private final int heartbeatReporterIntervalSeconds;
private final boolean uniqueIngestedKeyCountHllEnabled;
private final int uniqueIngestedKeyCountHllLog2K;
private final long leaderCompleteStateCheckInFollowerValidIntervalMs;
Expand Down Expand Up @@ -1066,6 +1068,12 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
recordLevelTimestampEnabled = serverProperties.getBoolean(SERVER_RECORD_LEVEL_TIMESTAMP_ENABLED, false);
perRecordOtelMetricsEnabled = serverProperties.getBoolean(SERVER_PER_RECORD_OTEL_METRICS_ENABLED, false);
perRecordBatchOtelMetricsEnabled = serverProperties.getBoolean(SERVER_PER_RECORD_BATCH_OTEL_METRICS_ENABLED, false);
heartbeatReporterIntervalSeconds = serverProperties.getInt(SERVER_HEARTBEAT_REPORTER_INTERVAL_SECONDS, 60);
Comment thread
sushantmane marked this conversation as resolved.
Comment thread
sushantmane marked this conversation as resolved.
if (heartbeatReporterIntervalSeconds < 1) {
throw new VeniceException(
SERVER_HEARTBEAT_REPORTER_INTERVAL_SECONDS + " must be at least 1 second; got "
+ heartbeatReporterIntervalSeconds);
}
uniqueIngestedKeyCountHllEnabled = serverProperties.getBoolean(SERVER_UNIQUE_INGESTED_KEY_COUNT_HLL_ENABLED, false);
uniqueIngestedKeyCountHllLog2K = serverProperties
.getInt(SERVER_UNIQUE_INGESTED_KEY_COUNT_HLL_LOG2K, PartitionConsumptionState.HLL_DEFAULT_LOG_K);
Expand Down Expand Up @@ -1881,6 +1889,10 @@ public boolean isPerRecordOtelMetricsEnabled() {
return perRecordOtelMetricsEnabled;
}

public int getHeartbeatReporterIntervalSeconds() {
return heartbeatReporterIntervalSeconds;
}

public boolean isUniqueIngestedKeyCountHllEnabled() {
return uniqueIngestedKeyCountHllEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.stats.dimensions.VeniceChunkingStatus;
import com.linkedin.venice.stats.dimensions.VeniceRegionLocality;
import com.linkedin.venice.stats.dimensions.VeniceStoreWriteType;
import com.linkedin.venice.storage.protocol.ChunkedValueManifest;
import com.linkedin.venice.utils.ArrayUtils;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.RegionUtils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.utils.lazy.Lazy;
import com.linkedin.venice.writer.LeaderCompleteState;
Expand Down Expand Up @@ -367,18 +371,31 @@ enum LatchStatus {
* (no batch baseline); RT signals are skipped when not tracked. AtomicLong for AA/WC parallel processing.
*/
private final AtomicLong activeKeyCount = new AtomicLong(OffsetRecord.ACTIVE_KEY_COUNT_NOT_TRACKED);
/** Last PUT key for batch dedup. Not persisted; used by {@link #incrementActiveKeyCountForBatchRecord}.
/**
* Last PUT key for batch dedup. Not persisted; used by {@link #incrementActiveKeyCountForBatchRecord}.
* Single-threaded: only accessed from the drainer thread during batch (pre-EOP). */
private byte[] lastBatchKeyForDedup;

/**
* SLO classification labels resolved once at PCS construction by the SIT (which knows the
* {@link Version} and the server's local region). Used by {@link #getOrCreateCachedHeartbeatKey(String)}
* so each cached HeartbeatKey carries pre-resolved enum references — no per-record string allocation.
*/
Comment thread
sushantmane marked this conversation as resolved.
private final VeniceStoreWriteType writeType;
private final VeniceChunkingStatus chunkingStatus;
private final String localRegionName;

/** Lazily allocated per-partition detector for partial-update amplification. */
private volatile PartialUpdateAmplificationDetector partialUpdateAmplificationDetector;

public PartitionConsumptionState(
PubSubTopicPartition partitionReplica,
OffsetRecord offsetRecord,
PubSubContext pubSubContext,
boolean hybrid) {
boolean hybrid,
boolean isWriteComputationEnabled,
boolean isChunked,
String localRegionName) {
Comment thread
sushantmane marked this conversation as resolved.
LOGGER.info("Creating PCS for replica: {}", partitionReplica);

this.partitionReplica = Objects.requireNonNull(partitionReplica, "TopicPartition cannot be null when creating PCS");
Expand Down Expand Up @@ -427,6 +444,9 @@ public PartitionConsumptionState(
trackingIncrementalPushStatus = Collections.emptyMap();
}
cachedHeartbeatKeys = new VeniceConcurrentHashMap<>(3);
this.writeType = isWriteComputationEnabled ? VeniceStoreWriteType.WRITE_COMPUTE : VeniceStoreWriteType.REGULAR;
this.chunkingStatus = isChunked ? VeniceChunkingStatus.CHUNKED : VeniceChunkingStatus.UNCHUNKED;
this.localRegionName = localRegionName;
// Restore in-memory latest consumed version topic position and leader info from the checkpoint version topic
// position
this.latestProcessedVtPosition = offsetRecord.getCheckpointedLocalVtPosition();
Expand Down Expand Up @@ -1470,14 +1490,33 @@ public void setHasResubscribedAfterBootstrapAsCurrentVersion(boolean hasResubscr

/**
* Get or create a cached HeartbeatKey for the given region.
* Derives storeName/version from the partition replica topic name.
* Derives storeName/version from the partition replica topic name. SLO labels (write type,
* chunking, locality) come from constructor args and are baked into the cached key so per-record
* OTel emission can read them without re-derivation.
*
* <p>Locality is derived per region by comparing the cached key's region to the local region
* supplied at construction: equal → LOCAL, otherwise REMOTE. When the local region is null or
* empty (lookup-only paths in tests, or unconfigured server region), locality is left null on
* the cached key — defaulting it here would mislabel every region as REMOTE. The OTel emit
* path coerces null → REMOTE at emission time so the metric still ships a concrete label.
*/
public HeartbeatKey getOrCreateCachedHeartbeatKey(String region) {
return cachedHeartbeatKeys.computeIfAbsent(region, r -> {
/*
* Normalize via RegionUtils so the per-record path produces the same region string as the
* HMS-side initializeEntry path. Both paths feed the same heartbeat-timestamps map and must
* agree on HeartbeatKey identity — otherwise computeIfPresent silently no-ops and the
* heartbeat-lag-driven ready-to-serve check never trips.
*/
String normalizedRegion = RegionUtils.normalizeRegionName(region);
return cachedHeartbeatKeys.computeIfAbsent(normalizedRegion, r -> {
String topicName = partitionReplica.getTopicName();
String storeName = Version.parseStoreFromKafkaTopicName(topicName);
int version = Version.parseVersionFromKafkaTopicName(topicName);
return new HeartbeatKey(storeName, version, getPartition(), r);
VeniceRegionLocality locality = null;
if (localRegionName != null && !localRegionName.isEmpty()) {
locality = r.equals(localRegionName) ? VeniceRegionLocality.LOCAL : VeniceRegionLocality.REMOTE;
}
return new HeartbeatKey(storeName, version, getPartition(), r, writeType, chunkingStatus, locality);
Comment thread
sushantmane marked this conversation as resolved.
Comment thread
sushantmane marked this conversation as resolved.
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2766,8 +2766,20 @@ private PartitionConsumptionState createAndInstallPartitionConsumptionState(PubS
storageMetadataService.getLastOffset(topicPartition.getTopicName(), partition, pubSubContext);
LOGGER.info("Creating PCS for replica: {} with offsetRecord: {}", topicPartition, offsetRecord);

PartitionConsumptionState freshPcs =
new PartitionConsumptionState(topicPartition, offsetRecord, pubSubContext, hybridStoreConfig.isPresent());
/*
* SLO labels are resolved once per PCS at construction. They ride along on every cached
* HeartbeatKey so the per-record OTel emit path reads pre-computed enum references (no string
* allocation, no per-call store/version lookup). Chunking is per-version
* (Version.isChunkingEnabled). Write-compute is store-level today (Store.isWriteComputationEnabled).
*/
PartitionConsumptionState freshPcs = new PartitionConsumptionState(
topicPartition,
offsetRecord,
pubSubContext,
hybridStoreConfig.isPresent(),
isWriteComputationEnabled,
isChunked,
serverConfig.getRegionName());
if (uniqueIngestedKeyCountHllEnabled) {
int lgK = serverConfig.getUniqueIngestedKeyCountHllLog2K();
boolean isNewSubscription = PubSubSymbolicPosition.EARLIEST.equals(offsetRecord.getCheckpointedLocalVtPosition());
Expand Down Expand Up @@ -2814,8 +2826,14 @@ private PartitionConsumptionState createPlaceholderPartitionConsumptionState(
int partition) {
OffsetRecord placeholderOffset = new OffsetRecord(partitionStateSerializer, pubSubContext);

PartitionConsumptionState pcs =
new PartitionConsumptionState(topicPartition, placeholderOffset, pubSubContext, hybridStoreConfig.isPresent());
PartitionConsumptionState pcs = new PartitionConsumptionState(
topicPartition,
placeholderOffset,
pubSubContext,
hybridStoreConfig.isPresent(),
isWriteComputationEnabled,
isChunked,
serverConfig.getRegionName());
pcs.setCurrentVersionSupplier(isCurrentVersion);

boolean isFutureVersionReady = isFutureVersionReady(kafkaVersionTopic, storeRepository);
Expand Down Expand Up @@ -3078,7 +3096,10 @@ private void resetOffset(int partition, PubSubTopicPartition topicPartition, boo
new PubSubTopicPartitionImpl(versionTopic, partition),
new OffsetRecord(partitionStateSerializer, pubSubContext),
pubSubContext,
hybridStoreConfig.isPresent());
hybridStoreConfig.isPresent(),
isWriteComputationEnabled,
isChunked,
serverConfig.getRegionName());
if (uniqueIngestedKeyCountHllEnabled) {
consumptionState.initializeUniqueKeyCountHll(serverConfig.getUniqueIngestedKeyCountHllLog2K());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,59 @@
package com.linkedin.davinci.stats.ingestion.heartbeat;

import com.linkedin.venice.meta.Version;
import com.linkedin.venice.stats.dimensions.VeniceChunkingStatus;
import com.linkedin.venice.stats.dimensions.VeniceRegionLocality;
import com.linkedin.venice.stats.dimensions.VeniceStoreWriteType;
import com.linkedin.venice.utils.Utils;


/**
* Composite key for the flattened heartbeat timestamp map
* Composite key for the flattened heartbeat timestamp map.
*
* <p>Identity is {@code (storeName, version, partition, region)}. The SLO classification
* labels {@link #writeType}, {@link #chunkingStatus}, {@link #locality} are passenger fields:
* resolved once when the caller (SIT) builds the key, carried along the per-record hot path
* to avoid repeated derivation, and intentionally excluded from {@link #equals}/{@link #hashCode}.
*/
public final class HeartbeatKey {
final String storeName;
final int version;
final int partition;
final String region;
/*
* Passenger labels: not part of identity. Used by per-record OTel emission so each record
* carries pre-resolved enum references (zero allocation on the hot path).
*/
final VeniceStoreWriteType writeType;
final VeniceChunkingStatus chunkingStatus;
final VeniceRegionLocality locality;
private final int hashCode;

/**
* Lookup-only constructor: builds a key without the SLO labels. Use only when the resulting
* key is consumed for identity (map lookup, equality) and never flows to per-record OTel emission.
* The canonical hot-path constructor is the 7-arg variant below — that one is what the SIT/PCS
* pipeline uses so each record carries pre-resolved enum references.
*/
public HeartbeatKey(String storeName, int version, int partition, String region) {
this(storeName, version, partition, region, null, null, null);
}

public HeartbeatKey(
String storeName,
int version,
int partition,
String region,
VeniceStoreWriteType writeType,
VeniceChunkingStatus chunkingStatus,
VeniceRegionLocality locality) {
Comment thread
sushantmane marked this conversation as resolved.
this.storeName = storeName;
this.version = version;
this.partition = partition;
this.region = region;
this.writeType = writeType;
this.chunkingStatus = chunkingStatus;
this.locality = locality;
// Manual hash computation avoids Objects.hash() varargs Object[] allocation and Integer autoboxing
int h = storeName.hashCode();
h = 31 * h + version;
Expand All @@ -31,6 +66,27 @@ String getReplicaId() {
return Utils.getReplicaId(Version.composeKafkaTopic(storeName, version), partition);
}

/**
* Public accessor for cross-package tests; the package-private field is the canonical source.
*/
public VeniceStoreWriteType getWriteType() {
return writeType;
}

/**
* Public accessor for cross-package tests; the package-private field is the canonical source.
*/
public VeniceChunkingStatus getChunkingStatus() {
return chunkingStatus;
}

/**
* Public accessor for cross-package tests; the package-private field is the canonical source.
*/
public VeniceRegionLocality getLocality() {
return locality;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Loading