Skip to content

[server][da-vinci] Add config-gated Netty backpressure to blob transfer client#2750

Draft
sixpluszero wants to merge 7 commits intolinkedin:mainfrom
sixpluszero:jlliu/blob-transfer-client-backpressure
Draft

[server][da-vinci] Add config-gated Netty backpressure to blob transfer client#2750
sixpluszero wants to merge 7 commits intolinkedin:mainfrom
sixpluszero:jlliu/blob-transfer-client-backpressure

Conversation

@sixpluszero
Copy link
Copy Markdown
Contributor

Problem Statement

During high-concurrency bootstrap on a Venice server host (e.g. Helix triggering OFFLINE→STANDBY for many replicas simultaneously while many stores are also transferring blobs in parallel), the blob-transfer client exhausts JVM direct memory and emits a burst of OOMs:

java.lang.OutOfMemoryError: Cannot reserve 4194304 bytes of direct buffer memory
  (allocated: 1072603013, limit: 1073741824)
  at P2PFileTransferClientHandler ...

Root cause lives in the client bootstrap in NettyFileTransferClient:

  • AUTO_READ is left at its default (true), so Netty chains socket reads back-to-back without waiting for downstream (FileChannel.transferFrom) to drain.
  • Writes to disk execute inline on the NIO event loop, blocking other channels sharing the worker group and lengthening the window in which retained direct ByteBufs sit in the pipeline.
  • There is no WRITE_BUFFER_WATER_MARK and no memory-aware admission, so under a slow disk + fast network the pooled direct-buffer allocator grows past -XX:MaxDirectMemorySize=1g and trips host-wide OOMs.

The OOMs cascade into aborted transfers, per-peer timeouts, and eventually SIT's 60-minute "Version state is not available" backstop kills the transfers entirely.

Solution

Add a single host-level config flag — blob.transfer.client.backpressure.enabled (default false) — that, when turned on, rewires the blob-transfer client's receive path to apply Netty-level backpressure:

  1. AUTO_READ=false on the client bootstrap. The handler drives socket reads explicitly (ctx.read() in channelActive, and again after each write unit completes).
  2. WRITE_BUFFER_WATER_MARK set to 256 KB / 1 MB so outbound writes respect a backpressure window as well.
  3. Dedicated disk-write executor (blob.transfer.client.disk.write.thread.pool.size, default max(2, cores)). File writes are offloaded off the NIO event loop.
  4. Single-flight FIFO queue per channel. Because one ctx.read() can dispatch multiple HttpContent messages through HttpClientCodec, all HttpContent chunks — and the end-of-transfer marker — flow through a per-handler deque drained one task at a time. This preserves file-content ordering and ensures handleEndOfTransfer observes the complete checksumValidationFutureList.
  5. Config-gated: the flag is threaded VeniceServerConfigP2PBlobTransferConfigBlobTransferManagerBuilderNettyFileTransferClientP2PFileTransferClientHandler with backward-compatible delegating constructors, so existing callers are unaffected. When the flag is false, the code paths are bitwise equivalent to pre-change behavior. A server restart is required to toggle (the bootstrap options are fixed at channel creation), which is fine for the intended staged-rollout workflow.

Expected impact

Metric Before (legacy) After (backpressure on)
Per-channel in-flight direct buffer Unbounded (pipeline-queue depth × RCVBUF_max) ≤ 1 × RCVBUF_allocator max (~1 MB)
NIO event loop blocked by FileChannel.transferFrom Yes No (writes run on disk executor)
Read rate when disk saturates Continues reading → memory pile-up TCP window closes → sender backs off

Code changes

  • Added new code behind a config: blob.transfer.client.backpressure.enabled (default false) and companion blob.transfer.client.disk.write.thread.pool.size (default max(2, cores)).
  • Introduced new log lines.
    • Confirmed if logs need to be rate limited to avoid excessive logging.

Concurrency-Specific Checks

Both reviewer and PR author to verify

  • Code has no race conditions or thread safety issues. The pending-task deque and single-flight AtomicBoolean are only accessed from the channel's event loop. Cross-thread handoffs (event loop → disk executor → back to event loop) use ExecutorService.execute / EventLoop.execute, which provide happens-before edges for the mutable handler state.
  • Proper synchronization mechanisms (e.g., synchronized, RWLock) are used where needed. AtomicBoolean for the single-flight flag; no locks needed because all queue operations stay on the event loop.
  • No blocking calls inside critical sections that could lead to deadlocks or performance degradation. The legacy path's FileChannel.transferFrom inline on the event loop is moved off the event loop by this change when backpressure is on.
  • Verified thread-safe collections are used (e.g., ConcurrentHashMap, CopyOnWriteArrayList). Queue is a plain ArrayDeque because it's confined to the event-loop thread.
  • Validated proper exception handling in multi-threaded code to avoid silent thread termination. Failures thrown from the offloaded task are captured and routed back through the handler's own exceptionCaught (mirroring SimpleChannelInboundHandler's inline-throw semantics). handleExceptionGracefully completes the transfer future exceptionally before draining pending tasks, so drained runnables short-circuit via handleMessage's isDone() guard and only release retained ByteBufs instead of writing to a closed file channel.

