diff --git a/Cargo.lock b/Cargo.lock index c9323e79..b128241b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14302,17 +14302,20 @@ version = "0.0.6" dependencies = [ "alloy-consensus", "alloy-contract", + "alloy-eip7928", "alloy-eips 2.0.4", "alloy-evm", "alloy-network", "alloy-op-evm", "alloy-primitives", "alloy-provider", + "alloy-rlp", "alloy-rpc-types-engine", "alloy-rpc-types-eth", "alloy-sol-types", "anyhow", "async-trait", + "brotli", "chrono", "clap", "ctor", diff --git a/bin/node/src/args.rs b/bin/node/src/args.rs index 5d6be000..d2ca69cb 100644 --- a/bin/node/src/args.rs +++ b/bin/node/src/args.rs @@ -259,6 +259,14 @@ pub struct FlashblocksRpcArgs { )] pub flashblocks_subscription_max_addresses: usize, + /// Disable EIP-7928 block access list usage on the flashblocks RPC node. + #[arg( + long = "xlayer.flashblocks-disable-access-list", + help = "Disable EIP-7928 access list usage on the flashblocks RPC node", + default_value = "false" + )] + pub flashblocks_disable_access_list: bool, + /// Enable flashblocks RPC state comparison debug mode #[arg( long = "xlayer.flashblocks-debug-state-comparison", diff --git a/bin/node/src/main.rs b/bin/node/src/main.rs index 52bc7805..bbd94293 100644 --- a/bin/node/src/main.rs +++ b/bin/node/src/main.rs @@ -208,6 +208,10 @@ fn main() { .xlayer_args .flashblocks_rpc .flashblocks_debug_state_comparison, + disable_access_list: args + .xlayer_args + .flashblocks_rpc + .flashblocks_disable_access_list, }, FlashblocksPersistCtx { datadir, diff --git a/crates/builder/Cargo.toml b/crates/builder/Cargo.toml index 52df2fc3..9536914f 100644 --- a/crates/builder/Cargo.toml +++ b/crates/builder/Cargo.toml @@ -54,16 +54,18 @@ reth-tasks.workspace = true reth-ipc = { workspace = true, optional = true } # alloy +alloy-eip7928 = { workspace = true, features = ["serde", "rlp"] } alloy-primitives.workspace = true -alloy-consensus.workspace = true +alloy-consensus = { workspace = true, features = ["std"] } alloy-contract.workspace = true alloy-eips.workspace = true alloy-evm.workspace = true +alloy-rlp = { workspace = true, features = ["derive", "std"] } alloy-rpc-types-engine.workspace = true alloy-rpc-types-eth.workspace = true alloy-network.workspace = true alloy-provider.workspace = true -alloy-sol-types = { workspace = true, features = ["json"] } +alloy-sol-types = { workspace = true, features = ["std", "json"] } # op alloy-op-evm.workspace = true @@ -100,6 +102,7 @@ multiaddr = { workspace = true } # misc anyhow = "1" +brotli = { workspace = true, features = ["std"] } clap.workspace = true dashmap.workspace = true derive_more.workspace = true diff --git a/crates/builder/src/broadcast/compress.rs b/crates/builder/src/broadcast/compress.rs new file mode 100644 index 00000000..bbd7c816 --- /dev/null +++ b/crates/builder/src/broadcast/compress.rs @@ -0,0 +1,108 @@ +//! Shared compression helpers for the flashblocks wire protocol. +//! +//! The wire format is auto-detecting: a payload is treated as legacy uncompressed +//! JSON if its first non-whitespace byte is `{`, otherwise it is decoded as a +//! brotli-compressed frame. This lets us upgrade the producer to emit compressed +//! frames without breaking older subscribers, and lets us replay legacy +//! persistence files unchanged. + +use std::{borrow::Cow, io}; + +use alloy_primitives::Bytes; +use brotli::enc::BrotliEncoderParams; + +/// Brotli quality level. Q5 is the empirical sweet spot for JSON-shaped +/// flashblock payloads — sub-10ms encode at ~5x size reduction on 1.2MB inputs. +pub const BROTLI_QUALITY: u32 = 5; + +/// Brotli sliding-window size in log2 bytes. 22 is the brotli default. +pub const BROTLI_LGWIN: u32 = 22; + +/// Compress `json` bytes with brotli at [`BROTLI_QUALITY`]. +pub fn brotli_encode(json: &[u8]) -> io::Result { + let mut compressed = Vec::with_capacity(json.len() / 4); + let params = BrotliEncoderParams { + quality: BROTLI_QUALITY as i32, + lgwin: BROTLI_LGWIN as i32, + ..Default::default() + }; + let mut input = json; + brotli::BrotliCompress(&mut input, &mut compressed, ¶ms)?; + Ok(Bytes::from(compressed)) +} + +/// Decode a wire frame, auto-detecting between legacy uncompressed JSON and +/// brotli-compressed bytes. +/// +/// Returns the input borrowed if it is already JSON (leading `{` after +/// optional whitespace), otherwise allocates a decompressed buffer. +pub fn try_decompress(bytes: &[u8]) -> io::Result> { + if bytes.trim_ascii_start().starts_with(b"{") { + return Ok(Cow::Borrowed(bytes)); + } + + let mut decompressor = brotli::Decompressor::new(bytes, 4096); + let mut decompressed = Vec::new(); + io::copy(&mut decompressor, &mut decompressed)?; + Ok(Cow::Owned(decompressed)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn brotli_round_trip() { + let original = br#"{"hello":"world","numbers":[1,2,3],"nested":{"a":"b"}}"#; + let compressed = brotli_encode(original).expect("encode succeeds"); + assert!(!compressed.is_empty(), "compressed output is non-empty"); + let decoded = try_decompress(&compressed).expect("decompress succeeds"); + assert_eq!(decoded.as_ref(), original); + } + + #[test] + fn brotli_compresses_repetitive_input() { + // EIP-7928 access list JSON is highly repetitive; expect significant compression. + let original: Vec = + br#"{"address":"0x0000000000000000000000000000000000000042","balance_changes":[{"block_access_index":0,"post_balance":"0x100"}]}"# + .repeat(100); + let compressed = brotli_encode(&original).expect("encode succeeds"); + assert!(compressed.len() < original.len() / 5, "expected >5x compression ratio"); + } + + #[test] + fn auto_detect_legacy_json_passthrough() { + let json = br#"{"key":"value"}"#; + let result = try_decompress(json).expect("passthrough succeeds"); + assert_eq!(result.as_ref(), json); + assert!(matches!(result, Cow::Borrowed(_)), "legacy JSON must be borrowed, not copied"); + } + + #[test] + fn auto_detect_legacy_json_with_leading_whitespace() { + let json = b" \n\t {\"key\":\"value\"}"; + let result = try_decompress(json).expect("passthrough succeeds"); + assert_eq!(result.as_ref(), json); + } + + #[test] + fn auto_detect_brotli_decompresses() { + let original = br#"{"key":"value","data":"some larger payload for brotli to compress"}"#; + let compressed = brotli_encode(original).expect("encode"); + assert!( + !compressed.starts_with(b"{"), + "compressed output must not start with `{{` (would defeat auto-detect)" + ); + let result = try_decompress(&compressed).expect("decompress succeeds"); + assert_eq!(result.as_ref(), original); + assert!(matches!(result, Cow::Owned(_)), "compressed input must allocate decoded buffer"); + } + + #[test] + fn malformed_brotli_returns_err() { + // Bytes that don't start with `{` and aren't valid brotli. + let garbage = &[0xff_u8, 0xff, 0xff, 0xff, 0xff]; + let result = try_decompress(garbage); + assert!(result.is_err(), "malformed brotli must error, not silently pass through"); + } +} diff --git a/crates/builder/src/broadcast/frame.rs b/crates/builder/src/broadcast/frame.rs new file mode 100644 index 00000000..a0c933c0 --- /dev/null +++ b/crates/builder/src/broadcast/frame.rs @@ -0,0 +1,118 @@ +//! Encode-once carrier types for the flashblocks broadcast pipeline. +//! +//! When the sequencer produces a flashblock, we serialize+compress it exactly +//! once and pass the resulting bytes through both fan-out paths (P2P + WS). +//! When a relay node receives a flashblock, it decodes once for internal +//! consumers (validator, persistence) but forwards the original bytes to its +//! downstream subscribers without re-encoding. + +use std::{io, sync::Arc}; + +use alloy_primitives::Bytes; +use serde::{de::DeserializeOwned, Serialize}; + +use crate::broadcast::{compress, types::Message}; + +/// Wire-ready bytes paired with their decoded form. +#[derive(Clone, Debug)] +pub struct BroadcastFrame { + /// Brotli-compressed JSON, ready to send on the WS pipe or libp2p stream. + pub bytes: Bytes, + /// Already-decoded structured form, for in-process consumers. + pub decoded: Arc, +} + +impl BroadcastFrame { + /// Decode wire bytes into a complete frame. + /// + /// Auto-detects between brotli-compressed and legacy uncompressed JSON via + /// [`compress::try_decompress`]. The original `bytes` are retained on the + /// frame so relay paths can forward them downstream without re-encoding. + pub fn from_bytes(bytes: Bytes) -> io::Result { + let decoded: Message = decode(&bytes)?; + Ok(Self { bytes, decoded: Arc::new(decoded) }) + } +} + +/// Serialize `value` to JSON and brotli-compress the result. +/// +/// Single encode path for any wire envelope on the broadcast layer (P2P +/// [`Message`], WS `XLayerFlashblockMessage`, etc). +pub fn encode(value: &T) -> io::Result { + let json = serde_json::to_vec(value)?; + compress::brotli_encode(&json) +} + +/// Decode wire bytes into `T`, auto-detecting compressed vs. legacy JSON. +/// +/// Single decode path for any wire envelope. The auto-detection (leading-`{` +/// heuristic) lives in [`compress::try_decompress`], so a producer that emits +/// compressed bytes and a producer that emits legacy uncompressed JSON can +/// both be consumed without coordination. +pub fn decode(bytes: &[u8]) -> io::Result { + let json = compress::try_decompress(bytes)?; + serde_json::from_slice::(&json) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, format!("decode: {e}"))) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::broadcast::{XLayerFlashblockMessage, XLayerFlashblockPayload}; + + fn sample_message() -> Message { + Message::from_flashblock_payload(XLayerFlashblockMessage::from_flashblock_payload( + XLayerFlashblockPayload::default(), + )) + } + + #[test] + fn encode_decode_round_trip() { + let original = sample_message(); + let bytes = encode(&original).expect("encode"); + let decoded: Message = decode(&bytes).expect("decode"); + assert_eq!(decoded, original); + } + + #[test] + fn encode_decode_round_trip_xlayer_flashblock_message() { + // The same encode/decode path works for any Serialize/DeserializeOwned type — + // the WS-side stream uses this with XLayerFlashblockMessage. + let original = + XLayerFlashblockMessage::from_flashblock_payload(XLayerFlashblockPayload::default()); + let bytes = encode(&original).expect("encode"); + let decoded: XLayerFlashblockMessage = decode(&bytes).expect("decode"); + assert_eq!(decoded, original); + } + + #[test] + fn from_bytes_round_trip() { + let original = sample_message(); + let bytes = encode(&original).expect("encode"); + let frame = BroadcastFrame::from_bytes(bytes.clone()).expect("from_bytes"); + assert_eq!(*frame.decoded, original); + // Frame retains the original bytes byte-for-byte for relay. + assert_eq!(frame.bytes, bytes); + } + + #[test] + fn from_bytes_accepts_legacy_uncompressed_json() { + let original = sample_message(); + let json = Bytes::from(serde_json::to_vec(&original).expect("json")); + // Legacy callers might send uncompressed JSON — must still decode. + let frame = BroadcastFrame::from_bytes(json).expect("decode legacy"); + assert_eq!(*frame.decoded, original); + } + + #[test] + fn encoded_bytes_are_not_legacy_json() { + // The encoded bytes must not start with `{` so that the auto-detect heuristic + // routes them through brotli decompression on the receive side. + let original = sample_message(); + let bytes = encode(&original).expect("encode"); + assert!( + !bytes.starts_with(b"{"), + "compressed frame must not look like legacy JSON to the auto-detect heuristic" + ); + } +} diff --git a/crates/builder/src/broadcast/mod.rs b/crates/builder/src/broadcast/mod.rs index 02ba2cd5..42e8ee17 100644 --- a/crates/builder/src/broadcast/mod.rs +++ b/crates/builder/src/broadcast/mod.rs @@ -1,4 +1,6 @@ mod behaviour; +pub mod compress; +pub mod frame; mod outgoing; pub(crate) mod payload; pub mod peer_status; @@ -6,6 +8,7 @@ pub(crate) mod types; pub(crate) mod wspub; use behaviour::Behaviour; +pub use frame::{decode, encode, BroadcastFrame}; pub use libp2p::{Multiaddr, StreamProtocol}; pub use payload::{XLayerFlashblockEnd, XLayerFlashblockMessage, XLayerFlashblockPayload}; pub use peer_status::PeerStatusTracker; @@ -234,6 +237,17 @@ impl Node { let protocol = message.protocol(); debug!(target: "payload_builder::broadcast", "received message to broadcast on protocol {protocol}"); + // Encode once (serde_json + brotli). The resulting bytes are reused by + // both the P2P broadcast path and the local WS publish path below — + // avoiding the prior pattern of encoding the same message twice. + let bytes = match frame::encode(&message) { + Ok(bytes) => bytes, + Err(e) => { + warn!(target: "payload_builder::broadcast", "failed to encode outgoing message on protocol {protocol}: {e:?}"); + continue; + } + }; + // NOTE on broadcast ordering and failure semantics: // `broadcast_message` sends to all connected peers concurrently and // awaits until every peer's TCP send completes (or fails). This is a @@ -249,7 +263,7 @@ impl Node { // for lower latency - since failures/switches in builder are very small. // The less strict (no ack of message deliveries) allow a lower latency // in the gossiping of new flashblock payloads to websocket subscribers. - match outgoing_streams_handler.broadcast_message(message.clone()).await { + match outgoing_streams_handler.broadcast_message(protocol.clone(), bytes.clone()).await { Ok(failed_peers) => { peer_status.on_broadcast_result(&failed_peers); for &peer_id in &failed_peers { @@ -267,7 +281,7 @@ impl Node { } } if let Message::OpFlashblockPayload(ref fb_payload) = message { - match ws_pub.publish(fb_payload) { + match ws_pub.publish(bytes, fb_payload) { Ok(flashblock_byte_size) => { metrics.flashblock_byte_size_histogram.record(flashblock_byte_size as f64); } @@ -324,7 +338,7 @@ impl Node { pub struct NodeBuildResult { pub node: Node, pub outgoing_message_tx: mpsc::Sender, - pub incoming_message_rxs: HashMap>, + pub incoming_message_rxs: HashMap>, /// Shared peer status tracker — clone into the RPC layer for /// `eth_flashblocksPeerStatus`. pub peer_status: PeerStatusTracker, @@ -521,7 +535,7 @@ impl NodeBuilder { struct IncomingStreamsHandler { protocol: StreamProtocol, incoming: IncomingStreams, - tx: mpsc::Sender, + tx: mpsc::Sender, cancellation_token: CancellationToken, } @@ -530,7 +544,7 @@ impl IncomingStreamsHandler { protocol: StreamProtocol, incoming: IncomingStreams, cancellation_token: CancellationToken, - ) -> (Self, mpsc::Receiver) { + ) -> (Self, mpsc::Receiver) { const CHANNEL_SIZE: usize = 100; let (tx, rx) = mpsc::channel(CHANNEL_SIZE); (Self { protocol, incoming, tx, cancellation_token }, rx) @@ -573,24 +587,25 @@ impl IncomingStreamsHandler { async fn handle_incoming_stream( peer_id: PeerId, stream: Stream, - payload_tx: mpsc::Sender, + payload_tx: mpsc::Sender, ) -> eyre::Result { use futures::StreamExt as _; use tokio_util::{ - codec::{FramedRead, LinesCodec}, + codec::{FramedRead, LengthDelimitedCodec}, compat::FuturesAsyncReadCompatExt as _, }; - let codec = LinesCodec::new(); + let codec = LengthDelimitedCodec::new(); let mut reader = FramedRead::new(stream.compat(), codec); while let Some(res) = reader.next().await { match res { - Ok(str) => { - let payload = serde_json::from_str::(&str) + Ok(raw_frame) => { + let bytes = alloy_primitives::Bytes::from(raw_frame.freeze()); + let frame = BroadcastFrame::from_bytes(bytes) .wrap_err("failed to decode stream message")?; - debug!(target: "payload_builder::broadcast", "got message from peer {peer_id}: {payload:?}"); - let _ = payload_tx.send(payload).await; + debug!(target: "payload_builder::broadcast", "got message from peer {peer_id}: {:?}", frame.decoded); + let _ = payload_tx.send(frame).await; } Err(e) => { return Err(e).wrap_err(format!("failed to read from stream of peer {peer_id}")); @@ -696,7 +711,7 @@ mod test { XLayerFlashblockMessage::from_flashblock_payload(XLayerFlashblockPayload::default()), ); let mut rx = rx1.remove(&types::FLASHBLOCKS_STREAM_PROTOCOL).unwrap(); - let recv_message = tokio::time::timeout(Duration::from_secs(10), async { + let recv_frame = tokio::time::timeout(Duration::from_secs(10), async { loop { // Use try_send to avoid panicking if the channel is full // (e.g. connection not yet established and messages are buffered). @@ -710,7 +725,7 @@ mod test { }) .await .expect("message receive timed out"); - assert_eq!(recv_message, message); + assert_eq!(*recv_frame.decoded, message); } /// Creates two minimal libp2p swarms (using only `libp2p_stream::Behaviour`), @@ -822,7 +837,11 @@ mod test { let msg = Message::from_flashblock_payload( XLayerFlashblockMessage::from_flashblock_payload(XLayerFlashblockPayload::default()), ); - let failed = handler.broadcast_message(msg).await.expect("serialization must not fail"); + let bytes = frame::encode(&msg).expect("encode must not fail"); + let failed = handler + .broadcast_message(msg.protocol(), bytes) + .await + .expect("broadcast must not fail"); assert_eq!(failed, vec![peer_a], "peer_a must be returned as a failed peer"); assert!(!handler.has_peer(&peer_a), "peer_a must be evicted from the handler"); @@ -860,7 +879,11 @@ mod test { let msg = Message::from_flashblock_payload( XLayerFlashblockMessage::from_flashblock_payload(XLayerFlashblockPayload::default()), ); - let failed = handler.broadcast_message(msg).await.expect("serialization must not fail"); + let bytes = frame::encode(&msg).expect("encode must not fail"); + let failed = handler + .broadcast_message(msg.protocol(), bytes) + .await + .expect("broadcast must not fail"); assert!(failed.is_empty(), "no peers should fail on a healthy stream"); assert!( diff --git a/crates/builder/src/broadcast/outgoing.rs b/crates/builder/src/broadcast/outgoing.rs index 1ece75ba..d3423972 100644 --- a/crates/builder/src/broadcast/outgoing.rs +++ b/crates/builder/src/broadcast/outgoing.rs @@ -1,5 +1,4 @@ -use super::types; -use eyre::Context; +use alloy_primitives::Bytes; use futures::stream::FuturesUnordered; use libp2p::{swarm::Stream, PeerId, StreamProtocol}; use std::collections::HashMap; @@ -33,17 +32,15 @@ impl StreamsHandler { pub(crate) async fn broadcast_message( &mut self, - message: types::Message, + protocol: StreamProtocol, + bytes: Bytes, ) -> eyre::Result> { use futures::{SinkExt as _, StreamExt as _}; use tokio_util::{ - codec::{FramedWrite, LinesCodec}, + codec::{FramedWrite, LengthDelimitedCodec}, compat::FuturesAsyncReadCompatExt as _, }; - let protocol = message.protocol(); - let payload = serde_json::to_string(&message).wrap_err("failed to serialize payload")?; - let peers = self.peers_to_stream.keys().cloned().collect::>(); let mut futures = FuturesUnordered::new(); for peer in peers { @@ -54,10 +51,10 @@ impl StreamsHandler { continue; }; let stream = stream.compat(); - let payload = payload.clone(); + let bytes = bytes.clone(); let fut = async move { - let mut writer = FramedWrite::new(stream, LinesCodec::new()); - match writer.send(payload).await { + let mut writer = FramedWrite::new(stream, LengthDelimitedCodec::new()); + match writer.send(bytes.into()).await { Ok(()) => Ok((peer, writer.into_inner().into_inner())), Err(e) => Err((peer, eyre::eyre!(e))), } diff --git a/crates/builder/src/broadcast/wspub.rs b/crates/builder/src/broadcast/wspub.rs index ac7759a3..769b1693 100644 --- a/crates/builder/src/broadcast/wspub.rs +++ b/crates/builder/src/broadcast/wspub.rs @@ -1,6 +1,7 @@ use crate::{ broadcast::XLayerFlashblockMessage, metrics::tokio::MonitoredTask, metrics::BuilderMetrics, }; +use alloy_primitives::Bytes; use core::{ fmt::{Debug, Formatter}, net::SocketAddr, @@ -20,7 +21,7 @@ use tokio_tungstenite::{ accept_async, tungstenite::{ protocol::frame::{coding::CloseCode, CloseFrame}, - Message, Utf8Bytes, + Message, }, WebSocketStream, }; @@ -29,12 +30,12 @@ use tracing::{debug, info, trace, warn}; /// A WebSockets publisher that accepts connections from client websockets and broadcasts to them /// updates about new flashblocks. It maintains a count of sent messages and active subscriptions. /// -/// This is modelled as a `futures::Sink` that can be used to send `OpFlashblockPayload` messages. +/// This is modelled as a `futures::Sink` that can be used to send `Bytes` wire bytes messages. pub struct WebSocketPublisher { sent: Arc, subs: Arc, term: watch::Sender, - pipe: broadcast::Sender, + pipe: broadcast::Sender, subscriber_limit: Option, } @@ -65,10 +66,7 @@ impl WebSocketPublisher { Ok(Self { sent, subs, term, pipe, subscriber_limit }) } - pub fn publish(&self, payload: &XLayerFlashblockMessage) -> io::Result { - // Serialize the payload to a UTF-8 string - // serialize only once, then just copy around only a pointer - // to the serialized data for each subscription. + pub fn publish(&self, bytes: Bytes, payload: &XLayerFlashblockMessage) -> io::Result { match payload { XLayerFlashblockMessage::Payload(payload) => { info!( @@ -90,14 +88,8 @@ impl WebSocketPublisher { ); } } - - let serialized = serde_json::to_string(payload)?; - let utf8_bytes = Utf8Bytes::from(serialized); - let size = utf8_bytes.len(); - // Send the serialized payload to all subscribers - self.pipe - .send(utf8_bytes) - .map_err(|e| io::Error::new(io::ErrorKind::ConnectionAborted, e))?; + let size = bytes.len(); + self.pipe.send(bytes).map_err(|e| io::Error::new(io::ErrorKind::ConnectionAborted, e))?; Ok(size) } } @@ -113,7 +105,7 @@ impl Drop for WebSocketPublisher { async fn listener_loop( listener: TcpListener, metrics: Arc, - receiver: Receiver, + receiver: Receiver, term: watch::Receiver, sent: Arc, subs: Arc, @@ -188,7 +180,7 @@ async fn broadcast_loop( stream: WebSocketStream, metrics: Arc, term: watch::Receiver, - blocks: broadcast::Receiver, + blocks: broadcast::Receiver, sent: Arc, ) { let mut term = term; @@ -218,8 +210,15 @@ async fn broadcast_loop( sent.fetch_add(1, Ordering::Relaxed); metrics.messages_sent_count.increment(1); - trace!(target: "payload_builder::broadcast", "Broadcasted payload: {:?}", payload); - if let Err(e) = stream.send(Message::Text(payload)).await { + trace!( + target: "payload_builder::broadcast", + size = payload.len(), + "Broadcasted payload" + ); + if let Err(e) = stream + .send(Message::Binary(tokio_tungstenite::tungstenite::Bytes::from(payload))) + .await + { debug!(target: "payload_builder::broadcast", "Send payload error for flashblocks subscription {peer_addr}: {e}"); break; // Exit the loop if sending fails } diff --git a/crates/builder/src/default/handler.rs b/crates/builder/src/default/handler.rs index 150a608c..ddb91a4a 100644 --- a/crates/builder/src/default/handler.rs +++ b/crates/builder/src/default/handler.rs @@ -5,7 +5,7 @@ //! relays them to downstream WebSocket subscribers. use crate::{ - broadcast::{Message, WebSocketPublisher, XLayerFlashblockMessage}, + broadcast::{BroadcastFrame, Message, WebSocketPublisher, XLayerFlashblockMessage}, flashblocks::utils::cache::FlashblockPayloadsCache, }; use std::sync::Arc; @@ -20,7 +20,7 @@ use tracing::warn; /// flashblocks). pub(crate) struct DefaultPayloadHandler { /// Receives incoming P2P messages from peers. - p2p_rx: mpsc::Receiver, + p2p_rx: mpsc::Receiver, /// Cache for externally received pending flashblock transactions. p2p_cache: FlashblockPayloadsCache, /// WebSocket publisher for relaying flashblocks to downstream subscribers. @@ -31,7 +31,7 @@ pub(crate) struct DefaultPayloadHandler { impl DefaultPayloadHandler { pub(crate) fn new( - p2p_rx: mpsc::Receiver, + p2p_rx: mpsc::Receiver, p2p_cache: FlashblockPayloadsCache, ws_pub: Arc, cancel: tokio_util::sync::CancellationToken, @@ -45,13 +45,14 @@ impl DefaultPayloadHandler { loop { tokio::select! { - Some(message) = self.p2p_rx.recv() => { - match message { + Some(frame) = self.p2p_rx.recv() => { + match frame.decoded.as_ref() { Message::OpFlashblockPayload(fb_payload) => { - if let XLayerFlashblockMessage::Payload(payload) = &fb_payload { + if let XLayerFlashblockMessage::Payload(payload) = fb_payload { self.p2p_cache.add_flashblock_payload(payload.inner.clone()); } - if let Err(e) = self.ws_pub.publish(&fb_payload) { + // Relay using the original wire bytes — no re-encode. + if let Err(e) = self.ws_pub.publish(frame.bytes.clone(), fb_payload) { warn!(target: "payload_builder", e = ?e, "failed to publish flashblock to websocket publisher"); } } diff --git a/crates/builder/src/flashblocks/builder.rs b/crates/builder/src/flashblocks/builder.rs index 70cb0324..96bb05a4 100644 --- a/crates/builder/src/flashblocks/builder.rs +++ b/crates/builder/src/flashblocks/builder.rs @@ -14,7 +14,7 @@ use crate::{ traits::{ClientBounds, PoolBounds}, }; use eyre::WrapErr as _; -use std::{collections::BTreeMap, sync::Arc, time::Instant}; +use std::{sync::Arc, time::Instant}; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; @@ -24,7 +24,7 @@ use alloy_consensus::{ }; use alloy_eips::{eip7685::EMPTY_REQUESTS_HASH, merge::BEACON_NONCE, Encodable2718}; use alloy_evm::block::BlockExecutionResult; -use alloy_primitives::{Address, BlockHash, B256, U256}; +use alloy_primitives::{BlockHash, B256, U256}; use op_alloy_rpc_types_engine::{ OpFlashblockPayload, OpFlashblockPayloadBase, OpFlashblockPayloadDelta, OpFlashblockPayloadMetadata, @@ -38,8 +38,7 @@ use reth_optimism_consensus::{calculate_receipt_root_no_memo_optimism, isthmus}; use reth_optimism_evm::{OpEvmConfig, OpNextBlockEnvAttributes}; use reth_optimism_forks::OpHardforks; use reth_optimism_node::{OpBuiltPayload, OpPayloadBuilderAttributes}; -use reth_optimism_primitives::{OpReceipt, OpTransactionSigned}; - +use reth_optimism_primitives::OpTransactionSigned; use reth_payload_primitives::BuiltPayload; use reth_payload_util::BestPayloadTransactions; use reth_primitives_traits::RecoveredBlock; @@ -49,6 +48,7 @@ use reth_provider::{ use reth_revm::{ database::StateProviderDatabase, db::{states::bundle_state::BundleRetention, BundleState}, + state::bal::Bal, State, }; use reth_tasks::TaskExecutor; @@ -56,25 +56,6 @@ use reth_transaction_pool::TransactionPool; use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput}; use revm::Database; -/// Converts a reth OpReceipt to an op-alloy OpReceipt -/// TODO: remove this once reth updates to use the op-alloy defined type as well. -fn convert_receipt(receipt: &OpReceipt) -> op_alloy_consensus::OpReceipt { - match receipt { - OpReceipt::Legacy(r) => op_alloy_consensus::OpReceipt::Legacy(r.clone()), - OpReceipt::Eip2930(r) => op_alloy_consensus::OpReceipt::Eip2930(r.clone()), - OpReceipt::Eip1559(r) => op_alloy_consensus::OpReceipt::Eip1559(r.clone()), - OpReceipt::Eip7702(r) => op_alloy_consensus::OpReceipt::Eip7702(r.clone()), - OpReceipt::Deposit(r) => { - op_alloy_consensus::OpReceipt::Deposit(op_alloy_consensus::OpDepositReceipt { - inner: r.inner.clone(), - deposit_nonce: r.deposit_nonce, - deposit_receipt_version: r.deposit_receipt_version, - }) - } - OpReceipt::PostExec(r) => op_alloy_consensus::OpReceipt::PostExec(r.clone()), - } -} - type NextBestFlashblocksTxs = BestFlashblocksTxs< ::Transaction, Box< @@ -362,8 +343,11 @@ where let db = StateProviderDatabase::new(&state_provider); // 1. execute the pre steps and seal an early block with that let sequencer_tx_start_time = Instant::now(); - let mut state = - State::builder().with_database(cached_reads.as_db_mut(db)).with_bundle_update().build(); + let mut state = State::builder() + .with_database(cached_reads.as_db_mut(db)) + .with_bundle_update() + .with_bal_builder() + .build(); let mut info = execute_pre_steps(&mut state, &ctx)?; let sequencer_tx_time = sequencer_tx_start_time.elapsed(); @@ -960,7 +944,10 @@ where .map_err(PayloadBuilderError::other)? .apply_pre_execution_changes()?; - // 2. execute sequencer transactions + // 2. bump fbal index after pre-execution state (index 0) + state.bump_bal_index(); + + // 3. execute sequencer transactions let info = ctx.execute_sequencer_transactions(state)?; Ok(info) @@ -1099,14 +1086,6 @@ where let (excess_blob_gas, blob_gas_used) = ctx.blob_fields(info); let extra_data = ctx.extra_data()?; - // need to read balances before take_bundle() below - let new_account_balances = state - .bundle_state - .state - .iter() - .filter_map(|(address, account)| account.info.as_ref().map(|info| (*address, info.balance))) - .collect::>(); - let bundle_state = state.take_bundle(); let execution_output = BlockExecutionOutput { state: bundle_state.clone(), @@ -1118,6 +1097,13 @@ where }, }; + // Take the current flashblock incremental bal and merge it into the accumulator + // on the `ExecutionInfo`, and reset the bal builder for the next flashblock. + let flashblock_bal = state.take_built_bal(); + state.bal_state.bal_builder = Some(Bal::new()); + let fbal = flashblock_bal.clone().map(|bal| bal.into_alloy_bal()); + info.merge_access_list(flashblock_bal); + let header = Header { parent_hash: ctx.parent().hash(), ommers_hash: EMPTY_OMMER_ROOT_HASH, @@ -1192,24 +1178,14 @@ where // For X Layer, monitoring logs let new_tx_hashes = new_transactions.iter().map(|tx| tx.tx_hash()).collect::>(); - let new_receipts = info.receipts[last_idx..].to_vec(); if let Some(fb) = fb_state { if let Some(updates) = trie_updates_to_cache.take() { fb.prev_trie_updates = Some(updates); } fb.set_last_flashblock_tx_index(info.executed_transactions.len()); } - let receipts_with_hash = new_transactions - .iter() - .zip(new_receipts.iter()) - .map(|(tx, receipt)| (tx.tx_hash(), convert_receipt(receipt))) - .collect::>(); - let metadata = OpFlashblockPayloadMetadata { - receipts: Some(receipts_with_hash), - new_account_balances: Some(new_account_balances), - block_number: ctx.parent().number + 1, - access_list: None, - }; + + let metadata = OpFlashblockPayloadMetadata::new(ctx.parent().number + 1, None, None, fbal); let (_, blob_gas_used) = ctx.blob_fields(info); diff --git a/crates/builder/src/flashblocks/context.rs b/crates/builder/src/flashblocks/context.rs index 71e38299..a0578d7f 100644 --- a/crates/builder/src/flashblocks/context.rs +++ b/crates/builder/src/flashblocks/context.rs @@ -259,6 +259,11 @@ impl FlashblocksBuilderCtx { ) -> Result { let mut info = ExecutionInfo::with_capacity(self.attributes().transactions.len()); + // EIP-7928: tx K (zero-indexed in the block) records at `bal_index = K + 1` + // (pre-exec occupies index 0). Compute the index for the first tx in this + // batch from the running tx count. + let next_bal_index = info.executed_transactions.len() as u64 + 1; + db.set_bal_index(next_bal_index); let mut evm = self.evm_config.evm_with_env(&mut *db, self.evm_env.clone()); for sequencer_tx in &self.attributes().transactions { @@ -331,8 +336,10 @@ impl FlashblocksBuilderCtx { evm.db_mut().commit(state); // append sender and transaction to the respective lists + // and increment the next txn index for the access list info.executed_senders.push(sequencer_tx.signer()); info.executed_transactions.push(sequencer_tx.into_inner()); + evm.db_mut().bump_bal_index(); } let da_footprint_gas_scalar = self @@ -371,7 +378,13 @@ impl FlashblocksBuilderCtx { block_gas_limit = ?block_gas_limit, ); + // EIP-7928: tx K (zero-indexed in the block) records at `bal_index = K + 1` + // (pre-exec occupies index 0). Compute the index for the first tx in this + // batch from the running tx count. + let next_bal_index = info.executed_transactions.len() as u64 + 1; + db.set_bal_index(next_bal_index); let mut evm = self.evm_config.evm_with_env(&mut *db, self.evm_env.clone()); + for with_encoded_tx in cached_txs { let (encoded_bytes, recovered_tx) = with_encoded_tx.split(); let sender = recovered_tx.signer(); @@ -447,8 +460,10 @@ impl FlashblocksBuilderCtx { info.total_fees += U256::from(miner_fee) * U256::from(gas_used); // Append sender and transaction to the respective lists + // and increment the next txn index for the access list info.executed_senders.push(sender); info.executed_transactions.push(recovered_tx.into_inner()); + evm.db_mut().bump_bal_index(); } Ok(()) @@ -475,6 +490,11 @@ impl FlashblocksBuilderCtx { let base_fee = self.base_fee(); let tx_da_limit = self.da_config.max_da_tx_size(); + // EIP-7928: tx K (zero-indexed in the block) records at `bal_index = K + 1` + // (pre-exec occupies index 0). Compute the index for the first tx in this + // batch from the running tx count. + let next_bal_index = info.executed_transactions.len() as u64 + 1; + db.set_bal_index(next_bal_index); let mut evm = self.evm_config.evm_with_env(&mut *db, self.evm_env.clone()); debug!( @@ -651,8 +671,10 @@ impl FlashblocksBuilderCtx { info.total_fees += U256::from(miner_fee) * U256::from(gas_used); // append sender and transaction to the respective lists + // and increment the next txn index for the access list info.executed_senders.push(tx.signer()); info.executed_transactions.push(tx.into_inner()); + evm.db_mut().bump_bal_index(); } let payload_transaction_simulation_time = execute_txs_start_time.elapsed(); diff --git a/crates/builder/src/flashblocks/handler.rs b/crates/builder/src/flashblocks/handler.rs index cdd51d5a..92eef975 100644 --- a/crates/builder/src/flashblocks/handler.rs +++ b/crates/builder/src/flashblocks/handler.rs @@ -1,5 +1,5 @@ use crate::{ - broadcast::{Message, WebSocketPublisher, XLayerFlashblockMessage}, + broadcast::{BroadcastFrame, Message, WebSocketPublisher, XLayerFlashblockMessage}, flashblocks::{ handler_ctx::FlashblockHandlerContext, utils::{cache::FlashblockPayloadsCache, execution::ExecutionInfo}, @@ -41,7 +41,7 @@ pub(crate) struct FlashblocksPayloadHandler { // receives new full block payloads built by this builder. built_payload_rx: mpsc::Receiver, // receives incoming p2p messages from peers. - p2p_rx: mpsc::Receiver, + p2p_rx: mpsc::Receiver, // outgoing p2p channel to broadcast new payloads to peers. p2p_tx: mpsc::Sender, // sends a `Events::BuiltPayload` to the reth payload builder when a new payload is received. @@ -68,7 +68,7 @@ where ctx: FlashblockHandlerContext, built_fb_payload_rx: mpsc::Receiver, built_payload_rx: mpsc::Receiver, - p2p_rx: mpsc::Receiver, + p2p_rx: mpsc::Receiver, p2p_tx: mpsc::Sender, payload_events_handle: tokio::sync::broadcast::Sender>, p2p_cache: FlashblockPayloadsCache, @@ -131,14 +131,14 @@ where let _ = p2p_tx.send(Message::from_built_payload(payload)).await; } } - Some(message) = p2p_rx.recv() => { - match message { + Some(frame) = p2p_rx.recv() => { + match frame.decoded.as_ref() { Message::OpBuiltPayload(payload) => { if !p2p_process_full_payload_flag { continue; } - let payload: OpBuiltPayload = (*payload).into(); + let payload: OpBuiltPayload = (**payload).clone().into(); let block_hash = payload.block().hash(); // Check if this block is already the pending block in canonical state if let Ok(Some(pending)) = client.pending_block() @@ -181,10 +181,10 @@ where })); } Message::OpFlashblockPayload(fb_payload) => { - if let XLayerFlashblockMessage::Payload(payload) = &fb_payload { + if let XLayerFlashblockMessage::Payload(payload) = fb_payload { p2p_cache.add_flashblock_payload(payload.inner.clone()); } - if let Err(e) = ws_pub.publish(&fb_payload) { + if let Err(e) = ws_pub.publish(frame.bytes.clone(), fb_payload) { warn!(target: "payload_builder", e = ?e, "failed to publish flashblock to websocket publisher"); } } diff --git a/crates/builder/src/flashblocks/utils/cache.rs b/crates/builder/src/flashblocks/utils/cache.rs index 2db06230..620a1454 100644 --- a/crates/builder/src/flashblocks/utils/cache.rs +++ b/crates/builder/src/flashblocks/utils/cache.rs @@ -183,7 +183,6 @@ mod tests { OpFlashblockPayloadBase, OpFlashblockPayloadDelta, OpFlashblockPayloadMetadata, }; use reth_optimism_primitives::OpTransactionSigned; - use std::collections::BTreeMap; /// RAII guard for a temporary directory that cleans up on drop (success or failure). struct TempDir(PathBuf); @@ -223,12 +222,12 @@ mod tests { ..Default::default() }), diff: OpFlashblockPayloadDelta::default(), - metadata: OpFlashblockPayloadMetadata { + metadata: OpFlashblockPayloadMetadata::new( block_number, - new_account_balances: Some(BTreeMap::new()), - receipts: Some(BTreeMap::new()), - access_list: None, - }, + Some(Default::default()), + Some(Default::default()), + Some(vec![]), + ), } } diff --git a/crates/builder/src/flashblocks/utils/execution.rs b/crates/builder/src/flashblocks/utils/execution.rs index 77cdd0b7..b607d29e 100644 --- a/crates/builder/src/flashblocks/utils/execution.rs +++ b/crates/builder/src/flashblocks/utils/execution.rs @@ -3,6 +3,7 @@ use alloy_primitives::{Address, U256}; use derive_more::Display; use op_revm::OpTransactionError; use reth_optimism_primitives::{OpReceipt, OpTransactionSigned}; +use reth_revm::state::bal::Bal; #[derive(Debug, Display)] pub enum TxnExecutionResult { @@ -41,6 +42,8 @@ pub struct ExecutionInfo { pub da_footprint_scalar: Option, /// Optional blob fields for payload validation pub optional_blob_fields: Option<(Option, Option)>, + /// Accumulated block access list + pub cumulative_bal: Bal, } impl ExecutionInfo { @@ -55,6 +58,7 @@ impl ExecutionInfo { total_fees: U256::ZERO, da_footprint_scalar: None, optional_blob_fields: None, + cumulative_bal: Bal::new(), } } @@ -109,4 +113,567 @@ impl ExecutionInfo { } Ok(()) } + + /// Merges a single flashblock's access list into the accumulated block access list. + pub fn merge_access_list(&mut self, fbal: Option) { + let Some(bal) = fbal else { return }; + for (addr, incoming) in bal.accounts { + match self.cumulative_bal.accounts.get_mut(&addr) { + Some(existing) => { + existing.account_info.extend(incoming.account_info); + existing.storage.extend(incoming.storage); + } + None => { + self.cumulative_bal.accounts.insert(addr, incoming); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy_eip7928::{ + AccountChanges, BalanceChange, BlockAccessList, CodeChange, NonceChange, SlotChanges, + StorageChange, + }; + use alloy_primitives::{address, Bytes}; + use reth_revm::state::bal::AccountBal; + + const ADDR_A: Address = address!("0000000000000000000000000000000000000001"); + const ADDR_B: Address = address!("0000000000000000000000000000000000000002"); + + type SlotChangeSpec = (U256, Vec<(u64, U256)>); + + fn make_account_changes( + addr: Address, + balance_changes: Vec<(u64, u64)>, + storage_reads: Vec, + ) -> AccountChanges { + let mut bc: Vec = balance_changes + .into_iter() + .map(|(tx_idx, bal)| BalanceChange { + block_access_index: tx_idx, + post_balance: U256::from(bal), + }) + .collect(); + bc.sort_unstable_by_key(|c| c.block_access_index); + + AccountChanges { + address: addr, + balance_changes: bc, + storage_reads: storage_reads.into_iter().map(U256::from).collect(), + ..Default::default() + } + } + + fn make_account_changes_full( + addr: Address, + balance_changes: Vec<(u64, U256)>, + nonce_changes: Vec<(u64, u64)>, + storage_changes: Vec, + ) -> AccountChanges { + let mut bc: Vec = balance_changes + .into_iter() + .map(|(tx_idx, val)| BalanceChange { block_access_index: tx_idx, post_balance: val }) + .collect(); + bc.sort_unstable_by_key(|c| c.block_access_index); + + let mut nc: Vec = nonce_changes + .into_iter() + .map(|(tx_idx, val)| NonceChange { block_access_index: tx_idx, new_nonce: val }) + .collect(); + nc.sort_unstable_by_key(|c| c.block_access_index); + + let storage_changes: Vec = storage_changes + .into_iter() + .map(|(slot, changes)| { + let mut changes: Vec = changes + .into_iter() + .map(|(tx_idx, val)| StorageChange { + block_access_index: tx_idx, + new_value: val, + }) + .collect(); + changes.sort_unstable_by_key(|c| c.block_access_index); + SlotChanges { slot, changes } + }) + .collect(); + + AccountChanges { + address: addr, + balance_changes: bc, + nonce_changes: nc, + storage_changes, + ..Default::default() + } + } + + fn alloy_to_revm(list: BlockAccessList) -> Bal { + list.into_iter() + .map(|ac| AccountBal::try_from_alloy(ac).expect("valid alloy account changes")) + .collect() + } + + fn merge_alloy(info: &mut ExecutionInfo, list: BlockAccessList) { + info.merge_access_list(Some(alloy_to_revm(list))); + } + + fn account_in(list: &BlockAccessList, addr: Address) -> &AccountChanges { + list.iter().find(|ac| ac.address == addr).expect("address present") + } + + #[test] + fn merge_none_is_noop() { + let mut info = ExecutionInfo::with_capacity(0); + info.merge_access_list(None); + assert!(info.cumulative_bal.accounts.is_empty()); + } + + #[test] + fn merge_empty_bal_is_noop() { + let mut info = ExecutionInfo::with_capacity(0); + info.merge_access_list(Some(Bal::new())); + assert!(info.cumulative_bal.accounts.is_empty()); + } + + #[test] + fn merge_single_flashblock_populates_cumulative() { + let mut info = ExecutionInfo::with_capacity(0); + let ac = make_account_changes(ADDR_A, vec![(0, 100)], vec![]); + merge_alloy(&mut info, vec![ac]); + + let out = info.cumulative_bal.into_alloy_bal(); + assert_eq!(out.len(), 1); + assert_eq!(out[0].address, ADDR_A); + assert_eq!(out[0].balance_changes.len(), 1); + assert_eq!(out[0].balance_changes[0].block_access_index, 0); + assert_eq!(out[0].balance_changes[0].post_balance, U256::from(100)); + } + + #[test] + fn merge_across_flashblocks_same_address_extends_balance() { + let mut info = ExecutionInfo::with_capacity(0); + merge_alloy(&mut info, vec![make_account_changes(ADDR_A, vec![(0, 100)], vec![])]); + merge_alloy( + &mut info, + vec![ + make_account_changes(ADDR_A, vec![(5, 200)], vec![]), + make_account_changes(ADDR_B, vec![(6, 300)], vec![]), + ], + ); + + let out = info.cumulative_bal.into_alloy_bal(); + assert_eq!(out.len(), 2, "ADDR_A merged into one entry, ADDR_B separate"); + assert_eq!(out[0].address, ADDR_A); + assert_eq!(out[1].address, ADDR_B); + + let a_changes: Vec<(u64, U256)> = + out[0].balance_changes.iter().map(|c| (c.block_access_index, c.post_balance)).collect(); + assert_eq!(a_changes.len(), 2); + assert!(a_changes.contains(&(0, U256::from(100)))); + assert!(a_changes.contains(&(5, U256::from(200)))); + + assert_eq!(out[1].balance_changes.len(), 1); + assert_eq!(out[1].balance_changes[0].block_access_index, 6); + assert_eq!(out[1].balance_changes[0].post_balance, U256::from(300)); + } + + #[test] + fn merge_dedupes_storage_reads() { + let mut info = ExecutionInfo::with_capacity(0); + merge_alloy(&mut info, vec![make_account_changes(ADDR_A, vec![], vec![1, 2])]); + merge_alloy(&mut info, vec![make_account_changes(ADDR_A, vec![], vec![1, 3])]); + + let out = info.cumulative_bal.into_alloy_bal(); + assert_eq!(out.len(), 1); + assert_eq!( + out[0].storage_reads, + vec![U256::from(1), U256::from(2), U256::from(3)], + "duplicate slot 1 collapsed; result sorted ascending", + ); + } + + #[test] + fn merge_last_balance_change_resolves_across_three_flashblocks() { + let mut info = ExecutionInfo::with_capacity(0); + let mk = |bal: Vec<(u64, U256)>| make_account_changes_full(ADDR_A, bal, vec![], vec![]); + merge_alloy(&mut info, vec![mk(vec![(2, U256::from(20)), (5, U256::from(50))])]); + merge_alloy(&mut info, vec![mk(vec![(8, U256::from(80)), (11, U256::from(110))])]); + merge_alloy(&mut info, vec![mk(vec![(15, U256::from(150)), (20, U256::from(200))])]); + + let out = info.cumulative_bal.into_alloy_bal(); + let a = account_in(&out, ADDR_A); + assert_eq!(a.balance_changes.last().unwrap().post_balance, U256::from(200)); + } + + #[test] + fn merge_last_nonce_change_resolves_across_three_flashblocks() { + let mut info = ExecutionInfo::with_capacity(0); + let mk = |n: Vec<(u64, u64)>| make_account_changes_full(ADDR_A, vec![], n, vec![]); + merge_alloy(&mut info, vec![mk(vec![(1, 1), (3, 2)])]); + merge_alloy(&mut info, vec![mk(vec![(7, 3), (9, 4)])]); + merge_alloy(&mut info, vec![mk(vec![(12, 5), (18, 6)])]); + + let out = info.cumulative_bal.into_alloy_bal(); + let a = account_in(&out, ADDR_A); + assert_eq!(a.nonce_changes.last().unwrap().new_nonce, 6); + } + + #[test] + fn merge_same_slot_modified_in_every_flashblock_resolves_to_latest_write() { + let mut info = ExecutionInfo::with_capacity(0); + let slot = U256::from(0x1); + let mk = |c: Vec<(u64, U256)>| { + make_account_changes_full(ADDR_A, vec![], vec![], vec![(slot, c)]) + }; + merge_alloy(&mut info, vec![mk(vec![(2, U256::from(200)), (4, U256::from(400))])]); + merge_alloy(&mut info, vec![mk(vec![(8, U256::from(800)), (10, U256::from(1000))])]); + merge_alloy(&mut info, vec![mk(vec![(15, U256::from(1500)), (20, U256::from(2000))])]); + + let out = info.cumulative_bal.into_alloy_bal(); + let a = account_in(&out, ADDR_A); + assert_eq!(a.storage_changes.len(), 1, "duplicate slots are deduped on aggregation"); + let sc = &a.storage_changes[0]; + assert_eq!(sc.slot, slot); + assert_eq!(sc.changes.len(), 6, "all 6 writes flattened into one sorted vec"); + assert_eq!(sc.changes.last().unwrap().new_value, U256::from(2000)); + } + + #[test] + fn merge_slot_modified_with_gap_resolves_to_latest_touching_flashblock() { + let mut info = ExecutionInfo::with_capacity(0); + let slot = U256::from(0x1); + merge_alloy( + &mut info, + vec![make_account_changes_full( + ADDR_A, + vec![], + vec![], + vec![(slot, vec![(3, U256::from(100))])], + )], + ); + merge_alloy( + &mut info, + vec![make_account_changes_full(ADDR_A, vec![(7, U256::from(7777))], vec![], vec![])], + ); + merge_alloy( + &mut info, + vec![make_account_changes_full( + ADDR_A, + vec![], + vec![], + vec![(slot, vec![(13, U256::from(999))])], + )], + ); + + let out = info.cumulative_bal.into_alloy_bal(); + let a = account_in(&out, ADDR_A); + let sc = a.storage_changes.iter().find(|sc| sc.slot == slot).expect("slot present"); + assert_eq!(sc.changes.last().unwrap().new_value, U256::from(999)); + assert_eq!(a.balance_changes.last().unwrap().post_balance, U256::from(7777)); + } + + #[test] + fn merge_multi_slot_each_resolves_independently_across_flashblocks() { + let mut info = ExecutionInfo::with_capacity(0); + let (s1, s2, s3) = (U256::from(0x1), U256::from(0x2), U256::from(0x3)); + let mk = |c: Vec| make_account_changes_full(ADDR_A, vec![], vec![], c); + merge_alloy( + &mut info, + vec![mk(vec![(s1, vec![(2, U256::from(11))]), (s3, vec![(4, U256::from(33))])])], + ); + merge_alloy( + &mut info, + vec![mk(vec![ + (s2, vec![(6, U256::from(60)), (8, U256::from(80))]), + (s3, vec![(7, U256::from(70))]), + ])], + ); + merge_alloy( + &mut info, + vec![mk(vec![(s1, vec![(14, U256::from(140))]), (s3, vec![(12, U256::from(120))])])], + ); + + let out = info.cumulative_bal.into_alloy_bal(); + let a = account_in(&out, ADDR_A); + let final_for = |slot| { + a.storage_changes + .iter() + .find(|sc| sc.slot == slot) + .and_then(|sc| sc.changes.last()) + .map(|c| c.new_value) + }; + assert_eq!(final_for(s1), Some(U256::from(140))); + assert_eq!(final_for(s2), Some(U256::from(80))); + assert_eq!(final_for(s3), Some(U256::from(120))); + } + + #[test] + fn merge_read_then_write_across_flashblocks_promotes_to_changes() { + let mut info = ExecutionInfo::with_capacity(0); + let slot = U256::from(0x42); + merge_alloy(&mut info, vec![make_account_changes(ADDR_A, vec![], vec![0x42])]); + merge_alloy( + &mut info, + vec![make_account_changes_full( + ADDR_A, + vec![], + vec![], + vec![(slot, vec![(5, U256::from(100))])], + )], + ); + + let out = info.cumulative_bal.into_alloy_bal(); + let a = account_in(&out, ADDR_A); + assert!(a.storage_reads.is_empty(), "later write must promote slot out of storage_reads"); + assert_eq!(a.storage_changes.len(), 1); + let sc = &a.storage_changes[0]; + assert_eq!(sc.slot, slot); + assert_eq!(sc.changes.len(), 1); + assert_eq!(sc.changes[0].block_access_index, 5); + assert_eq!(sc.changes[0].new_value, U256::from(100)); + } + + #[test] + fn merge_write_then_read_across_flashblocks_stays_as_changes() { + let mut info = ExecutionInfo::with_capacity(0); + let slot = U256::from(0x99); + merge_alloy( + &mut info, + vec![make_account_changes_full( + ADDR_A, + vec![], + vec![], + vec![(slot, vec![(3, U256::from(777))])], + )], + ); + merge_alloy(&mut info, vec![make_account_changes(ADDR_A, vec![], vec![0x99])]); + + let out = info.cumulative_bal.into_alloy_bal(); + let a = account_in(&out, ADDR_A); + assert!( + a.storage_reads.is_empty(), + "slot already has writes — must not appear as bare read" + ); + assert_eq!(a.storage_changes.len(), 1); + let sc = &a.storage_changes[0]; + assert_eq!(sc.slot, slot); + assert_eq!(sc.changes.len(), 1, "FB0 write must survive across the FB1 read"); + assert_eq!(sc.changes[0].block_access_index, 3); + assert_eq!(sc.changes[0].new_value, U256::from(777)); + } + + #[test] + fn merge_code_changes_across_flashblocks_with_last_extraction() { + let mut info = ExecutionInfo::with_capacity(0); + let code_v1 = Bytes::from(vec![0x60, 0x00]); + let code_v2 = Bytes::from(vec![0x61, 0xff, 0xff]); + + let mk = |bai: u64, code: Bytes| AccountChanges { + address: ADDR_A, + code_changes: vec![CodeChange { block_access_index: bai, new_code: code }], + ..Default::default() + }; + + merge_alloy(&mut info, vec![mk(2, code_v1.clone())]); + merge_alloy(&mut info, vec![mk(8, code_v2.clone())]); + + let out = info.cumulative_bal.into_alloy_bal(); + let a = account_in(&out, ADDR_A); + assert_eq!(a.code_changes.len(), 2); + assert_eq!(a.code_changes[0].block_access_index, 2); + assert_eq!(a.code_changes[0].new_code, code_v1); + assert_eq!(a.code_changes[1].block_access_index, 8); + assert_eq!(a.code_changes[1].new_code, code_v2); + assert_eq!(a.code_changes.last().unwrap().new_code, code_v2); + } + + #[test] + fn merge_output_is_sorted_by_address() { + let mut info = ExecutionInfo::with_capacity(0); + let addr_z = address!("00000000000000000000000000000000000000ff"); + let addr_m = address!("0000000000000000000000000000000000000080"); + let addr_a = address!("0000000000000000000000000000000000000001"); + + merge_alloy(&mut info, vec![make_account_changes(addr_z, vec![(1, 1)], vec![])]); + merge_alloy(&mut info, vec![make_account_changes(addr_a, vec![(2, 2)], vec![])]); + merge_alloy(&mut info, vec![make_account_changes(addr_m, vec![(3, 3)], vec![])]); + + let out = info.cumulative_bal.into_alloy_bal(); + assert_eq!(out.len(), 3); + assert_eq!(out[0].address, addr_a); + assert_eq!(out[1].address, addr_m); + assert_eq!(out[2].address, addr_z); + } + + #[test] + fn merge_preserves_bal_index_ordering_per_slot() { + let mut info = ExecutionInfo::with_capacity(0); + let slot = U256::from(0xab); + let mk = |writes: Vec<(u64, U256)>| { + make_account_changes_full(ADDR_A, vec![], vec![], vec![(slot, writes)]) + }; + merge_alloy(&mut info, vec![mk(vec![(1, U256::from(11)), (3, U256::from(33))])]); + merge_alloy(&mut info, vec![mk(vec![(5, U256::from(55)), (9, U256::from(99))])]); + merge_alloy(&mut info, vec![mk(vec![(12, U256::from(120)), (20, U256::from(200))])]); + + let out = info.cumulative_bal.into_alloy_bal(); + let sc = &account_in(&out, ADDR_A).storage_changes[0]; + let indices: Vec = sc.changes.iter().map(|c| c.block_access_index).collect(); + assert_eq!( + indices, + vec![1, 3, 5, 9, 12, 20], + "writes must remain in non-decreasing bal_index order across flashblock merges", + ); + } + + // EIP-7928 §"BlockAccessList canonical form" — focused compliance pins. + + #[test] + fn eip7928_storage_changes_lex_sorted_by_slot() { + let mut info = ExecutionInfo::with_capacity(0); + let s_high = U256::from(0xff); + let s_mid = U256::from(0x80); + let s_low = U256::from(0x01); + let mk = |slot: U256, idx: u64, val: u64| { + make_account_changes_full( + ADDR_A, + vec![], + vec![], + vec![(slot, vec![(idx, U256::from(val))])], + ) + }; + merge_alloy(&mut info, vec![mk(s_high, 1, 10)]); + merge_alloy(&mut info, vec![mk(s_low, 2, 20)]); + merge_alloy(&mut info, vec![mk(s_mid, 3, 30)]); + + let out = info.cumulative_bal.into_alloy_bal(); + let a = account_in(&out, ADDR_A); + let slots: Vec = a.storage_changes.iter().map(|sc| sc.slot).collect(); + assert_eq!(slots, vec![s_low, s_mid, s_high], "storage_changes must be lex-sorted by slot"); + } + + #[test] + fn eip7928_balance_changes_ascending_index_within_account() { + let mut info = ExecutionInfo::with_capacity(0); + let mk = |bc: Vec<(u64, U256)>| make_account_changes_full(ADDR_A, bc, vec![], vec![]); + merge_alloy(&mut info, vec![mk(vec![(1, U256::from(1)), (4, U256::from(4))])]); + merge_alloy(&mut info, vec![mk(vec![(7, U256::from(7))])]); + merge_alloy(&mut info, vec![mk(vec![(11, U256::from(11)), (15, U256::from(15))])]); + + let out = info.cumulative_bal.into_alloy_bal(); + let indices: Vec = + account_in(&out, ADDR_A).balance_changes.iter().map(|c| c.block_access_index).collect(); + let mut sorted = indices.clone(); + sorted.sort_unstable(); + assert_eq!(indices, sorted, "balance_changes must be ascending by block_access_index"); + } + + #[test] + fn eip7928_no_duplicate_addresses_in_output() { + let mut info = ExecutionInfo::with_capacity(0); + for idx in 0..5 { + merge_alloy( + &mut info, + vec![ + make_account_changes(ADDR_A, vec![(idx, idx * 10)], vec![]), + make_account_changes(ADDR_B, vec![(idx, idx * 100)], vec![]), + ], + ); + } + + let out = info.cumulative_bal.into_alloy_bal(); + let mut addrs: Vec
= out.iter().map(|ac| ac.address).collect(); + let total = addrs.len(); + addrs.sort_unstable(); + addrs.dedup(); + assert_eq!(total, addrs.len(), "each address must appear exactly once in the output"); + assert_eq!(total, 2); + } + + #[test] + fn eip7928_storage_changes_and_storage_reads_are_disjoint_per_account() { + let mut info = ExecutionInfo::with_capacity(0); + let written_slot = U256::from(0x10); + let read_slot = U256::from(0x20); + + merge_alloy( + &mut info, + vec![make_account_changes_full( + ADDR_A, + vec![], + vec![], + vec![(written_slot, vec![(2, U256::from(99))])], + )], + ); + merge_alloy(&mut info, vec![make_account_changes(ADDR_A, vec![], vec![0x20])]); + merge_alloy(&mut info, vec![make_account_changes(ADDR_A, vec![], vec![0x10])]); + + let out = info.cumulative_bal.into_alloy_bal(); + let a = account_in(&out, ADDR_A); + let change_slots: std::collections::HashSet = + a.storage_changes.iter().map(|sc| sc.slot).collect(); + let read_slots: std::collections::HashSet = a.storage_reads.iter().copied().collect(); + assert!( + change_slots.intersection(&read_slots).next().is_none(), + "storage_changes ∩ storage_reads must be empty per EIP-7928", + ); + assert!(change_slots.contains(&written_slot)); + assert!(read_slots.contains(&read_slot)); + } + + #[test] + fn eip7928_no_duplicate_block_access_index_in_change_lists() { + // Across non-overlapping flashblock index ranges (the builder invariant), no two + // entries can share a `block_access_index`. + let mut info = ExecutionInfo::with_capacity(0); + let mk_balance = + |bc: Vec<(u64, U256)>| make_account_changes_full(ADDR_A, bc, vec![], vec![]); + merge_alloy(&mut info, vec![mk_balance(vec![(1, U256::from(1)), (2, U256::from(2))])]); + merge_alloy(&mut info, vec![mk_balance(vec![(3, U256::from(3)), (4, U256::from(4))])]); + + let slot = U256::from(0xff); + let mk_storage = |sc: Vec<(u64, U256)>| { + make_account_changes_full(ADDR_A, vec![], vec![], vec![(slot, sc)]) + }; + merge_alloy(&mut info, vec![mk_storage(vec![(5, U256::from(50))])]); + merge_alloy(&mut info, vec![mk_storage(vec![(6, U256::from(60))])]); + + let out = info.cumulative_bal.into_alloy_bal(); + let a = account_in(&out, ADDR_A); + + let bal_indices: Vec = + a.balance_changes.iter().map(|c| c.block_access_index).collect(); + let mut bal_unique = bal_indices.clone(); + bal_unique.sort_unstable(); + bal_unique.dedup(); + assert_eq!(bal_indices.len(), bal_unique.len(), "no duplicate index in balance_changes"); + + let sc = &a.storage_changes[0]; + let slot_indices: Vec = sc.changes.iter().map(|c| c.block_access_index).collect(); + let mut slot_unique = slot_indices.clone(); + slot_unique.sort_unstable(); + slot_unique.dedup(); + assert_eq!(slot_indices.len(), slot_unique.len(), "no duplicate index per slot"); + } + + #[test] + fn eip7928_empty_change_lists_preserved_for_touched_accounts() { + // Account touched only via reads must still surface in the output with all change lists + // empty, satisfying "accounts with no state changes MUST still be present". + let mut info = ExecutionInfo::with_capacity(0); + merge_alloy(&mut info, vec![make_account_changes(ADDR_A, vec![], vec![1, 2, 3])]); + + let out = info.cumulative_bal.into_alloy_bal(); + assert_eq!(out.len(), 1); + let a = &out[0]; + assert_eq!(a.address, ADDR_A); + assert!(a.balance_changes.is_empty()); + assert!(a.nonce_changes.is_empty()); + assert!(a.code_changes.is_empty()); + assert!(a.storage_changes.is_empty()); + assert_eq!(a.storage_reads, vec![U256::from(1), U256::from(2), U256::from(3)]); + } } diff --git a/crates/builder/src/tests/flashblocks.rs b/crates/builder/src/tests/flashblocks.rs index 843d2fe6..30503e8c 100644 --- a/crates/builder/src/tests/flashblocks.rs +++ b/crates/builder/src/tests/flashblocks.rs @@ -356,3 +356,118 @@ async fn progressive_lag_reduces_flashblocks(rbuilder: LocalInstance) -> eyre::R flashblocks_listener.stop().await } + +/// Validates the EIP-7928 BAL produced via the upstream `State::with_bal_builder()` +/// path is wire-correct end-to-end. Combines two invariants in one e2e run: +/// +/// * **EIP-7928 index assignment**: pre-exec records at `block_access_index = 0`, +/// tx K records at `K + 1`. The base flashblock (index=0) carries the pre-exec +/// entry at `bal_index=0` and the L1-info system tx at `bal_index=1`. +/// * **Per-flashblock incrementality**: subsequent flashblocks (index>=1) carry +/// ONLY their own tx range — no pre-exec leak from FB 0 (caught by the +/// `take_built_alloy_bal()` + `bal_builder = Some(Bal::new())` re-arm pair), +/// and no overlap of `bal_index` across FBs (i.e., FB N max < FB N+1 min). +#[rb_test(args = BuilderArgs { + chain_block_time: 1000, + flashblocks: FlashblocksArgs { + enabled: true, + flashblocks_port: 1242, + flashblocks_addr: "127.0.0.1".into(), + flashblocks_block_time: 200, + flashblocks_disable_state_root: true, + ..Default::default() + }, + ..Default::default() +})] +async fn test_bal_eip7928_per_flashblock_invariants(rbuilder: LocalInstance) -> eyre::Result<()> { + use alloy_eip7928::BlockAccessList; + use alloy_rlp::Decodable; + use std::collections::BTreeSet; + + let driver = rbuilder.driver().await?; + let flashblocks_listener = rbuilder.spawn_flashblocks_listener(); + + // Send enough txs that the block produces multiple flashblocks (need + // ≥2 FBs to assert non-trivial incrementality). + for _ in 0..6 { + let _ = driver.create_transaction().random_valid_transfer().send().await?; + } + let _block = driver.build_new_block_with_current_timestamp(None).await?; + tokio::time::sleep(Duration::from_millis(100)).await; + + let flashblocks = flashblocks_listener.get_flashblocks(); + assert!(flashblocks.len() >= 2, "expected at least 2 flashblocks; got {}", flashblocks.len()); + + // For each flashblock, decode its wire BAL and collect every + // `block_access_index` referenced. + fn collect_indices(bal: &BlockAccessList) -> BTreeSet { + let mut set = BTreeSet::new(); + for account in bal { + for c in &account.balance_changes { + set.insert(c.block_access_index); + } + for c in &account.nonce_changes { + set.insert(c.block_access_index); + } + for c in &account.code_changes { + set.insert(c.block_access_index); + } + for sc in &account.storage_changes { + for c in &sc.changes { + set.insert(c.block_access_index); + } + } + } + set + } + + let mut per_fb: Vec<(u64, BTreeSet)> = Vec::new(); + for fb in &flashblocks { + let Some(raw_bal) = fb.metadata.access_list.as_ref() else { continue }; + let mut buf: &[u8] = raw_bal.as_ref(); + let bal = BlockAccessList::decode(&mut buf).expect("BAL must decode"); + per_fb.push((fb.index, collect_indices(&bal))); + } + assert!(!per_fb.is_empty(), "expected at least one flashblock to carry a BAL"); + + // EIP-7928 invariant: base FB carries pre-exec at bal_index=0 AND tx 0 + // (L1-info attribute deposit) at bal_index=1. + let fb0 = per_fb + .iter() + .find(|(idx, _)| *idx == 0) + .map(|(_, set)| set) + .expect("base flashblock must carry a BAL"); + assert!(fb0.contains(&0), "FB 0 must carry pre-exec entry at bal_index=0; got {fb0:?}"); + assert!(fb0.contains(&1), "FB 0 must carry the L1-info system tx at bal_index=1; got {fb0:?}",); + + // Per-FB incrementality: only the base FB may carry bal_index=0. If a + // subsequent FB does, `take_built_alloy_bal() + re-arm` leaked. + for (fb_idx, indices) in &per_fb { + if *fb_idx == 0 { + continue; + } + assert!( + !indices.contains(&0), + "FB {fb_idx} leaked pre-exec entries at bal_index=0: {indices:?}", + ); + } + + // Global monotonicity: across FBs that carry indices, FB N's max is + // strictly less than FB N+1's min. Catches any per-FB cumulative leak + // beyond just the pre-exec slot. + let mut nonempty: Vec<&(u64, BTreeSet)> = + per_fb.iter().filter(|(_, s)| !s.is_empty()).collect(); + nonempty.sort_by_key(|(idx, _)| *idx); + for w in nonempty.windows(2) { + let (a_fb, a_set) = w[0]; + let (b_fb, b_set) = w[1]; + let a_max = *a_set.iter().next_back().expect("non-empty"); + let b_min = *b_set.iter().next().expect("non-empty"); + assert!( + a_max < b_min, + "FB {a_fb} (max bal_index={a_max}) and FB {b_fb} (min bal_index={b_min}) overlap", + ); + } + + flashblocks_listener.stop().await +} diff --git a/crates/builder/src/tests/framework/instance.rs b/crates/builder/src/tests/framework/instance.rs index 518188a2..d0c77108 100644 --- a/crates/builder/src/tests/framework/instance.rs +++ b/crates/builder/src/tests/framework/instance.rs @@ -1,6 +1,5 @@ use crate::{ args::BuilderArgs, - broadcast::XLayerFlashblockMessage, flashblocks::{BuilderConfig, FlashblocksServiceBuilder}, signer::Signer, tests::{ @@ -341,14 +340,21 @@ impl FlashblocksListener { _ = cancellation_token_clone.cancelled() => { break Ok(()); } - Some(Ok(Message::Text(text))) = read.next() => { - let message: XLayerFlashblockMessage = serde_json::from_str(&text).unwrap(); - if let Some(payload) = message.as_payload() { - let timestamped = TimestampedFlashblock { - payload: payload.inner.clone(), - received_at: Instant::now(), - }; - flashblocks_clone.lock().push(timestamped); + Some(Ok(msg)) = read.next() => { + let bytes = match msg { + Message::Binary(b) => b.to_vec(), + Message::Text(t) => t.as_bytes().to_vec(), + _ => continue, + }; + let outer: crate::broadcast::Message = + crate::broadcast::frame::decode(&bytes).unwrap(); + if let crate::broadcast::Message::OpFlashblockPayload(message) = outer && + let Some(payload) = message.as_payload() { + let timestamped = TimestampedFlashblock { + payload: payload.inner.clone(), + received_at: Instant::now(), + }; + flashblocks_clone.lock().push(timestamped); } } } diff --git a/crates/flashblocks/Cargo.toml b/crates/flashblocks/Cargo.toml index 16ac684a..e01c29f3 100644 --- a/crates/flashblocks/Cargo.toml +++ b/crates/flashblocks/Cargo.toml @@ -53,7 +53,7 @@ reth-trie-parallel.workspace = true # alloy alloy-consensus.workspace = true alloy-eips = { workspace = true, features = ["serde"] } -alloy-eip7928.workspace = true +alloy-eip7928 = { workspace = true, features = ["serde", "rlp"] } alloy-evm.workspace = true alloy-json-rpc.workspace = true alloy-primitives = { workspace = true, features = ["serde"] } diff --git a/crates/flashblocks/src/cache/raw.rs b/crates/flashblocks/src/cache/raw.rs index 3b219ff3..8a06beec 100644 --- a/crates/flashblocks/src/cache/raw.rs +++ b/crates/flashblocks/src/cache/raw.rs @@ -3,10 +3,12 @@ use parking_lot::RwLock; use ringbuffer::{AllocRingBuffer, RingBuffer}; use std::{collections::BTreeMap, sync::Arc}; +use alloy_eip7928::BlockAccessList; use alloy_eips::{eip2718::WithEncoded, eip4895::Withdrawal}; use alloy_rpc_types_engine::PayloadId; use op_alloy_rpc_types_engine::{OpFlashblockPayload, OpFlashblockPayloadBase}; use reth_primitives_traits::{Recovered, SignedTransaction}; +use reth_revm::state::bal::{AccountBal, Bal}; use xlayer_builder::broadcast::{XLayerFlashblockMessage, XLayerFlashblockPayload}; @@ -27,8 +29,8 @@ pub struct RawFlashblocksCache { } impl RawFlashblocksCache { - pub fn new() -> Self { - let inner = Arc::new(RwLock::new(RawFlashblocksCacheInner::new())); + pub fn new(disable_access_list: bool) -> Self { + let inner = Arc::new(RwLock::new(RawFlashblocksCacheInner::new(disable_access_list))); Self { inner } } @@ -59,11 +61,16 @@ impl RawFlashblocksCache { pub struct RawFlashblocksCacheInner { cache: AllocRingBuffer>, canon_height: u64, + disable_access_list: bool, } impl RawFlashblocksCacheInner { - fn new() -> Self { - Self { cache: AllocRingBuffer::new(MAX_RAW_CACHE_SIZE), canon_height: 0 } + fn new(disable_access_list: bool) -> Self { + Self { + cache: AllocRingBuffer::new(MAX_RAW_CACHE_SIZE), + canon_height: 0, + disable_access_list, + } } pub fn handle_canonical_height(&mut self, height: u64) { @@ -124,7 +131,7 @@ impl RawFlashblocksCacheInner { .iter() .rev() .find(|entry| entry.block_number() == Some(height)) - .and_then(|entry| entry.try_to_buildable_args()) + .and_then(|entry| entry.try_to_buildable_args(self.disable_access_list)) } } @@ -136,6 +143,8 @@ struct RawFlashblocksEntry { payloads: BTreeMap, /// Tracks the recovered transactions by index recovered_transactions_by_index: BTreeMap>>>, + /// Tracks the decoded EIP-7928 [`BlockAccessList`] by index + block_access_lists: BTreeMap, /// Tracks if the accumulated sequence has received the first base flashblock has_base: bool, /// The sequencer's target flashblock index. Zero if unset. @@ -149,6 +158,7 @@ impl RawFlashblocksEntry { Self { payloads: BTreeMap::new(), recovered_transactions_by_index: BTreeMap::new(), + block_access_lists: BTreeMap::new(), has_base: false, target_index: 0, sequence_end: false, @@ -179,6 +189,47 @@ impl RawFlashblocksEntry { } let flashblock_index = flashblock.index; let recovered_txs = flashblock.recover_transactions().collect::, _>>()?; + + match flashblock.metadata.block_access_list() { + None => {} + Some(Ok(list)) => { + let mut bal = Bal::new(); + for alloy_account in list { + let address = alloy_account.address; + match AccountBal::try_from_alloy(alloy_account) { + Ok((addr, incoming)) => match bal.accounts.get_mut(&addr) { + Some(existing) => { + existing.account_info.extend(incoming.account_info); + existing.storage.extend(incoming.storage); + } + None => { + bal.accounts.insert(addr, incoming); + } + }, + Err(err) => tracing::warn!( + target: "flashblocks", + flashblock_index, + ?address, + ?err, + "BAL account decode failed at insert, account skipped", + ), + } + } + if !bal.accounts.is_empty() { + self.block_access_lists.insert(flashblock_index, bal); + } + } + Some(Err(e)) => { + tracing::warn!( + target: "flashblocks", + flashblock_index, + block_number = flashblock.metadata.block_number, + error = %e, + "Failed to decode RLP access list at insert, flashblock retained without it", + ); + } + } + self.payloads.insert(flashblock_index, flashblock); self.recovered_transactions_by_index.insert(flashblock_index, recovered_txs); Ok(()) @@ -242,12 +293,44 @@ impl RawFlashblocksEntry { .collect() } - fn try_to_buildable_args(&self) -> Option>>>> { + /// Aggregates per-flashblock BALs into a single block-wide [`BlockAccessList`] + /// covering flashblocks `[0..=up_to]`. + fn access_list_up_to(&self, up_to: u64) -> Option { + let mut merged = Bal::new(); + for (_, bal) in self.block_access_lists.range(..=up_to) { + for (addr, incoming) in &bal.accounts { + match merged.accounts.get_mut(addr) { + Some(existing) => { + existing.account_info.extend(incoming.account_info.clone()); + existing.storage.extend(incoming.storage.clone()); + } + None => { + merged.accounts.insert(*addr, incoming.clone()); + } + } + } + } + + if merged.accounts.is_empty() { + return None; + } + Some(merged.into_alloy_bal()) + } + + fn try_to_buildable_args( + &self, + disable_access_list: bool, + ) -> Option>>>> { let best_revision = self.try_get_best_revision()?; Some(BuildArgs { base: self.base()?.clone(), payload_id: self.payload_id()?, transactions: self.transactions_up_to(best_revision), + access_list: if !disable_access_list { + self.access_list_up_to(best_revision) + } else { + None + }, withdrawals: self.withdrawals_at(best_revision), last_flashblock_index: best_revision, target_index: self.target_index, @@ -280,7 +363,7 @@ mod tests { // Arrange let factory = TestFlashBlockFactory::new(); let fb0 = factory.flashblock_at(0).build(); - let mut cache = TestRawCache::new(); + let mut cache = TestRawCache::new(false); // Act let result = cache.handle_flashblock(wrap(fb0)); @@ -295,7 +378,7 @@ mod tests { let factory = TestFlashBlockFactory::new(); let fb0 = factory.flashblock_at(0).build(); let fb0_dup = factory.flashblock_at(0).build(); - let mut cache = TestRawCache::new(); + let mut cache = TestRawCache::new(false); cache.handle_flashblock(wrap(fb0)).expect("first flashblock should succeed"); let result = cache.handle_flashblock(wrap(fb0_dup)); @@ -307,7 +390,7 @@ mod tests { let factory = TestFlashBlockFactory::new(); let fb0 = factory.flashblock_at(0).build(); let payload_id = fb0.payload_id; - let mut cache = TestRawCache::new(); + let mut cache = TestRawCache::new(false); cache.handle_flashblock(wrap(fb0)).expect("first flashblock should succeed"); let fb_wrong_block = factory @@ -327,7 +410,7 @@ mod tests { let fb0 = factory.flashblock_at(0).build(); let payload_id = fb0.payload_id; let fb2 = factory.builder().index(2).block_number(100).payload_id(payload_id).build(); - let mut cache = TestRawCache::new(); + let mut cache = TestRawCache::new(false); cache.handle_flashblock(wrap(fb0)).expect("fb0 insert"); let result = cache.handle_flashblock(wrap(fb2)); @@ -338,7 +421,7 @@ mod tests { fn test_raw_entry_get_best_revision_returns_none_without_base() { let factory = TestFlashBlockFactory::new(); let fb1 = factory.builder().index(1).block_number(100).build(); - let mut cache = TestRawCache::new(); + let mut cache = TestRawCache::new(false); cache.handle_flashblock(wrap(fb1)).expect("fb1 insert"); let entry = cache.cache.iter().next().expect("entry should exist"); @@ -351,7 +434,7 @@ mod tests { let factory = TestFlashBlockFactory::new(); let fb0 = factory.flashblock_at(0).build(); - let mut cache = TestRawCache::new(); + let mut cache = TestRawCache::new(false); cache.handle_flashblock(wrap(fb0)).expect("fb0 insert"); let entry = cache.cache.iter().next().expect("entry should exist"); let best = entry.try_get_best_revision(); @@ -365,7 +448,7 @@ mod tests { let fb1 = factory.flashblock_after(&fb0).build(); let fb2 = factory.flashblock_after(&fb1).build(); let fb3 = factory.flashblock_after(&fb2).build(); - let mut cache = TestRawCache::new(); + let mut cache = TestRawCache::new(false); cache.handle_flashblock(wrap(fb0)).expect("fb0"); cache.handle_flashblock(wrap(fb1)).expect("fb1"); @@ -383,7 +466,7 @@ mod tests { let payload_id = fb0.payload_id; let fb1 = factory.flashblock_after(&fb0).build(); let fb3 = factory.builder().index(3).block_number(100).payload_id(payload_id).build(); - let mut cache = TestRawCache::new(); + let mut cache = TestRawCache::new(false); cache.handle_flashblock(wrap(fb0)).expect("fb0"); cache.handle_flashblock(wrap(fb1)).expect("fb1"); @@ -398,7 +481,7 @@ mod tests { let factory = TestFlashBlockFactory::new(); let fb100 = factory.flashblock_at(0).build(); let fb101 = factory.flashblock_for_next_block(&fb100).build(); - let mut cache = TestRawCache::new(); + let mut cache = TestRawCache::new(false); cache.handle_flashblock(wrap(fb100)).expect("fb100"); cache.handle_flashblock(wrap(fb101)).expect("fb101"); @@ -416,7 +499,7 @@ mod tests { let fb100 = factory.flashblock_at(0).build(); let fb101 = factory.flashblock_for_next_block(&fb100).build(); let fb102 = factory.flashblock_for_next_block(&fb101).build(); - let mut cache = TestRawCache::new(); + let mut cache = TestRawCache::new(false); cache.handle_flashblock(wrap(fb100)).expect("fb100"); cache.handle_flashblock(wrap(fb101)).expect("fb101"); @@ -431,7 +514,7 @@ mod tests { let factory = TestFlashBlockFactory::new(); let fb100 = factory.flashblock_at(0).build(); let fb101 = factory.flashblock_for_next_block(&fb100).build(); - let mut cache = TestRawCache::new(); + let mut cache = TestRawCache::new(false); cache.handle_flashblock(wrap(fb100)).expect("fb100"); cache.handle_flashblock(wrap(fb101)).expect("fb101"); @@ -443,7 +526,7 @@ mod tests { fn test_raw_cache_rejects_flashblock_at_or_below_canonical_height() { let factory = TestFlashBlockFactory::new(); let fb100 = factory.flashblock_at(0).build(); - let mut cache = TestRawCache::new(); + let mut cache = TestRawCache::new(false); cache.handle_canonical_height(100); let result = cache.handle_flashblock(wrap(fb100)); @@ -456,7 +539,7 @@ mod tests { let factory = TestFlashBlockFactory::new(); let fb0_seq1 = factory.flashblock_at(0).build(); let fb0_seq2 = factory.flashblock_for_next_block(&fb0_seq1).build(); - let mut cache = TestRawCache::new(); + let mut cache = TestRawCache::new(false); cache.handle_flashblock(wrap(fb0_seq1.clone())).expect("seq1 fb0"); cache.handle_flashblock(wrap(fb0_seq2.clone())).expect("seq2 fb0"); @@ -471,7 +554,7 @@ mod tests { let factory = TestFlashBlockFactory::new(); let mut prev_fb = factory.flashblock_at(0).build(); let first_block_num = prev_fb.metadata.block_number; - let mut cache = TestRawCache::new(); + let mut cache = TestRawCache::new(false); cache.handle_flashblock(wrap(prev_fb.clone())).expect("first fb"); // Fill up to MAX_RAW_CACHE_SIZE (10) unique sequences @@ -507,7 +590,7 @@ mod tests { let factory = TestFlashBlockFactory::new(); let fb0 = factory.flashblock_at(0).build(); let expected_block = fb0.metadata.block_number; - let mut cache = TestRawCache::new(); + let mut cache = TestRawCache::new(false); cache.handle_flashblock(wrap(fb0)).expect("fb0 insert"); let entry = cache.cache.iter().next().expect("entry should exist"); @@ -518,7 +601,7 @@ mod tests { fn test_raw_entry_transaction_count_is_zero_on_empty_flashblock() { let factory = TestFlashBlockFactory::new(); let fb0 = factory.flashblock_at(0).build(); // no transactions set - let mut cache = TestRawCache::new(); + let mut cache = TestRawCache::new(false); cache.handle_flashblock(wrap(fb0)).expect("fb0 insert"); let entry = cache.cache.iter().next().expect("entry should exist"); @@ -529,7 +612,7 @@ mod tests { fn test_raw_entry_has_base_set_after_inserting_index_zero() { let factory = TestFlashBlockFactory::new(); let fb0 = factory.flashblock_at(0).build(); - let mut cache = TestRawCache::new(); + let mut cache = TestRawCache::new(false); cache.handle_flashblock(wrap(fb0)).expect("fb0 insert"); let entry = cache.cache.iter().next().expect("entry should exist"); @@ -540,7 +623,7 @@ mod tests { fn test_raw_entry_has_base_not_set_when_only_non_zero_index_inserted() { let factory = TestFlashBlockFactory::new(); let fb1 = factory.builder().index(1).block_number(100).build(); - let mut cache = TestRawCache::new(); + let mut cache = TestRawCache::new(false); cache.handle_flashblock(wrap(fb1)).expect("fb1 insert"); let entry = cache.cache.iter().next().expect("entry should exist"); @@ -551,7 +634,7 @@ mod tests { fn test_raw_flashblocks_cache_handle_flashblock_inserts_via_arc_rwlock() { let factory = TestFlashBlockFactory::new(); let fb0 = factory.flashblock_at(0).build(); - let cache = RawFlashblocksCache::::new(); + let cache = RawFlashblocksCache::::new(false); let result = cache.handle_message(XLayerFlashblockMessage::from_flashblock_payload(wrap(fb0))); @@ -563,7 +646,7 @@ mod tests { let factory = TestFlashBlockFactory::new(); let fb1 = factory.builder().index(1).block_number(100).build(); - let mut cache = TestRawCache::new(); + let mut cache = TestRawCache::new(false); cache.handle_flashblock(wrap(fb1)).expect("fb1 insert"); let entry = cache.cache.iter().next().expect("entry should exist"); let best = entry.try_get_best_revision(); @@ -580,7 +663,7 @@ mod tests { let block_number = fb0.metadata.block_number; let fb2 = factory.builder().index(2).block_number(block_number).payload_id(payload_id).build(); - let mut cache = TestRawCache::new(); + let mut cache = TestRawCache::new(false); cache.handle_flashblock(wrap(fb0)).expect("fb0"); cache.handle_flashblock(wrap(fb2)).expect("fb2"); @@ -600,7 +683,7 @@ mod tests { let different_payload_id = alloy_rpc_types_engine::PayloadId::new([ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, ]); - let mut cache = TestRawCache::new(); + let mut cache = TestRawCache::new(false); cache.handle_flashblock(wrap(fb0.clone())).expect("fb0 insert"); let fb_diff = factory @@ -626,7 +709,7 @@ mod tests { let fb1 = factory.flashblock_after(&fb0).build(); let fb2 = factory.flashblock_after(&fb1).build(); let fb3 = factory.flashblock_after(&fb2).build(); - let mut cache = TestRawCache::new(); + let mut cache = TestRawCache::new(false); cache.handle_flashblock(wrap(fb0)).expect("fb0"); cache.handle_flashblock(wrap(fb1)).expect("fb1"); @@ -648,7 +731,7 @@ mod tests { let fb0 = factory.flashblock_at(0).build(); let payload_id = fb0.payload_id; let expected_block = fb0.metadata.block_number; - let mut cache = TestRawCache::new(); + let mut cache = TestRawCache::new(false); cache.handle_flashblock(wrap(fb0)).expect("fb0 insert"); let block_number = cache.handle_end_sequence(payload_id).expect("should succeed"); @@ -660,7 +743,7 @@ mod tests { #[test] fn test_handle_end_sequence_errors_for_unknown_payload_id() { - let mut cache = TestRawCache::new(); + let mut cache = TestRawCache::new(false); let result = cache.handle_end_sequence(PayloadId::new([0xAA; 8])); assert!(result.is_err()); } @@ -671,7 +754,7 @@ mod tests { let fb0 = factory.flashblock_at(0).build(); let payload_id = fb0.payload_id; let block_number = fb0.metadata.block_number; - let mut cache = TestRawCache::new(); + let mut cache = TestRawCache::new(false); cache.handle_flashblock(wrap(fb0)).expect("fb0 insert"); let args_before = cache.try_get_buildable_args(block_number).expect("should be buildable"); @@ -688,7 +771,7 @@ mod tests { let fb0 = factory.flashblock_at(0).build(); let payload_id = fb0.payload_id; let expected_block = fb0.metadata.block_number; - let cache = RawFlashblocksCache::::new(); + let cache = RawFlashblocksCache::::new(false); cache .handle_message(XLayerFlashblockMessage::from_flashblock_payload(wrap(fb0))) @@ -894,4 +977,488 @@ mod tests { assert_eq!(flashblock, roundtrip); } + + // ======================================================================== + // Tests for `access_list_up_to` and `try_to_buildable_args` access-list flow + // ======================================================================== + + use alloy_eip7928::{AccountChanges, BalanceChange}; + use alloy_primitives::{address, Address, U256}; + + /// Helper: minimal `AccountChanges` with balance changes and storage reads. + fn make_account_changes( + addr: Address, + balance_changes: Vec<(u64, u64)>, + storage_reads: Vec, + ) -> AccountChanges { + AccountChanges { + address: addr, + balance_changes: balance_changes + .into_iter() + .map(|(tx_idx, bal)| BalanceChange { + block_access_index: tx_idx, + post_balance: U256::from(bal), + }) + .collect(), + storage_reads: storage_reads.into_iter().map(U256::from).collect(), + ..Default::default() + } + } + + const ADDR_A: Address = address!("0000000000000000000000000000000000000001"); + const ADDR_B: Address = address!("0000000000000000000000000000000000000002"); + + #[test] + fn test_access_list_up_to_merges_across_flashblocks() { + // Covers: same-address merge with disjoint tx indices preserved, and + // separate entry for a distinct address in a later flashblock. + let factory = TestFlashBlockFactory::new(); + let ac0_a = make_account_changes(ADDR_A, vec![(0, 100)], vec![]); + let ac1_a = make_account_changes(ADDR_A, vec![(5, 200)], vec![]); + let ac1_b = make_account_changes(ADDR_B, vec![(6, 300)], vec![]); + let fb0 = factory.flashblock_at(0).access_list(Some(vec![ac0_a])).build(); + let fb1 = factory.flashblock_after(&fb0).access_list(Some(vec![ac1_a, ac1_b])).build(); + + let mut cache = TestRawCache::new(false); + cache.handle_flashblock(wrap(fb0)).expect("fb0 insert"); + cache.handle_flashblock(wrap(fb1)).expect("fb1 insert"); + + let args = cache.try_get_buildable_args(100).expect("should be buildable"); + let al = args.access_list.expect("access list present"); + assert_eq!(al.len(), 2, "ADDR_A merged into one entry, ADDR_B separate"); + // Output sorted by address ascending. + assert_eq!(al[0].address, ADDR_A); + assert_eq!(al[1].address, ADDR_B); + + // ADDR_A carries both flashblocks' balance changes with exact values preserved. + assert_eq!(al[0].balance_changes.len(), 2); + let a_changes: Vec<(u64, U256)> = + al[0].balance_changes.iter().map(|c| (c.block_access_index, c.post_balance)).collect(); + assert!(a_changes.contains(&(0, U256::from(100)))); + assert!(a_changes.contains(&(5, U256::from(200)))); + + // ADDR_B carries only fb1's single change. + assert_eq!(al[1].balance_changes.len(), 1); + assert_eq!(al[1].balance_changes[0].block_access_index, 6); + assert_eq!(al[1].balance_changes[0].post_balance, U256::from(300)); + } + + #[test] + fn test_access_list_up_to_dedupes_storage_reads() { + // Same slot read in two flashblocks — must appear once in storage_reads. + let factory = TestFlashBlockFactory::new(); + let ac0 = make_account_changes(ADDR_A, vec![], vec![1, 2]); + let ac1 = make_account_changes(ADDR_A, vec![], vec![1, 3]); + let fb0 = factory.flashblock_at(0).access_list(Some(vec![ac0])).build(); + let fb1 = factory.flashblock_after(&fb0).access_list(Some(vec![ac1])).build(); + + let mut cache = TestRawCache::new(false); + cache.handle_flashblock(wrap(fb0)).expect("fb0 insert"); + cache.handle_flashblock(wrap(fb1)).expect("fb1 insert"); + + let args = cache.try_get_buildable_args(100).expect("should be buildable"); + let entry = &args.access_list.expect("present")[0]; + // Exact dedup + sort: slot 1 once, 2 and 3 retained, ascending order. + assert_eq!(entry.storage_reads, vec![U256::from(1), U256::from(2), U256::from(3)]); + } + + #[test] + fn test_access_list_up_to_skips_missing_and_returns_none_when_all_missing() { + // Covers two behaviors: a missing middle flashblock is skipped (not + // treated as an abort), and if ALL flashblocks lack access list data + // the aggregated output is `None`. + let factory = TestFlashBlockFactory::new(); + let ac0 = make_account_changes(ADDR_A, vec![(0, 100)], vec![]); + let fb0 = factory.flashblock_at(0).access_list(Some(vec![ac0])).build(); + let fb1 = factory.flashblock_after(&fb0).access_list(None).build(); + + let mut cache = TestRawCache::new(false); + cache.handle_flashblock(wrap(fb0)).expect("fb0 insert"); + cache.handle_flashblock(wrap(fb1)).expect("fb1 insert"); + + // Partial data preserved with exact value survived fb1's absence. + let args = cache.try_get_buildable_args(100).expect("should be buildable"); + let al = args.access_list.expect("fb0 data preserved despite fb1 having None"); + assert_eq!(al.len(), 1); + assert_eq!(al[0].address, ADDR_A); + assert_eq!(al[0].balance_changes.len(), 1); + assert_eq!(al[0].balance_changes[0].block_access_index, 0); + assert_eq!(al[0].balance_changes[0].post_balance, U256::from(100)); + + // Swap in a cache where both flashblocks lack access list data. + let fb0_empty = factory.flashblock_at(0).access_list(None).build(); + let fb1_empty = factory.flashblock_after(&fb0_empty).access_list(None).build(); + let mut empty_cache = TestRawCache::new(false); + empty_cache.handle_flashblock(wrap(fb0_empty)).expect("fb0 insert"); + empty_cache.handle_flashblock(wrap(fb1_empty)).expect("fb1 insert"); + let args = empty_cache.try_get_buildable_args(100).expect("should be buildable"); + assert!(args.access_list.is_none(), "no data at all → None"); + } + + #[test] + fn test_buildable_args_disable_flag_strips_access_list() { + // With the disable flag set, even a flashblock carrying access list data + // must produce `BuildArgs.access_list == None`. + let factory = TestFlashBlockFactory::new(); + let ac = make_account_changes(ADDR_A, vec![(0, 100)], vec![]); + let fb0 = factory.flashblock_at(0).access_list(Some(vec![ac])).build(); + + let mut cache_enabled = TestRawCache::new(false); + cache_enabled.handle_flashblock(wrap(fb0.clone())).expect("fb0 insert"); + let args = cache_enabled.try_get_buildable_args(100).expect("should be buildable"); + let al = args.access_list.expect("flag off + data → access list surfaced"); + assert_eq!(al.len(), 1); + assert_eq!(al[0].address, ADDR_A); + assert_eq!(al[0].balance_changes.len(), 1); + assert_eq!(al[0].balance_changes[0].block_access_index, 0); + assert_eq!(al[0].balance_changes[0].post_balance, U256::from(100)); + + let mut cache_disabled = TestRawCache::new(true); + cache_disabled.handle_flashblock(wrap(fb0)).expect("fb0 insert"); + let args = cache_disabled.try_get_buildable_args(100).expect("should be buildable"); + assert!(args.access_list.is_none(), "flag on → access list stripped"); + } + + // ======================================================================== + // Verification tests for `access_list_up_to` against reth's + // `bal_to_hashed_post_state` consumer contract: + // * per-tx-index vectors: `.last()` returns the entry with the largest + // `block_access_index`. + // * per-slot final value: linear iter + insert keyed by slot + // (last-iteration-wins) yields the actual final post-state. + // ======================================================================== + + use alloy_eip7928::{CodeChange, NonceChange, SlotChanges, StorageChange}; + use alloy_primitives::Bytes; + use std::collections::HashMap; + + /// Slot/value list per slot used by [`make_account_changes_full`]. + type SlotChangeSpec = (U256, Vec<(u64, U256)>); + + /// Richer counterpart to [`make_account_changes`] — accepts nonce and + /// storage in addition to balance. Per-tx-index vectors are emitted + /// already sorted ascending by `block_access_index` to mirror what the + /// builder produces on the wire post `f699366`. + fn make_account_changes_full( + addr: Address, + balance_changes: Vec<(u64, U256)>, + nonce_changes: Vec<(u64, u64)>, + storage_changes: Vec, + ) -> AccountChanges { + let mut bc: Vec = balance_changes + .into_iter() + .map(|(tx_idx, val)| BalanceChange { block_access_index: tx_idx, post_balance: val }) + .collect(); + bc.sort_unstable_by_key(|c| c.block_access_index); + + let mut nc: Vec = nonce_changes + .into_iter() + .map(|(tx_idx, val)| NonceChange { block_access_index: tx_idx, new_nonce: val }) + .collect(); + nc.sort_unstable_by_key(|c| c.block_access_index); + + let storage_changes: Vec = storage_changes + .into_iter() + .map(|(slot, changes)| { + let mut changes: Vec = changes + .into_iter() + .map(|(tx_idx, val)| StorageChange { + block_access_index: tx_idx, + new_value: val, + }) + .collect(); + changes.sort_unstable_by_key(|c| c.block_access_index); + SlotChanges { slot, changes } + }) + .collect(); + + AccountChanges { + address: addr, + balance_changes: bc, + nonce_changes: nc, + storage_changes, + ..Default::default() + } + } + + /// Mimics reth's `bal_to_hashed_post_state` per-slot extraction: iterate + /// outer `storage_changes` linearly, insert keyed by slot, last wins. + fn extract_final_storage(ac: &AccountChanges) -> HashMap { + let mut out = HashMap::new(); + for sc in &ac.storage_changes { + if let Some(last) = sc.changes.last() { + out.insert(sc.slot, last.new_value); + } + } + out + } + + fn aggregate_for(addr: Address, fbs: Vec) -> AccountChanges { + let mut cache = TestRawCache::new(false); + for fb in fbs { + cache.handle_flashblock(wrap(fb)).expect("flashblock insert"); + } + cache + .try_get_buildable_args(100) + .expect("buildable") + .access_list + .expect("access list present") + .into_iter() + .find(|ac| ac.address == addr) + .expect("address present in aggregated access list") + } + + #[test] + fn last_balance_change_resolves_across_three_flashblocks() { + let factory = TestFlashBlockFactory::new(); + let mk = |bal: Vec<(u64, U256)>| make_account_changes_full(ADDR_A, bal, vec![], vec![]); + let fb0 = factory + .flashblock_at(0) + .access_list(Some(vec![mk(vec![(2, U256::from(20)), (5, U256::from(50))])])) + .build(); + let fb1 = factory + .flashblock_after(&fb0) + .access_list(Some(vec![mk(vec![(8, U256::from(80)), (11, U256::from(110))])])) + .build(); + let fb2 = factory + .flashblock_after(&fb1) + .access_list(Some(vec![mk(vec![(15, U256::from(150)), (20, U256::from(200))])])) + .build(); + + let merged = aggregate_for(ADDR_A, vec![fb0, fb1, fb2]); + + // .last() must return tx_idx 20's value (the global max). + assert_eq!(merged.balance_changes.last().unwrap().post_balance, U256::from(200)); + } + + #[test] + fn last_nonce_change_resolves_across_three_flashblocks() { + let factory = TestFlashBlockFactory::new(); + let mk = |n: Vec<(u64, u64)>| make_account_changes_full(ADDR_A, vec![], n, vec![]); + let fb0 = + factory.flashblock_at(0).access_list(Some(vec![mk(vec![(1, 1), (3, 2)])])).build(); + let fb1 = factory + .flashblock_after(&fb0) + .access_list(Some(vec![mk(vec![(7, 3), (9, 4)])])) + .build(); + let fb2 = factory + .flashblock_after(&fb1) + .access_list(Some(vec![mk(vec![(12, 5), (18, 6)])])) + .build(); + + let merged = aggregate_for(ADDR_A, vec![fb0, fb1, fb2]); + + assert_eq!(merged.nonce_changes.last().unwrap().new_nonce, 6); + } + + #[test] + fn same_slot_modified_in_every_flashblock_resolves_to_latest_write() { + // Same slot in every FB → outer vec carries 3 SlotChanges entries + // for that slot. Last-iteration-wins must converge to FB2's tx 20. + let factory = TestFlashBlockFactory::new(); + let slot = U256::from(0x1); + let mk = |c: Vec<(u64, U256)>| { + make_account_changes_full(ADDR_A, vec![], vec![], vec![(slot, c)]) + }; + let fb0 = factory + .flashblock_at(0) + .access_list(Some(vec![mk(vec![(2, U256::from(200)), (4, U256::from(400))])])) + .build(); + let fb1 = factory + .flashblock_after(&fb0) + .access_list(Some(vec![mk(vec![(8, U256::from(800)), (10, U256::from(1000))])])) + .build(); + let fb2 = factory + .flashblock_after(&fb1) + .access_list(Some(vec![mk(vec![(15, U256::from(1500)), (20, U256::from(2000))])])) + .build(); + + let merged = aggregate_for(ADDR_A, vec![fb0, fb1, fb2]); + + // Upstream `StorageBal::extend` is `BTreeMap`-backed and merges per-slot + // writes — one `SlotChanges` entry per slot, regardless of how many + // flashblocks touched it. + assert_eq!(merged.storage_changes.len(), 1, "duplicate slots are deduped on aggregation"); + let sc = &merged.storage_changes[0]; + assert_eq!(sc.slot, slot); + // All 6 writes (2 per FB across 3 FBs) are flattened into one sorted vec. + assert_eq!(sc.changes.len(), 6); + assert_eq!(sc.changes.last().unwrap().new_value, U256::from(2000)); + assert_eq!(extract_final_storage(&merged).get(&slot).copied(), Some(U256::from(2000))); + } + + #[test] + fn slot_modified_with_gap_resolves_to_latest_touching_flashblock() { + // Slot in FB0 and FB2 only; FB1 touches the address via balance. + let factory = TestFlashBlockFactory::new(); + let slot = U256::from(0x1); + let fb0 = factory + .flashblock_at(0) + .access_list(Some(vec![make_account_changes_full( + ADDR_A, + vec![], + vec![], + vec![(slot, vec![(3, U256::from(100))])], + )])) + .build(); + let fb1 = factory + .flashblock_after(&fb0) + .access_list(Some(vec![make_account_changes_full( + ADDR_A, + vec![(7, U256::from(7777))], + vec![], + vec![], + )])) + .build(); + let fb2 = factory + .flashblock_after(&fb1) + .access_list(Some(vec![make_account_changes_full( + ADDR_A, + vec![], + vec![], + vec![(slot, vec![(13, U256::from(999))])], + )])) + .build(); + + let merged = aggregate_for(ADDR_A, vec![fb0, fb1, fb2]); + + // FB2 (tx 13) wins for storage; FB1 (tx 7) wins for balance. + assert_eq!(extract_final_storage(&merged).get(&slot).copied(), Some(U256::from(999))); + assert_eq!(merged.balance_changes.last().unwrap().post_balance, U256::from(7777)); + } + + #[test] + fn multi_slot_each_resolves_independently_across_flashblocks() { + // s1: FB0(tx2) + FB2(tx14) → FB2 wins + // s2: FB1 only (tx6, tx8) → FB1 tx8 wins + // s3: FB0(tx4) + FB1(tx7) + FB2(tx12) → FB2 wins + let factory = TestFlashBlockFactory::new(); + let (s1, s2, s3) = (U256::from(0x1), U256::from(0x2), U256::from(0x3)); + let mk = |c: Vec| make_account_changes_full(ADDR_A, vec![], vec![], c); + let fb0 = factory + .flashblock_at(0) + .access_list(Some(vec![mk(vec![ + (s1, vec![(2, U256::from(11))]), + (s3, vec![(4, U256::from(33))]), + ])])) + .build(); + let fb1 = factory + .flashblock_after(&fb0) + .access_list(Some(vec![mk(vec![ + (s2, vec![(6, U256::from(60)), (8, U256::from(80))]), + (s3, vec![(7, U256::from(70))]), + ])])) + .build(); + let fb2 = factory + .flashblock_after(&fb1) + .access_list(Some(vec![mk(vec![ + (s1, vec![(14, U256::from(140))]), + (s3, vec![(12, U256::from(120))]), + ])])) + .build(); + + let storage = extract_final_storage(&aggregate_for(ADDR_A, vec![fb0, fb1, fb2])); + + assert_eq!(storage.get(&s1).copied(), Some(U256::from(140))); + assert_eq!(storage.get(&s2).copied(), Some(U256::from(80))); + assert_eq!(storage.get(&s3).copied(), Some(U256::from(120))); + } + + #[test] + fn aggregator_storage_read_then_write_across_flashblocks_promotes_to_changes() { + // FB0 reads `slot` (no writes); FB1 writes `slot` at tx 5. + // Aggregated output: `slot` lives in storage_changes, not storage_reads. + let factory = TestFlashBlockFactory::new(); + let slot = U256::from(0x42); + let fb0 = factory + .flashblock_at(0) + .access_list(Some(vec![make_account_changes(ADDR_A, vec![], vec![0x42])])) + .build(); + let fb1 = factory + .flashblock_after(&fb0) + .access_list(Some(vec![make_account_changes_full( + ADDR_A, + vec![], + vec![], + vec![(slot, vec![(5, U256::from(100))])], + )])) + .build(); + + let merged = aggregate_for(ADDR_A, vec![fb0, fb1]); + + assert!( + merged.storage_reads.is_empty(), + "later write must promote slot out of storage_reads" + ); + assert_eq!(merged.storage_changes.len(), 1, "exactly one slot in storage_changes"); + let sc = &merged.storage_changes[0]; + assert_eq!(sc.slot, slot); + assert_eq!(sc.changes.len(), 1, "single write at tx 5 survives"); + assert_eq!(sc.changes[0].block_access_index, 5); + assert_eq!(sc.changes[0].new_value, U256::from(100)); + } + + #[test] + fn aggregator_storage_write_then_read_across_flashblocks_stays_as_changes() { + // FB0 writes `slot` at tx 3; FB1 reads `slot` (no writes). + // The later read must NOT demote the earlier write. + let factory = TestFlashBlockFactory::new(); + let slot = U256::from(0x99); + let fb0 = factory + .flashblock_at(0) + .access_list(Some(vec![make_account_changes_full( + ADDR_A, + vec![], + vec![], + vec![(slot, vec![(3, U256::from(777))])], + )])) + .build(); + let fb1 = factory + .flashblock_after(&fb0) + .access_list(Some(vec![make_account_changes(ADDR_A, vec![], vec![0x99])])) + .build(); + + let merged = aggregate_for(ADDR_A, vec![fb0, fb1]); + + assert!( + merged.storage_reads.is_empty(), + "slot already has writes — must not appear as a bare read", + ); + assert_eq!(merged.storage_changes.len(), 1); + let sc = &merged.storage_changes[0]; + assert_eq!(sc.slot, slot); + assert_eq!(sc.changes.len(), 1, "the FB0 write must survive across the FB1 read"); + assert_eq!(sc.changes[0].block_access_index, 3); + assert_eq!(sc.changes[0].new_value, U256::from(777)); + } + + #[test] + fn aggregator_code_changes_merge_across_flashblocks_with_last_extraction() { + // FB0 deploys code at tx 2; FB1 redeploys at tx 8 (e.g. CREATE2 reuse). + // After aggregation, code_changes contains both entries, ordered with + // FB0's write first and FB1's last — matching `.last()` semantics that + // reth's BAL→hashed-state consumer relies on. + let factory = TestFlashBlockFactory::new(); + let code_v1 = Bytes::from(vec![0x60, 0x00]); + let code_v2 = Bytes::from(vec![0x61, 0xff, 0xff]); + + let mk = |bai: u64, code: Bytes| AccountChanges { + address: ADDR_A, + code_changes: vec![CodeChange { block_access_index: bai, new_code: code }], + ..Default::default() + }; + + let fb0 = factory.flashblock_at(0).access_list(Some(vec![mk(2, code_v1.clone())])).build(); + let fb1 = + factory.flashblock_after(&fb0).access_list(Some(vec![mk(8, code_v2.clone())])).build(); + + let merged = aggregate_for(ADDR_A, vec![fb0, fb1]); + + assert_eq!(merged.code_changes.len(), 2, "both flashblocks' code writes survive merge"); + assert_eq!(merged.code_changes[0].block_access_index, 2); + assert_eq!(merged.code_changes[0].new_code, code_v1); + let last = merged.code_changes.last().expect("non-empty"); + assert_eq!(last.block_access_index, 8, ".last() must yield the higher index"); + assert_eq!(last.new_code, code_v2); + } } diff --git a/crates/flashblocks/src/execution/mod.rs b/crates/flashblocks/src/execution/mod.rs index 681afc3b..fee2595e 100644 --- a/crates/flashblocks/src/execution/mod.rs +++ b/crates/flashblocks/src/execution/mod.rs @@ -5,6 +5,7 @@ pub mod validator; pub use engine::{XLayerEngineValidator, XLayerEngineValidatorBuilder}; pub use validator::FlashblockSequenceValidator; +use alloy_eip7928::BlockAccessList; use alloy_eips::eip4895::Withdrawal; use alloy_rpc_types_engine::PayloadId; use op_alloy_rpc_types_engine::OpFlashblockPayloadBase; @@ -20,6 +21,7 @@ pub struct BuildArgs { pub base: OpFlashblockPayloadBase, pub payload_id: PayloadId, pub transactions: I, + pub access_list: Option, pub withdrawals: Vec, pub last_flashblock_index: u64, pub target_index: u64, diff --git a/crates/flashblocks/src/execution/validator.rs b/crates/flashblocks/src/execution/validator.rs index 585c2576..a6d62d71 100644 --- a/crates/flashblocks/src/execution/validator.rs +++ b/crates/flashblocks/src/execution/validator.rs @@ -223,9 +223,6 @@ where withdrawals: Some(args.withdrawals), }; - // TODO: Extract the BAL once flashblocks BAL is supported - let bal = None; - let overlay_factory = calculate_state_root.then(|| { // Create lazy overlay from ancestors - this doesn't block, allowing execution to start // before the trie data is ready. The overlay will be computed on first access. @@ -249,7 +246,7 @@ where provider_builder, overlay_factory.clone(), strategy, - bal, + args.access_list.map(Arc::new), )?; // Use cached state provider before executing, used in execution after prewarming threads @@ -871,7 +868,28 @@ where provider_builder, overlay_factory.expect("overlay factory must exist for StateRootTask strategy"), &self.tree_config, - bal, + // TODO: BAL is intentionally NOT fed into the StateRootTask pipeline — pass `None` + // until upstream BAL+sparse-trie integration stabilizes. + // + // Why: combining BAL with the sparse-trie task is still WIP upstream and the + // currently-released path (reth v2.1.0 / PRs #23393 + #23423) regresses on small + // and large blocks: + // - Empty/small blocks below `SMALL_BLOCK_TX_THRESHOLD`: PR #23393 disables the + // per-tx state hook whenever BAL is present, but the prewarm task only emits + // `FinishedStateUpdates` on the BAL-prewarm branch. Small blocks take the + // skip-prewarm fallback → no terminator is sent → sparse trie hits the 1s + // timeout. Fixed upstream by PR #23833 (gates state-hook removal on whether + // BAL prewarm actually runs). + // - Heavy blocks: PR #23423's `par_iter().for_each_init(...)` streaming sends + // one `HashedStateUpdate` per `AccountChanges`, fragmenting the channel and + // under-utilizing proof workers via the chunk-size=5 dispatch path. + // + // The tx-based prewarm path (BAL=None) is the well-tested upstream flow and is + // what we run in production today. Re-enable BAL here once the upstream WIP + // lands and we verify both modes on devnet stress runs. + // + // Upstream WIP branch: https://github.com/paradigmxyz/reth/tree/bal-devnet-3 + None, )) } StateRootStrategy::Parallel | StateRootStrategy::Synchronous => Ok(self diff --git a/crates/flashblocks/src/lib.rs b/crates/flashblocks/src/lib.rs index 196abb85..52f6ad59 100644 --- a/crates/flashblocks/src/lib.rs +++ b/crates/flashblocks/src/lib.rs @@ -20,10 +20,9 @@ pub use execution::{ }; pub use service::{FlashblocksPersistCtx, FlashblocksRpcCtx, FlashblocksRpcService}; pub use subscription::FlashblocksPubSub; -pub use ws::WsFlashBlockStream; +pub use ws::{WsFlashBlockStream, WsFrame}; use std::sync::Arc; -use xlayer_builder::broadcast::XLayerFlashblockMessage; pub type PendingSequenceRx = tokio::sync::watch::Receiver>>; -pub type ReceivedFlashblocksRx = tokio::sync::broadcast::Receiver>; +pub type ReceivedFlashblocksRx = tokio::sync::broadcast::Receiver>; diff --git a/crates/flashblocks/src/persist.rs b/crates/flashblocks/src/persist.rs index db7bff1f..78ff0f49 100644 --- a/crates/flashblocks/src/persist.rs +++ b/crates/flashblocks/src/persist.rs @@ -21,8 +21,8 @@ pub async fn handle_persistence(mut rx: ReceivedFlashblocksRx, datadir: ChainPat tokio::select! { result = rx.recv() => { match result { - Ok(fb_payload) => { - if let XLayerFlashblockMessage::Payload(payload) = &*fb_payload { + Ok(frame) => { + if let XLayerFlashblockMessage::Payload(payload) = frame.decoded.as_ref() { cache.add_flashblock_payload(payload.inner.clone()); } dirty = true; @@ -65,8 +65,8 @@ pub async fn handle_relay_flashblocks( loop { match rx.recv().await { - Ok(flashblock) => { - if let Err(e) = ws_pub.publish(&flashblock) { + Ok(frame) => { + if let Err(e) = ws_pub.publish(frame.bytes.clone(), &frame.decoded) { warn!( target: "flashblocks", "Failed to relay flashblock to websocket subscribers: {:?}", e diff --git a/crates/flashblocks/src/service.rs b/crates/flashblocks/src/service.rs index 3b5de3fe..7632c1ce 100644 --- a/crates/flashblocks/src/service.rs +++ b/crates/flashblocks/src/service.rs @@ -3,7 +3,7 @@ use crate::{ execution::{FlashblockReceipt, OverlayProviderFactory}, persist::{handle_persistence, handle_relay_flashblocks}, state::{handle_canonical_stream, handle_execution_tasks, handle_incoming_flashblocks}, - ReceivedFlashblocksRx, XLayerEngineValidator, + ReceivedFlashblocksRx, WsFrame, XLayerEngineValidator, }; use futures_util::Stream; use std::{net::SocketAddr, sync::Arc}; @@ -25,7 +25,7 @@ use reth_tasks::TaskExecutor; use xlayer_builder::{ args::FlashblocksArgs, - broadcast::{WebSocketPublisher, XLayerFlashblockMessage}, + broadcast::WebSocketPublisher, metrics::{tokio::FlashblocksTaskMetrics, BuilderMetrics}, }; @@ -35,6 +35,8 @@ pub struct FlashblocksRpcCtx { pub canon_state_rx: CanonStateNotificationStream, /// Flashblocks RPC debug mode to enable state comparison. pub debug_state_comparison: bool, + /// Flag to disable EIP-7928 flashblocks access lists. + pub disable_access_list: bool, } /// Context for handling flashblocks persistence and relaying. @@ -58,7 +60,7 @@ where /// Task executor. task_executor: TaskExecutor, /// Broadcast channel to forward received flashblocks from the subscription. - received_flashblocks_tx: Sender>, + received_flashblocks_tx: Sender>, } impl FlashblocksRpcService @@ -133,7 +135,7 @@ where engine_validator: XLayerEngineValidator, incoming_rx: S, ) where - S: Stream> + Unpin + Send + 'static, + S: Stream> + Unpin + Send + 'static, EvmConfig: ConfigureEvm< Primitives = N, NextBlockEnvCtx: From + Unpin + Send, @@ -153,7 +155,7 @@ where V: Send + Sync + 'static, { debug!(target: "flashblocks", "Initializing flashblocks rpc"); - let raw_cache = Arc::new(RawFlashblocksCache::new()); + let raw_cache = Arc::new(RawFlashblocksCache::new(self.rpc_ctx.disable_access_list)); // Spawn incoming raw flashblocks handle. let received_tx = self.received_flashblocks_tx.clone(); diff --git a/crates/flashblocks/src/state.rs b/crates/flashblocks/src/state.rs index c0639d89..3107731e 100644 --- a/crates/flashblocks/src/state.rs +++ b/crates/flashblocks/src/state.rs @@ -2,6 +2,7 @@ use crate::{ cache::{ExecutionTaskQueue, RawFlashblocksCache}, debug::debug_compare_flashblocks_bundle_states, execution::{FlashblockReceipt, OverlayProviderFactory}, + ws::WsFrame, FlashblockStateCache, XLayerEngineValidator, }; use futures_util::{FutureExt, Stream, StreamExt}; @@ -22,25 +23,23 @@ use reth_provider::{ BlockReader, HashedPostStateProvider, HeaderProvider, StateProviderFactory, StateReader, }; -use xlayer_builder::broadcast::XLayerFlashblockMessage; - const CONNECTION_BACKOUT_PERIOD: Duration = Duration::from_secs(5); pub async fn handle_incoming_flashblocks( mut incoming_rx: S, - received_tx: Sender>, + received_tx: Sender>, raw_cache: Arc>, task_queue: ExecutionTaskQueue, ) where - S: Stream> + Unpin + Send + 'static, + S: Stream> + Unpin + Send + 'static, N: NodePrimitives, { info!(target: "flashblocks", "Flashblocks raw handle started"); loop { match incoming_rx.next().await { - Some(Ok(message)) => { + Some(Ok(frame)) => { if let Err(err) = - process_flashblock_payload::(message, &received_tx, &raw_cache, &task_queue) + process_flashblock_payload::(frame, &received_tx, &raw_cache, &task_queue) { debug!( target: "flashblocks", @@ -53,9 +52,9 @@ pub async fn handle_incoming_flashblocks( // Batch process all other immediately available flashblocks while let Some(result) = incoming_rx.next().now_or_never().flatten() { match result { - Ok(message) => { + Ok(frame) => { if let Err(err) = process_flashblock_payload::( - message, + frame, &received_tx, &raw_cache, &task_queue, @@ -93,16 +92,17 @@ pub async fn handle_incoming_flashblocks( } fn process_flashblock_payload( - message: XLayerFlashblockMessage, - received_tx: &tokio::sync::broadcast::Sender>, + frame: WsFrame, + received_tx: &tokio::sync::broadcast::Sender>, raw_cache: &RawFlashblocksCache, task_queue: &ExecutionTaskQueue, ) -> eyre::Result<()> { + let decoded = (*frame.decoded).clone(); if received_tx.receiver_count() > 0 { - let _ = received_tx.send(Arc::new(message.clone())); + let _ = received_tx.send(Arc::new(frame)); } // Insert into raw cache and enqueue to execution tasks - task_queue.insert(raw_cache.handle_message(message)?); + task_queue.insert(raw_cache.handle_message(decoded)?); Ok(()) } diff --git a/crates/flashblocks/src/test_utils.rs b/crates/flashblocks/src/test_utils.rs index 3224092c..6eb97a55 100644 --- a/crates/flashblocks/src/test_utils.rs +++ b/crates/flashblocks/src/test_utils.rs @@ -1,6 +1,7 @@ use std::{collections::HashMap, sync::Arc, time::Instant}; use alloy_consensus::{Header, Receipt, TxEip7702}; +use alloy_eip7928::BlockAccessList; use alloy_primitives::{Address, Bloom, Bytes, Signature, TxHash, B256, U256}; use alloy_rpc_types_engine::PayloadId; use op_alloy_consensus::OpTypedTransaction; @@ -173,6 +174,7 @@ impl TestFlashBlockFactory { withdrawals: vec![], withdrawals_root: B256::ZERO, blob_gas_used: None, + access_list: None, } } } @@ -194,6 +196,7 @@ pub(crate) struct TestFlashBlockBuilder { withdrawals: Vec, withdrawals_root: B256, blob_gas_used: Option, + access_list: Option, } impl TestFlashBlockBuilder { @@ -258,6 +261,11 @@ impl TestFlashBlockBuilder { self } + pub(crate) fn access_list(mut self, access_list: Option) -> Self { + self.access_list = access_list; + self + } + pub(crate) fn build(mut self) -> OpFlashblockPayload { // Auto-create base for index 0 if not set if self.index == 0 && self.base.is_none() { @@ -289,12 +297,12 @@ impl TestFlashBlockBuilder { withdrawals_root: self.withdrawals_root, blob_gas_used: self.blob_gas_used, }, - metadata: OpFlashblockPayloadMetadata { - block_number: self.block_number, - receipts: Default::default(), - new_account_balances: Default::default(), - access_list: None, - }, + metadata: OpFlashblockPayloadMetadata::new( + self.block_number, + Some(Default::default()), + Some(Default::default()), + self.access_list, + ), } } } diff --git a/crates/flashblocks/src/ws/decoding.rs b/crates/flashblocks/src/ws/decoding.rs deleted file mode 100644 index 96a89b6d..00000000 --- a/crates/flashblocks/src/ws/decoding.rs +++ /dev/null @@ -1,42 +0,0 @@ -use alloy_primitives::bytes::Bytes; -use std::io; - -use xlayer_builder::broadcast::XLayerFlashblockMessage; - -/// A trait for decoding flashblocks from bytes. -pub trait FlashBlockDecoder: Send + 'static { - /// Decodes `bytes` into an [`XLayerFlashblockMessage`]. - fn decode(&self, bytes: Bytes) -> eyre::Result; -} - -/// Default implementation of the decoder. -impl FlashBlockDecoder for () { - fn decode(&self, bytes: Bytes) -> eyre::Result { - decode_flashblock(bytes) - } -} - -pub(crate) fn decode_flashblock(bytes: Bytes) -> eyre::Result { - let bytes = crate::ws::decoding::try_parse_message(bytes)?; - let payload: XLayerFlashblockMessage = - serde_json::from_slice(&bytes).map_err(|e| eyre::eyre!("failed to parse message: {e}"))?; - Ok(payload) -} - -/// Maps `bytes` into a potentially different [`Bytes`]. -/// -/// If the bytes start with a "{" character, prepended by any number of ASCII-whitespaces, -/// then it assumes that it is JSON-encoded and returns it as-is. -/// -/// Otherwise, the `bytes` are passed through a brotli decompressor and returned. -fn try_parse_message(bytes: Bytes) -> eyre::Result { - if bytes.trim_ascii_start().starts_with(b"{") { - return Ok(bytes); - } - - let mut decompressor = brotli::Decompressor::new(bytes.as_ref(), 4096); - let mut decompressed = Vec::new(); - io::copy(&mut decompressor, &mut decompressed)?; - - Ok(decompressed.into()) -} diff --git a/crates/flashblocks/src/ws/frame.rs b/crates/flashblocks/src/ws/frame.rs new file mode 100644 index 00000000..be7b8538 --- /dev/null +++ b/crates/flashblocks/src/ws/frame.rs @@ -0,0 +1,36 @@ +//! Encode-once carrier for the RPC-side WS receive pipeline. +//! +//! The WS stream emits [`WsFrame`] items, each carrying both the original wire +//! bytes and the decoded [`XLayerFlashblockMessage`]. Internal consumers +//! (validator pipeline, persistence) read `decoded`; the relay-to-downstream +//! WS subscribers path passes `bytes` straight through to +//! [`xlayer_builder::broadcast::WebSocketPublisher::publish`] without +//! re-encoding. + +use std::{io, sync::Arc}; + +use alloy_primitives::Bytes; +use xlayer_builder::broadcast::{decode, Message, XLayerFlashblockMessage}; + +/// A received WS frame: the wire bytes paired with the decoded message. +#[derive(Clone, Debug)] +pub struct WsFrame { + /// The original wire bytes (compressed or legacy uncompressed JSON). + /// Use these when forwarding to downstream WS subscribers via + /// [`xlayer_builder::broadcast::WebSocketPublisher::publish`]. + pub bytes: Bytes, + /// The decoded structured message. + pub decoded: Arc, +} + +impl WsFrame { + /// Decode wire bytes into a complete frame. + pub fn from_bytes(bytes: Bytes) -> io::Result { + let outer: Message = decode(&bytes)?; + if let Message::OpFlashblockPayload(inner) = outer { + Ok(Self { bytes, decoded: Arc::new(inner) }) + } else { + Err(io::Error::new(io::ErrorKind::InvalidData, "invalid message type on the WS pipe")) + } + } +} diff --git a/crates/flashblocks/src/ws/mod.rs b/crates/flashblocks/src/ws/mod.rs index 3a69d13f..608091b5 100644 --- a/crates/flashblocks/src/ws/mod.rs +++ b/crates/flashblocks/src/ws/mod.rs @@ -1,5 +1,5 @@ -mod decoding; +mod frame; mod stream; -pub use decoding::FlashBlockDecoder; +pub use frame::WsFrame; pub use stream::WsFlashBlockStream; diff --git a/crates/flashblocks/src/ws/stream.rs b/crates/flashblocks/src/ws/stream.rs index 6f09d684..19b5caac 100644 --- a/crates/flashblocks/src/ws/stream.rs +++ b/crates/flashblocks/src/ws/stream.rs @@ -1,4 +1,4 @@ -use crate::ws::FlashBlockDecoder; +use crate::ws::WsFrame; use futures_util::{ stream::{SplitSink, SplitStream}, FutureExt, Sink, Stream, StreamExt, @@ -18,8 +18,6 @@ use tokio_tungstenite::{ use tracing::debug; use url::Url; -use xlayer_builder::broadcast::XLayerFlashblockMessage; - /// An asynchronous stream of [`XLayerFlashblockMessage`] from a websocket connection. /// /// The stream attempts to connect to a websocket URL and then decode each received item. @@ -30,7 +28,6 @@ pub struct WsFlashBlockStream { ws_url: Url, state: State, connector: Connector, - decoder: Box, connect: ConnectFuture, stream: Option, sink: Option, @@ -43,17 +40,11 @@ impl WsFlashBlockStream { ws_url, state: State::default(), connector: WsConnector, - decoder: Box::new(()), connect: Box::pin(async move { Err(Error::ConnectionClosed)? }), stream: None, sink: None, } } - - /// Sets the [`XLayerFlashblockMessage`] decoder for the websocket stream. - pub fn with_decoder(self, decoder: Box) -> Self { - Self { decoder, ..self } - } } impl WsFlashBlockStream { @@ -62,7 +53,6 @@ impl WsFlashBlockStream { Self { ws_url, state: State::default(), - decoder: Box::new(()), connector, connect: Box::pin(async move { Err(Error::ConnectionClosed)? }), stream: None, @@ -77,7 +67,7 @@ where S: Sink + Send + Unpin, C: WsConnect + Clone + Send + 'static + Unpin, { - type Item = eyre::Result; + type Item = eyre::Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); @@ -121,10 +111,18 @@ where match msg { Ok(Message::Binary(bytes)) => { - return Poll::Ready(Some(this.decoder.decode(bytes))); + let bytes = alloy_primitives::Bytes::from(bytes); + return Poll::Ready(Some( + WsFrame::from_bytes(bytes) + .map_err(|e| eyre::eyre!("failed to decode WS frame: {e}")), + )); } Ok(Message::Text(bytes)) => { - return Poll::Ready(Some(this.decoder.decode(bytes.into()))); + let bytes = alloy_primitives::Bytes::copy_from_slice(bytes.as_bytes()); + return Poll::Ready(Some( + WsFrame::from_bytes(bytes) + .map_err(|e| eyre::eyre!("failed to decode WS frame: {e}")), + )); } Ok(Message::Ping(bytes)) => this.ping(bytes), Ok(Message::Close(frame)) => this.close(frame), @@ -425,6 +423,15 @@ mod tests { } } + /// Wraps a flashblock payload as `XLayerFlashblockMessage::Payload` for the wire format. + fn wrap_msg(block: &OpFlashblockPayload) -> xlayer_builder::broadcast::Message { + xlayer_builder::broadcast::Message::from_flashblock_payload( + xlayer_builder::broadcast::XLayerFlashblockMessage::from_flashblock_payload( + xlayer_builder::broadcast::XLayerFlashblockPayload::new(block.clone(), 0), + ), + ) + } + fn to_json_message, F: Fn(B) -> Message>( wrapper_f: F, ) -> impl Fn(&OpFlashblockPayload) -> Result + use { @@ -439,11 +446,13 @@ mod tests { block: &OpFlashblockPayload, wrapper_f: F, ) -> Result { - Ok(wrapper_f(B::try_from(Bytes::from(serde_json::to_vec(block).unwrap())).unwrap())) + Ok(wrapper_f( + B::try_from(Bytes::from(serde_json::to_vec(&wrap_msg(block)).unwrap())).unwrap(), + )) } fn to_brotli_message(block: &OpFlashblockPayload) -> Result { - let json = serde_json::to_vec(block).unwrap(); + let json = serde_json::to_vec(&wrap_msg(block)).unwrap(); let mut compressed = Vec::new(); brotli::BrotliCompress( &mut json.as_slice(), @@ -470,10 +479,10 @@ mod tests { let ws_url = "http://localhost".parse().unwrap(); let stream = WsFlashBlockStream::with_connector(ws_url, connector); - let actual_messages: Vec<_> = stream.take(1).map(Result::unwrap).collect().await; + let actual_frames: Vec<_> = stream.take(1).map(Result::unwrap).collect().await; let expected_messages = flashblocks.to_vec(); let actual_messages: Vec<_> = - actual_messages.iter().map(|m| &m.as_payload().unwrap().inner).collect(); + actual_frames.iter().map(|frame| &frame.decoded.as_payload().unwrap().inner).collect(); let expected_messages: Vec<_> = expected_messages.iter().collect(); assert_eq!(actual_messages, expected_messages); } @@ -488,10 +497,10 @@ mod tests { let mut stream = WsFlashBlockStream::with_connector(ws_url, connector); let expected_message = flashblock; - let actual_message = + let actual_frame = stream.next().await.expect("Binary message should not be ignored").unwrap(); - assert_eq!(actual_message.as_payload().unwrap().inner, expected_message) + assert_eq!(actual_frame.decoded.as_payload().unwrap().inner, expected_message) } #[tokio::test]