[server][dvc] Invalidate inherited fast-RTS inputs after blob transfer and persist before logging#2773
Conversation
…r and persist before logging After PR linkedin#2715 cleared the previouslyReadyToServe gate flag inherited from the blob-transfer source host, the OffsetRecord still carried the donor server's heartbeatTimestamp and lastCheckpointTimestamp. checkFastReadyToServeWithPreviousTimeLag would correctly bail out at the heartbeat-INVALID guard for a fresh follower (since measureHybridHeartbeatTimestamp returns INVALID_MESSAGE_TIMESTAMP while consumedFromUpstream is false), but if that single guard were ever bypassed -- by a future refactor of the heartbeat measurement, a logic bug, or post-restart persistence of mismatched fields -- the fast path would compute (currentLag - previousLag) ~= 0 against the donor's clock and wrongly mark the replica RTS without any real catch-up. This change closes both early-return guards in checkFastReadyToServeWithPreviousTimeLag (heartbeat AND checkpoint) for any post-blob replica: * In completePostTransferPSCUpdated, explicitly set heartbeatTimestamp, lastCheckpointTimestamp, and offsetLag to INVALID_MESSAGE_TIMESTAMP / DEFAULT_OFFSET_LAG on the reinitialized PCS. These fields belong to the donor server's ingestion clock and must not be read as authoritative for this replica. * Add a syncOffset(pcs, updateMetadataLag) overload. The post-blob path calls syncOffset(newPcs, false) to persist the cleared OffsetRecord verbatim, so a JVM bounce immediately after blob bootstrap (and before any HB has been consumed on the new ingestion path) cannot re-trigger fast RTS. The default syncOffset(pcs) preserves existing behavior for all other callers. * The persist-then-log ordering also makes the "Post-blob-transfer PCS reinitialized" log reflect on-disk state, not just the in-memory snapshot, which simplifies post-incident investigations. Belt-and-suspenders alongside the gate flag clear from linkedin#2715: even if the gate is somehow set true again, the heartbeat AND checkpoint INVALID values together make checkFastReadyToServeWithPreviousTimeLag structurally bail out. Testing Done * Existing SITFastReadyToServeTest (unchanged behavior for non-blob callers). * Manually verified the diff via git diff and confirmed the no-overwrite path through syncOffset(pcs, false) reaches storageMetadataService.put with the intended INVALID values intact.
No functional change. Project convention is to use /* */ block form for multi-line explanatory comments; one-liner // is unchanged.
There was a problem hiding this comment.
Pull request overview
This PR hardens the Da Vinci/Server post–blob-transfer reinitialization flow to prevent inherited (donor) lag metadata in OffsetRecord from incorrectly enabling the fast ready-to-serve (RTS) path after blob bootstrap, and ensures the cleared state is persisted before it is logged.
Changes:
- In
completePostTransferPSCUpdated, explicitly invalidate inheritedheartbeatTimestamp,lastCheckpointTimestamp, andoffsetLag, then persist the updatedOffsetRecordbefore logging. - Add a
syncOffset(PartitionConsumptionState pcs, boolean updateMetadataLag)overload to optionally skipupdateOffsetLagInMetadata(...)so explicitly-cleared lag fields can be persisted verbatim. - Gate the lag-metadata refresh inside
syncOffset(...)behind the newupdateMetadataLagflag.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…point
Closes the crash window between when the donor's OffsetRecord lands on disk
and when the SIT thread runs the post-blob clears. Previously, the donor's
record was persisted by
P2PMetadataTransferHandler.updateStorePartitionMetadata
in full -- including previouslyReadyToServe, heartbeatTimestamp,
lastCheckpointTimestamp, and offsetLag -- and only later got cleared by
completePostTransferPSCUpdated on the SIT thread. A JVM crash anywhere in
between meant restart would read the donor's values and re-trigger fast-RTS
via checkFastReadyToServeWithPreviousTimeLag.
Sanitize the inherited record at the entry point, before any disk write:
* P2PMetadataTransferHandler.updateStorePartitionMetadata clears
previouslyReadyToServe and sets heartbeatTimestamp /
lastCheckpointTimestamp
/ offsetLag to INVALID_MESSAGE_TIMESTAMP / DEFAULT_OFFSET_LAG immediately
after deserializing the transferred bytes, so storageMetadataService.put
never sees donor-clock values.
* OffsetRecord exposes PREVIOUSLY_READY_TO_SERVE_KEY as a public constant
and a clearPreviouslyReadyToServe() convenience helper, so callers in
other packages (here: blobtransfer.client) can clear the flag without
depending on the davinci-internal constant.
* PartitionConsumptionState's private alias now points to the public
constant instead of duplicating the Utf8 literal; the unused Utf8 import
is removed.
Together with the SIT-side clears in completePostTransferPSCUpdated and the
syncOffset(pcs, false) variant from earlier in this PR, both early-return
guards in checkFastReadyToServeWithPreviousTimeLag (heartbeat AND checkpoint)
are armed for any post-blob replica at every observable persistence point.
Testing Done
* Verified Utf8 import is no longer needed in PartitionConsumptionState; no
other reference remains.
* Verified the public constant is reachable from the blobtransfer.client
package without a circular dependency.
OffsetRecord.clearInheritedDonorState
Replace the inline 4-line clear blocks in P2PMetadataTransferHandler and
StoreIngestionTask.completePostTransferPSCUpdated with a single static
helper OffsetRecord.clearInheritedDonorState(record). The helper is the
authoritative spot to extend whenever a new field is added that is unsafe
to inherit verbatim from the donor host -- callers don't need to remember
the full list each time.
Static (rather than instance) because the call site reads more naturally
as "clear inherited state on this record" and avoids the awkward
`getOffsetRecord().sanitizeAfterBlobTransfer()` getter dance when the
OffsetRecord lives inside another object (PCS).
Both call sites now invoke it before any disk persistence:
* P2PMetadataTransferHandler.updateStorePartitionMetadata clears the
inherited record before storageMetadataService.put.
* StoreIngestionTask.completePostTransferPSCUpdated clears the in-memory
record reloaded from storage before syncOffset(pcs, false).
No behavior change vs. the prior commit; this is a pure refactor.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (1)
clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/P2PMetadataTransferHandler.java:131
- Clearing inherited donor state before persisting the transferred OffsetRecord is critical to avoid re-triggering fast RTS after a crash, but there’s no unit coverage validating that the stored OffsetRecord actually has the cleared heartbeatTimestamp/lastCheckpointTimestamp/offsetLag and previouslyReadyToServe removed. Please add a focused unit test for updateStorePartitionMetadata that captures the StorageMetadataService.put() argument and asserts the cleared values are persisted.
Utils.getReplicaId(transferredPartitionMetadata.topicName, transferredPartitionMetadata.partitionId));
OffsetRecord transferredOffsetRecord =
new OffsetRecord(transferredPartitionMetadata.offsetRecord.array(), partitionStateSerializer, null);
/*
* Clear inherited donor state before persisting. If donor-clock fields reach disk verbatim, a
* JVM crash between this put() and the post-blob sync on the SIT thread leaves a state where
* restart reads donor values and re-triggers fast RTS via checkFastReadyToServeWithPreviousTimeLag.
* Clearing at the entry point closes that crash window regardless of where the SIT thread is.
*/
OffsetRecord.clearInheritedDonorState(transferredOffsetRecord);
// 1. update the offset incremental push job information
updateIncrementalPushInfoToStore(transferredOffsetRecord, transferredPartitionMetadata);
// 2. update the offset record in storage service
storageMetadataService
.put(transferredPartitionMetadata.topicName, transferredPartitionMetadata.partitionId, transferredOffsetRecord);
// 3. update the metadata SVS
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
wrapper, add unit tests Three review-driven changes, all in response to copilot-pull-request-reviewer feedback on PR linkedin#2773: 1. clearInheritedDonorState no longer touches heartbeatTimestamp. Setting it to INVALID_MESSAGE_TIMESTAMP (-1) interacts badly with BlobTransferIngestionHelper.isReplicaLaggedAndNeedBlobTransfer, which computes the time-lag eligibility check as (now() - offsetRecord.getHeartbeatTimestamp()). With heartbeatTimestamp = -1 this always exceeds blobTransferDisabledTimeLagThresholdInMinutes and would re-trigger blob transfer on every restart until a real heartbeat is consumed and checkpointed. The fast-RTS guard on lastCheckpointTimestamp alone is sufficient to block the wrongful RTS path -- the donor's heartbeatTimestamp is left intact so eligibility checks see a recent value. 2. Replace the boolean-literal call site syncOffset(newPcs, false) with a named wrapper syncOffsetPreservingLagMetadata(pcs). The boolean overload is renamed to syncOffsetInternal(pcs, updateMetadataLag) and is no longer the public-facing API; both callers go through one of the two named methods so the metadata-lag handling is explicit at every call site. 3. Add unit tests in TestOffsetRecord covering: * clearPreviouslyReadyToServe removes the gate flag. * clearInheritedDonorState clears the gate flag, lastCheckpointTimestamp, and offsetLag, but leaves heartbeatTimestamp at the donor value. * clearInheritedDonorState is idempotent on a default OffsetRecord. No behavior change for any non-blob caller.
blob-transfer eligibility
Treat the donor's heartbeatTimestamp as untrusted on this replica (it is the
root-cause
input to the fast-RTS misfire we are fixing) and absorb the consequence at
the only
reader that cared about it.
* OffsetRecord.clearInheritedDonorState now also resets heartbeatTimestamp
= -1, so both
early-return guards in checkFastReadyToServeWithPreviousTimeLag are armed
for any
post-blob replica regardless of which guard a future change might weaken.
* BlobTransferIngestionHelper.isReplicaLaggedAndNeedBlobTransfer skips the
time-lag
branch when heartbeatTimestamp == INVALID_MESSAGE_TIMESTAMP and falls
through to the
existing offset-lag fallback. With offsetLag = DEFAULT_OFFSET_LAG (-1,
also cleared by
sanitize) and a real localVtPosition (SST data on disk from blob
bootstrap, not
EARLIEST), the fallback correctly classifies this state as "caught up — no
blob
transfer", preventing the spurious-retransfer regression that a naive
(now - INVALID_MESSAGE_TIMESTAMP) computation would cause.
* Brand-new replicas (offsetLag == 0 && localVtPosition == EARLIEST) still
take the
blob-transfer path via the existing offset-lag short-circuit; no
regression there.
* Once a real heartbeat is consumed and a regular syncOffset(pcs) runs (NOT
the
syncOffsetPreservingLagMetadata variant used post-blob),
updateOffsetLagInMetadata
overwrites heartbeatTimestamp with a real measured value and the time-lag
check
resumes its normal behavior. The cleared INVALID state self-stabilizes.
Tests:
* TestOffsetRecord asserts heartbeatTimestamp = -1 after
clearInheritedDonorState (was
asserting "intact at donor's value" in the previous commit; reverted to
match the new
contract).
* LeaderFollowerStoreIngestionTaskTest adds
testShouldStartBlobTransferHybridStoreInvalidHeartbeatFallsThroughToOffsetLag
asserting
that an INVALID heartbeatTimestamp + cleared offsetLag + real
localVtPosition → no blob
transfer (guard against the spurious-retransfer regression).
This supersedes the "keep heartbeatTimestamp intact" approach from the
previous commit.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
… for
post-blob freshness
Bumps PartitionState schema to v23, adding donorHeartbeatTimestampMs as a
freshness anchor
that explicitly carries the donor's wall-clock for blob-transfer eligibility.
This separates
the donor-clock value (used for "is the data fresh?") from heartbeatTimestamp
(which is now
strictly the THIS-server-observed HB ts, used by fast-RTS lag computation).
Both fast-RTS
guards in checkFastReadyToServeWithPreviousTimeLag remain armed for any
post-blob replica.
Mechanics:
* PartitionState.avsc v23 adds donorHeartbeatTimestampMs (long, default -1).
AvroProtocolDefinition.PARTITION_STATE bumps to currentProtocolVersion =
23.
* OffsetRecord exposes getDonorHeartbeatTimestampMs /
setDonorHeartbeatTimestampMs
wrappers, and clearInheritedDonorState now captures the inherited
heartbeatTimestamp
into the donor field BEFORE clearing it. The capture guard is "> 0" so:
* fresh records (heartbeatTimestamp = 0, the Java long default) → no ca
pture
* already-cleared records (heartbeatTimestamp = -1) → no capture (r
e-entrant safe)
* donor records (heartbeatTimestamp = real wall-clock ms) → capture an
d clear
Re-running on the in-memory record reloaded from disk by the SIT-side
path is
therefore a no-op for the donor field — the value persisted by the ea
rlier P2P-side
call is preserved.
* BlobTransferIngestionHelper.isReplicaLaggedAndNeedBlobTransfer falls back
from
heartbeatTimestamp to donorHeartbeatTimestampMs when the former is
INVALID. The
threshold comparison uses whichever anchor is available, with
anchorSource logged
explicitly. If neither is set (truly fresh replica with no inherited
donor state),
falls through to the existing offset-lag check.
Why this is better than the previous "fall through to offset-lag" approach:
* Preserves time-lag-based blob-transfer eligibility post-blob — the op
erator's
threshold tuning (blobTransferDisabledTimeLagThresholdInMinutes) remains
effective.
* Trust separation is explicit in the schema. Future readers can tell apart
"this
replica observed an HB at this time" from "the donor observed an HB at
this time".
* Once the replica catches up and a regular syncOffset overwrites
heartbeatTimestamp
with a real measured value, the donor field is naturally ignored — el
igibility
prefers the THIS-server value.
Tests:
* TestOffsetRecord.testClearInheritedDonorState now asserts donor's HB ts
is captured
into donorHeartbeatTimestampMs while the standard heartbeatTimestamp is
cleared to -1.
*
TestOffsetRecord.testClearInheritedDonorStateIsReentrantSafeOnAlreadyClearedRe
cord
asserts that a second invocation does not overwrite the captured donor
value.
*
LeaderFollowerStoreIngestionTaskTest.testShouldStartBlobTransferHybridStoreUse
sDonor-
HeartbeatFallbackPostBlob verifies the eligibility fallback path on the
post-blob
state: heartbeatTimestamp = INVALID + donorHeartbeatTimestampMs = recent
→ no blob
transfer.
*
LeaderFollowerStoreIngestionTaskTest.testShouldStartBlobTransferHybridStoreNoH
eart-
beatAnchorFallsThroughToOffsetLag covers the pathological case where
neither anchor
is set.
Verified locally: :internal:venice-common:test --tests TestOffsetRecord and
:clients:da-vinci-client:test --tests testShouldStartBlobTransfer* all pass.
Supersedes the heartbeatTimestamp-cleared-and-fall-through-to-offset-lag
approach from
the previous commit. Both early-return guards in fast-RTS lag computation
remain armed,
AND time-lag-based blob-transfer eligibility is preserved.
Summary
Builds on top of #2715. After that PR cleared the inherited
previouslyReadyToServegate flag on the post-blob-transfer reinitialized PCS, the OffsetRecord still carried the donor server'sheartbeatTimestampandlastCheckpointTimestamp.checkFastReadyToServeWithPreviousTimeLagwould correctly bail out at the heartbeat-INVALID guard for a fresh follower today —measureHybridHeartbeatTimestampreturnsINVALID_MESSAGE_TIMESTAMPwhileconsumedFromUpstreamisfalse— but the entire defense relies on that single guard.If the heartbeat-INVALID early-return is ever bypassed (future refactor of the heartbeat measurement, logic bug, post-restart persistence of mismatched fields), the fast path computes
(currentLag - previousLag) ≈ 0against the donor server's clock and wrongly marks the replica RTS without any real catch-up. The replica is then served reads while reportingINVALID_HEARTBEAT_LAG(Long.MAX_VALUE) for the heartbeat-monitoring metric, which clamps into the metric's top histogram bucket and produces a sentinel artifact in any record-delay / heartbeat-delay SLI dashboard.Bug call-chain
The fast-RTS path triggers via three different entry points into
validateAndSubscribePartition. The post-blob path is the one that wrongly inherits donor state:The donor's OffsetRecord lands on disk earlier than this code path ever runs, in
P2PMetadataTransferHandler.updateStorePartitionMetadata:A JVM crash anywhere between the
putabove and the SIT-side post-blob clears = restart reads the donor's record and re-triggers fast-RTS via the same gate.Fix
Close every observable persistence point of the donor's OffsetRecord:
At the blob-transfer entry point (the disk window).
P2PMetadataTransferHandler.updateStorePartitionMetadatacallsOffsetRecord.clearInheritedDonorState(transferredOffsetRecord)immediately after deserializing the wire bytes, sostorageMetadataService.putnever sees donor-clock values. Crash-safe across the entire SIT-side post-blob sequence.In-memory at the SIT post-blob path (the same-cycle window).
completePostTransferPSCUpdatedcallsOffsetRecord.clearInheritedDonorState(newPcs.getOffsetRecord())on the record reloaded from storage, so the in-memory PCS used by the immediately-followingcheckFastReadyToServeForReplicasees cleared values regardless of future changes to the disk-side path.syncOffset(pcs, updateMetadataLag)overload so the post-blob persist path can preserve our cleared timestamps verbatim. The defaultsyncOffset(pcs)keeps current behavior (updateMetadataLag=true); the post-blob path passesfalseto skip theupdateOffsetLagInMetadatare-measurement that would otherwise overwritelastCheckpointTimestampwithnow()and weaken the second early-return guard.OffsetRecord.clearInheritedDonorState(OffsetRecord)is the single authoritative place to extend whenever a new field is added that is unsafe to inherit from a donor host. Today it clearspreviouslyReadyToServe,heartbeatTimestamp,lastCheckpointTimestamp, andoffsetLag.OffsetRecord.PREVIOUSLY_READY_TO_SERVE_KEYis now a public constant;PartitionConsumptionState's private alias is kept for brevity and now references it.Result: both early-return guards in
checkFastReadyToServeWithPreviousTimeLag(heartbeat AND checkpoint) are armed for any post-blob replica at every observable persistence point. Even if a future change weakens one of them, the other still bails out.Log evidence
Observed on a hybrid AA store (
<store>) on a follower replica (<host>) that had just been assigned partitions via HelixLOAD_REBALANCEand bootstrapped via blob transfer. Names sanitized; the timing and structure are real.The
Long.MAX_VALUEsample lands in the OTel histogram's top bucket and shows up as a P99 spike of ~8.8M ms (~2.4h) onVenice.Server.Ingestion.Replication.{Record,Heartbeat}.Delay— even withVenice.Replica.State=ready_to_serveANDVenice.Version.Role=currentfilters applied. The replica is structurally RTS via the fast path before it has consumed a single HB.This change ensures the fast path cannot be entered post-blob regardless of which guard the bug hits.
Testing Done
SITFastReadyToServeTestcontinues to pass — non-blob callers see no behavior change (defaultsyncOffset(pcs)keepsupdateMetadataLag=true).P2PMetadataTransferHandler.updateStorePartitionMetadata, the OffsetRecord persisted tostorageMetadataServicehasheartbeatTimestamp == -1,lastCheckpointTimestamp == -1,offsetLag == DEFAULT_OFFSET_LAG, and thepreviouslyReadyToServepreviousStatuses entry is absent — proves the disk-side window is closed.completePostTransferPSCUpdatedon a freshly reloaded PCS leaves the in-memory OffsetRecord with the same cleared fields aftersyncOffset(pcs, false)returns — proves the SIT-side window is closed.checkFastReadyToServeWithPreviousTimeLagreturnsfalsewhen bothheartbeatTimestamp == -1andlastCheckpointTimestamp == -1, even withpreviouslyReadyToServe=trueartificially set on the OffsetRecord — proves the second guard is armed.