How was this PR tested?

  • New unit tests added.
    • TestP2PFileTransferClientHandler.testSingleFileTransferUnderBothBackpressureModes (DataProvider-parameterized, covers happy path both modes)
    • TestP2PFileTransferClientHandler.testFileSizeMismatchUnderBothBackpressureModes (DataProvider-parameterized, covers error-propagation through the offload path)
    • TestP2PFileTransferClientHandler.testChannelActiveTriggersReadOnlyWhenBackpressureEnabled (pins the AUTO_READ priming contract)
    • TestP2PFileTransferClientHandler.testBackpressureWithoutDiskExecutorRejected (constructor guard)
    • TestP2PFileTransferClientHandler.testPendingContentReleasedOnChannelCloseUnderBackpressure (ref-count leak regression: channel close with pending queue must release retained ByteBufs)
    • TestNettyP2PBlobTransferManager.testLocalFileTransferInBatchStoreWithBackpressureEnabled (end-to-end with real NIO + real disk I/O, exercises the multi-HttpContent-per-socket-read path)
  • New integration tests added.
    • BlobP2PTransferAmongServersTest.testBlobP2PTransferAmongServersForBatchStore is now parameterized via DataProviderUtils.True-and-False to run the full end-to-end P2P scenario (server restart, blob transfer, offset equality between two servers) with and without backpressure enabled on both servers.
  • Modified or extended existing tests.
    • BlobP2PTransferAmongServersTest.initializeVeniceCluster gained a two-argument overload so tests can override server 1's properties independently from server 2's.
  • Verified backward compatibility (if applicable). Legacy constructors of NettyFileTransferClient, P2PFileTransferClientHandler, and P2PBlobTransferConfig are preserved and delegate to the new overloads with backpressureEnabled=false, so existing callers are binary-compatible and exercise the unchanged legacy path.

All 40+ tests in com.linkedin.davinci.blobtransfer.* pass, as do both data-provider branches of the parameterized integration test (legacy ~31s, backpressure ~28s on local laptop).

Does this PR introduce any user-facing or breaking changes?

  • No. You can skip the rest of this section.
  • Yes. Clearly explain the behavior change and its impact.

The new config defaults to false so the deploy is a no-op until an operator explicitly flips blob.transfer.client.backpressure.enabled=true and restarts the affected host(s).

…er client

Adds blob.transfer.client.backpressure.enabled (default false). When true,
the P2P blob transfer client bootstrap uses AUTO_READ=false and
WRITE_BUFFER_WATER_MARK, and HttpContent writes are offloaded to a dedicated
disk-write executor via a single-flight FIFO queue per channel. When false,
behavior is bitwise identical to the previous code path so the change can be
rolled out by deploying the code first and flipping the flag per-host later.

Why: during high-concurrency bootstrap (many replicas transitioning at once
+ many stores in parallel), the default AUTO_READ=on path lets Netty keep
reading from the socket faster than the disk can absorb the data. In-flight
direct ByteBufs accumulate past the JVM MaxDirectMemorySize cap, producing
"Cannot reserve N bytes of direct buffer memory" OOMs that cascade through
the transfer layer and eventually trip SIT's 60-minute kill backstop.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings April 18, 2026 08:27
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a config-gated Netty backpressure mode to the blob-transfer client to prevent direct-memory OOMs during high-concurrency bootstrap by disabling AUTO_READ and offloading disk writes off the NIO event loop.

Changes:

  • Thread new server config flags for client backpressure enablement and disk-write executor sizing through server config → blob transfer config → manager/client/handler.
  • Rework P2PFileTransferClientHandler to support an AUTO_READ=false, explicit-read, FIFO queued/offloaded disk write path.
  • Expand unit + integration tests to exercise both legacy and backpressure-enabled modes.

Reviewed changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java Wires new blob-transfer client backpressure settings into blob transfer config construction.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java Passes new backpressure + disk executor sizing settings into P2PBlobTransferConfig.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/P2PBlobTransferConfig.java Adds config fields + ctor overloads for client backpressure and disk-write pool sizing.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferManagerBuilder.java Plumbs new config fields into NettyFileTransferClient construction.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java Introduces new server-level config parsing + getters for backpressure enablement and disk-write pool sizing.
internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java Adds config keys and docs for backpressure enablement + disk-write pool size.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java Applies AUTO_READ=false and WRITE_BUFFER_WATER_MARK under flag; creates disk-write executor; passes settings to handler.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/P2PFileTransferClientHandler.java Implements backpressure path with queued/offloaded disk writes and explicit read driving.
clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferClientHandler.java Adds/parameterizes tests for both backpressure modes and queue cleanup behavior.
clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestNettyP2PBlobTransferManager.java Adds an end-to-end local NIO test that runs with backpressure enabled.
internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/BlobP2PTransferAmongServersTest.java Parameterizes the end-to-end multi-server integration test to run with/without backpressure.
Comments suppressed due to low confidence (2)

clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java:401

  • With AUTO_READ disabled (backpressureEnabled=true), the channel will not begin reading until someone calls Channel.read(). This handler is added to the pipeline after connectToHost() returns an already-active Channel, so relying on P2PFileTransferClientHandler.channelActive() to prime ctx.read() is brittle (channelActive may have already fired before the handler was added). Consider explicitly calling ch.read() once after adding the handlers (only when backpressureEnabled), or moving the priming logic to handlerAdded()/channelRegistered() where it will run even when the handler is added late.
      // Request to get the blob file and metadata
      // Attach the file handler to the pipeline
      // Attach the metadata handler to the pipeline
      ch.pipeline()
          .addLast(new IdleStateHandler(blobReceiveReaderIdleTimeInSeconds, 0, 0))
          .addLast(new MetadataAggregator(MAX_METADATA_CONTENT_LENGTH))
          .addLast(
              new P2PFileTransferClientHandler(
                  baseDir,
                  perHostTransferFuture,
                  storeName,
                  version,
                  partition,
                  requestedTableFormat,
                  aggBlobTransferStats,
                  checksumValidationExecutorService,
                  backpressureEnabled,
                  diskWriteExecutorService))
          .addLast(
              new P2PMetadataTransferHandler(
                  storageMetadataService,
                  baseDir,
                  storeName,
                  version,
                  partition,
                  requestedTableFormat,
                  notifierSupplier));
      // Send a GET request
      ChannelFuture requestFuture =
          ch.writeAndFlush(prepareRequest(storeName, version, partition, requestedTableFormat));

internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/BlobP2PTransferAmongServersTest.java:377

  • initializeVeniceCluster(Map firstOverride, Map secondOverride) reuses the same Properties instance for both servers. Any keys set in configOverrideForFirstServer will be inherited by server 2 unless configOverrideForSecondServer explicitly overwrites them, which is easy to miss and can lead to surprising test behavior. Consider creating a fresh Properties copy for the second server (or explicitly removing first-server-only override keys) to keep per-server overrides isolated.
    // Apply first-server-specific overrides (e.g. backpressure opt-in) before the server is stood up. The same
    // Properties instance is reused for the second server further below, so any keys set here that the second
    // server should NOT inherit would need to be cleared — in practice the second-server override map takes care
    // of that by explicitly re-setting what differs for server 2.
    serverProperties.putAll(configOverrideForFirstServer);

    server1Port = veniceClusterWrapper.addVeniceServer(serverFeatureProperties, serverProperties).getPort();

    // add second server
    serverProperties.setProperty(ConfigKeys.DATA_BASE_PATH, path2);
    serverProperties.setProperty(ConfigKeys.ENABLE_BLOB_TRANSFER, "true");
    serverProperties.setProperty(ConfigKeys.BLOB_TRANSFER_MANAGER_ENABLED, "true");
    serverProperties.setProperty(ConfigKeys.DAVINCI_P2P_BLOB_TRANSFER_SERVER_PORT, String.valueOf(port2));
    serverProperties.setProperty(ConfigKeys.DAVINCI_P2P_BLOB_TRANSFER_CLIENT_PORT, String.valueOf(port1));

    // Add additional information for 2nd server.
    serverProperties.putAll(configOverrideForSecondServer);

    server2Port = veniceClusterWrapper.addVeniceServer(serverFeatureProperties, serverProperties).getPort();

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

backpressure

Five follow-up fixes to PR linkedin#2750 covering the review comments:

1. NettyFileTransferClient.get(): explicitly call ch.read() after attaching
   the P2P handlers under backpressureEnabled. channelActive fires once during
   connectToHost (BEFORE the late-added P2PFileTransferClientHandler is in
   the pipeline), so relying on the handler's channelActive to prime the read
   pump under AUTO_READ=false was brittle.
2. P2PFileTransferClientHandler.channelRead0: retain non-HttpContent messages
   that are routed through the disk-task queue (e.g. DefaultFullHttpResponse
   metadata). SimpleChannelInboundHandler auto-releases the msg once
   channelRead0 returns, so the disk-executor task was dereferencing a freed
   buffer. Retain before enqueue, release in the task's finally.
3. P2PFileTransferClientHandler.submitNextDiskTask: wrap
   diskWriteExecutorService.execute() in try/catch. On
RejectedExecutionException
   (e.g. NettyFileTransferClient.close() called mid-transfer), put the polled
   task back at the head of pendingDiskTasks and route the failure through
   exceptionCaught so the transfer fails deterministically and every retained
   ByteBuf is released by the drain path.
4. TestP2PFileTransferClientHandler: the backpressure ref-count regression
test
   uses a no-op disk executor that drops the first submitted task. Wrap the
   test in try/finally that releases any still-retained buffers to avoid
   tripping Netty leak detection at suite teardown.
5. BlobP2PTransferAmongServersTest.initializeVeniceCluster: build server 1's
   properties on its own Properties copy so first-server-only overrides (e.g.
   backpressure opt-in when only one server should enable it) don't silently
   leak into server 2 via the shared Properties instance.

Also documents the intentional design choice to keep cleanupResources() inline
on the event loop (instead of routing through the disk queue): the existing
isDone() short-circuit in handleMessage already swallows the narrow-window
ClosedChannelException that can surface from a mid-write disk task, and
routing cleanup through the queue regresses the ref-count test when the
executor is dead.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@sixpluszero
Copy link
Copy Markdown
Contributor Author

