feat(l1): add eth_subscribe/eth_unsubscribe WebSocket subscription support#6496
feat(l1): add eth_subscribe/eth_unsubscribe WebSocket subscription support#6496avilagaston9 merged 31 commits intomainfrom
Conversation
Implement eth_subscribe("newHeads") in both the L1 and L2 RPC WebSocket
servers. When a new block is produced, all subscribers receive the block
header as a JSON-RPC notification envelope.
L1 RPC (ethrex-rpc):
- Upgrade handle_websocket from a simple request-response loop to a
select!-based loop that multiplexes incoming messages, subscription
notification draining, and outbound message sending.
- Add handle_eth_subscribe, handle_eth_unsubscribe, generate_subscription_id,
drain_subscriptions, build_subscription_notification as public functions
so L2 can delegate to L1's implementation.
- Add new_heads_sender: Option<broadcast::Sender<Value>> field to
RpcApiContext; None when WS is disabled or subscriptions not needed.
- Add NEW_HEADS_CHANNEL_CAPACITY constant (128) for the broadcast channel.
- Export all new symbols from lib.rs including tokio::sync::broadcast.
L2 RPC (ethrex-l2-rpc):
- Add optional WS server startup in start_api when ws_addr is Some.
- handle_websocket delegates eth_subscribe/eth_unsubscribe to L1's
implementations via context.l1_ctx, eliminating duplicate code.
- Add ws_addr: Option<SocketAddr> and new_heads_sender parameters to
start_api; pass new_heads_sender through to the embedded l1_ctx.
- Re-export NEW_HEADS_CHANNEL_CAPACITY and broadcast from ethrex_rpc.
- Add axum ws feature to Cargo.toml.
L2 CLI:
- Add --l2.ws-enabled, --l2.ws-addr, --l2.ws-port flags to Options.
- Create broadcast channel in init_l2 when WS is enabled; pass sender
clone to both init_rpc_api and start_l2 (for the block producer).
L2 block producer:
- Add new_heads_sender: Option<broadcast::Sender<Value>> field.
- After sealing and storing each block, serialize the header, inject
the computed block hash, and broadcast to all WS subscribers.
- Add new_heads_sender parameter to BlockProducer::new and spawn.
Tests:
- 11 unit tests in ethrex-l2-rpc covering subscription ID generation,
broadcast channel delivery, notification format, and unsubscribe
behavior (happy path, missing subscription, missing params).
Lines of code reportTotal lines added: Detailed view |
it as a parameter. Follows the same pattern as get_http_socket_addr(opts).
creating two separate variables (new_heads_sender + new_heads_sender_for_block_producer).
… doc improvements - Remove duplicate L2 WS CLI flags (l2.ws-enabled/addr/port) — L2 now uses L1's --ws.enabled / --ws.addr / --ws.port flags - Broadcast newHeads from engine fork_choice after successful canonical head advancement - Create broadcast channel in L1 initializer when WS is enabled - Update channel capacity from 128 to 10 (matching Geth's chainEvChanSize)
sender together. Remove separate ws_addr and new_heads_sender parameters from start_api — now a single Option<WebSocketConfig>. Remove duplicate L2 WS CLI flags (l2.ws-enabled/addr/port); L2 inherits L1 flags.
paths, remove L2-specific comments and log prefixes, drop unnecessary doc comment about L1 crate delegation.
…t subscriptions
Introduce a SubscriptionManager actor (spawned-concurrency GenServer) that owns
all eth_subscribe("newHeads") subscription state. Each WebSocket connection gets
an mpsc channel and a subscription ID when it connects; the actor fans out new
block headers to all registered channels on new_head messages.
This removes the tokio broadcast channel approach, which had a "lagged subscriber"
problem and required each WS handler to maintain a HashMap of broadcast receivers.
The actor approach is simpler: two branches in the select! loop instead of three,
and dead subscribers are cleaned up automatically when their mpsc channel closes.
Changes:
- Add crates/networking/rpc/subscription_manager.rs with the SubscriptionManager
actor, SubscriptionManagerProtocol trait (#[protocol] macro), and
build_subscription_notification helper.
- WebSocketConfig now holds ActorRef<SubscriptionManager> instead of
broadcast::Sender<Value>. NEW_HEADS_CHANNEL_CAPACITY is removed.
- handle_websocket simplified to 2 select! branches; subscribe on connect,
unsubscribe on disconnect.
- handle_eth_subscribe/unsubscribe updated to use the actor; broadcast-based
drain_subscriptions and generate_subscription_id are removed.
- fork_choice.rs sends new heads via actor.new_head() instead of sender.send().
- block_producer.rs replaces broadcast::Sender with ActorRef<SubscriptionManager>.
- Both initializers.rs files spawn the actor instead of creating a broadcast channel.
- L2 sequencer mod.rs and block_producer.rs updated to pass ActorRef through.
- L2 networking rpc updated similarly; tests rewritten against the actor API.
new_head() now accepts BlockHeader directly instead of serde_json::Value. The actor handles JSON serialization and hash injection, so callers just pass the raw header.
Previously, every new WebSocket connection was auto-subscribed to
newHeads. Now subscription only happens when the client explicitly
sends eth_subscribe("newHeads"). handle_eth_subscribe calls
subscription_manager.subscribe() and stores the ID. The ID is
cleared on eth_unsubscribe or connection close.
Make handle_websocket public so L2 calls it with context.l1_ctx instead of duplicating the entire select loop, subscribe/unsubscribe routing, and cleanup logic. L2-specific RPC methods are not available over WS for now (only standard Ethereum methods). Remove all WS unit tests added by this PR.
of unbounded. When a slow client's channel is full, the notification is dropped via try_send rather than accumulating unbounded memory. The actor is never blocked.
…ptionManager tests
Change subscription tracking from Option<String> to Vec<String> so each
eth_subscribe call creates a new subscription with its own ID, matching
the Ethereum JSON-RPC spec. Previously a duplicate eth_subscribe("newHeads")
on the same connection returned the existing ID instead of creating a new one.
Also add 8 tests for the SubscriptionManager actor covering subscribe/unsubscribe,
fan-out, dead subscriber cleanup, and notification format.
-32700 Parse error with null id for unparseable requests (per JSON-RPC 2.0 spec), and use BadParams instead of Internal for unsupported subscription types since it is a client error.
and optimize fan-out serialization in SubscriptionManager. Sequential IDs leaked server activity information; replaced with 128-bit random hex IDs. Added MAX_SUBSCRIPTIONS_PER_CONNECTION (128) check in handle_eth_subscribe to prevent subscription spam. Optimized new_head fan-out to serialize the header result JSON once and build each subscriber's notification envelope via string formatting instead of repeated serde_json serialization.
can route L2-specific RPC methods (ethrexL2_*, sponsored txs, etc.) through its own map_http_requests when serving WebSocket connections. Previously the L2 WS handler delegated to the L1 context, meaning only L1 methods were reachable over WebSocket. Added Clone to RpcRequest and RpcRequestId to support passing requests by value into the routing closure.
9d040d9 to
abc812f
Compare
of the repository configuration.
🤖 Claude Code ReviewCode Review: PR #6496 —
|
🤖 Codex Code ReviewFindings:
No other correctness/security problems stood out in the diff itself. I could not run Automated review by OpenAI Codex · gpt-5.4 · custom prompt |
There was a problem hiding this comment.
Pull request overview
Adds Ethereum JSON-RPC WebSocket subscription support (eth_subscribe / eth_unsubscribe) for "newHeads" by introducing an actor-owned subscription registry and wiring canonical-head updates to WS fan-out for both L1 and L2.
Changes:
- Introduce an actor-based
SubscriptionManagerwith bounded per-subscriber channels and ID-based subscribe/unsubscribe. - Extend the WS RPC handler to support
eth_subscribe("newHeads")/eth_unsubscribealongside normal request/response routing, and plumb WS config viaWebSocketConfig. - Emit
newHeadsnotifications from L1 fork-choice canonical head updates and from the L2 block producer after sealing.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| test/tests/rpc/subscription_manager_tests.rs | Adds unit tests for SubscriptionManager subscribe/unsubscribe and fan-out/cleanup behavior. |
| test/tests/rpc/mod.rs | Registers the new subscription manager test module. |
| crates/networking/rpc/utils.rs | Derives Clone for RpcRequestId and RpcRequest to support WS routing flow. |
| crates/networking/rpc/test_utils.rs | Updates test API startup/context helpers for the new ws: Option<WebSocketConfig> plumbing. |
| crates/networking/rpc/subscription_manager.rs | New actor implementing subscription state and newHeads notification fan-out. |
| crates/networking/rpc/rpc.rs | Introduces WebSocketConfig, makes handle_websocket public/generic, and implements WS subscribe/unsubscribe handling. |
| crates/networking/rpc/lib.rs | Exposes subscription manager module and re-exports WS-related APIs/types. |
| crates/networking/rpc/engine/fork_choice.rs | Sends canonical head updates to the subscription manager on successful fork choice updates. |
| crates/l2/sequencer/mod.rs | Threads optional subscription manager handle into L2 startup for block producer notifications. |
| crates/l2/sequencer/block_producer.rs | Broadcasts newHeads after sealing/applying fork choice, using the optional subscription manager. |
| crates/l2/networking/rpc/rpc.rs | Starts an optional WS server in L2 and reuses L1’s handle_websocket with L2 request routing. |
| crates/l2/networking/rpc/Cargo.toml | Enables Axum’s ws feature for L2 RPC crate. |
| cmd/ethrex/l2/initializers.rs | Builds WebSocketConfig/SubscriptionManager when WS is enabled and passes it to both RPC and sequencer. |
| cmd/ethrex/initializers.rs | Builds WebSocketConfig/SubscriptionManager for L1 when WS is enabled and passes it into start_api. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| match req.method.as_str() { | ||
| "eth_subscribe" => { | ||
| let result = handle_eth_subscribe(&req, context, out_tx, subscription_ids).await; | ||
| let resp = rpc_response(req.id, result).ok()?; | ||
| Some(resp.to_string()) |
There was a problem hiding this comment.
New WebSocket behavior (handle_ws_request routing + eth_subscribe/eth_unsubscribe) isn’t covered by tests in this crate. Adding targeted unit/integration tests would help prevent regressions (e.g., batch requests over WS, invalid-request vs parse-error responses, subscription limit enforcement, and unsubscribe scoping to a single connection).
| let req: RpcRequest = match serde_json::from_str(body) { | ||
| Ok(r) => r, | ||
| Err(_) => { | ||
| // JSON-RPC 2.0 spec: parse error responses must have "id": null. | ||
| let resp = serde_json::json!({ | ||
| "jsonrpc": "2.0", | ||
| "id": null, | ||
| "error": { | ||
| "code": -32700, | ||
| "message": "Parse error" | ||
| } | ||
| }); | ||
| return Some(resp.to_string()); | ||
| } |
There was a problem hiding this comment.
handle_ws_request deserializes directly into RpcRequest and treats any deserialization error as a JSON parse error (-32700). This will incorrectly classify valid JSON that isn’t a valid request object (e.g., missing required fields), and it also regresses prior WS support for batch requests (arrays), which used to work via handle_http_request/RpcRequestWrapper. Consider parsing into RpcRequestWrapper (or serde_json::Value first) so you can: (1) return -32700 only for invalid JSON, (2) return -32600 (Invalid Request) for structurally-invalid JSON-RPC objects, and (3) preserve batch handling over WebSocket.
There was a problem hiding this comment.
Addressed in 80647af. handle_ws_request now parses the body into serde_json::Value first (→ -32700 for malformed JSON), then into RpcRequestWrapper (→ -32600 for structurally-invalid requests). Batch support is restored: each entry is routed through process_ws_request, which still intercepts eth_subscribe / eth_unsubscribe per-connection. Empty batches respond with -32600 per the JSON-RPC 2.0 spec.
| #[allow(clippy::too_many_arguments)] | ||
| pub async fn start_api( | ||
| http_addr: SocketAddr, | ||
| ws_addr: Option<SocketAddr>, | ||
| ws: Option<WebSocketConfig>, | ||
| authrpc_addr: SocketAddr, | ||
| storage: Store, |
There was a problem hiding this comment.
The start_api doc comment (and argument docs) still refer to ws_addr even though the function now accepts ws: Option<WebSocketConfig>. Updating the docs here will prevent confusion for callers and keep generated docs accurate (e.g., describe WebSocketConfig.addr + subscription_manager instead of a bare socket addr).
There was a problem hiding this comment.
Addressed in 3f9195e — updated the doc comment to describe ws: Option<WebSocketConfig> and the subscription methods it serves.
| msg: subscription_manager_protocol::Subscribe, | ||
| _ctx: &Context<Self>, | ||
| ) -> String { | ||
| let id = generate_subscription_id(); |
There was a problem hiding this comment.
handle_subscribe inserts a freshly-generated random ID into self.subscribers without checking whether that ID already exists. While collisions are unlikely, a collision would overwrite an existing subscription sender and effectively drop a subscriber. Consider looping until an unused ID is generated (or switch to a monotonic/UUID-based ID source) before inserting.
| let id = generate_subscription_id(); | |
| let id = loop { | |
| let id = generate_subscription_id(); | |
| if !self.subscribers.contains_key(&id) { | |
| break id; | |
| } | |
| }; |
Greptile SummaryAdds Confidence Score: 4/5Safe to merge once CI passes; all findings are P2 style/robustness suggestions with no functional blockers. All identified issues are P2: JSON string interpolation, deprecated thread_rng, async{}/async move{} inconsistency, and the silent non-response edge case. The core logic — actor lifecycle, bounded channels, dead-subscriber cleanup, cleanup on disconnect, L2 reuse — is sound. Score is 4 rather than 5 only because CI is still pending. crates/networking/rpc/subscription_manager.rs (JSON building and RNG), crates/networking/rpc/rpc.rs (async capture inconsistency)
|
| Filename | Overview |
|---|---|
| crates/networking/rpc/subscription_manager.rs | New actor managing all eth_subscribe state; clean design with bounded channels and dead-subscriber cleanup, but JSON building uses string interpolation and thread_rng is deprecated in newer rand |
| crates/networking/rpc/rpc.rs | Core WS loop refactored to support eth_subscribe/eth_unsubscribe; L1 handler uses async {} while L2 counterpart uses async move {} — inconsistent but functional; silent non-response on rpc_response failure |
| crates/networking/rpc/engine/fork_choice.rs | Broadcasts newHead after canonical update via fire-and-forget actor send; correctly placed after the chain is updated |
| crates/l2/sequencer/block_producer.rs | Clones header before block is consumed, notifies subscription manager after apply_fork_choice succeeds; correct ordering |
| cmd/ethrex/l2/initializers.rs | Single SubscriptionManager actor cloned and shared between RPC server and block producer correctly; ws_config.clone() is safe since ActorRef is a cheap reference |
| test/tests/rpc/subscription_manager_tests.rs | New unit tests cover subscribe uniqueness, unsubscribe round-trips, dead-subscriber cleanup, fan-out, and hash injection; good coverage of the actor's core paths |
| crates/l2/networking/rpc/rpc.rs | L2 correctly reuses L1's handle_websocket via ctx.l1_ctx; ws config is set on the embedded L1 context so subscriptions are wired up properly |
| crates/networking/rpc/utils.rs | RpcRequest and RpcRequestId gain Clone derives; needed so req.id can be kept after req is moved into route_request |
Sequence Diagram
sequenceDiagram
participant CL as Consensus / BlockProducer
participant FC as fork_choice / block_producer
participant SM as SubscriptionManager (actor)
participant WS as handle_websocket loop
participant Client as WS Client
Client->>WS: eth_subscribe("newHeads")
WS->>SM: subscribe(out_tx)
SM-->>WS: subscription_id
WS-->>Client: {"result":"0x..."}
CL->>FC: engine_forkchoiceUpdated / seal_block
FC->>SM: new_head(BlockHeader) [fire-and-forget]
SM->>SM: serialize header + inject hash
SM->>WS: try_send(notification JSON)
WS-->>Client: eth_subscription notification
Client->>WS: eth_unsubscribe("0x...")
WS->>SM: unsubscribe(id)
SM-->>WS: true/false
WS-->>Client: {"result":true}
Note over WS,SM: On disconnect: cleanup loop unsubscribes all owned IDs
Comments Outside Diff (1)
-
crates/networking/rpc/rpc.rs, line 579-582 (link)Silent non-response on
rpc_responseserialization failureIn
handle_ws_request, all three match arms userpc_response(...).ok()?. Ifrpc_responsefails to serialize (extremely unlikely but possible for pathological payloads), the function returnsNone, and the calling loop sends no reply for the request. The WS client will hang waiting for a response. A fallback error envelope would be more robust:let resp = rpc_response(req.id, result).unwrap_or_else(|_| { serde_json::json!({"jsonrpc":"2.0","id":null,"error":{"code":-32603,"message":"Internal error"}}) }); Some(resp.to_string())
Prompt To Fix With AI
This is a comment left during a code review. Path: crates/networking/rpc/rpc.rs Line: 579-582 Comment: **Silent non-response on `rpc_response` serialization failure** In `handle_ws_request`, all three match arms use `rpc_response(...).ok()?`. If `rpc_response` fails to serialize (extremely unlikely but possible for pathological payloads), the function returns `None`, and the calling loop sends no reply for the request. The WS client will hang waiting for a response. A fallback error envelope would be more robust: ```rust let resp = rpc_response(req.id, result).unwrap_or_else(|_| { serde_json::json!({"jsonrpc":"2.0","id":null,"error":{"code":-32603,"message":"Internal error"}}) }); Some(resp.to_string()) ``` How can I resolve this? If you propose a fix, please make it concise.
Prompt To Fix All With AI
This is a comment left during a code review.
Path: crates/networking/rpc/subscription_manager.rs
Line: 153-157
Comment:
**JSON built via string interpolation**
`build_subscription_notification` embeds `result_json` verbatim into a hand-rolled format string. This works today because `sub_id` is hex-only and `result_json` is serde_json output, but any future change that makes either value contain a `"` or `}` in an unexpected place would silently produce malformed JSON. Prefer proper serialization:
```rust
fn build_subscription_notification(sub_id: &str, result_json: &str) -> String {
serde_json::json!({
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"subscription": sub_id,
"result": serde_json::from_str::<serde_json::Value>(result_json).unwrap_or(serde_json::Value::Null)
}
})
.to_string()
}
```
Alternatively, keep the pre-serialisation optimisation by holding `header_value` (a `serde_json::Value`) and passing it directly to `serde_json::json!` instead of converting to a string and back.
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: crates/networking/rpc/subscription_manager.rs
Line: 160-163
Comment:
**`rand::thread_rng()` in an actor's async handler**
`rand::thread_rng()` is thread-local. The actor runs under `spawned_concurrency`'s `Backend::Blocking`, which pools threads, so each invocation could draw from a different thread's RNG state. This is fine for randomness quality but `thread_rng` is deprecated in rand 0.9+. Prefer `rand::rng()` (rand 0.9 API) or `OsRng` to stay forward-compatible and thread-agnostic:
```suggestion
fn generate_subscription_id() -> String {
let mut bytes = [0u8; 16];
rand::rng().fill_bytes(&mut bytes);
format!("0x{}", hex::encode(bytes))
}
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: crates/networking/rpc/rpc.rs
Line: 596-604
Comment:
**`async {}` vs `async move {}` inconsistency between L1 and L2**
The L1 handler uses `async {}` (capture by reference) while the equivalent L2 handler at `crates/l2/networking/rpc/rpc.rs:164` uses `async move {}`. Both appear to compile because `ctx` ends up being moved into the inner `on_upgrade` callback regardless. However, the inconsistency is confusing and the non-`move` form relies on implicit capture semantics. For clarity and alignment with the L2 version:
```suggestion
let ws_handler = |ws: WebSocketUpgrade, State(ctx): State<RpcApiContext>| async move {
ws.on_upgrade(|mut socket| async move {
handle_websocket(&mut socket, &ctx, |req| {
let c = ctx.clone();
async move { map_http_requests(&req, c).await }
})
.await;
})
};
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: crates/networking/rpc/rpc.rs
Line: 579-582
Comment:
**Silent non-response on `rpc_response` serialization failure**
In `handle_ws_request`, all three match arms use `rpc_response(...).ok()?`. If `rpc_response` fails to serialize (extremely unlikely but possible for pathological payloads), the function returns `None`, and the calling loop sends no reply for the request. The WS client will hang waiting for a response. A fallback error envelope would be more robust:
```rust
let resp = rpc_response(req.id, result).unwrap_or_else(|_| {
serde_json::json!({"jsonrpc":"2.0","id":null,"error":{"code":-32603,"message":"Internal error"}})
});
Some(resp.to_string())
```
How can I resolve this? If you propose a fix, please make it concise.Reviews (1): Last reviewed commit: "Make build_subscription_notification pri..." | Re-trigger Greptile
| fn build_subscription_notification(sub_id: &str, result_json: &str) -> String { | ||
| format!( | ||
| r#"{{"jsonrpc":"2.0","method":"eth_subscription","params":{{"subscription":"{sub_id}","result":{result_json}}}}}"# | ||
| ) | ||
| } |
There was a problem hiding this comment.
JSON built via string interpolation
build_subscription_notification embeds result_json verbatim into a hand-rolled format string. This works today because sub_id is hex-only and result_json is serde_json output, but any future change that makes either value contain a " or } in an unexpected place would silently produce malformed JSON. Prefer proper serialization:
fn build_subscription_notification(sub_id: &str, result_json: &str) -> String {
serde_json::json!({
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"subscription": sub_id,
"result": serde_json::from_str::<serde_json::Value>(result_json).unwrap_or(serde_json::Value::Null)
}
})
.to_string()
}Alternatively, keep the pre-serialisation optimisation by holding header_value (a serde_json::Value) and passing it directly to serde_json::json! instead of converting to a string and back.
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/networking/rpc/subscription_manager.rs
Line: 153-157
Comment:
**JSON built via string interpolation**
`build_subscription_notification` embeds `result_json` verbatim into a hand-rolled format string. This works today because `sub_id` is hex-only and `result_json` is serde_json output, but any future change that makes either value contain a `"` or `}` in an unexpected place would silently produce malformed JSON. Prefer proper serialization:
```rust
fn build_subscription_notification(sub_id: &str, result_json: &str) -> String {
serde_json::json!({
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"subscription": sub_id,
"result": serde_json::from_str::<serde_json::Value>(result_json).unwrap_or(serde_json::Value::Null)
}
})
.to_string()
}
```
Alternatively, keep the pre-serialisation optimisation by holding `header_value` (a `serde_json::Value`) and passing it directly to `serde_json::json!` instead of converting to a string and back.
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
Addressed in c4d662e. build_subscription_notification now uses serde_json::json! and takes the header as &serde_json::Value directly, so escaping is delegated to serde_json and the envelope stays correct regardless of what sub_id or the result contain.
| fn generate_subscription_id() -> String { | ||
| let mut bytes = [0u8; 16]; | ||
| rand::thread_rng().fill_bytes(&mut bytes); | ||
| format!("0x{}", hex::encode(bytes)) |
There was a problem hiding this comment.
rand::thread_rng() in an actor's async handler
rand::thread_rng() is thread-local. The actor runs under spawned_concurrency's Backend::Blocking, which pools threads, so each invocation could draw from a different thread's RNG state. This is fine for randomness quality but thread_rng is deprecated in rand 0.9+. Prefer rand::rng() (rand 0.9 API) or OsRng to stay forward-compatible and thread-agnostic:
| fn generate_subscription_id() -> String { | |
| let mut bytes = [0u8; 16]; | |
| rand::thread_rng().fill_bytes(&mut bytes); | |
| format!("0x{}", hex::encode(bytes)) | |
| fn generate_subscription_id() -> String { | |
| let mut bytes = [0u8; 16]; | |
| rand::rng().fill_bytes(&mut bytes); | |
| format!("0x{}", hex::encode(bytes)) | |
| } |
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/networking/rpc/subscription_manager.rs
Line: 160-163
Comment:
**`rand::thread_rng()` in an actor's async handler**
`rand::thread_rng()` is thread-local. The actor runs under `spawned_concurrency`'s `Backend::Blocking`, which pools threads, so each invocation could draw from a different thread's RNG state. This is fine for randomness quality but `thread_rng` is deprecated in rand 0.9+. Prefer `rand::rng()` (rand 0.9 API) or `OsRng` to stay forward-compatible and thread-agnostic:
```suggestion
fn generate_subscription_id() -> String {
let mut bytes = [0u8; 16];
rand::rng().fill_bytes(&mut bytes);
format!("0x{}", hex::encode(bytes))
}
```
How can I resolve this? If you propose a fix, please make it concise.| let ws_handler = |ws: WebSocketUpgrade, State(ctx): State<RpcApiContext>| async { | ||
| ws.on_upgrade(|mut socket| async move { | ||
| handle_websocket(&mut socket, &ctx, |req| { | ||
| let c = ctx.clone(); | ||
| async move { map_http_requests(&req, c).await } | ||
| }) | ||
| .await; | ||
| }) | ||
| }; |
There was a problem hiding this comment.
async {} vs async move {} inconsistency between L1 and L2
The L1 handler uses async {} (capture by reference) while the equivalent L2 handler at crates/l2/networking/rpc/rpc.rs:164 uses async move {}. Both appear to compile because ctx ends up being moved into the inner on_upgrade callback regardless. However, the inconsistency is confusing and the non-move form relies on implicit capture semantics. For clarity and alignment with the L2 version:
| let ws_handler = |ws: WebSocketUpgrade, State(ctx): State<RpcApiContext>| async { | |
| ws.on_upgrade(|mut socket| async move { | |
| handle_websocket(&mut socket, &ctx, |req| { | |
| let c = ctx.clone(); | |
| async move { map_http_requests(&req, c).await } | |
| }) | |
| .await; | |
| }) | |
| }; | |
| let ws_handler = |ws: WebSocketUpgrade, State(ctx): State<RpcApiContext>| async move { | |
| ws.on_upgrade(|mut socket| async move { | |
| handle_websocket(&mut socket, &ctx, |req| { | |
| let c = ctx.clone(); | |
| async move { map_http_requests(&req, c).await } | |
| }) | |
| .await; | |
| }) | |
| }; |
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/networking/rpc/rpc.rs
Line: 596-604
Comment:
**`async {}` vs `async move {}` inconsistency between L1 and L2**
The L1 handler uses `async {}` (capture by reference) while the equivalent L2 handler at `crates/l2/networking/rpc/rpc.rs:164` uses `async move {}`. Both appear to compile because `ctx` ends up being moved into the inner `on_upgrade` callback regardless. However, the inconsistency is confusing and the non-`move` form relies on implicit capture semantics. For clarity and alignment with the L2 version:
```suggestion
let ws_handler = |ws: WebSocketUpgrade, State(ctx): State<RpcApiContext>| async move {
ws.on_upgrade(|mut socket| async move {
handle_websocket(&mut socket, &ctx, |req| {
let c = ctx.clone();
async move { map_http_requests(&req, c).await }
})
.await;
})
};
```
How can I resolve this? If you propose a fix, please make it concise.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
There was a problem hiding this comment.
Addressed in 6cdaa3a — L1 handler now uses async move to match L2.
There was a problem hiding this comment.
No longer true (we don't take just an address, there are now ws-specific endpoints)
| Fut: std::future::Future<Output = Result<Value, E>>, | ||
| E: Into<RpcErrorMetadata>, | ||
| { | ||
| let req: RpcRequest = match serde_json::from_str(body) { |
There was a problem hiding this comment.
This doesn't use RpcRequestWrapper like handle_http_request, meaning this breaks support for batch requests
There was a problem hiding this comment.
Addressed in 80647af — WS now parses via RpcRequestWrapper, so single and batched requests both work, matching the HTTP handler.
…ish parse errors (-32700) from structurally-invalid requests (-32600). On main the WS handler delegated to handle_http_request, which parsed bodies via RpcRequestWrapper and therefore accepted both single and batched requests. The new handle_ws_request introduced in this branch deserialized directly into RpcRequest, silently regressing batch support over WebSocket and collapsing every non-parse error into a generic -32700. handle_ws_request now parses the body into serde_json::Value first (returning -32700 only for malformed JSON), then into RpcRequestWrapper (returning -32600 for valid JSON that is not a JSON-RPC request or batch). Per-request routing — including eth_subscribe / eth_unsubscribe interception against the per-connection subscription set — is extracted into process_ws_request so it can run uniformly for single requests and each entry of a batch. Empty batches respond with -32600 as required by the JSON-RPC 2.0 spec, and batches respond with a JSON array of responses (notifications, which produce None, are filtered out).
…onfig> parameter. The previous doc described a bare ws_addr and did not mention that the WebSocket server also serves eth_subscribe / eth_unsubscribe in addition to the regular JSON-RPC methods.
… so it matches the L2 WebSocket handler in crates/l2/networking/rpc/rpc.rs. The ctx ends up being moved into the inner on_upgrade callback either way, but the inconsistency between the two identical-shaped closures was confusing when reading them side by side.
…cation with serde_json::json! and pass the result header as a serde_json::Value instead of a pre-serialized string. The previous implementation embedded result_json verbatim into a format string. That was safe today — sub_id was hex-only and result_json was always serde_json output — but any future change that made either value contain unescaped quotes or braces would silently produce malformed JSON notifications. Using serde_json::json! delegates escaping to serde_json, so the envelope is correct by construction regardless of what the id or result carry. handle_new_head now keeps header_value as a Value and the per-subscriber clone inside json! replaces the previous once-per-fan-out to_string() call; allocations are comparable in size and the code is harder to break.
ElFantasma
left a comment
There was a problem hiding this comment.
This LGTM, but I added some comments that may be ignored, fixed or trigger new issues. None seem blocking
Also:
Tests (per Copilot): 676 lines added, zero new tests in the diff (other than subscription_manager_tests.rs which wasn't in the file list I saw — apologies if it actually exists). The actor + WS integration is exactly where protocol-level bugs slip in: what does the WS write loop do when its mpsc receiver returns None? does the connection close cleanly? does unsubscribe of a non-existent ID return the right error? Worth at minimum:
- An actor unit test (
subscribe→NewHead→ assert notification shape). - An integration test driving
handle_websocketwith a mock socket through subscribe → NewHead → unsubscribe. - Regression test for the
RpcRequestWrapperbatch handling iovoid flagged (now addressed in 80647af).
rand::thread_rng() in the actor (greptile) is correct — thread_rng() is per-OS-thread and re-seeds lazily, so even if the actor moves between threads the IDs stay random. Acceptable as-is.
WebSocketConfig L2-only deployment: PR says "L2 uses L1's existing flags" — worth confirming an L2-only node with --ws.enabled works. If something silently binds to L1-default address, that's a foot-gun. Add to test plan: L2-only mode + WS enabled + verify subscribe works.
| pub const SUBSCRIBER_CHANNEL_CAPACITY: usize = 512; | ||
|
|
||
| /// Maximum number of active subscriptions allowed per WebSocket connection. | ||
| pub const MAX_SUBSCRIPTIONS_PER_CONNECTION: usize = 128; |
There was a problem hiding this comment.
No global subscription cap (DoS surface): MAX_SUBSCRIPTIONS_PER_CONNECTION = 128 is enforced per-connection, but no cap on total subscribers. A malicious client opening 1000 connections × 128 subscriptions = 128k entries in self.subscribers, each holding a 512-cap mpsc::Sender<String>. Memory cost grows linearly with no upper bound. For a public RPC endpoint, that's a DoS vector. Two mitigations:
- Add a
MAX_TOTAL_SUBSCRIPTIONScap inside the actor (returnErrfromsubscribe). - Cap concurrent WS connections at the accept layer (probably belongs upstream, but coordinate with this cap).
Not blocking unless this is exposed to the public internet at deploy time, but worth pinning down.
| Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => { | ||
| dead_ids.push(sub_id.clone()); | ||
| } | ||
| Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { |
There was a problem hiding this comment.
Silent drops: try_send on Full leaves the subscriber's view of the chain non-contiguous without any indication. Some Credible-Layer-shaped consumers will quietly miss a block and not realize. Two protocol-level options:
- On
Full, close the connection (forces resubscribe — at least visible). - Emit a synthetic
subscription_gapnotification (also viatry_sendso it stays bounded) — preserves the connection but tells the client "you missed something."
Geth chose silent-drop, so matching that is defensible — just want to make sure the choice is deliberate.
| _ctx: &Context<Self>, | ||
| ) -> String { | ||
| let id = generate_subscription_id(); | ||
| self.subscribers.insert(id.clone(), msg.sender); |
There was a problem hiding this comment.
(Copilot collision concern) Mathematically negligible at any realistic scale (~2^64 IDs for ~50% chance), but the fix is trivially cheap:
loop {
let id = generate_subscription_id();
if let std::collections::hash_map::Entry::Vacant(e) = self.subscribers.entry(id.clone()) {
e.insert(msg.sender);
return id;
}
}Not worth blocking; easy slot-in if you touch this line.
| let mut dead_ids: Vec<String> = Vec::new(); | ||
|
|
||
| for (sub_id, sender) in &self.subscribers { | ||
| let notification = build_subscription_notification(sub_id, &header_value); |
There was a problem hiding this comment.
Per-subscriber clone: this calls serde_json::json!({...result: result, ...}).to_string() once per subscriber, cloning the Value. For 100 subscribers × ~2 KB header, ~200 KB allocation per block — cheap, but if subscriber count climbs, a faster shape would be to serialize the result once into a string, then format! only the "subscription":"<id>" field per subscriber. Not blocking; mark for later if you ever profile a hot newHeads cycle.
worst-case memory of the SubscriptionManager actor. The per-connection cap of
128 alone does not bound total state when many connections are open, so a
client opening many connections could grow the actor's bounded mpsc buffers
without limit. Change SubscriptionManagerProtocol::subscribe to return
Option<String> and have the actor return None once self.subscribers.len()
reaches MAX_TOTAL_SUBSCRIPTIONS. handle_eth_subscribe surfaces None as
-32603 Internal error ("Global subscription cap reached") so the client sees
the failure rather than being silently rejected. Add a unit test that fills
the actor to the cap and asserts the next subscribe is refused.
f10e5c6 to
9f0757c
Compare
Bring this branch up to date with main. The branch's WebSocket subscription work landed on main via #6496; conflicts in subscription_manager.rs, subscription_manager_tests.rs, and rpc.rs were resolved by taking main's canonical version, which adds a global subscription cap, JSON-RPC 2.0 batch support over WebSocket, and stricter parse/invalid-request error codes. The prestateTracer implementation is unaffected.
…ug_traceBlockByNumber (lambdaclass#6501) > **Depends on:** lambdaclass#6496 (eth_subscribe/eth_unsubscribe WS support) — must be merged first. **Motivation** The `prestateTracer` is a standard Ethereum debug tracer that captures pre-execution account state (balance, nonce, code, storage) and optionally computes a post-execution diff. It's needed by external tools like the Credible Layer sidecar, which uses `debug_traceBlockByNumber` with `prestateTracer` in diff mode to build its local state database. **Description** Implement the `prestateTracer` (with optional `diffMode`) across the tracing stack: - Prestate types in `ethrex-common` with serde formatting to match Geth's output format. - `trace_tx_prestate` in `ethrex-vm` snapshots the `GeneralizedDatabase` cache before executing a transaction, then compares against the post-execution cache to identify touched accounts. For already-cached accounts whose new storage slots were loaded during the tx, it merges values from `initial_accounts_state` to capture original slot values. - `trace_transaction_prestate` / `trace_block_prestate` in `ethrex-blockchain` rebuild parent state and orchestrate per-tx tracing with timeouts. - `PrestateTracer` variant wired into the RPC `debug_traceTransaction` and `debug_traceBlockByNumber` handlers with `PrestateTracerConfig { diffMode }`. Also extracts the duplicated `TestDatabase` into a shared `test/tests/levm/test_db.rs` module reused by both `l2_hook_tests` and the new `prestate_tracer_tests`. **How to Test** ```bash # Trace a transaction with prestateTracer curl -X POST http://localhost:8545 -H "Content-Type: application/json" \ -d '{"jsonrpc":"2.0","method":"debug_traceTransaction","params":["<TX_HASH>",{"tracer":"prestateTracer"}],"id":1}' # Trace with diff mode curl -X POST http://localhost:8545 -H "Content-Type: application/json" \ -d '{"jsonrpc":"2.0","method":"debug_traceTransaction","params":["<TX_HASH>",{"tracer":"prestateTracer","tracerConfig":{"diffMode":true}}],"id":1}' # Trace a block curl -X POST http://localhost:8545 -H "Content-Type: application/json" \ -d '{"jsonrpc":"2.0","method":"debug_traceBlockByNumber","params":["latest",{"tracer":"prestateTracer","tracerConfig":{"diffMode":true}}],"id":1}' ```
Motivation
Implement
eth_subscribe("newHeads")WebSocket subscription support. This is standard Ethereum JSON-RPC functionality needed by external tools (e.g., the Credible Layer sidecar) that track new blocks in real time.Description
Add a
SubscriptionManagerGenServer actor that owns all subscription state. When a new block becomes canonical (via fork choice update on L1, or block production on L2), the headBlockHeaderis sent to the actor, which serializes it and fans out notifications to all subscribed WebSocket clients through bounded per-connection channels.SubscriptionManageractor (crates/networking/rpc/subscription_manager.rs): handlesSubscribe,Unsubscribe, andNewHeadmessages. Dead subscribers are cleaned up automatically on the nextNewHeadfan-out.WebSocketConfigstruct bundles the WS socket address and the actor handle.handle_websocketis public so L2 reuses it directly — zero duplicate WS code in L2.eth_subscribe("newHeads"), not on connection open.try_sendinstead of accumulating unbounded memory. The actor is never blocked.newHeadsfromengine/fork_choice.rsafter successful canonical head advancement.--ws.enabled/--ws.addr/--ws.portCLI flags — no L2-specific WS flags.How to Test
Checklist
cargo check -p ethrex --features l2)