From 5f4f6ed9aabe7880206d46182ba0cdcf330369a4 Mon Sep 17 00:00:00 2001 From: vlad Date: Wed, 27 May 2026 07:52:32 +0100 Subject: [PATCH 1/2] some fixes --- crates/common/src/spine/messages.rs | 1 + crates/network/src/tile.rs | 5 +- crates/peer/src/manager.rs | 101 ++++++++++---------- crates/peer/src/manager/rpc.rs | 138 ++++++++++++---------------- crates/peer/src/state.rs | 14 +++ 5 files changed, 131 insertions(+), 128 deletions(-) diff --git a/crates/common/src/spine/messages.rs b/crates/common/src/spine/messages.rs index f045644..32276f8 100644 --- a/crates/common/src/spine/messages.rs +++ b/crates/common/src/spine/messages.rs @@ -191,6 +191,7 @@ pub enum PeerEvent { }, P2pDisconnect { p2p_peer: usize, + peer_id: PeerId, }, P2pCannotCreateStream { p2p_peer: usize, diff --git a/crates/network/src/tile.rs b/crates/network/src/tile.rs index 3b0fe0d..de622f4 100644 --- a/crates/network/src/tile.rs +++ b/crates/network/src/tile.rs @@ -114,7 +114,10 @@ impl Tile for NetworkTile { adapter.produce(PeerEvent::P2pPeerIdentity { p2p_peer: peer, identify }); } NetEvent::PeerDisconnected { peer } => { - adapter.produce(PeerEvent::P2pDisconnect { p2p_peer: peer.connection }); + adapter.produce(PeerEvent::P2pDisconnect { + p2p_peer: peer.connection, + peer_id: peer.peer_id, + }); } NetEvent::StreamReady { stream: _ } => { // TODO notifiy new stream? diff --git a/crates/peer/src/manager.rs b/crates/peer/src/manager.rs index a56ca64..ac019ca 100644 --- a/crates/peer/src/manager.rs +++ b/crates/peer/src/manager.rs @@ -19,17 +19,15 @@ use silver_common::{ use crate::{ database::PeerDatabase, - manager::rpc::{OutboundCounts, PeerInboundState}, scoring, state::{ArchivedState, IpPrefix, MsgIdMap, PeerState, TopicScore}, }; -mod rpc; +pub(crate) mod rpc; /// Initial capacity hints — chosen so normal steady-state activity doesn't /// rehash. Undersizing is fine correctness-wise; this is a perf nudge. const PEERS_CAP: usize = 256; -const MESH_CAP: usize = 96; const IP_COLOC_CAP: usize = 128; const ARCHIVE_CAP: usize = 512; const SYNC_AGG_CAP: usize = 64; @@ -38,6 +36,9 @@ pub struct PeerManager { /// Live peers keyed by connection handle. peers: HashMap, + /// In-progress dials, mapping PeerId to when the dial was initiated. + dialing: HashMap, + /// Counters persisted across reconnect by PeerId. GC'd on tick. archived: HashMap, @@ -51,9 +52,6 @@ pub struct PeerManager { /// Our mesh per topic: connections we've grafted onto. Bounded by d_high. mesh: HashMap>, - /// Backoff after PRUNE — don't re-graft same (peer, topic) until deadline. - backoffs: HashMap<(usize, GossipTopic), Instant>, - /// Outstanding IHAVE→IWANT promises, keyed by `MessageId`. Each entry /// holds every (conn, deadline) that has promised that id. Any one /// peer's delivery via `PeerEvent::NewGossip` clears the entry for all @@ -143,16 +141,7 @@ pub struct PeerManager { /// the 300s periodic. just_synced: bool, - /// Outstanding outbound RPC requests per peer, broken down by - /// protocol. Incremented when we send a request, decremented when - /// the terminal response for that protocol arrives. Cleared on - /// peer disconnect. Indexed by `protocol.ordinal() as usize`. - pub(crate) outbound_in_flight: OutboundCounts, - /// Per-peer inbound rate-limit state — token bucket per protocol - /// using lighthouse defaults. Refill is lazy (credit on access). - /// Cleared on disconnect. - pub(crate) inbound_buckets: HashMap, /// Outstanding catch-up `BlocksByRange` request issued by the PM-owned /// driver. At most one in flight per target. Cleared when the range is @@ -236,11 +225,11 @@ impl PeerManager { let rejected = RejectedRoots::new(syncing.rejected_cap); Self { peers: HashMap::with_capacity(PEERS_CAP), + dialing: HashMap::with_capacity(64), archived: HashMap::with_capacity(ARCHIVE_CAP), ip_colocations: HashMap::with_capacity(IP_COLOC_CAP), our_topics, mesh, - backoffs: HashMap::with_capacity(MESH_CAP), promises: MsgIdMap::with_capacity_and_hasher(4096, Default::default()), banned_ips: HashMap::with_capacity(64), ip_eviction_counts: HashMap::with_capacity(64), @@ -261,8 +250,6 @@ impl PeerManager { metadata, is_synced: false, just_synced: false, - outbound_in_flight: OutboundCounts::with_capacity(PEERS_CAP), - inbound_buckets: HashMap::with_capacity(PEERS_CAP), inflight_syncreq: None, burnt_for_target: FxHashSet::with_capacity_and_hasher(PEERS_CAP, Default::default()), finalized_counts: FxHashMap::with_capacity_and_hasher(SYNC_AGG_CAP, Default::default()), @@ -651,7 +638,8 @@ impl PeerManager { tracing::info!("New p2p peer: {ip:?}:{port}, local dial? {local_dial}"); self.on_connected(p2p_peer_id, peer_id_full, ip, port, now, emit, local_dial); } - PeerEvent::P2pDisconnect { p2p_peer } => { + PeerEvent::P2pDisconnect { p2p_peer, peer_id } => { + self.dialing.remove(&peer_id); self.on_disconnected(p2p_peer, now, emit); } PeerEvent::P2pCannotCreateStream { p2p_peer, .. } | @@ -702,7 +690,7 @@ impl PeerManager { self.add_behaviour_penalty(p2p_peer, 1.0); } PeerEvent::DiscNodeFound { enr } => { - self.on_disc_node_found(enr, emit); + self.on_disc_node_found(enr, now, emit); } PeerEvent::DiscExternalAddress { address: _ } => { // Informational — network tile handles advertisement. @@ -784,6 +772,11 @@ impl PeerManager { // 8) Trigger discovery if we're under target. self.maybe_request_discovery(now, emit); + + // 9) Prune stale dials (older than 15 seconds) + self.dialing.retain(|_, &mut time| { + now.saturating_duration_since(time) < std::time::Duration::from_secs(15) + }); } // ── Lifecycle ─────────────────────────────────────────────────────── @@ -802,6 +795,8 @@ impl PeerManager { let addr = SocketAddr::new(ip_bytes_to_addr(ip), port); let mut state = PeerState::new(peer_id, addr, now); + self.dialing.remove(&peer_id); + // Inherit counters if we remember this PeerId. if let Some(archive) = self.archived.remove(&peer_id) { state.restore_from_archive(archive); @@ -860,9 +855,6 @@ impl PeerManager { } } - // Drop any outstanding backoffs referencing this conn. - self.backoffs.retain(|(c, _), _| *c != conn); - // Clear this peer from the global IHAVE-promise index. They can't // fulfil anything anymore and shouldn't be re-penalised on sweep. self.promises.retain(|_hash, waiters| { @@ -902,12 +894,7 @@ impl PeerManager { self.inflight_syncreq = None; } - // Drop RPC bookkeeping for the gone connection. Outstanding - // responses can never arrive — leaving counters pinned would - // lock out future retries on reconnect. Rate-limit bucket reset - // so a reconnect starts fresh at full tokens (matches lighthouse). - self.outbound_in_flight.remove(&conn); - self.inbound_buckets.remove(&conn); + } // ── Gossip event handlers ─────────────────────────────────────────── @@ -935,7 +922,7 @@ impl PeerManager { // is below d_low, pull the peer in. if we_want && mesh_size < self.params.d_low as usize && - !self.is_backed_off((conn, topic), now) + !self.is_backed_off(conn, topic, now) { self.do_graft(conn, peer_id, topic, now, emit); } @@ -978,21 +965,19 @@ impl PeerManager { tracing::info!(p2p_peer = conn, ?topic, "PM peer GRAFTed us"); } - /// Peer pruned us. Record a backoff so we don't re-graft immediately, - /// and mark their P3b penalty so the deficit carries forward. fn on_remote_prune(&mut self, conn: usize, topic: GossipTopic, now: Instant) { - if let Some(peer) = self.peers.get_mut(&conn) && - let Some(t) = peer.topic_stats.get_mut(&topic) - { - // Carry any active deficit into the failure penalty. - if t.mesh_active && t.mesh_deliveries < self.params.mesh_message_deliveries_threshold { - t.mesh_failure_penalty += - self.params.mesh_message_deliveries_threshold - t.mesh_deliveries; + if let Some(peer) = self.peers.get_mut(&conn) { + if let Some(t) = peer.topic_stats.get_mut(&topic) { + // Carry any active deficit into the failure penalty. + if t.mesh_active && t.mesh_deliveries < self.params.mesh_message_deliveries_threshold { + t.mesh_failure_penalty += + self.params.mesh_message_deliveries_threshold - t.mesh_deliveries; + } + t.meshed_since = None; + t.mesh_active = false; } - t.meshed_since = None; - t.mesh_active = false; + peer.backoffs.insert(topic, now + self.params.prune_backoff); } - self.backoffs.insert((conn, topic), now + self.params.prune_backoff); tracing::debug!(p2p_peer = conn, ?topic, "PM peer PRUNEd us"); } @@ -1203,7 +1188,7 @@ impl PeerManager { /// (priority) and we're under `max_priority_peers`. /// /// Network tile handles in-flight dial / already-connected dedup. - fn on_disc_node_found(&mut self, enr: Enr, emit: &mut impl FnMut(PeerControl)) { + fn on_disc_node_found(&mut self, enr: Enr, now: Instant, emit: &mut impl FnMut(PeerControl)) { // 1. Fork-digest gate. Spec-conformant CL nodes always advertise `eth2`; // missing-or-mismatched is a drop (matches lighthouse). if let Some(my_digest) = self.our_fork_digest { @@ -1241,6 +1226,10 @@ impl PeerManager { tracing::warn!(?peer_id, "known peer id"); return; } + if self.dialing.contains_key(&peer_id) { + tracing::warn!(?peer_id, "already dialing peer id"); + return; + } if self.archived.contains_key(&peer_id) { tracing::warn!(?peer_id, "archived peer id"); return; @@ -1259,7 +1248,7 @@ impl PeerManager { // ours, meaning the peer can fill a subnet we care about. Lets us go past // `target_peers` up to `max_priority_peers` for under-meshed validator // subnets. - let connected = self.peers.len(); + let connected = self.peers.len() + self.dialing.len(); let priority = enr_matches_subnets(&enr, self.required_attnets, self.required_syncnets); let dial = connected < self.params.target_peers || (priority && connected < self.params.max_priority_peers); @@ -1267,6 +1256,7 @@ impl PeerManager { tracing::debug!(connected, "not dialling"); return; } + self.dialing.insert(peer_id, now); emit(PeerControl::P2pDial { p2p: peer_id, enr }); } @@ -1394,8 +1384,11 @@ impl PeerManager { // ── Internal helpers ──────────────────────────────────────────────── - fn is_backed_off(&self, key: (usize, GossipTopic), now: Instant) -> bool { - self.backoffs.get(&key).is_some_and(|d| now < *d) + fn is_backed_off(&self, conn: usize, topic: GossipTopic, now: Instant) -> bool { + self.peers + .get(&conn) + .and_then(|p| p.backoffs.get(&topic)) + .is_some_and(|d| now < *d) } fn do_graft( @@ -1444,8 +1437,8 @@ impl PeerManager { } t.meshed_since = None; t.mesh_active = false; + peer.backoffs.insert(topic, now + self.params.prune_backoff); } - self.backoffs.insert((conn, topic), now + self.params.prune_backoff); emit(PeerControl::P2pGossipPrune { p2p: peer_id, p2p_connection: conn, topic }); } @@ -1575,7 +1568,7 @@ impl PeerManager { if peer.cached_score < self.params.gossip_threshold { return None; } - if self.is_backed_off((*conn, topic), now) { + if self.is_backed_off(*conn, topic, now) { return None; } Some((*conn, peer.cached_score)) @@ -2010,7 +2003,11 @@ mod tests { mgr.handle_event(PeerEvent::P2pGossipInvalidFrame { p2p_peer: 1 }, now, &mut |c| { cap.0.push(c) }); - mgr.handle_event(PeerEvent::P2pDisconnect { p2p_peer: 1 }, now, &mut |c| cap.0.push(c)); + mgr.handle_event( + PeerEvent::P2pDisconnect { p2p_peer: 1, peer_id: peer_id(1) }, + now, + &mut |c| cap.0.push(c), + ); assert_eq!(mgr.archived_count(), 1); connect(&mut mgr, &mut cap, 99, 1, now); @@ -2027,7 +2024,11 @@ mod tests { params.archived_ttl = Duration::from_secs(10); let (mut mgr, mut cap) = fixture(vec![], params); connect(&mut mgr, &mut cap, 1, 1, now); - mgr.handle_event(PeerEvent::P2pDisconnect { p2p_peer: 1 }, now, &mut |c| cap.0.push(c)); + mgr.handle_event( + PeerEvent::P2pDisconnect { p2p_peer: 1, peer_id: peer_id(1) }, + now, + &mut |c| cap.0.push(c), + ); assert_eq!(mgr.archived_count(), 1); now += Duration::from_secs(11); diff --git a/crates/peer/src/manager/rpc.rs b/crates/peer/src/manager/rpc.rs index c4da011..85aaf76 100644 --- a/crates/peer/src/manager/rpc.rs +++ b/crates/peer/src/manager/rpc.rs @@ -9,7 +9,6 @@ //! controller tile that drives the spine remains a thin shell. use std::{ - collections::HashMap, ops::Deref, time::{Duration, Instant}, }; @@ -83,7 +82,7 @@ pub(crate) struct PeerInboundState { last_refill: [Option; N_STREAM_PROTOCOLS], } -pub(crate) type OutboundCounts = HashMap; + /// Map a received `RpcResponse::Error` to an `RpcSeverity`. Returns `None` /// when the error is informational and shouldn't impact peer score (notably @@ -126,27 +125,7 @@ fn severity_for_error_response(code: u8, protocol: StreamProtocol) -> Option u32 { - counts.get(&peer).map_or(0, |c| c[protocol.ordinal() as usize]) -} -fn record_outbound(counts: &mut OutboundCounts, peer: usize, protocol: StreamProtocol) { - let entry = counts.entry(peer).or_insert([0; N_STREAM_PROTOCOLS]); - entry[protocol.ordinal() as usize] += 1; -} - -/// Decrement the outbound slot for `(peer, protocol)`; drop the peer's -/// entry when every protocol's count is zero so the map stays small in -/// steady state. -fn release_outbound(counts: &mut OutboundCounts, peer: usize, protocol: StreamProtocol) { - if let Some(c) = counts.get_mut(&peer) { - let slot = &mut c[protocol.ordinal() as usize]; - *slot = slot.saturating_sub(1); - if c.iter().all(|&v| v == 0) { - counts.remove(&peer); - } - } -} /// Does this `response` terminate an outbound RPC stream we initiated? /// Single-chunk protocols (Status/Ping/MetaData/Goodbye) have no `Complete` @@ -168,8 +147,7 @@ fn is_terminal_response(protocol: StreamProtocol, response: &RpcResponse) -> boo /// always admitted. First contact seeds the bucket at `max_tokens`, /// matching lighthouse's burst-allowed semantics. fn try_admit_inbound( - buckets: &mut HashMap, - peer: usize, + state: &mut PeerInboundState, protocol: StreamProtocol, now: Instant, ) -> bool { @@ -177,7 +155,6 @@ fn try_admit_inbound( let Some(quota) = INBOUND_QUOTAS[idx].as_ref() else { return true; }; - let state = buckets.entry(peer).or_default(); if state.last_refill[idx].is_none() { state.tokens[idx] = quota.max_tokens; @@ -235,7 +212,8 @@ impl PeerManager { self.database .live_peers_supporting(protocol) .filter_map(|p| { - if outbound_count(&self.outbound_in_flight, p, protocol) >= + let peer = self.peers.get(&p)?; + if peer.outbound_in_flight[protocol.ordinal() as usize] >= MAX_RPC_PROTOCOL_IN_FLIGHT { return None; @@ -244,7 +222,7 @@ impl PeerManager { if overlap == 0 { return None; } - let score = self.score(p).unwrap_or(f64::NEG_INFINITY); + let score = peer.cached_score; Some((p, overlap, score)) }) .max_by(|a, b| { @@ -279,7 +257,10 @@ impl PeerManager { // unconditionally for the unquota'd protocols // (gossip/identity). let protocol = request.protocol(); - if !try_admit_inbound(&mut self.inbound_buckets, stream_id.peer(), protocol, now) { + let Some(peer) = self.peers.get_mut(&stream_id.peer()) else { + return; + }; + if !try_admit_inbound(&mut peer.inbound_state, protocol, now) { // Goodbye expects no response — silently drop; // anything else gets the standard rate-limit // error chunk. @@ -438,11 +419,9 @@ impl PeerManager { let current_peer_metadata_seq = self.peer_metadata_seq(stream_id.peer()); let metadata_seq = u64::from_le_bytes(ping); if !matches!(current_peer_metadata_seq, Some(seq) if seq == metadata_seq) { - record_outbound( - &mut self.outbound_in_flight, - stream_id.peer(), - StreamProtocol::Metadata, - ); + if let Some(peer) = self.peers.get_mut(&stream_id.peer()) { + peer.outbound_in_flight[StreamProtocol::Metadata.ordinal() as usize] += 1; + } emit(PeerControl::P2pSend(P2pSend::Rpc(RpcOutbound::Request( RpcRequestOutbound { application_id: 0, @@ -480,7 +459,10 @@ impl PeerManager { _ => {} } if let Some(protocol) = terminal_protocol { - release_outbound(&mut self.outbound_in_flight, stream_id.peer(), protocol); + if let Some(peer) = self.peers.get_mut(&stream_id.peer()) { + peer.outbound_in_flight[protocol.ordinal() as usize] = + peer.outbound_in_flight[protocol.ordinal() as usize].saturating_sub(1); + } } } } @@ -578,7 +560,9 @@ impl PeerManager { // application_id is peer-local; use start_slot as a unique correlator. let application_id = start_slot; - record_outbound(&mut self.outbound_in_flight, peer_id, StreamProtocol::BeaconBlocksByRange); + if let Some(peer) = self.peers.get_mut(&peer_id) { + peer.outbound_in_flight[StreamProtocol::BeaconBlocksByRange.ordinal() as usize] += 1; + } tracing::info!( peer_id, start_slot, @@ -626,11 +610,9 @@ impl PeerManager { ) else { break; }; - record_outbound( - &mut self.outbound_in_flight, - peer, - StreamProtocol::DataColumnSidecarsByRoot, - ); + if let Some(p_state) = self.peers.get_mut(&peer) { + p_state.outbound_in_flight[StreamProtocol::DataColumnSidecarsByRoot.ordinal() as usize] += 1; + } emit(PeerControl::P2pDataColumnsRequest { app_id: request_id, peer, @@ -676,11 +658,9 @@ impl PeerManager { else { break; }; - record_outbound( - &mut self.outbound_in_flight, - peer, - StreamProtocol::DataColumnSidecarsByRoot, - ); + if let Some(p_state) = self.peers.get_mut(&peer) { + p_state.outbound_in_flight[StreamProtocol::DataColumnSidecarsByRoot.ordinal() as usize] += 1; + } emit(PeerControl::P2pDataColumnsRequest { app_id: request_id, peer, @@ -734,13 +714,16 @@ impl PeerManager { ); continue; } - if outbound_count(&self.outbound_in_flight, peer, StreamProtocol::BeaconBlocksByRange) >= + let Some(peer_state) = self.peers.get(&peer) else { + continue; + }; + if peer_state.outbound_in_flight[StreamProtocol::BeaconBlocksByRange.ordinal() as usize] >= MAX_RPC_PROTOCOL_IN_FLIGHT { tracing::warn!("too many rpcs in flight already"); continue; } - let s = self.score(peer).unwrap_or(f64::NEG_INFINITY); + let s = peer_state.cached_score; if best.is_none_or(|(_, bs)| s > bs) { best = Some((peer, s)); } @@ -757,7 +740,9 @@ impl PeerManager { let ping = RpcRequest::Ping(MetadataView::seq_number(metadata).to_le_bytes()); let peers: Vec = self.live_peers().collect(); for peer in peers { - record_outbound(&mut self.outbound_in_flight, peer, StreamProtocol::Ping); + if let Some(p_state) = self.peers.get_mut(&peer) { + p_state.outbound_in_flight[StreamProtocol::Ping.ordinal() as usize] += 1; + } emit(PeerControl::P2pSend(P2pSend::Rpc(RpcOutbound::Request(RpcRequestOutbound { application_id: 0, peer, @@ -896,69 +881,68 @@ mod tests { #[test] fn inbound_unlimited_protocols_always_admit() { // Gossip + Identity have no quota → permit unconditionally. - let mut buckets = HashMap::new(); + let mut state = PeerInboundState::default(); let now = Instant::now(); for _ in 0..1000 { - assert!(try_admit_inbound(&mut buckets, 7, StreamProtocol::GossipSub, now)); - assert!(try_admit_inbound(&mut buckets, 7, StreamProtocol::Identity, now)); + assert!(try_admit_inbound(&mut state, StreamProtocol::GossipSub, now)); + assert!(try_admit_inbound(&mut state, StreamProtocol::Identity, now)); } - // No state should be allocated for unquota'd protocols. - assert!(buckets.is_empty()); } #[test] fn inbound_burst_up_to_max_then_denies() { - let mut buckets = HashMap::new(); + let mut state = PeerInboundState::default(); let now = Instant::now(); // Ping quota = 2 / 10 s. First two admits succeed, the third // hits an empty bucket (no time has passed → no refill). - assert!(try_admit_inbound(&mut buckets, 1, StreamProtocol::Ping, now)); - assert!(try_admit_inbound(&mut buckets, 1, StreamProtocol::Ping, now)); - assert!(!try_admit_inbound(&mut buckets, 1, StreamProtocol::Ping, now)); - assert!(!try_admit_inbound(&mut buckets, 1, StreamProtocol::Ping, now)); + assert!(try_admit_inbound(&mut state, StreamProtocol::Ping, now)); + assert!(try_admit_inbound(&mut state, StreamProtocol::Ping, now)); + assert!(!try_admit_inbound(&mut state, StreamProtocol::Ping, now)); + assert!(!try_admit_inbound(&mut state, StreamProtocol::Ping, now)); } #[test] fn inbound_refills_after_period() { - let mut buckets = HashMap::new(); + let mut state = PeerInboundState::default(); let t0 = Instant::now(); // Drain Ping bucket (2 tokens). - assert!(try_admit_inbound(&mut buckets, 1, StreamProtocol::Ping, t0)); - assert!(try_admit_inbound(&mut buckets, 1, StreamProtocol::Ping, t0)); - assert!(!try_admit_inbound(&mut buckets, 1, StreamProtocol::Ping, t0)); + assert!(try_admit_inbound(&mut state, StreamProtocol::Ping, t0)); + assert!(try_admit_inbound(&mut state, StreamProtocol::Ping, t0)); + assert!(!try_admit_inbound(&mut state, StreamProtocol::Ping, t0)); // After a full period (10 s) the bucket is full again. let t1 = t0 + Duration::from_secs(10); - assert!(try_admit_inbound(&mut buckets, 1, StreamProtocol::Ping, t1)); - assert!(try_admit_inbound(&mut buckets, 1, StreamProtocol::Ping, t1)); - assert!(!try_admit_inbound(&mut buckets, 1, StreamProtocol::Ping, t1)); + assert!(try_admit_inbound(&mut state, StreamProtocol::Ping, t1)); + assert!(try_admit_inbound(&mut state, StreamProtocol::Ping, t1)); + assert!(!try_admit_inbound(&mut state, StreamProtocol::Ping, t1)); } #[test] fn inbound_continuous_refill_partial() { // BlocksByRange = 128 / 10 s ⇒ one token per ~78 ms. // Drain then wait 200 ms — expect ~2 tokens to have been credited. - let mut buckets = HashMap::new(); + let mut state = PeerInboundState::default(); let t0 = Instant::now(); for _ in 0..128 { - assert!(try_admit_inbound(&mut buckets, 5, StreamProtocol::BeaconBlocksByRange, t0)); + assert!(try_admit_inbound(&mut state, StreamProtocol::BeaconBlocksByRange, t0)); } - assert!(!try_admit_inbound(&mut buckets, 5, StreamProtocol::BeaconBlocksByRange, t0)); + assert!(!try_admit_inbound(&mut state, StreamProtocol::BeaconBlocksByRange, t0)); let t1 = t0 + Duration::from_millis(200); - assert!(try_admit_inbound(&mut buckets, 5, StreamProtocol::BeaconBlocksByRange, t1)); - assert!(try_admit_inbound(&mut buckets, 5, StreamProtocol::BeaconBlocksByRange, t1)); + assert!(try_admit_inbound(&mut state, StreamProtocol::BeaconBlocksByRange, t1)); + assert!(try_admit_inbound(&mut state, StreamProtocol::BeaconBlocksByRange, t1)); // ~2 tokens credited; the third call at the same instant should fail. - assert!(!try_admit_inbound(&mut buckets, 5, StreamProtocol::BeaconBlocksByRange, t1)); + assert!(!try_admit_inbound(&mut state, StreamProtocol::BeaconBlocksByRange, t1)); } #[test] fn inbound_per_peer_independent() { // Draining peer A's bucket must not affect peer B. - let mut buckets = HashMap::new(); + let mut state_a = PeerInboundState::default(); + let mut state_b = PeerInboundState::default(); let now = Instant::now(); - assert!(try_admit_inbound(&mut buckets, 1, StreamProtocol::Ping, now)); - assert!(try_admit_inbound(&mut buckets, 1, StreamProtocol::Ping, now)); - assert!(!try_admit_inbound(&mut buckets, 1, StreamProtocol::Ping, now)); - assert!(try_admit_inbound(&mut buckets, 2, StreamProtocol::Ping, now)); - assert!(try_admit_inbound(&mut buckets, 2, StreamProtocol::Ping, now)); + assert!(try_admit_inbound(&mut state_a, StreamProtocol::Ping, now)); + assert!(try_admit_inbound(&mut state_a, StreamProtocol::Ping, now)); + assert!(!try_admit_inbound(&mut state_a, StreamProtocol::Ping, now)); + assert!(try_admit_inbound(&mut state_b, StreamProtocol::Ping, now)); + assert!(try_admit_inbound(&mut state_b, StreamProtocol::Ping, now)); } } diff --git a/crates/peer/src/state.rs b/crates/peer/src/state.rs index 6029a78..3dcc258 100644 --- a/crates/peer/src/state.rs +++ b/crates/peer/src/state.rs @@ -22,6 +22,8 @@ pub(crate) const TOPICS_PER_PEER_CAP: usize = 96; pub(crate) type MsgIdBuild = BuildHasherDefault; +use crate::manager::rpc::{N_STREAM_PROTOCOLS, PeerInboundState}; + /// One live peer's state. pub(crate) struct PeerState { // Identity — `PeerId` + connection handle are both needed to emit any @@ -54,6 +56,15 @@ pub(crate) struct PeerState { pub ihaves_received: u16, // gates P7 via max_ihave_messages pub iwant_ids_sent: u16, // gates P7 via max_ihave_length + // Inbound protocol rate-limit state + pub inbound_state: PeerInboundState, + + // Outbound requests in flight count per protocol + pub outbound_in_flight: [u32; N_STREAM_PROTOCOLS], + + // Prune backoff deadlines per topic + pub backoffs: HashMap, + // Cached score value + recomputation timestamp. pub cached_score: f64, pub score_valid_at: Instant, @@ -72,6 +83,9 @@ impl PeerState { behaviour_penalty: 0.0, ihaves_received: 0, iwant_ids_sent: 0, + inbound_state: PeerInboundState::default(), + outbound_in_flight: [0; N_STREAM_PROTOCOLS], + backoffs: HashMap::new(), cached_score: 0.0, score_valid_at: now, } From c321591730abf8486986d6b2360a08411a0ed67d Mon Sep 17 00:00:00 2001 From: vladimir-ea Date: Wed, 27 May 2026 08:19:56 +0100 Subject: [PATCH 2/2] fmt --- crates/peer/src/manager.rs | 13 ++++--------- crates/peer/src/manager/rpc.rs | 19 +++++++------------ 2 files changed, 11 insertions(+), 21 deletions(-) diff --git a/crates/peer/src/manager.rs b/crates/peer/src/manager.rs index ac019ca..77987b8 100644 --- a/crates/peer/src/manager.rs +++ b/crates/peer/src/manager.rs @@ -141,8 +141,6 @@ pub struct PeerManager { /// the 300s periodic. just_synced: bool, - - /// Outstanding catch-up `BlocksByRange` request issued by the PM-owned /// driver. At most one in flight per target. Cleared when the range is /// fully delivered (BS head_slot has advanced through it), when the @@ -893,8 +891,6 @@ impl PeerManager { { self.inflight_syncreq = None; } - - } // ── Gossip event handlers ─────────────────────────────────────────── @@ -969,7 +965,9 @@ impl PeerManager { if let Some(peer) = self.peers.get_mut(&conn) { if let Some(t) = peer.topic_stats.get_mut(&topic) { // Carry any active deficit into the failure penalty. - if t.mesh_active && t.mesh_deliveries < self.params.mesh_message_deliveries_threshold { + if t.mesh_active && + t.mesh_deliveries < self.params.mesh_message_deliveries_threshold + { t.mesh_failure_penalty += self.params.mesh_message_deliveries_threshold - t.mesh_deliveries; } @@ -1385,10 +1383,7 @@ impl PeerManager { // ── Internal helpers ──────────────────────────────────────────────── fn is_backed_off(&self, conn: usize, topic: GossipTopic, now: Instant) -> bool { - self.peers - .get(&conn) - .and_then(|p| p.backoffs.get(&topic)) - .is_some_and(|d| now < *d) + self.peers.get(&conn).and_then(|p| p.backoffs.get(&topic)).is_some_and(|d| now < *d) } fn do_graft( diff --git a/crates/peer/src/manager/rpc.rs b/crates/peer/src/manager/rpc.rs index 85aaf76..d8e92e6 100644 --- a/crates/peer/src/manager/rpc.rs +++ b/crates/peer/src/manager/rpc.rs @@ -82,8 +82,6 @@ pub(crate) struct PeerInboundState { last_refill: [Option; N_STREAM_PROTOCOLS], } - - /// Map a received `RpcResponse::Error` to an `RpcSeverity`. Returns `None` /// when the error is informational and shouldn't impact peer score (notably /// `ResourceUnavailable` for blob/column-by-root, where missing data is @@ -125,8 +123,6 @@ fn severity_for_error_response(code: u8, protocol: StreamProtocol) -> Option boo /// Protocols with no quota in `INBOUND_QUOTAS` (gossip/identity) are /// always admitted. First contact seeds the bucket at `max_tokens`, /// matching lighthouse's burst-allowed semantics. -fn try_admit_inbound( - state: &mut PeerInboundState, - protocol: StreamProtocol, - now: Instant, -) -> bool { +fn try_admit_inbound(state: &mut PeerInboundState, protocol: StreamProtocol, now: Instant) -> bool { let idx = protocol.ordinal() as usize; let Some(quota) = INBOUND_QUOTAS[idx].as_ref() else { return true; @@ -420,7 +412,8 @@ impl PeerManager { let metadata_seq = u64::from_le_bytes(ping); if !matches!(current_peer_metadata_seq, Some(seq) if seq == metadata_seq) { if let Some(peer) = self.peers.get_mut(&stream_id.peer()) { - peer.outbound_in_flight[StreamProtocol::Metadata.ordinal() as usize] += 1; + peer.outbound_in_flight + [StreamProtocol::Metadata.ordinal() as usize] += 1; } emit(PeerControl::P2pSend(P2pSend::Rpc(RpcOutbound::Request( RpcRequestOutbound { @@ -611,7 +604,8 @@ impl PeerManager { break; }; if let Some(p_state) = self.peers.get_mut(&peer) { - p_state.outbound_in_flight[StreamProtocol::DataColumnSidecarsByRoot.ordinal() as usize] += 1; + p_state.outbound_in_flight + [StreamProtocol::DataColumnSidecarsByRoot.ordinal() as usize] += 1; } emit(PeerControl::P2pDataColumnsRequest { app_id: request_id, @@ -659,7 +653,8 @@ impl PeerManager { break; }; if let Some(p_state) = self.peers.get_mut(&peer) { - p_state.outbound_in_flight[StreamProtocol::DataColumnSidecarsByRoot.ordinal() as usize] += 1; + p_state.outbound_in_flight + [StreamProtocol::DataColumnSidecarsByRoot.ordinal() as usize] += 1; } emit(PeerControl::P2pDataColumnsRequest { app_id: request_id,