Thanks @copilot-pull-request-reviewer — pushed 4d9ef60 addressing all 6 flagged concerns (4 inline + 2 in the review body). Summary:

  1. NettyFileTransferClient.java:401 — brittle channelActive priming. ✅ Fixed. P2PFileTransferClientHandler is attached to the pipeline AFTER connectToHost() returns an already-active channel, so the handler's channelActive never fires. Added an explicit ch.read() in get() after the handlers are attached (gated on backpressureEnabled) so the GET response is actually pulled from the TCP buffer.

  2. BlobP2PTransferAmongServersTest.java:377 — shared Properties between servers. ✅ Fixed. initializeVeniceCluster now builds server 1's properties on its own copy, so first-server-only overrides don't silently leak into server 2.

  3. Non-HttpContent messages not retained before enqueue (inline Venice is not open-source yet #1) — ✅ fixed, retain/release pair around the disk-task wrapper.

  4. diskWriteExecutorService.execute can throw (inline Create the Venice docker image  #2) — ✅ fixed, try/catch with polled task put back at head of the queue before routing the failure through exceptionCaught.

  5. State races between event loop and disk executor (inline [doc] Update readme with getting started section #3) — ⚠️ documented as known behavior. The race exists but is handled deterministically by the isDone() short-circuit in handleMessage; the secondary ClosedChannelException is caught and skipped cleanly. I tried the stronger fix (route cleanup through the queue) but it regressed testPendingContentReleasedOnChannelCloseUnderBackpressure when the executor is dead. A follow-up that offloads only FileChannel.transferFrom while keeping state transitions on the event loop is the right long-term structure; left an in-line comment tracking it.

  6. Test buffer leak (inline [server] Use interpolation for logging in venice-common #4) — ✅ fixed, test wrapped in try/finally that releases any retained refs.

All 88 blob-transfer unit tests + both branches of the parameterized integration test (testBlobP2PTransferAmongServersForBatchStore[0](false) 31.7s, [2](true) 28.4s) pass locally.

Four follow-ups to PR linkedin#2750 driven by @jingy-li's review:

1. Restore cleanupResources/completeExceptionally ordering in
   handleExceptionGracefully (regression from my earlier reorder). The
   outer failover loop in NettyFileTransferClient.get() relies on the
   invariant from linkedin#2071 that "future complete ⇒ previous host fully
   cleaned up"; completing the future first allowed the next-host handler
   to start constructing files at the same tempPartitionDir before this
   handler's cleanup (file close + partial-file delete + state reset) had
   run. Reverted to: cleanupResources → completeExceptionally →
   releasePendingContents. The trailing releasePendingContents is new
   (specific to backpressure mode) and its drained runnables short-circuit
   via handleMessage's isDone() guard, only releasing retained ByteBufs.
2. Make handleEndOfTransfer non-blocking. In backpressure mode it runs on
   the disk-write executor; a sync .join() on the checksum-validation
   futures pinned a pool thread for the entire checksum window, and two
   simultaneous end-of-transfers on a 2-thread pool would stall every
   other channel's content writes. Switched to whenComplete chaining,
   finalising on the event loop via finalizeTransferAfterChecksumValidation.
3. Route metadata fireChannelRead through the event loop when we're on
   the disk-write executor. DefaultFullHttpResponse satisfies instanceof
   HttpContent, so under backpressure metadata was enqueued as a disk
   task and fireChannelRead ran P2PMetadataTransferHandler on the
   disk-pool thread. Now we dispatch on the event loop and block the
   disk-pool thread on a short-lived CompletableFuture barrier so the
   next queued task (typically EndOfTransfer) still observes metadata
   already persisted in storageMetadataService.
4. Extract DEFAULT_BLOB_TRANSFER_DISK_WRITE_THREAD_POOL_SIZE=2 constant
   shared across NettyFileTransferClient, P2PBlobTransferConfig, and
   VeniceServerConfig. Previously the default was "Math.max(2, cores)"
   in three places while the per-instance floor was "Math.max(1, configured)";
   aligned the floor to the default so a misconfigured "1" is still usable.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings April 20, 2026 22:22
@sixpluszero
Copy link
Copy Markdown
Contributor Author

Thanks @jingy-li for the thorough review — pushed 060efa4 addressing all points:

# Concern Status
1 Ordering regression vs #2071 (cleanupResources must run before completeExceptionally so the outer failover loop's invariant holds) ✅ Reverted to original ordering; releasePendingContents is now trailing and safe because drained runnables short-circuit via isDone()
2 handleEndOfTransfer .join() pins a disk-pool thread during checksum validation ✅ Refactored to CompletableFuture.allOf(...).whenComplete(...) → event loop; disk-pool thread returns immediately
3 Metadata fireChannelRead runs P2PMetadataTransferHandler on disk-pool thread ✅ Dispatched on event loop via eventLoop().execute(...) + short CompletableFuture barrier so the ordering guarantee with EndOfTransfer still holds
4 Extract 2 into a named constant + Math.max(1, …) inconsistent with Math.max(2, …) DEFAULT_BLOB_TRANSFER_DISK_WRITE_THREAD_POOL_SIZE=2 on NettyFileTransferClient, reused across P2PBlobTransferConfig, VeniceServerConfig, and the pool-size floor
5 outputFileChannel == null as short-circuit Acknowledged — current isDone() guard works, happy to switch if you prefer

CI status on previous push: all 100+ integration test shards passed. The single CANCELLED shard (IntegrationTests_38) was triggered by TestStoreMigrationMultiRegion.setUp timing out at 120s — that's unrelated multi-region store migration infra, not blob transfer. E2ETestsFailureAlert just aggregates that.

All 88 blob-transfer unit tests + both branches of the parameterized BlobP2PTransferAmongServersForBatchStore integration test pass locally after this push.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 11 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

sixpluszero and others added 2 commits April 21, 2026 14:03
…DLE races

Fixes a regression introduced by the handleEndOfTransfer refactor in 060efa4.
After switching to asynchronous checksum validation via whenComplete, the
transfer future stays incomplete for a brief window between:
  - the end-of-transfer marker being processed on the disk-write executor
  - the checksum+rename chain finalizing on the event loop

If the peer closes the TCP connection (server-side IdleStateHandler or
graceful FIN) during that window, channelInactive / userEventTriggered
currently route through fastFailoverIncompleteTransfer and flip the future
into an exception — even though every byte is already on disk.

Fix: introduce an endOfTransferReceived AtomicBoolean that handleEndOfTransfer
sets BEFORE kicking off the async checksum chain. channelInactive and
userEventTriggered check the flag first; when set, they log and return
without triggering the fast-failover path, leaving finalization to the
async chain.

Also clean up a stale comment in fastFailoverIncompleteTransfer that still
referenced the reverted "completeExceptionally-first" ordering.

Tests: two new regression tests in TestP2PFileTransferClientHandler drive
the finalizing state with (1) a peer-close after EndOfTransfer and (2) a
READER_IDLE after EndOfTransfer, asserting the transfer future completes
successfully instead of exceptionally.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
backpressure

The batch-store variant already gates on
blob.transfer.client.backpressure.enabled
via a single True-and-False DataProvider. Hybrid has more moving parts
(streaming
RT writes, rewind, offset-lag completion) and was only parameterized over
enableAdaptiveThrottling. Broaden it to a 2x2 matrix
(enableAdaptiveThrottling,
enableClientBackpressure) so every combination of the two orthogonal
server-side
opt-ins is exercised end-to-end against real NIO + real disk I/O on a hybrid
store under streaming traffic.

Applies the backpressure override to both servers via the two-arg
initializeVeniceCluster overload, matching how the batch test does it.

Locally each variant runs ~30s in isolation; CI fans out to a fresh JVM per
shard so the 2x2 costs ~4x one test's latency, not cumulative resource
pressure.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings April 22, 2026 19:39
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 11 changed files in this pull request and generated 9 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

async-finalization code path

Eight fixes covering the new Copilot comments on 4192c46, plus one comment
rewording. Categorised:

Real bugs:
  1. LOGGER.error in handleExceptionGracefully consumed the Throwable via a {}
     placeholder, so Log4j2 never printed the stack trace. Now passed as the
     trailing throwable argument.
  2. Metadata-dispatch barrier (backpressure path) called
CompletableFuture.get()
     with no timeout, which could pin a disk-write pool thread indefinitely if
     the event loop stalled. Added a 30s bounded wait; timeout fails the
     transfer explicitly.
  3. channelRead0 did not short-circuit on inputStreamFuture.isDone(), so
     messages arriving after failure/completion were retained + enqueued and
     could leak buffers behind a stuck diskWriteTaskInFlight flag. Now guarded
     at the entry point before any retain.
  4. When a disk task failed, the in-flight flag was not reset before
     exceptionCaught fired. Any late-arriving inbound message could then
     enqueue a retained ByteBuf behind a permanently-stuck flag and leak.
     Flag is now cleared before routing to exceptionCaught.

Log-noise cleanup:
  5/6. The "letting async checksum/rename finalize" INFO logs fired on every
     successful transfer because finalizeTransferAfterChecksumValidation
always
     calls ctx.close() after completing the future. Gated on !isDone() and
     dropped to DEBUG.

Documentation:
  7. Updated NettyFileTransferClient's pool-size-floor rationale — it still
     referenced the pre-async handleEndOfTransfer "await" behaviour.
  8. Corrected endOfTransferReceived's Javadoc: the flag is flipped on
whichever
     thread handleEndOfTransfer runs on (event loop in legacy mode, disk-write
     executor in backpressure mode); AtomicBoolean makes the cross-thread set
     safe.

All 88 blob-transfer unit tests still pass. spotlessCheck passes.

The remaining Copilot nit ("single blank line between imports and class") was
dismissed: Venice's Spotless config
(gradle/spotless/eclipse-java-formatter.xml)
explicitly sets blank_lines_after_imports=2 and the check is green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Comment on lines +173 to +176
// Early-out BEFORE any retain/enqueue so that late-arriving messages (e.g. after completeExceptionally on a
// failure path, or after the async checksum chain has already completed the future) don't uselessly retain
// buffers nor pile additional tasks behind a potentially stuck single-flight flag. SimpleChannelInboundHandler
// will auto-release msg when this method returns.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets simply comment as " Drop late messages before retain/enqueue to avoid pinning buffers behind a stuck single-flight flag."

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 979a1a8. Shortened to: // Drop late messages before retain/enqueue to avoid pinning buffers behind a stuck single-flight flag.

// Retain and enqueue the chunk; the FIFO queue ensures sequential writes to the same FileChannel even when
// one ctx.read() synchronously dispatches multiple HttpContent objects via HttpClientCodec.
HttpContent content = (HttpContent) msg;
content.retain();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq httpcontent is use content.retain(); and the follow end of transfer is ReferenceCountUtil.retain(msg)
is that intended?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not intentional — fixed in 979a1a8. Unified on ReferenceCountUtil.retain/release throughout (the extracted enqueueMessage helper handles it once). Previously the HttpContent branch called content.retain() directly (works, but asymmetric with the HttpResponse branch). Now consistent.

} finally {
ReferenceCountUtil.release(msg);
}
});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The line 216-222 is same with the line 192-198, can we extract it as an helper method:

  private void enqueueMessage(ChannelHandlerContext ctx, HttpObject msg) {                                                                                                         
    enqueueDiskTask(ctx, () -> {                                                                              
      try {
        handleMessage(ctx, msg);                                                                              
      } finally { 
        ReferenceCountUtil.release(msg);
      }                                                                                                       
    });
  }  

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great suggestion — done in 979a1a8. Extracted enqueueMessage(ctx, msg) helper that does retain → enqueue → try/finally release. Both the HttpContent branch and the HttpResponse-queued branch now just call it.

if (ctx.channel().isActive() && !inputStreamFuture.toCompletableFuture().isDone()) {
ctx.read();
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the channelRead0 method, my understanding is:
Type 1: HttpContent: always queue
Type 2: HttpResponse: queue only if ordering demands it
2.1 isEndOfTransferResponse — the EOT marker calls
2.2 !pendingDiskTasks.isEmpty()
2.3 diskWriteTaskInFlight.get()
Type3: fast path: inline on the event loop

I had concern that:
(1) the filename used for writing is changed by the HTTP response file handler. For example, when file1 starts, it uses the name file1 and begins writing to disk[in fifo way now]. However, when file2 arrives, the filename is updated to file2. As a result, the ongoing write for file1 ends up using the name file2.
But given this case, the 2.2 condition will prevent it? As there is still having file writing, so wont change file name?
(2) for the fast path, what kind of message will goes to type 3?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your understanding is spot on. Answering your two questions:

(1) The filename race — yes, condition 2.2 (!pendingDiskTasks.isEmpty()) is exactly what prevents it. Concrete trace:

  • file1 header arrives → fast-path (queue empty) → inline handleMessage opens outputFileChannel for file1, sets fileName='file1'
  • file1 chunks arrive → enqueued (because previous inline handleMessage consumed the fast-path momentarily but subsequent chunks see queue was empty too; actually chunks ALWAYS queue as Type 1 — even if the queue is empty they go through the HttpContent branch first, which unconditionally enqueues)
  • Wait actually I realize my branch ordering guarantees this: HttpContent branch comes BEFORE the HttpResponse fast-path check. So chunks always enqueue, meaning by the time file2 header could arrive inline, pendingDiskTasks is non-empty → condition 2.2 kicks in → file2 header also queues → waits behind file1's LastHttpContent which does resetState().

(2) What lands in the fast-path (Type 3)? Only HttpResponse that is:

  • not EndOfTransfer
  • with empty pending queue AND no in-flight disk task
    So in practice: the very first file-header HttpResponse of a transfer, plus error responses (404, 500). Everything else (chunks, EndOfTransfer, and any subsequent file-header once content is flowing) goes through the queue.

/**
* Pops the next pending unit off {@link #pendingDiskTasks} and runs it on {@link #diskWriteExecutorService},
* chaining to itself on completion to drain the queue in FIFO order. When the queue is empty, releases the
* single-flight flag and issues {@code ctx.read()} so the next socket read can bring more data into the pipeline.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is actually 2 entry points for queued task running, can we add comment like:

======================================================
Two entry points drive the drain loop:

  1. {@code enqueueDiskTask} — called from the event loop when a message arrives. Appends the task and, if the queue was previously idle, bootstraps the drain loop via {@link #submitNextDiskTask}. The bootstrap is required because the chain-continuation (entry point 2) only fires after a task completes, so the very first task has nothing to pump it.
  2. {@link #submitNextDiskTask} self-chain — after each task finishes, its completion callback re-invokes {@code submitNextDiskTask} on the event loop to pop the next task in FIFO order.
======================================================

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in 979a1a8 — dropped your suggested Javadoc block onto enqueueDiskTask documenting the two entry points (initial bootstrap call and self-chain continuation). Cross-referenced from submitNextDiskTask so future readers see the loop from either end.

@@ -112,7 +333,44 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex
boolean isMetadataMessage = BlobTransferUtils.isMetadataMessage(response);
if (isMetadataMessage) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might not need this separate handling for metadata between line 336-373.

Current order:

  1. Metadata arrives from the wire as a DefaultFullHttpResponse.                                                                                                                                               
  2. channelRead0 sees instanceof HttpContent (because DefaultFullHttpResponse extends LastHttpContent), matches the first branch, retains it, enqueues onto pendingDiskTasks. Already in the queue.            
  3. The queue is single-flight FIFO. All file chunks enqueued before metadata drain first.                                                                                                                     
  4. When the metadata task's turn comes, every preceding file write has completed synchronously on the disk thread. Guaranteed by the queue.                                                                   
  5. The metadata task runs handleMessage on the disk thread → matches isMetadataMessage → calls ctx.fireChannelRead(msg).                                                                                      
  6. P2PMetadataTransferHandler.channelRead0 runs synchronously on the disk thread, storageMetadataService.put + computeStoreVersionState complete before return.                                               
  7. Disk task returns. submitNextDiskTask polls the next item (EndOfTransfer). EndOfTransfer runs with metadata already persisted.   

So as long as we make sure metadata response is in the queue, so when metadata is polled out, we already can ensure the file write part is done. So only "not-good" part is metadata is handle by disk write executor, or we rename it as something else like "blobReceiveSequencer"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent suggestion — done in 979a1a8. You are exactly right: given the FIFO queue already serialises metadata behind every file-chunk write, there is no ordering problem with running P2PMetadataTransferHandler on the disk-write executor. Its only side-effects are storageMetadataService.put / computeStoreVersionState / veniceNotifier.* which are thread-safe backend services, and it never mutates pipeline state. Removed the whole CompletableFuture metadataDone barrier + METADATA_DISPATCH_TIMEOUT_SECONDS machinery. Far simpler now — just retain + fireChannelRead unconditionally in both modes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed together with the metadata-hop removal above — WDYT on the simpler version in 979a1a8? The fireChannelRead is now identical in both legacy and backpressure modes, and the thread is whichever one the queue happens to be draining on (disk executor in backpressure, event loop in legacy).

// notice while finalization is still genuinely in flight — logging on every completion is INFO-level noise.
if (!inputStreamFuture.toCompletableFuture().isDone()) {
LOGGER.debug(
"Channel became inactive for replica {} while the asynchronous checksum/rename path is still "
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we change this line to warn.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 979a1a8. Both this channelInactive branch and the mirror in userEventTriggered now log at WARN, with a short justification comment (connection drop or reader-idle during finalization is operator-relevant and infrequent).

Comment on lines +574 to +589
// Ordering matters: cleanupResources() runs BEFORE completeExceptionally(), preserving the invariant introduced
// in #2071 that the outer failover loop in NettyFileTransferClient relies on — namely, "future complete ⇒
// previous host fully cleaned up". If we completed the future first, the outer loop could advance to the next
// host and start constructing files at the same tempPartitionDir while this handler's cleanup (file close +
// partial-file delete + state reset) is still executing, producing overlapping state access.
//
// releasePendingContents() runs AFTER completeExceptionally() so that each drained runnable's handleMessage
// call short-circuits via the isDone() guard (resetState has already nulled outputFileChannel anyway), and the
// retained ByteBuf is released by each runnable's own finally block.
//
// Backpressure-mode race: if a disk task on diskWriteExecutorService is executing FileChannel.transferFrom at
// the exact moment this event-loop thread calls cleanupResources, the disk task may observe a
// ClosedChannelException. It is caught by submitNextDiskTask's wrapper, routed back through exceptionCaught,
// and skipped at the isDone() guard (set just below) without re-logging or re-completing. A cleaner design
// that offloads only the transferFrom call while keeping state management on the event loop is tracked as a
// follow-up.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this to java doc.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved in 979a1a8. The ordering contract + backpressure-race commentary is now a proper Javadoc block on handleExceptionGracefully — it shows up on IDE hover instead of being buried inline. The method body is down to just the four operations.

// Always finalize on the Netty event loop so pipeline mutations (ctx.close()) stay on the expected thread.
if (ctx.channel().eventLoop().inEventLoop()) {
finalizeOnEventLoop.run();
} else {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of this async logic change, the channel may become idle or close before the checksum completes and the future is waiting to be completed. I noticed that both userEventTriggered and channelInactive already reflect this behavior, they wait for the checksum to finish even after the connection has been closed, which is good.

But one issue is:

exceptionCaught is missing the endOfTransferReceived guard (P2PFileTransferClientHandler.java:562). A TCP RST or SSL error during the async checksum/rename window silently deletes fully-transferred partition data via the outer loop's cleanupBothPartitionDirAndTempTransferredDir.

Some recommendation fix:

  • (A) Route checksum failures in finalizeTransferAfterChecksumValidation directly through handleExceptionGracefully instead of re-entering exceptionCaught, otherwise a naive guard would swallow checksum failures.
  • (B) Add endOfTransferReceived && !isDone() guard to exceptionCaught.
  • (C) Regression test testExceptionAfterEndOfTransferDoesNotFastFailover mirroring the two existing post-EoT tests.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch — this is a real data-loss bug. Fixed in 979a1a8:

  • (B) exceptionCaught now has the endOfTransferReceived && !isDone() guard matching channelInactive/userEventTriggered. Added a WARN log so operators still see the event.
  • (A) finalizeTransferAfterChecksumValidation no longer routes checksum failures through exceptionCaught (which would now be swallowed by the new guard). It calls handleExceptionGracefully + ctx.close() directly. Checksum failure is a legitimate data-integrity failure (bytes could be corrupt even though all received), unlike the transient pipeline errors the guard suppresses.
  • (C) New regression test testExceptionCaughtAfterEndOfTransferDoesNotFastFailover drives the exact scenario (TCP RST via fireExceptionCaught during the checksum window) and asserts (1) the future completes successfully and (2) the transferred file survives on disk — not deleted by cleanupResources.

public void channelActive(ChannelHandlerContext ctx) throws Exception {
// When AUTO_READ is disabled, Netty will not initiate the first socket read automatically.
// Request the first read once the pipeline is ready so the GET response can be received.
if (backpressureEnabled) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add && !endOfTransferReceived.get() to the two ctx.read() guards to closes the spurious-read race window.

One real bug fix, one substantial simplification, one refactor, plus log/doc
polish driven by @jingy-li's review of 145abfb:

Real bug:
  - exceptionCaught was missing the endOfTransferReceived guard that
    channelInactive and userEventTriggered already had. A TCP RST or SSL
    decode error arriving during the async checksum/rename window would
    fire this path, hit handleExceptionGracefully, and delete the fully
    received temp partition directory — destroying data the async chain
    was about to finalize. Added the same guard (warn log + early return).

  - As a corollary, finalizeTransferAfterChecksumValidation no longer
    routes checksum failures through exceptionCaught (which would now
    be silently swallowed by the new guard). It calls
    handleExceptionGracefully + ctx.close() directly. Checksum failure is
    a legitimate data-integrity reason to fail the transfer, unlike the
    transient pipeline errors the guard intentionally suppresses.

  - New regression test
testExceptionCaughtAfterEndOfTransferDoesNotFastFailover
    drives the exact TCP-RST-during-checksum-window scenario and asserts
    (a) the future completes successfully and (b) the transferred file
    survives on disk.

Simplification:
  - Dropped the event-loop hop + CompletableFuture barrier that backpressure-
    mode metadata used to do. Because the single-flight FIFO queue already
    guarantees metadata runs after every preceding file-chunk write,
    P2PMetadataTransferHandler is safe to run on the disk-write executor —
    it only calls thread-safe backend services (storageMetadataService,
    veniceNotifier) and never mutates pipeline state. Removed
    METADATA_DISPATCH_TIMEOUT_SECONDS and the TimeUnit import.

Refactor:
  - Extracted enqueueMessage helper. The HttpContent branch and the
    HttpResponse-queued branch were identical retain/try-finally/release
    blocks; consolidated. Unified on ReferenceCountUtil.retain/release so
    any HttpObject subtype works the same way.

Log/doc:
  - channelInactive/userEventTriggered finalizing-window logs escalated from
    DEBUG to WARN: a connection drop or reader-idle during finalization is
    infrequent and operator-relevant.
  - Added javadoc on enqueueDiskTask explaining the two entry points that
    drive the drain loop (initial bootstrap and self-chain).
  - Moved the ordering contract + backpressure-race commentary from an
    inline comment block to handleExceptionGracefully's javadoc so it
    shows up in IDE hover.
  - Shortened the channelRead0 early-out comment per reviewer.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings April 23, 2026 16:31
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 11 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +343 to 352
// In both legacy and backpressure mode, simply fire through the pipeline. The single-flight FIFO queue
// (see enqueueDiskTask) already guarantees metadata runs AFTER every preceding file-chunk write, so
// P2PMetadataTransferHandler can safely run on whichever thread is dispatching this message. The
// downstream metadata handler only calls into thread-safe backend services
// (storageMetadataService.put / computeStoreVersionState, veniceNotifier.*) — it never mutates pipeline
// state. Running it on the disk-write executor is therefore a Netty convention departure, not a
// correctness issue, and avoids the complexity of an event-loop hop + barrier that would otherwise pin
// this disk-write pool thread while waiting for metadata processing to complete.
ReferenceCountUtil.retain(msg);
ctx.fireChannelRead(msg);
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In backpressure mode, handleMessage() can run on diskWriteExecutorService threads, but this branch calls ctx.fireChannelRead(msg) directly. Netty’s ChannelPipeline is not thread-safe for inbound event propagation, and P2PMetadataTransferHandler (a SimpleChannelInboundHandler) is expected to run on the channel’s event loop. Please dispatch the metadata fireChannelRead onto ctx.channel().eventLoop() (and preserve ordering relative to queued disk tasks), rather than invoking it from the disk-write thread.

Suggested change
// In both legacy and backpressure mode, simply fire through the pipeline. The single-flight FIFO queue
// (see enqueueDiskTask) already guarantees metadata runs AFTER every preceding file-chunk write, so
// P2PMetadataTransferHandler can safely run on whichever thread is dispatching this message. The
// downstream metadata handler only calls into thread-safe backend services
// (storageMetadataService.put / computeStoreVersionState, veniceNotifier.*) — it never mutates pipeline
// state. Running it on the disk-write executor is therefore a Netty convention departure, not a
// correctness issue, and avoids the complexity of an event-loop hop + barrier that would otherwise pin
// this disk-write pool thread while waiting for metadata processing to complete.
ReferenceCountUtil.retain(msg);
ctx.fireChannelRead(msg);
// Metadata processing must be propagated on the channel's event loop because Netty inbound pipeline
// traversal is not thread-safe. In backpressure mode this handler can run on a disk-write executor
// thread, so retain the message and hop back to the event loop before firing it to the next handler.
// This still preserves ordering relative to the already-serialized disk task queue because this branch
// is reached only after all earlier queued disk writes for this message stream have completed.
if (ctx.channel().eventLoop().inEventLoop()) {
ctx.fireChannelRead(msg);
} else {
ReferenceCountUtil.retain(msg);
ctx.channel().eventLoop().execute(() -> ctx.fireChannelRead(msg));
}

Copilot uses AI. Check for mistakes.
@sixpluszero sixpluszero marked this pull request as draft April 24, 2026 02:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants