[server][compat] Activate KME v14 and stamp upstreamMessageTimestamp on leader produces#2780
Open
sushantmane wants to merge 6 commits intolinkedin:mainfrom
Open
Conversation
…p on leader produces Activates KafkaMessageEnvelope schema v14 (introduced in linkedin#2778) and starts populating the new `upstreamMessageTimestamp` field on records produced by leader from a consumed upstream message. Today on the fresh-DIV produce path (post-EOP Put/Update/Delete from RT), the leader constructs a fresh `producerMetadata.messageTimestamp` from its own local clock, dropping the upstream RT record's time. Followers reading these records can only measure latency from leader-produce-time, not from RT entry — so per-record nearline la tency metrics underreport the leader-side processing/produce-callback delay. Carrying the upstream record's timestamp explicitly via `LeaderMetadata.upstreamMessageTimestamp` lets followers compute the same true per-record end-to-end latency the leader uses today. Changes: - build.gradle: remove the v13 KME pin from `versionOverrides` so generated Avro classes use v14. - AvroProtocolDefinition: bump `KAFKA_MESSAGE_ENVELOPE` current protocol version from 13 to 14. - LeaderMetadataWrapper: add `upstreamMessageTimestamp` field, getter, and constructor overloads. Existing 3-arg / 4-arg constructors default the new field to `DEFAULT_UPSTREAM_MESSAGE_TIMESTAMP` (-1) so all current callers continue to compile and produce records with the field defaulted to the schema sentinel. - VeniceWriter: stamp `leaderMetadataFooter.upstreamMessageTimestamp` from the wrapper at every site that builds a `LeaderMetadata` (fresh-DIV `getKafkaMessageEnvelope`, pass-through `getKafkaMessageEnvelopeProvider`, `getHeartbeatKME`, and the cached `DefaultLeaderMetadata`). The plumbing is uniform; specific sites carry the value or the default sentinel based on what the wrapper holds. - LeaderFollowerStoreIngestionTask.produceToLocalKafka: pass `consumerRecord.getPubSubMessageTime()` into the wrapper, which is the broker append time when available (falling back to the upstream producer timestamp). This is the only production stamping site in this PR — heartbeat propagation (`propagateHeartbeatFromUpstreamTopicToLocalVersionTopic`) is intentionally left unchanged because the existing path already preserves the upstream HB time via `producerMetadata.messageTimestamp` (see `getHeartbeatKME`). Tests: - New unit tests in `VeniceWriterUnitTest` verifying the field flows through `getKafkaMessageEnvelope` to the produced KME's `leaderMetadataFooter`, and that `DEFAULT_LEADER_METADATA_WRAPPER` carries the sentinel. - `InstanceSizeEstimatorTest` updated to call the v14 all-args `LeaderMetadata` constructor. A follow-up PR will introduce the store-config knob to optionally stamp from the upstream KME's `producerMetadata.messageTimestamp` instead of the broker time, and a later PR will switch the existing per-record OTel follower delay metric to use the new field for true RT-to-follower E2E.
There was a problem hiding this comment.
Pull request overview
This PR activates KafkaMessageEnvelope schema v14 and plumbs/stamps the new LeaderMetadata.upstreamMessageTimestamp so followers can compute true per-record end-to-end nearline ingestion latency (from upstream message time rather than leader produce time).
Changes:
- Remove the KME v13 Avro pin and bump
AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPEcurrent version to 14. - Add
upstreamMessageTimestamptoLeaderMetadataWrapperand propagate it into allLeaderMetadataconstruction paths inVeniceWriter. - Stamp
consumerRecord.getPubSubMessageTime()into leader-produced records and add unit tests covering the plumbing + sentinel behavior.
Reviewed changes
Copilot reviewed 6 out of 7 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| build.gradle | Removes the KME v13 schema override so v14 is used for generated Avro classes. |
| internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java | Bumps KAFKA_MESSAGE_ENVELOPE current protocol version to v14. |
| internal/venice-common/src/main/java/com/linkedin/venice/writer/LeaderMetadataWrapper.java | Adds upstreamMessageTimestamp with sentinel default and constructor overloads/getter. |
| internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java | Copies upstreamMessageTimestamp from wrapper into LeaderMetadata (default + data + heartbeat + passthrough). |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java | Stamps getPubSubMessageTime() into LeaderMetadataWrapper on the leader produce path. |
| internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java | Adds unit tests to validate the new timestamp is propagated and sentinel default is preserved. |
| internal/venice-common/src/test/java/com/linkedin/venice/memory/InstanceSizeEstimatorTest.java | Updates LeaderMetadata constructor usage for the new schema arity. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Address Copilot review feedback. Per the contract on PubSubMessage.getPubSubMessageTime(), the returned value should always be positive (broker time when available, otherwise the upstream producer timestamp). To make the read-side invariant unambiguous — "u pstreamMessageTimestamp > 0 means a real upstream time" — the leader now co llapses any non-positive value returned by getPubSubMessageTime() to the sentinel DEFAULT_UPSTREAM_MESSAGE_TIMESTAMP (-1) before stamping the wrapper. This is purely defensive; no observed call site returns 0, but it removes the need for downstream readers to special-case 0 versus -1.
PRODUCER)
Adds a server-level config that selects which timestamp the leader stamps
into LeaderMetadata.upstreamMessageTimestamp when producing a record from a
consumed upstream message:
server.nearline.latency.timestamp.source = BROKER | PRODUCER (default:
BROKER)
- BROKER (default): the upstream pub-sub broker's append timestamp via
PubSubMessage.getPubSubMessageTime() (which itself falls back to the
upstream producer timestamp when the broker time is unavailable). Matches
the infra-only latency view the leader uses today.
- PRODUCER: the upstream producer's wall clock embedded in the upstream
KafkaMessageEnvelope.producerMetadata.messageTimestamp. Includes the
upstream client's enqueue-to-produce latency, giving the
application-perceived end-to-end view.
Both options yield millisecond-since-epoch values, matching the time basis
of ProducerMetadata.messageTimestamp. The leader still collapses any
non-positive resolved value to
LeaderMetadataWrapper.DEFAULT_UPSTREAM_MESSAGE_TIMESTAMP
(-1) so the field's invariant ("> 0 means a real upstream time") holds for
readers regardless of source.
Validation in follow-up commits.
Adds unit tests for the BROKER-vs-PRODUCER selection introduced in the previous commit: - NearlineLatencyTimestampSourceTest covers parsing (exact names, case insensitivity, whitespace trimming, null default to BROKER, rejection of unknown values). - LeaderFollowerStoreIngestionTask gets three tests on resolveUpstreamMessageTimestamp: BROKER mode picks PubSubMessage.getPubSubMessageTime(), PRODUCER mode picks KafkaMessageEnvelope.producerMetadata.messageTimestamp, and PRODUCER mode returns the sentinel (-1) when producerMetadata is null. resolveUpstreamMessageTimestamp is refactored to a static package-private method so it can be unit-tested without spinning up a full ingestion task. The production call site now passes the source explicitly from getServerConfig().getNearlineLatencyTimestampSource(). E2E coverage exercising the full leader-produce → follower-apply path under both modes can come in a follow-up.
Fix CI failure in testLeaderCanSendValueChunksIntoDrainer caused by the new server-config-driven stamping logic in produceToLocalKafka. The test creates a fully-mocked ActiveActiveStoreIngestionTask without stubbing getServerConfig(). When the configurable BROKER-vs-PRODUCER selection landed, produceToLocalKafka started calling getServerConfig().getNearlineLatencyTimestampSource() and NPE'd. Stubs the mock to return a VeniceServerConfig with BROKER (the default).
…a ctor Address Copilot review feedback: replace the raw `-1L` literals for termId and upstreamMessageTimestamp with the named constants `VeniceWriter.DEFAULT_TERM_ID` and `LeaderMetadataWrapper.DEFAULT_UPSTREAM_MESSAGE_TIMESTAMP` so the test stays correct if the schema sentinels are ever changed.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem Statement
Today on the fresh-DIV produce path (post-EOP Put/Update/Delete from RT), the leader constructs a fresh
producerMetadata.messageTimestampfrom its own local clock, dropping the upstream RT record's time. Followers reading these records on the version topic can only measure latency from leader-produce-time, not from RT entry — so per-record nearline latency metrics underreport leader-side processing and produce-callback delay. The follower-side per-record OTel delay metric (added in #2399) measures a value that is structurally smaller than the true end-to-end replication lag.For heartbeats, this gap is already closed via a special path (
getHeartbeatKMEoverridesproducerMetadata.messageTimestampwithmax(broker, upstream-producer)), but heartbeats are sparse (~one per minute) and don't give per-record granularity.Solution
This PR activates
KafkaMessageEnvelopeschema v14 (introduced in #2778) and starts populating the newupstreamMessageTimestampfield on records produced by the leader from a consumed upstream message. Followers can then compute the same true per-record end-to-end nearline ingestion latency the leader uses today, without changing any existing wire-level semantics.The PR is intentionally narrow:
AvroProtocolDefinition).LeaderMetadataWrapperand everyLeaderMetadata-construction site inVeniceWriter(fresh-DIVgetKafkaMessageEnvelope, pass-throughgetKafkaMessageEnvelopeProvider,getHeartbeatKME, the cachedDefaultLeaderMetadata).consumerRecord.getPubSubMessageTime()at the one production site that owns the data-record produce path (LeaderFollowerStoreIngestionTask.produceToLocalKafka).getPubSubMessageTime()returns the broker append time when available, and falls back to the upstream producer timestamp embedded in the KME. The exact source semantics (broker vs producer) become configurable per store in a follow-up PR.The heartbeat propagation path (
propagateHeartbeatFromUpstreamTopicToLocalVersionTopic) is intentionally left unchanged in this PR, because the existing path already preserves the upstream HB time viaproducerMetadata.messageTimestampand the existing follower HB lag metric is correct.This change is additive at the wire level — existing readers that don't understand v14 default the new field to
-1(the schema sentinel) and lose nothing. New readers that compute E2E latency only do so when the field is> 0.File-by-file
build.gradleversionOverridesso generated Avro classes use v14.AvroProtocolDefinition.javaKAFKA_MESSAGE_ENVELOPEcurrent protocol version from 13 to 14.LeaderMetadataWrapper.javaupstreamMessageTimestampfield + getter + constructor overloads. Existing constructors default the new field toDEFAULT_UPSTREAM_MESSAGE_TIMESTAMP(-1), keeping all current callers source-compatible.VeniceWriter.javaleaderMetadataFooter.upstreamMessageTimestampfrom the wrapper at every site that builds aLeaderMetadata.LeaderFollowerStoreIngestionTask.javaproduceToLocalKafkanow passesconsumerRecord.getPubSubMessageTime()into the wrapper.VeniceWriterUnitTest.javaleaderMetadataFooterand thatDEFAULT_LEADER_METADATA_WRAPPERcarries the sentinel.InstanceSizeEstimatorTest.javaLeaderMetadataall-args constructor call for the new arity.Code changes
Concurrency-Specific Checks
Both reviewer and PR author to verify
LeaderMetadataWrappersemantics, which is immutable.)How was this PR tested?
testUpstreamMessageTimestampPlumbedToLeaderMetadataFooter,testDefaultLeaderMetadataWrapperHasSentinelUpstreamMessageTimestamp).InstanceSizeEstimatorTestupdated for the v14 constructor arity).Does this PR introduce any user-facing or breaking changes?