Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/common/src/spine/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ pub enum PeerEvent {
},
P2pDisconnect {
p2p_peer: usize,
peer_id: PeerId,
},
P2pCannotCreateStream {
p2p_peer: usize,
Expand Down
5 changes: 4 additions & 1 deletion crates/network/src/tile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ impl Tile<SilverSpine> for NetworkTile {
adapter.produce(PeerEvent::P2pPeerIdentity { p2p_peer: peer, identify });
}
NetEvent::PeerDisconnected { peer } => {
adapter.produce(PeerEvent::P2pDisconnect { p2p_peer: peer.connection });
adapter.produce(PeerEvent::P2pDisconnect {
p2p_peer: peer.connection,
peer_id: peer.peer_id,
});
}
NetEvent::StreamReady { stream: _ } => {
// TODO notifiy new stream?
Expand Down
102 changes: 49 additions & 53 deletions crates/peer/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,15 @@ use silver_common::{

use crate::{
database::PeerDatabase,
manager::rpc::{OutboundCounts, PeerInboundState},
scoring,
state::{ArchivedState, IpPrefix, MsgIdMap, PeerState, TopicScore},
};

mod rpc;
pub(crate) mod rpc;

/// Initial capacity hints — chosen so normal steady-state activity doesn't
/// rehash. Undersizing is fine correctness-wise; this is a perf nudge.
const PEERS_CAP: usize = 256;
const MESH_CAP: usize = 96;
const IP_COLOC_CAP: usize = 128;
const ARCHIVE_CAP: usize = 512;
const SYNC_AGG_CAP: usize = 64;
Expand All @@ -38,6 +36,9 @@ pub struct PeerManager {
/// Live peers keyed by connection handle.
peers: HashMap<usize, PeerState>,

/// In-progress dials, mapping PeerId to when the dial was initiated.
dialing: HashMap<PeerId, Instant>,

/// Counters persisted across reconnect by PeerId. GC'd on tick.
archived: HashMap<PeerId, ArchivedState>,

Expand All @@ -51,9 +52,6 @@ pub struct PeerManager {
/// Our mesh per topic: connections we've grafted onto. Bounded by d_high.
mesh: HashMap<GossipTopic, Vec<usize>>,

/// Backoff after PRUNE — don't re-graft same (peer, topic) until deadline.
backoffs: HashMap<(usize, GossipTopic), Instant>,

/// Outstanding IHAVE→IWANT promises, keyed by `MessageId`. Each entry
/// holds every (conn, deadline) that has promised that id. Any one
/// peer's delivery via `PeerEvent::NewGossip` clears the entry for all
Expand Down Expand Up @@ -143,17 +141,6 @@ pub struct PeerManager {
/// the 300s periodic.
just_synced: bool,

/// Outstanding outbound RPC requests per peer, broken down by
/// protocol. Incremented when we send a request, decremented when
/// the terminal response for that protocol arrives. Cleared on
/// peer disconnect. Indexed by `protocol.ordinal() as usize`.
pub(crate) outbound_in_flight: OutboundCounts,

/// Per-peer inbound rate-limit state — token bucket per protocol
/// using lighthouse defaults. Refill is lazy (credit on access).
/// Cleared on disconnect.
pub(crate) inbound_buckets: HashMap<usize, PeerInboundState>,

/// Outstanding catch-up `BlocksByRange` request issued by the PM-owned
/// driver. At most one in flight per target. Cleared when the range is
/// fully delivered (BS head_slot has advanced through it), when the
Expand Down Expand Up @@ -236,11 +223,11 @@ impl PeerManager {
let rejected = RejectedRoots::new(syncing.rejected_cap);
Self {
peers: HashMap::with_capacity(PEERS_CAP),
dialing: HashMap::with_capacity(64),
archived: HashMap::with_capacity(ARCHIVE_CAP),
ip_colocations: HashMap::with_capacity(IP_COLOC_CAP),
our_topics,
mesh,
backoffs: HashMap::with_capacity(MESH_CAP),
promises: MsgIdMap::with_capacity_and_hasher(4096, Default::default()),
banned_ips: HashMap::with_capacity(64),
ip_eviction_counts: HashMap::with_capacity(64),
Expand All @@ -261,8 +248,6 @@ impl PeerManager {
metadata,
is_synced: false,
just_synced: false,
outbound_in_flight: OutboundCounts::with_capacity(PEERS_CAP),
inbound_buckets: HashMap::with_capacity(PEERS_CAP),
inflight_syncreq: None,
burnt_for_target: FxHashSet::with_capacity_and_hasher(PEERS_CAP, Default::default()),
finalized_counts: FxHashMap::with_capacity_and_hasher(SYNC_AGG_CAP, Default::default()),
Expand Down Expand Up @@ -651,7 +636,8 @@ impl PeerManager {
tracing::info!("New p2p peer: {ip:?}:{port}, local dial? {local_dial}");
self.on_connected(p2p_peer_id, peer_id_full, ip, port, now, emit, local_dial);
}
PeerEvent::P2pDisconnect { p2p_peer } => {
PeerEvent::P2pDisconnect { p2p_peer, peer_id } => {
self.dialing.remove(&peer_id);
self.on_disconnected(p2p_peer, now, emit);
}
PeerEvent::P2pCannotCreateStream { p2p_peer, .. } |
Expand Down Expand Up @@ -702,7 +688,7 @@ impl PeerManager {
self.add_behaviour_penalty(p2p_peer, 1.0);
}
PeerEvent::DiscNodeFound { enr } => {
self.on_disc_node_found(enr, emit);
self.on_disc_node_found(enr, now, emit);
}
PeerEvent::DiscExternalAddress { address: _ } => {
// Informational — network tile handles advertisement.
Expand Down Expand Up @@ -784,6 +770,11 @@ impl PeerManager {

// 8) Trigger discovery if we're under target.
self.maybe_request_discovery(now, emit);

// 9) Prune stale dials (older than 15 seconds)
self.dialing.retain(|_, &mut time| {
now.saturating_duration_since(time) < std::time::Duration::from_secs(15)
});
}

// ── Lifecycle ───────────────────────────────────────────────────────
Expand All @@ -802,6 +793,8 @@ impl PeerManager {
let addr = SocketAddr::new(ip_bytes_to_addr(ip), port);
let mut state = PeerState::new(peer_id, addr, now);

self.dialing.remove(&peer_id);

// Inherit counters if we remember this PeerId.
if let Some(archive) = self.archived.remove(&peer_id) {
state.restore_from_archive(archive);
Expand Down Expand Up @@ -860,9 +853,6 @@ impl PeerManager {
}
}

// Drop any outstanding backoffs referencing this conn.
self.backoffs.retain(|(c, _), _| *c != conn);

// Clear this peer from the global IHAVE-promise index. They can't
// fulfil anything anymore and shouldn't be re-penalised on sweep.
self.promises.retain(|_hash, waiters| {
Expand Down Expand Up @@ -901,13 +891,6 @@ impl PeerManager {
{
self.inflight_syncreq = None;
}

// Drop RPC bookkeeping for the gone connection. Outstanding
// responses can never arrive — leaving counters pinned would
// lock out future retries on reconnect. Rate-limit bucket reset
// so a reconnect starts fresh at full tokens (matches lighthouse).
self.outbound_in_flight.remove(&conn);
self.inbound_buckets.remove(&conn);
}

// ── Gossip event handlers ───────────────────────────────────────────
Expand Down Expand Up @@ -935,7 +918,7 @@ impl PeerManager {
// is below d_low, pull the peer in.
if we_want &&
mesh_size < self.params.d_low as usize &&
!self.is_backed_off((conn, topic), now)
!self.is_backed_off(conn, topic, now)
{
self.do_graft(conn, peer_id, topic, now, emit);
}
Expand Down Expand Up @@ -978,21 +961,21 @@ impl PeerManager {
tracing::info!(p2p_peer = conn, ?topic, "PM peer GRAFTed us");
}

/// Peer pruned us. Record a backoff so we don't re-graft immediately,
/// and mark their P3b penalty so the deficit carries forward.
fn on_remote_prune(&mut self, conn: usize, topic: GossipTopic, now: Instant) {
if let Some(peer) = self.peers.get_mut(&conn) &&
let Some(t) = peer.topic_stats.get_mut(&topic)
{
// Carry any active deficit into the failure penalty.
if t.mesh_active && t.mesh_deliveries < self.params.mesh_message_deliveries_threshold {
t.mesh_failure_penalty +=
self.params.mesh_message_deliveries_threshold - t.mesh_deliveries;
if let Some(peer) = self.peers.get_mut(&conn) {
if let Some(t) = peer.topic_stats.get_mut(&topic) {
// Carry any active deficit into the failure penalty.
if t.mesh_active &&
t.mesh_deliveries < self.params.mesh_message_deliveries_threshold
{
t.mesh_failure_penalty +=
self.params.mesh_message_deliveries_threshold - t.mesh_deliveries;
}
t.meshed_since = None;
t.mesh_active = false;
}
t.meshed_since = None;
t.mesh_active = false;
peer.backoffs.insert(topic, now + self.params.prune_backoff);
}
self.backoffs.insert((conn, topic), now + self.params.prune_backoff);
tracing::debug!(p2p_peer = conn, ?topic, "PM peer PRUNEd us");
}

Expand Down Expand Up @@ -1203,7 +1186,7 @@ impl PeerManager {
/// (priority) and we're under `max_priority_peers`.
///
/// Network tile handles in-flight dial / already-connected dedup.
fn on_disc_node_found(&mut self, enr: Enr, emit: &mut impl FnMut(PeerControl)) {
fn on_disc_node_found(&mut self, enr: Enr, now: Instant, emit: &mut impl FnMut(PeerControl)) {
// 1. Fork-digest gate. Spec-conformant CL nodes always advertise `eth2`;
// missing-or-mismatched is a drop (matches lighthouse).
if let Some(my_digest) = self.our_fork_digest {
Expand Down Expand Up @@ -1241,6 +1224,10 @@ impl PeerManager {
tracing::warn!(?peer_id, "known peer id");
return;
}
if self.dialing.contains_key(&peer_id) {
tracing::warn!(?peer_id, "already dialing peer id");
return;
}
if self.archived.contains_key(&peer_id) {
tracing::warn!(?peer_id, "archived peer id");
return;
Expand All @@ -1259,14 +1246,15 @@ impl PeerManager {
// ours, meaning the peer can fill a subnet we care about. Lets us go past
// `target_peers` up to `max_priority_peers` for under-meshed validator
// subnets.
let connected = self.peers.len();
let connected = self.peers.len() + self.dialing.len();
let priority = enr_matches_subnets(&enr, self.required_attnets, self.required_syncnets);
let dial = connected < self.params.target_peers ||
(priority && connected < self.params.max_priority_peers);
if !dial {
tracing::debug!(connected, "not dialling");
return;
}
self.dialing.insert(peer_id, now);
emit(PeerControl::P2pDial { p2p: peer_id, enr });
}

Expand Down Expand Up @@ -1394,8 +1382,8 @@ impl PeerManager {

// ── Internal helpers ────────────────────────────────────────────────

fn is_backed_off(&self, key: (usize, GossipTopic), now: Instant) -> bool {
self.backoffs.get(&key).is_some_and(|d| now < *d)
fn is_backed_off(&self, conn: usize, topic: GossipTopic, now: Instant) -> bool {
self.peers.get(&conn).and_then(|p| p.backoffs.get(&topic)).is_some_and(|d| now < *d)
}

fn do_graft(
Expand Down Expand Up @@ -1444,8 +1432,8 @@ impl PeerManager {
}
t.meshed_since = None;
t.mesh_active = false;
peer.backoffs.insert(topic, now + self.params.prune_backoff);
}
self.backoffs.insert((conn, topic), now + self.params.prune_backoff);
emit(PeerControl::P2pGossipPrune { p2p: peer_id, p2p_connection: conn, topic });
}

Expand Down Expand Up @@ -1575,7 +1563,7 @@ impl PeerManager {
if peer.cached_score < self.params.gossip_threshold {
return None;
}
if self.is_backed_off((*conn, topic), now) {
if self.is_backed_off(*conn, topic, now) {
return None;
}
Some((*conn, peer.cached_score))
Expand Down Expand Up @@ -2010,7 +1998,11 @@ mod tests {
mgr.handle_event(PeerEvent::P2pGossipInvalidFrame { p2p_peer: 1 }, now, &mut |c| {
cap.0.push(c)
});
mgr.handle_event(PeerEvent::P2pDisconnect { p2p_peer: 1 }, now, &mut |c| cap.0.push(c));
mgr.handle_event(
PeerEvent::P2pDisconnect { p2p_peer: 1, peer_id: peer_id(1) },
now,
&mut |c| cap.0.push(c),
);
assert_eq!(mgr.archived_count(), 1);

connect(&mut mgr, &mut cap, 99, 1, now);
Expand All @@ -2027,7 +2019,11 @@ mod tests {
params.archived_ttl = Duration::from_secs(10);
let (mut mgr, mut cap) = fixture(vec![], params);
connect(&mut mgr, &mut cap, 1, 1, now);
mgr.handle_event(PeerEvent::P2pDisconnect { p2p_peer: 1 }, now, &mut |c| cap.0.push(c));
mgr.handle_event(
PeerEvent::P2pDisconnect { p2p_peer: 1, peer_id: peer_id(1) },
now,
&mut |c| cap.0.push(c),
);
assert_eq!(mgr.archived_count(), 1);

now += Duration::from_secs(11);
Expand Down
Loading
Loading