Skip to content

[server][compat] Activate KME v14 and stamp upstreamMessageTimestamp on leader produces#2780

Open
sushantmane wants to merge 6 commits intolinkedin:mainfrom
sushantmane:sumane/kme-v14-stamp-upstream-timestamp
Open

[server][compat] Activate KME v14 and stamp upstreamMessageTimestamp on leader produces#2780
sushantmane wants to merge 6 commits intolinkedin:mainfrom
sushantmane:sumane/kme-v14-stamp-upstream-timestamp

Conversation

@sushantmane
Copy link
Copy Markdown
Contributor

Problem Statement

Today on the fresh-DIV produce path (post-EOP Put/Update/Delete from RT), the leader constructs a fresh producerMetadata.messageTimestamp from its own local clock, dropping the upstream RT record's time. Followers reading these records on the version topic can only measure latency from leader-produce-time, not from RT entry — so per-record nearline latency metrics underreport leader-side processing and produce-callback delay. The follower-side per-record OTel delay metric (added in #2399) measures a value that is structurally smaller than the true end-to-end replication lag.

For heartbeats, this gap is already closed via a special path (getHeartbeatKME overrides producerMetadata.messageTimestamp with max(broker, upstream-producer)), but heartbeats are sparse (~one per minute) and don't give per-record granularity.

Solution

