[server][da-vinci] Add config-gated Netty backpressure to blob transfer client#2750
[server][da-vinci] Add config-gated Netty backpressure to blob transfer client#2750sixpluszero wants to merge 7 commits intolinkedin:mainfrom
Conversation
…er client Adds blob.transfer.client.backpressure.enabled (default false). When true, the P2P blob transfer client bootstrap uses AUTO_READ=false and WRITE_BUFFER_WATER_MARK, and HttpContent writes are offloaded to a dedicated disk-write executor via a single-flight FIFO queue per channel. When false, behavior is bitwise identical to the previous code path so the change can be rolled out by deploying the code first and flipping the flag per-host later. Why: during high-concurrency bootstrap (many replicas transitioning at once + many stores in parallel), the default AUTO_READ=on path lets Netty keep reading from the socket faster than the disk can absorb the data. In-flight direct ByteBufs accumulate past the JVM MaxDirectMemorySize cap, producing "Cannot reserve N bytes of direct buffer memory" OOMs that cascade through the transfer layer and eventually trip SIT's 60-minute kill backstop. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Adds a config-gated Netty backpressure mode to the blob-transfer client to prevent direct-memory OOMs during high-concurrency bootstrap by disabling AUTO_READ and offloading disk writes off the NIO event loop.
Changes:
- Thread new server config flags for client backpressure enablement and disk-write executor sizing through server config → blob transfer config → manager/client/handler.
- Rework
P2PFileTransferClientHandlerto support an AUTO_READ=false, explicit-read, FIFO queued/offloaded disk write path. - Expand unit + integration tests to exercise both legacy and backpressure-enabled modes.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java | Wires new blob-transfer client backpressure settings into blob transfer config construction. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java | Passes new backpressure + disk executor sizing settings into P2PBlobTransferConfig. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/P2PBlobTransferConfig.java | Adds config fields + ctor overloads for client backpressure and disk-write pool sizing. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferManagerBuilder.java | Plumbs new config fields into NettyFileTransferClient construction. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java | Introduces new server-level config parsing + getters for backpressure enablement and disk-write pool sizing. |
| internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java | Adds config keys and docs for backpressure enablement + disk-write pool size. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java | Applies AUTO_READ=false and WRITE_BUFFER_WATER_MARK under flag; creates disk-write executor; passes settings to handler. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/P2PFileTransferClientHandler.java | Implements backpressure path with queued/offloaded disk writes and explicit read driving. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferClientHandler.java | Adds/parameterizes tests for both backpressure modes and queue cleanup behavior. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestNettyP2PBlobTransferManager.java | Adds an end-to-end local NIO test that runs with backpressure enabled. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/BlobP2PTransferAmongServersTest.java | Parameterizes the end-to-end multi-server integration test to run with/without backpressure. |
Comments suppressed due to low confidence (2)
clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java:401
- With AUTO_READ disabled (backpressureEnabled=true), the channel will not begin reading until someone calls Channel.read(). This handler is added to the pipeline after connectToHost() returns an already-active Channel, so relying on P2PFileTransferClientHandler.channelActive() to prime ctx.read() is brittle (channelActive may have already fired before the handler was added). Consider explicitly calling ch.read() once after adding the handlers (only when backpressureEnabled), or moving the priming logic to handlerAdded()/channelRegistered() where it will run even when the handler is added late.
// Request to get the blob file and metadata
// Attach the file handler to the pipeline
// Attach the metadata handler to the pipeline
ch.pipeline()
.addLast(new IdleStateHandler(blobReceiveReaderIdleTimeInSeconds, 0, 0))
.addLast(new MetadataAggregator(MAX_METADATA_CONTENT_LENGTH))
.addLast(
new P2PFileTransferClientHandler(
baseDir,
perHostTransferFuture,
storeName,
version,
partition,
requestedTableFormat,
aggBlobTransferStats,
checksumValidationExecutorService,
backpressureEnabled,
diskWriteExecutorService))
.addLast(
new P2PMetadataTransferHandler(
storageMetadataService,
baseDir,
storeName,
version,
partition,
requestedTableFormat,
notifierSupplier));
// Send a GET request
ChannelFuture requestFuture =
ch.writeAndFlush(prepareRequest(storeName, version, partition, requestedTableFormat));
internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/BlobP2PTransferAmongServersTest.java:377
- initializeVeniceCluster(Map firstOverride, Map secondOverride) reuses the same Properties instance for both servers. Any keys set in configOverrideForFirstServer will be inherited by server 2 unless configOverrideForSecondServer explicitly overwrites them, which is easy to miss and can lead to surprising test behavior. Consider creating a fresh Properties copy for the second server (or explicitly removing first-server-only override keys) to keep per-server overrides isolated.
// Apply first-server-specific overrides (e.g. backpressure opt-in) before the server is stood up. The same
// Properties instance is reused for the second server further below, so any keys set here that the second
// server should NOT inherit would need to be cleared — in practice the second-server override map takes care
// of that by explicitly re-setting what differs for server 2.
serverProperties.putAll(configOverrideForFirstServer);
server1Port = veniceClusterWrapper.addVeniceServer(serverFeatureProperties, serverProperties).getPort();
// add second server
serverProperties.setProperty(ConfigKeys.DATA_BASE_PATH, path2);
serverProperties.setProperty(ConfigKeys.ENABLE_BLOB_TRANSFER, "true");
serverProperties.setProperty(ConfigKeys.BLOB_TRANSFER_MANAGER_ENABLED, "true");
serverProperties.setProperty(ConfigKeys.DAVINCI_P2P_BLOB_TRANSFER_SERVER_PORT, String.valueOf(port2));
serverProperties.setProperty(ConfigKeys.DAVINCI_P2P_BLOB_TRANSFER_CLIENT_PORT, String.valueOf(port1));
// Add additional information for 2nd server.
serverProperties.putAll(configOverrideForSecondServer);
server2Port = veniceClusterWrapper.addVeniceServer(serverFeatureProperties, serverProperties).getPort();
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
backpressure Five follow-up fixes to PR linkedin#2750 covering the review comments: 1. NettyFileTransferClient.get(): explicitly call ch.read() after attaching the P2P handlers under backpressureEnabled. channelActive fires once during connectToHost (BEFORE the late-added P2PFileTransferClientHandler is in the pipeline), so relying on the handler's channelActive to prime the read pump under AUTO_READ=false was brittle. 2. P2PFileTransferClientHandler.channelRead0: retain non-HttpContent messages that are routed through the disk-task queue (e.g. DefaultFullHttpResponse metadata). SimpleChannelInboundHandler auto-releases the msg once channelRead0 returns, so the disk-executor task was dereferencing a freed buffer. Retain before enqueue, release in the task's finally. 3. P2PFileTransferClientHandler.submitNextDiskTask: wrap diskWriteExecutorService.execute() in try/catch. On RejectedExecutionException (e.g. NettyFileTransferClient.close() called mid-transfer), put the polled task back at the head of pendingDiskTasks and route the failure through exceptionCaught so the transfer fails deterministically and every retained ByteBuf is released by the drain path. 4. TestP2PFileTransferClientHandler: the backpressure ref-count regression test uses a no-op disk executor that drops the first submitted task. Wrap the test in try/finally that releases any still-retained buffers to avoid tripping Netty leak detection at suite teardown. 5. BlobP2PTransferAmongServersTest.initializeVeniceCluster: build server 1's properties on its own Properties copy so first-server-only overrides (e.g. backpressure opt-in when only one server should enable it) don't silently leak into server 2 via the shared Properties instance. Also documents the intentional design choice to keep cleanupResources() inline on the event loop (instead of routing through the disk queue): the existing isDone() short-circuit in handleMessage already swallows the narrow-window ClosedChannelException that can surface from a mid-write disk task, and routing cleanup through the queue regresses the ref-count test when the executor is dead. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Thanks @copilot-pull-request-reviewer — pushed 4d9ef60 addressing all 6 flagged concerns (4 inline + 2 in the review body). Summary:
All 88 blob-transfer unit tests + both branches of the parameterized integration test ( |
Four follow-ups to PR linkedin#2750 driven by @jingy-li's review: 1. Restore cleanupResources/completeExceptionally ordering in handleExceptionGracefully (regression from my earlier reorder). The outer failover loop in NettyFileTransferClient.get() relies on the invariant from linkedin#2071 that "future complete ⇒ previous host fully cleaned up"; completing the future first allowed the next-host handler to start constructing files at the same tempPartitionDir before this handler's cleanup (file close + partial-file delete + state reset) had run. Reverted to: cleanupResources → completeExceptionally → releasePendingContents. The trailing releasePendingContents is new (specific to backpressure mode) and its drained runnables short-circuit via handleMessage's isDone() guard, only releasing retained ByteBufs. 2. Make handleEndOfTransfer non-blocking. In backpressure mode it runs on the disk-write executor; a sync .join() on the checksum-validation futures pinned a pool thread for the entire checksum window, and two simultaneous end-of-transfers on a 2-thread pool would stall every other channel's content writes. Switched to whenComplete chaining, finalising on the event loop via finalizeTransferAfterChecksumValidation. 3. Route metadata fireChannelRead through the event loop when we're on the disk-write executor. DefaultFullHttpResponse satisfies instanceof HttpContent, so under backpressure metadata was enqueued as a disk task and fireChannelRead ran P2PMetadataTransferHandler on the disk-pool thread. Now we dispatch on the event loop and block the disk-pool thread on a short-lived CompletableFuture barrier so the next queued task (typically EndOfTransfer) still observes metadata already persisted in storageMetadataService. 4. Extract DEFAULT_BLOB_TRANSFER_DISK_WRITE_THREAD_POOL_SIZE=2 constant shared across NettyFileTransferClient, P2PBlobTransferConfig, and VeniceServerConfig. Previously the default was "Math.max(2, cores)" in three places while the per-instance floor was "Math.max(1, configured)"; aligned the floor to the default so a misconfigured "1" is still usable. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Thanks @jingy-li for the thorough review — pushed 060efa4 addressing all points:
CI status on previous push: all 100+ integration test shards passed. The single All 88 blob-transfer unit tests + both branches of the parameterized |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…DLE races Fixes a regression introduced by the handleEndOfTransfer refactor in 060efa4. After switching to asynchronous checksum validation via whenComplete, the transfer future stays incomplete for a brief window between: - the end-of-transfer marker being processed on the disk-write executor - the checksum+rename chain finalizing on the event loop If the peer closes the TCP connection (server-side IdleStateHandler or graceful FIN) during that window, channelInactive / userEventTriggered currently route through fastFailoverIncompleteTransfer and flip the future into an exception — even though every byte is already on disk. Fix: introduce an endOfTransferReceived AtomicBoolean that handleEndOfTransfer sets BEFORE kicking off the async checksum chain. channelInactive and userEventTriggered check the flag first; when set, they log and return without triggering the fast-failover path, leaving finalization to the async chain. Also clean up a stale comment in fastFailoverIncompleteTransfer that still referenced the reverted "completeExceptionally-first" ordering. Tests: two new regression tests in TestP2PFileTransferClientHandler drive the finalizing state with (1) a peer-close after EndOfTransfer and (2) a READER_IDLE after EndOfTransfer, asserting the transfer future completes successfully instead of exceptionally. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
backpressure The batch-store variant already gates on blob.transfer.client.backpressure.enabled via a single True-and-False DataProvider. Hybrid has more moving parts (streaming RT writes, rewind, offset-lag completion) and was only parameterized over enableAdaptiveThrottling. Broaden it to a 2x2 matrix (enableAdaptiveThrottling, enableClientBackpressure) so every combination of the two orthogonal server-side opt-ins is exercised end-to-end against real NIO + real disk I/O on a hybrid store under streaming traffic. Applies the backpressure override to both servers via the two-arg initializeVeniceCluster overload, matching how the batch test does it. Locally each variant runs ~30s in isolation; CI fans out to a fresh JVM per shard so the 2x2 costs ~4x one test's latency, not cumulative resource pressure. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 9 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
async-finalization code path Eight fixes covering the new Copilot comments on 4192c46, plus one comment rewording. Categorised: Real bugs: 1. LOGGER.error in handleExceptionGracefully consumed the Throwable via a {} placeholder, so Log4j2 never printed the stack trace. Now passed as the trailing throwable argument. 2. Metadata-dispatch barrier (backpressure path) called CompletableFuture.get() with no timeout, which could pin a disk-write pool thread indefinitely if the event loop stalled. Added a 30s bounded wait; timeout fails the transfer explicitly. 3. channelRead0 did not short-circuit on inputStreamFuture.isDone(), so messages arriving after failure/completion were retained + enqueued and could leak buffers behind a stuck diskWriteTaskInFlight flag. Now guarded at the entry point before any retain. 4. When a disk task failed, the in-flight flag was not reset before exceptionCaught fired. Any late-arriving inbound message could then enqueue a retained ByteBuf behind a permanently-stuck flag and leak. Flag is now cleared before routing to exceptionCaught. Log-noise cleanup: 5/6. The "letting async checksum/rename finalize" INFO logs fired on every successful transfer because finalizeTransferAfterChecksumValidation always calls ctx.close() after completing the future. Gated on !isDone() and dropped to DEBUG. Documentation: 7. Updated NettyFileTransferClient's pool-size-floor rationale — it still referenced the pre-async handleEndOfTransfer "await" behaviour. 8. Corrected endOfTransferReceived's Javadoc: the flag is flipped on whichever thread handleEndOfTransfer runs on (event loop in legacy mode, disk-write executor in backpressure mode); AtomicBoolean makes the cross-thread set safe. All 88 blob-transfer unit tests still pass. spotlessCheck passes. The remaining Copilot nit ("single blank line between imports and class") was dismissed: Venice's Spotless config (gradle/spotless/eclipse-java-formatter.xml) explicitly sets blank_lines_after_imports=2 and the check is green. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| // Early-out BEFORE any retain/enqueue so that late-arriving messages (e.g. after completeExceptionally on a | ||
| // failure path, or after the async checksum chain has already completed the future) don't uselessly retain | ||
| // buffers nor pile additional tasks behind a potentially stuck single-flight flag. SimpleChannelInboundHandler | ||
| // will auto-release msg when this method returns. |
There was a problem hiding this comment.
Lets simply comment as " Drop late messages before retain/enqueue to avoid pinning buffers behind a stuck single-flight flag."
There was a problem hiding this comment.
Fixed in 979a1a8. Shortened to: // Drop late messages before retain/enqueue to avoid pinning buffers behind a stuck single-flight flag.
| // Retain and enqueue the chunk; the FIFO queue ensures sequential writes to the same FileChannel even when | ||
| // one ctx.read() synchronously dispatches multiple HttpContent objects via HttpClientCodec. | ||
| HttpContent content = (HttpContent) msg; | ||
| content.retain(); |
There was a problem hiding this comment.
qq httpcontent is use content.retain(); and the follow end of transfer is ReferenceCountUtil.retain(msg)
is that intended?
There was a problem hiding this comment.
Not intentional — fixed in 979a1a8. Unified on ReferenceCountUtil.retain/release throughout (the extracted enqueueMessage helper handles it once). Previously the HttpContent branch called content.retain() directly (works, but asymmetric with the HttpResponse branch). Now consistent.
| } finally { | ||
| ReferenceCountUtil.release(msg); | ||
| } | ||
| }); |
There was a problem hiding this comment.
The line 216-222 is same with the line 192-198, can we extract it as an helper method:
private void enqueueMessage(ChannelHandlerContext ctx, HttpObject msg) {
enqueueDiskTask(ctx, () -> {
try {
handleMessage(ctx, msg);
} finally {
ReferenceCountUtil.release(msg);
}
});
}
There was a problem hiding this comment.
Great suggestion — done in 979a1a8. Extracted enqueueMessage(ctx, msg) helper that does retain → enqueue → try/finally release. Both the HttpContent branch and the HttpResponse-queued branch now just call it.
| if (ctx.channel().isActive() && !inputStreamFuture.toCompletableFuture().isDone()) { | ||
| ctx.read(); | ||
| } | ||
| } |
There was a problem hiding this comment.
For the channelRead0 method, my understanding is:
Type 1: HttpContent: always queue
Type 2: HttpResponse: queue only if ordering demands it
2.1 isEndOfTransferResponse — the EOT marker calls
2.2 !pendingDiskTasks.isEmpty()
2.3 diskWriteTaskInFlight.get()
Type3: fast path: inline on the event loop
I had concern that:
(1) the filename used for writing is changed by the HTTP response file handler. For example, when file1 starts, it uses the name file1 and begins writing to disk[in fifo way now]. However, when file2 arrives, the filename is updated to file2. As a result, the ongoing write for file1 ends up using the name file2.
But given this case, the 2.2 condition will prevent it? As there is still having file writing, so wont change file name?
(2) for the fast path, what kind of message will goes to type 3?
There was a problem hiding this comment.
Your understanding is spot on. Answering your two questions:
(1) The filename race — yes, condition 2.2 (!pendingDiskTasks.isEmpty()) is exactly what prevents it. Concrete trace:
- file1 header arrives → fast-path (queue empty) → inline
handleMessageopensoutputFileChannelfor file1, setsfileName='file1' - file1 chunks arrive → enqueued (because previous inline handleMessage consumed the fast-path momentarily but subsequent chunks see queue was empty too; actually chunks ALWAYS queue as Type 1 — even if the queue is empty they go through the HttpContent branch first, which unconditionally enqueues)
- Wait actually I realize my branch ordering guarantees this: HttpContent branch comes BEFORE the HttpResponse fast-path check. So chunks always enqueue, meaning by the time file2 header could arrive inline,
pendingDiskTasksis non-empty → condition 2.2 kicks in → file2 header also queues → waits behind file1's LastHttpContent which doesresetState().
(2) What lands in the fast-path (Type 3)? Only HttpResponse that is:
- not EndOfTransfer
- with empty pending queue AND no in-flight disk task
So in practice: the very first file-header HttpResponse of a transfer, plus error responses (404, 500). Everything else (chunks, EndOfTransfer, and any subsequent file-header once content is flowing) goes through the queue.
| /** | ||
| * Pops the next pending unit off {@link #pendingDiskTasks} and runs it on {@link #diskWriteExecutorService}, | ||
| * chaining to itself on completion to drain the queue in FIFO order. When the queue is empty, releases the | ||
| * single-flight flag and issues {@code ctx.read()} so the next socket read can bring more data into the pipeline. |
There was a problem hiding this comment.
There is actually 2 entry points for queued task running, can we add comment like:
======================================================
Two entry points drive the drain loop:
- {@code enqueueDiskTask} — called from the event loop when a message arrives. Appends the task and, if the queue was previously idle, bootstraps the drain loop via {@link #submitNextDiskTask}. The bootstrap is required because the chain-continuation (entry point 2) only fires after a task completes, so the very first task has nothing to pump it.
- {@link #submitNextDiskTask} self-chain — after each task finishes, its completion callback re-invokes {@code submitNextDiskTask} on the event loop to pop the next task in FIFO order.
There was a problem hiding this comment.
Added in 979a1a8 — dropped your suggested Javadoc block onto enqueueDiskTask documenting the two entry points (initial bootstrap call and self-chain continuation). Cross-referenced from submitNextDiskTask so future readers see the loop from either end.
| @@ -112,7 +333,44 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex | |||
| boolean isMetadataMessage = BlobTransferUtils.isMetadataMessage(response); | |||
| if (isMetadataMessage) { | |||
There was a problem hiding this comment.
We might not need this separate handling for metadata between line 336-373.
Current order:
1. Metadata arrives from the wire as a DefaultFullHttpResponse.
2. channelRead0 sees instanceof HttpContent (because DefaultFullHttpResponse extends LastHttpContent), matches the first branch, retains it, enqueues onto pendingDiskTasks. Already in the queue.
3. The queue is single-flight FIFO. All file chunks enqueued before metadata drain first.
4. When the metadata task's turn comes, every preceding file write has completed synchronously on the disk thread. Guaranteed by the queue.
5. The metadata task runs handleMessage on the disk thread → matches isMetadataMessage → calls ctx.fireChannelRead(msg).
6. P2PMetadataTransferHandler.channelRead0 runs synchronously on the disk thread, storageMetadataService.put + computeStoreVersionState complete before return.
7. Disk task returns. submitNextDiskTask polls the next item (EndOfTransfer). EndOfTransfer runs with metadata already persisted.
So as long as we make sure metadata response is in the queue, so when metadata is polled out, we already can ensure the file write part is done. So only "not-good" part is metadata is handle by disk write executor, or we rename it as something else like "blobReceiveSequencer"
There was a problem hiding this comment.
Excellent suggestion — done in 979a1a8. You are exactly right: given the FIFO queue already serialises metadata behind every file-chunk write, there is no ordering problem with running P2PMetadataTransferHandler on the disk-write executor. Its only side-effects are storageMetadataService.put / computeStoreVersionState / veniceNotifier.* which are thread-safe backend services, and it never mutates pipeline state. Removed the whole CompletableFuture metadataDone barrier + METADATA_DISPATCH_TIMEOUT_SECONDS machinery. Far simpler now — just retain + fireChannelRead unconditionally in both modes.
There was a problem hiding this comment.
Addressed together with the metadata-hop removal above — WDYT on the simpler version in 979a1a8? The fireChannelRead is now identical in both legacy and backpressure modes, and the thread is whichever one the queue happens to be draining on (disk executor in backpressure, event loop in legacy).
| // notice while finalization is still genuinely in flight — logging on every completion is INFO-level noise. | ||
| if (!inputStreamFuture.toCompletableFuture().isDone()) { | ||
| LOGGER.debug( | ||
| "Channel became inactive for replica {} while the asynchronous checksum/rename path is still " |
There was a problem hiding this comment.
can we change this line to warn.
There was a problem hiding this comment.
Done in 979a1a8. Both this channelInactive branch and the mirror in userEventTriggered now log at WARN, with a short justification comment (connection drop or reader-idle during finalization is operator-relevant and infrequent).
| // Ordering matters: cleanupResources() runs BEFORE completeExceptionally(), preserving the invariant introduced | ||
| // in #2071 that the outer failover loop in NettyFileTransferClient relies on — namely, "future complete ⇒ | ||
| // previous host fully cleaned up". If we completed the future first, the outer loop could advance to the next | ||
| // host and start constructing files at the same tempPartitionDir while this handler's cleanup (file close + | ||
| // partial-file delete + state reset) is still executing, producing overlapping state access. | ||
| // | ||
| // releasePendingContents() runs AFTER completeExceptionally() so that each drained runnable's handleMessage | ||
| // call short-circuits via the isDone() guard (resetState has already nulled outputFileChannel anyway), and the | ||
| // retained ByteBuf is released by each runnable's own finally block. | ||
| // | ||
| // Backpressure-mode race: if a disk task on diskWriteExecutorService is executing FileChannel.transferFrom at | ||
| // the exact moment this event-loop thread calls cleanupResources, the disk task may observe a | ||
| // ClosedChannelException. It is caught by submitNextDiskTask's wrapper, routed back through exceptionCaught, | ||
| // and skipped at the isDone() guard (set just below) without re-logging or re-completing. A cleaner design | ||
| // that offloads only the transferFrom call while keeping state management on the event loop is tracked as a | ||
| // follow-up. |
There was a problem hiding this comment.
Can we move this to java doc.
There was a problem hiding this comment.
Moved in 979a1a8. The ordering contract + backpressure-race commentary is now a proper Javadoc block on handleExceptionGracefully — it shows up on IDE hover instead of being buried inline. The method body is down to just the four operations.
| // Always finalize on the Netty event loop so pipeline mutations (ctx.close()) stay on the expected thread. | ||
| if (ctx.channel().eventLoop().inEventLoop()) { | ||
| finalizeOnEventLoop.run(); | ||
| } else { |
There was a problem hiding this comment.
Because of this async logic change, the channel may become idle or close before the checksum completes and the future is waiting to be completed. I noticed that both userEventTriggered and channelInactive already reflect this behavior, they wait for the checksum to finish even after the connection has been closed, which is good.
But one issue is:
exceptionCaught is missing the endOfTransferReceived guard (P2PFileTransferClientHandler.java:562). A TCP RST or SSL error during the async checksum/rename window silently deletes fully-transferred partition data via the outer loop's cleanupBothPartitionDirAndTempTransferredDir.
Some recommendation fix:
- (A) Route checksum failures in finalizeTransferAfterChecksumValidation directly through handleExceptionGracefully instead of re-entering exceptionCaught, otherwise a naive guard would swallow checksum failures.
- (B) Add endOfTransferReceived && !isDone() guard to exceptionCaught.
- (C) Regression test testExceptionAfterEndOfTransferDoesNotFastFailover mirroring the two existing post-EoT tests.
There was a problem hiding this comment.
Great catch — this is a real data-loss bug. Fixed in 979a1a8:
- (B)
exceptionCaughtnow has theendOfTransferReceived && !isDone()guard matchingchannelInactive/userEventTriggered. Added a WARN log so operators still see the event. - (A)
finalizeTransferAfterChecksumValidationno longer routes checksum failures throughexceptionCaught(which would now be swallowed by the new guard). It callshandleExceptionGracefully+ctx.close()directly. Checksum failure is a legitimate data-integrity failure (bytes could be corrupt even though all received), unlike the transient pipeline errors the guard suppresses. - (C) New regression test
testExceptionCaughtAfterEndOfTransferDoesNotFastFailoverdrives the exact scenario (TCP RST viafireExceptionCaughtduring the checksum window) and asserts (1) the future completes successfully and (2) the transferred file survives on disk — not deleted bycleanupResources.
| public void channelActive(ChannelHandlerContext ctx) throws Exception { | ||
| // When AUTO_READ is disabled, Netty will not initiate the first socket read automatically. | ||
| // Request the first read once the pipeline is ready so the GET response can be received. | ||
| if (backpressureEnabled) { |
There was a problem hiding this comment.
Lets add && !endOfTransferReceived.get() to the two ctx.read() guards to closes the spurious-read race window.
One real bug fix, one substantial simplification, one refactor, plus log/doc polish driven by @jingy-li's review of 145abfb: Real bug: - exceptionCaught was missing the endOfTransferReceived guard that channelInactive and userEventTriggered already had. A TCP RST or SSL decode error arriving during the async checksum/rename window would fire this path, hit handleExceptionGracefully, and delete the fully received temp partition directory — destroying data the async chain was about to finalize. Added the same guard (warn log + early return). - As a corollary, finalizeTransferAfterChecksumValidation no longer routes checksum failures through exceptionCaught (which would now be silently swallowed by the new guard). It calls handleExceptionGracefully + ctx.close() directly. Checksum failure is a legitimate data-integrity reason to fail the transfer, unlike the transient pipeline errors the guard intentionally suppresses. - New regression test testExceptionCaughtAfterEndOfTransferDoesNotFastFailover drives the exact TCP-RST-during-checksum-window scenario and asserts (a) the future completes successfully and (b) the transferred file survives on disk. Simplification: - Dropped the event-loop hop + CompletableFuture barrier that backpressure- mode metadata used to do. Because the single-flight FIFO queue already guarantees metadata runs after every preceding file-chunk write, P2PMetadataTransferHandler is safe to run on the disk-write executor — it only calls thread-safe backend services (storageMetadataService, veniceNotifier) and never mutates pipeline state. Removed METADATA_DISPATCH_TIMEOUT_SECONDS and the TimeUnit import. Refactor: - Extracted enqueueMessage helper. The HttpContent branch and the HttpResponse-queued branch were identical retain/try-finally/release blocks; consolidated. Unified on ReferenceCountUtil.retain/release so any HttpObject subtype works the same way. Log/doc: - channelInactive/userEventTriggered finalizing-window logs escalated from DEBUG to WARN: a connection drop or reader-idle during finalization is infrequent and operator-relevant. - Added javadoc on enqueueDiskTask explaining the two entry points that drive the drain loop (initial bootstrap and self-chain). - Moved the ordering contract + backpressure-race commentary from an inline comment block to handleExceptionGracefully's javadoc so it shows up in IDE hover. - Shortened the channelRead0 early-out comment per reviewer. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // In both legacy and backpressure mode, simply fire through the pipeline. The single-flight FIFO queue | ||
| // (see enqueueDiskTask) already guarantees metadata runs AFTER every preceding file-chunk write, so | ||
| // P2PMetadataTransferHandler can safely run on whichever thread is dispatching this message. The | ||
| // downstream metadata handler only calls into thread-safe backend services | ||
| // (storageMetadataService.put / computeStoreVersionState, veniceNotifier.*) — it never mutates pipeline | ||
| // state. Running it on the disk-write executor is therefore a Netty convention departure, not a | ||
| // correctness issue, and avoids the complexity of an event-loop hop + barrier that would otherwise pin | ||
| // this disk-write pool thread while waiting for metadata processing to complete. | ||
| ReferenceCountUtil.retain(msg); | ||
| ctx.fireChannelRead(msg); |
There was a problem hiding this comment.
In backpressure mode, handleMessage() can run on diskWriteExecutorService threads, but this branch calls ctx.fireChannelRead(msg) directly. Netty’s ChannelPipeline is not thread-safe for inbound event propagation, and P2PMetadataTransferHandler (a SimpleChannelInboundHandler) is expected to run on the channel’s event loop. Please dispatch the metadata fireChannelRead onto ctx.channel().eventLoop() (and preserve ordering relative to queued disk tasks), rather than invoking it from the disk-write thread.
| // In both legacy and backpressure mode, simply fire through the pipeline. The single-flight FIFO queue | |
| // (see enqueueDiskTask) already guarantees metadata runs AFTER every preceding file-chunk write, so | |
| // P2PMetadataTransferHandler can safely run on whichever thread is dispatching this message. The | |
| // downstream metadata handler only calls into thread-safe backend services | |
| // (storageMetadataService.put / computeStoreVersionState, veniceNotifier.*) — it never mutates pipeline | |
| // state. Running it on the disk-write executor is therefore a Netty convention departure, not a | |
| // correctness issue, and avoids the complexity of an event-loop hop + barrier that would otherwise pin | |
| // this disk-write pool thread while waiting for metadata processing to complete. | |
| ReferenceCountUtil.retain(msg); | |
| ctx.fireChannelRead(msg); | |
| // Metadata processing must be propagated on the channel's event loop because Netty inbound pipeline | |
| // traversal is not thread-safe. In backpressure mode this handler can run on a disk-write executor | |
| // thread, so retain the message and hop back to the event loop before firing it to the next handler. | |
| // This still preserves ordering relative to the already-serialized disk task queue because this branch | |
| // is reached only after all earlier queued disk writes for this message stream have completed. | |
| if (ctx.channel().eventLoop().inEventLoop()) { | |
| ctx.fireChannelRead(msg); | |
| } else { | |
| ReferenceCountUtil.retain(msg); | |
| ctx.channel().eventLoop().execute(() -> ctx.fireChannelRead(msg)); | |
| } |
Problem Statement
During high-concurrency bootstrap on a Venice server host (e.g. Helix triggering OFFLINE→STANDBY for many replicas simultaneously while many stores are also transferring blobs in parallel), the blob-transfer client exhausts JVM direct memory and emits a burst of OOMs:
Root cause lives in the client bootstrap in
NettyFileTransferClient:AUTO_READis left at its default (true), so Netty chains socket reads back-to-back without waiting for downstream (FileChannel.transferFrom) to drain.ByteBufs sit in the pipeline.WRITE_BUFFER_WATER_MARKand no memory-aware admission, so under a slow disk + fast network the pooled direct-buffer allocator grows past-XX:MaxDirectMemorySize=1gand trips host-wide OOMs.The OOMs cascade into aborted transfers, per-peer timeouts, and eventually SIT's 60-minute "Version state is not available" backstop kills the transfers entirely.
Solution
Add a single host-level config flag —
blob.transfer.client.backpressure.enabled(default false) — that, when turned on, rewires the blob-transfer client's receive path to apply Netty-level backpressure:AUTO_READ=falseon the client bootstrap. The handler drives socket reads explicitly (ctx.read()inchannelActive, and again after each write unit completes).WRITE_BUFFER_WATER_MARKset to 256 KB / 1 MB so outbound writes respect a backpressure window as well.blob.transfer.client.disk.write.thread.pool.size, defaultmax(2, cores)). File writes are offloaded off the NIO event loop.ctx.read()can dispatch multipleHttpContentmessages throughHttpClientCodec, all HttpContent chunks — and the end-of-transfer marker — flow through a per-handler deque drained one task at a time. This preserves file-content ordering and ensureshandleEndOfTransferobserves the completechecksumValidationFutureList.VeniceServerConfig→P2PBlobTransferConfig→BlobTransferManagerBuilder→NettyFileTransferClient→P2PFileTransferClientHandlerwith backward-compatible delegating constructors, so existing callers are unaffected. When the flag isfalse, the code paths are bitwise equivalent to pre-change behavior. A server restart is required to toggle (the bootstrap options are fixed at channel creation), which is fine for the intended staged-rollout workflow.Expected impact
FileChannel.transferFromCode changes
blob.transfer.client.backpressure.enabled(defaultfalse) and companionblob.transfer.client.disk.write.thread.pool.size(defaultmax(2, cores)).Concurrency-Specific Checks
Both reviewer and PR author to verify
AtomicBooleanare only accessed from the channel's event loop. Cross-thread handoffs (event loop → disk executor → back to event loop) useExecutorService.execute/EventLoop.execute, which provide happens-before edges for the mutable handler state.synchronized,RWLock) are used where needed.AtomicBooleanfor the single-flight flag; no locks needed because all queue operations stay on the event loop.FileChannel.transferFrominline on the event loop is moved off the event loop by this change when backpressure is on.ConcurrentHashMap,CopyOnWriteArrayList). Queue is a plainArrayDequebecause it's confined to the event-loop thread.exceptionCaught(mirroringSimpleChannelInboundHandler's inline-throw semantics).handleExceptionGracefullycompletes the transfer future exceptionally before draining pending tasks, so drained runnables short-circuit viahandleMessage'sisDone()guard and only release retainedByteBufs instead of writing to a closed file channel.How was this PR tested?
TestP2PFileTransferClientHandler.testSingleFileTransferUnderBothBackpressureModes(DataProvider-parameterized, covers happy path both modes)TestP2PFileTransferClientHandler.testFileSizeMismatchUnderBothBackpressureModes(DataProvider-parameterized, covers error-propagation through the offload path)TestP2PFileTransferClientHandler.testChannelActiveTriggersReadOnlyWhenBackpressureEnabled(pins the AUTO_READ priming contract)TestP2PFileTransferClientHandler.testBackpressureWithoutDiskExecutorRejected(constructor guard)TestP2PFileTransferClientHandler.testPendingContentReleasedOnChannelCloseUnderBackpressure(ref-count leak regression: channel close with pending queue must release retainedByteBufs)TestNettyP2PBlobTransferManager.testLocalFileTransferInBatchStoreWithBackpressureEnabled(end-to-end with real NIO + real disk I/O, exercises the multi-HttpContent-per-socket-read path)BlobP2PTransferAmongServersTest.testBlobP2PTransferAmongServersForBatchStoreis now parameterized viaDataProviderUtils.True-and-Falseto run the full end-to-end P2P scenario (server restart, blob transfer, offset equality between two servers) with and without backpressure enabled on both servers.BlobP2PTransferAmongServersTest.initializeVeniceClustergained a two-argument overload so tests can override server 1's properties independently from server 2's.NettyFileTransferClient,P2PFileTransferClientHandler, andP2PBlobTransferConfigare preserved and delegate to the new overloads withbackpressureEnabled=false, so existing callers are binary-compatible and exercise the unchanged legacy path.All 40+ tests in
com.linkedin.davinci.blobtransfer.*pass, as do both data-provider branches of the parameterized integration test (legacy ~31s, backpressure ~28s on local laptop).Does this PR introduce any user-facing or breaking changes?
The new config defaults to
falseso the deploy is a no-op until an operator explicitly flipsblob.transfer.client.backpressure.enabled=trueand restarts the affected host(s).