This PR activates KafkaMessageEnvelope schema v14 (introduced in #2778) and starts populating the new upstreamMessageTimestamp field on records produced by the leader from a consumed upstream message. Followers can then compute the same true per-record end-to-end nearline ingestion latency the leader uses today, without changing any existing wire-level semantics.

The PR is intentionally narrow:

  • Activates v14 (removes the build.gradle pin, bumps AvroProtocolDefinition).
  • Plumbs the new field through LeaderMetadataWrapper and every LeaderMetadata-construction site in VeniceWriter (fresh-DIV getKafkaMessageEnvelope, pass-through getKafkaMessageEnvelopeProvider, getHeartbeatKME, the cached DefaultLeaderMetadata).
  • Stamps consumerRecord.getPubSubMessageTime() at the one production site that owns the data-record produce path (LeaderFollowerStoreIngestionTask.produceToLocalKafka).

getPubSubMessageTime() returns the broker append time when available, and falls back to the upstream producer timestamp embedded in the KME. The exact source semantics (broker vs producer) become configurable per store in a follow-up PR.

The heartbeat propagation path (propagateHeartbeatFromUpstreamTopicToLocalVersionTopic) is intentionally left unchanged in this PR, because the existing path already preserves the upstream HB time via producerMetadata.messageTimestamp and the existing follower HB lag metric is correct.

This change is additive at the wire level — existing readers that don't understand v14 default the new field to -1 (the schema sentinel) and lose nothing. New readers that compute E2E latency only do so when the field is > 0.

File-by-file

File Change
build.gradle Remove the v13 KME pin from versionOverrides so generated Avro classes use v14.
AvroProtocolDefinition.java Bump KAFKA_MESSAGE_ENVELOPE current protocol version from 13 to 14.
LeaderMetadataWrapper.java Add upstreamMessageTimestamp field + getter + constructor overloads. Existing constructors default the new field to DEFAULT_UPSTREAM_MESSAGE_TIMESTAMP (-1), keeping all current callers source-compatible.
VeniceWriter.java Stamp leaderMetadataFooter.upstreamMessageTimestamp from the wrapper at every site that builds a LeaderMetadata.
LeaderFollowerStoreIngestionTask.java One call site updated: produceToLocalKafka now passes consumerRecord.getPubSubMessageTime() into the wrapper.
VeniceWriterUnitTest.java New unit tests verifying the field flows through to leaderMetadataFooter and that DEFAULT_LEADER_METADATA_WRAPPER carries the sentinel.
InstanceSizeEstimatorTest.java Updated LeaderMetadata all-args constructor call for the new arity.

Code changes

  • Added new code behind a config.
  • Introduced new log lines.
    • Confirmed if logs need to be rate limited to avoid excessive logging.

Concurrency-Specific Checks

Both reviewer and PR author to verify

  • Code has no race conditions or thread safety issues. (No new shared state introduced; the field rides existing LeaderMetadataWrapper semantics, which is immutable.)
  • Proper synchronization mechanisms are used where needed.
  • No blocking calls inside critical sections.
  • Verified thread-safe collections are used.
  • Validated proper exception handling in multi-threaded code.

How was this PR tested?

  • New unit tests added (testUpstreamMessageTimestampPlumbedToLeaderMetadataFooter, testDefaultLeaderMetadataWrapperHasSentinelUpstreamMessageTimestamp).
  • New integration tests added.
  • Modified or extended existing tests (InstanceSizeEstimatorTest updated for the v14 constructor arity).
  • Verified backward compatibility — existing readers see the default sentinel (-1) on records without the field, and new constructors are added as overloads so all current call sites continue to compile.

Does this PR introduce any user-facing or breaking changes?

  • No. You can skip the rest of this section.
  • Yes.

…p on

leader produces

Activates KafkaMessageEnvelope schema v14 (introduced in linkedin#2778) and starts
populating
the new `upstreamMessageTimestamp` field on records produced by leader from a
consumed
upstream message.

Today on the fresh-DIV produce path (post-EOP Put/Update/Delete from RT), the
leader
constructs a fresh `producerMetadata.messageTimestamp` from its own local
clock, dropping
the upstream RT record's time. Followers reading these records can only
measure latency
from leader-produce-time, not from RT entry — so per-record nearline la
tency metrics
underreport the leader-side processing/produce-callback delay. Carrying the
upstream
record's timestamp explicitly via `LeaderMetadata.upstreamMessageTimestamp`
lets followers
compute the same true per-record end-to-end latency the leader uses today.

Changes:

- build.gradle: remove the v13 KME pin from `versionOverrides` so generated
Avro classes
  use v14.
- AvroProtocolDefinition: bump `KAFKA_MESSAGE_ENVELOPE` current protocol
version from 13
  to 14.
- LeaderMetadataWrapper: add `upstreamMessageTimestamp` field, getter, and
constructor
  overloads. Existing 3-arg / 4-arg constructors default the new field to
  `DEFAULT_UPSTREAM_MESSAGE_TIMESTAMP` (-1) so all current callers continue
to compile and
  produce records with the field defaulted to the schema sentinel.
- VeniceWriter: stamp `leaderMetadataFooter.upstreamMessageTimestamp` from
the wrapper at
  every site that builds a `LeaderMetadata` (fresh-DIV
`getKafkaMessageEnvelope`,
  pass-through `getKafkaMessageEnvelopeProvider`, `getHeartbeatKME`, and the
cached
  `DefaultLeaderMetadata`). The plumbing is uniform; specific sites carry the
value or the
  default sentinel based on what the wrapper holds.
- LeaderFollowerStoreIngestionTask.produceToLocalKafka: pass
  `consumerRecord.getPubSubMessageTime()` into the wrapper, which is the
broker
  append time when available (falling back to the upstream producer
timestamp). This is
  the only production stamping site in this PR — heartbeat propagation
  (`propagateHeartbeatFromUpstreamTopicToLocalVersionTopic`) is intentionally
left
  unchanged because the existing path already preserves the upstream HB time
via
  `producerMetadata.messageTimestamp` (see `getHeartbeatKME`).

Tests:

- New unit tests in `VeniceWriterUnitTest` verifying the field flows through
  `getKafkaMessageEnvelope` to the produced KME's `leaderMetadataFooter`, and
that
  `DEFAULT_LEADER_METADATA_WRAPPER` carries the sentinel.
- `InstanceSizeEstimatorTest` updated to call the v14 all-args
`LeaderMetadata`
  constructor.

A follow-up PR will introduce the store-config knob to optionally stamp from
the
upstream KME's `producerMetadata.messageTimestamp` instead of the broker
time, and a
later PR will switch the existing per-record OTel follower delay metric to
use the new
field for true RT-to-follower E2E.
Copilot AI review requested due to automatic review settings May 7, 2026 20:57
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR activates KafkaMessageEnvelope schema v14 and plumbs/stamps the new LeaderMetadata.upstreamMessageTimestamp so followers can compute true per-record end-to-end nearline ingestion latency (from upstream message time rather than leader produce time).

Changes:

  • Remove the KME v13 Avro pin and bump AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE current version to 14.
  • Add upstreamMessageTimestamp to LeaderMetadataWrapper and propagate it into all LeaderMetadata construction paths in VeniceWriter.
  • Stamp consumerRecord.getPubSubMessageTime() into leader-produced records and add unit tests covering the plumbing + sentinel behavior.

Reviewed changes

Copilot reviewed 6 out of 7 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
build.gradle Removes the KME v13 schema override so v14 is used for generated Avro classes.
internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/AvroProtocolDefinition.java Bumps KAFKA_MESSAGE_ENVELOPE current protocol version to v14.
internal/venice-common/src/main/java/com/linkedin/venice/writer/LeaderMetadataWrapper.java Adds upstreamMessageTimestamp with sentinel default and constructor overloads/getter.
internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java Copies upstreamMessageTimestamp from wrapper into LeaderMetadata (default + data + heartbeat + passthrough).
clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java Stamps getPubSubMessageTime() into LeaderMetadataWrapper on the leader produce path.
internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java Adds unit tests to validate the new timestamp is propagated and sentinel default is preserved.
internal/venice-common/src/test/java/com/linkedin/venice/memory/InstanceSizeEstimatorTest.java Updates LeaderMetadata constructor usage for the new schema arity.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@sushantmane sushantmane changed the title [server][protocol] Activate KME v14 and stamp upstreamMessageTimestamp on leader produces [server][compat] Activate KME v14 and stamp upstreamMessageTimestamp on leader produces May 7, 2026
Address Copilot review feedback. Per the contract on
PubSubMessage.getPubSubMessageTime(), the returned value should always be
positive (broker time when available, otherwise the upstream producer
timestamp). To make the read-side invariant unambiguous — "u
pstreamMessageTimestamp > 0 means a real upstream time" — the leader now co
llapses any
non-positive value returned by getPubSubMessageTime() to the sentinel
DEFAULT_UPSTREAM_MESSAGE_TIMESTAMP (-1) before stamping the wrapper.

This is purely defensive; no observed call site returns 0, but it removes
the need for downstream readers to special-case 0 versus -1.
PRODUCER)

Adds a server-level config that selects which timestamp the leader stamps
into LeaderMetadata.upstreamMessageTimestamp when producing a record from a
consumed upstream message:

  server.nearline.latency.timestamp.source = BROKER | PRODUCER  (default:
BROKER)

  - BROKER (default): the upstream pub-sub broker's append timestamp via
    PubSubMessage.getPubSubMessageTime() (which itself falls back to the
    upstream producer timestamp when the broker time is unavailable). Matches
    the infra-only latency view the leader uses today.

  - PRODUCER: the upstream producer's wall clock embedded in the upstream
    KafkaMessageEnvelope.producerMetadata.messageTimestamp. Includes the
    upstream client's enqueue-to-produce latency, giving the
    application-perceived end-to-end view.

Both options yield millisecond-since-epoch values, matching the time basis
of ProducerMetadata.messageTimestamp. The leader still collapses any
non-positive resolved value to
LeaderMetadataWrapper.DEFAULT_UPSTREAM_MESSAGE_TIMESTAMP
(-1) so the field's invariant ("> 0 means a real upstream time") holds for
readers regardless of source.

Validation in follow-up commits.
Adds unit tests for the BROKER-vs-PRODUCER selection introduced in the
previous commit:

- NearlineLatencyTimestampSourceTest covers parsing (exact names, case
  insensitivity, whitespace trimming, null default to BROKER, rejection
  of unknown values).
- LeaderFollowerStoreIngestionTask gets three tests on
  resolveUpstreamMessageTimestamp: BROKER mode picks
  PubSubMessage.getPubSubMessageTime(), PRODUCER mode picks
  KafkaMessageEnvelope.producerMetadata.messageTimestamp, and PRODUCER
  mode returns the sentinel (-1) when producerMetadata is null.

resolveUpstreamMessageTimestamp is refactored to a static
package-private method so it can be unit-tested without spinning up a
full ingestion task. The production call site now passes the source
explicitly from getServerConfig().getNearlineLatencyTimestampSource().

E2E coverage exercising the full leader-produce → follower-apply path
under both modes can come in a follow-up.
Copilot AI review requested due to automatic review settings May 7, 2026 23:04
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 12 changed files in this pull request and generated 4 comments.

Fix CI failure in testLeaderCanSendValueChunksIntoDrainer caused by the
new server-config-driven stamping logic in produceToLocalKafka.

The test creates a fully-mocked ActiveActiveStoreIngestionTask without
stubbing getServerConfig(). When the configurable BROKER-vs-PRODUCER
selection landed, produceToLocalKafka started calling
getServerConfig().getNearlineLatencyTimestampSource() and NPE'd.

Stubs the mock to return a VeniceServerConfig with BROKER (the default).
Copilot AI review requested due to automatic review settings May 8, 2026 00:30
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 13 changed files in this pull request and generated no new comments.

…a ctor

Address Copilot review feedback: replace the raw `-1L` literals for
termId and upstreamMessageTimestamp with the named constants
`VeniceWriter.DEFAULT_TERM_ID` and
`LeaderMetadataWrapper.DEFAULT_UPSTREAM_MESSAGE_TIMESTAMP` so the test
stays correct if the schema sentinels are ever changed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants