Skip to content
Closed
8 changes: 6 additions & 2 deletions crates/core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ pub(crate) enum NodeEvent {
/// Register expectation for an inbound connection from the given peer.
ExpectPeerConnection {
peer: PeerId,
courtesy: bool,
},
}

Expand Down Expand Up @@ -444,8 +445,11 @@ impl Display for NodeEvent {
"Local subscribe complete (tx: {tx}, key: {key}, subscribed: {subscribed})"
)
}
NodeEvent::ExpectPeerConnection { peer } => {
write!(f, "ExpectPeerConnection (from {peer})")
NodeEvent::ExpectPeerConnection { peer, courtesy } => {
write!(
f,
"ExpectPeerConnection (from {peer}, courtesy: {courtesy})"
)
}
}
}
Expand Down
64 changes: 39 additions & 25 deletions crates/core/src/node/network_bridge/p2p_protoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,29 +311,14 @@ impl P2pConnManager {
match *event {
ConnEvent::InboundMessage(inbound) => {
let remote = inbound.remote_addr;
let mut msg = inbound.msg;
let msg = inbound.msg;
tracing::info!(
tx = %msg.id(),
msg_type = %msg,
remote = ?remote,
peer = %ctx.bridge.op_manager.ring.connection_manager.get_peer_key().unwrap(),
"Received inbound message from peer - processing"
);
// Only the hop that owns the transport socket (gateway/first hop in
// practice) knows the UDP source address; tag the connect request here
// so downstream relays don't guess at the joiner's address.
if let (
Some(remote_addr),
NetMessage::V1(NetMessageV1::Connect(ConnectMsg::Request {
payload,
..
})),
) = (remote, &mut msg)
{
if payload.observed_addr.is_none() {
payload.observed_addr = Some(remote_addr);
}
}
ctx.handle_inbound_message(msg, &op_manager, &mut state)
.await?;
}
Expand Down Expand Up @@ -649,14 +634,14 @@ impl P2pConnManager {
)
.await?;
}
NodeEvent::ExpectPeerConnection { peer } => {
tracing::debug!(%peer, "ExpectPeerConnection event received; registering inbound expectation via handshake driver");
NodeEvent::ExpectPeerConnection { peer, courtesy } => {
tracing::debug!(%peer, ?courtesy, "ExpectPeerConnection event received; registering inbound expectation via handshake driver");
state.outbound_handler.expect_incoming(peer.addr);
if let Err(error) = handshake_cmd_sender
.send(HandshakeCommand::ExpectInbound {
peer: peer.clone(),
transaction: None,
courtesy: false,
courtesy,
})
.await
{
Expand Down Expand Up @@ -1393,7 +1378,7 @@ impl P2pConnManager {
"Inbound connection established"
);

self.handle_successful_connection(peer_id, connection, state, None)
self.handle_successful_connection(peer_id, connection, state, None, courtesy)
.await?;
}
HandshakeEvent::OutboundEstablished {
Expand All @@ -1408,7 +1393,7 @@ impl P2pConnManager {
transaction = %transaction,
"Outbound connection established"
);
self.handle_successful_connection(peer, connection, state, None)
self.handle_successful_connection(peer, connection, state, None, courtesy)
.await?;
}
HandshakeEvent::OutboundFailed {
Expand All @@ -1417,11 +1402,12 @@ impl P2pConnManager {
error,
courtesy,
} => {
tracing::info!(
tracing::warn!(
remote = %peer.addr,
courtesy,
transaction = %transaction,
?error,
open_connections = self.bridge.op_manager.ring.open_connections(),
"Outbound connection failed"
);

Expand Down Expand Up @@ -1523,6 +1509,7 @@ impl P2pConnManager {
connection: PeerConnection,
state: &mut EventListenerState,
remaining_checks: Option<usize>,
courtesy: bool,
) -> anyhow::Result<()> {
let pending_txs = state
.awaiting_connection_txs
Expand Down Expand Up @@ -1613,7 +1600,7 @@ impl P2pConnManager {
self.bridge
.op_manager
.ring
.add_connection(loc, peer_id.clone(), false)
.add_connection(loc, peer_id.clone(), false, courtesy)
.await;
}
Ok(())
Expand All @@ -1625,7 +1612,6 @@ impl P2pConnManager {
state: &mut EventListenerState,
handshake_commands: &HandshakeCommandSender,
) -> anyhow::Result<EventResult> {
let _ = state;
match event {
Some(ConnEvent::InboundMessage(mut inbound)) => {
let tx = *inbound.msg.id();
Expand Down Expand Up @@ -1670,6 +1656,33 @@ impl P2pConnManager {
}
}
}

let should_connect =
!self.connections.keys().any(|peer| peer.addr == remote_addr)
&& !state.awaiting_connection.contains_key(&remote_addr);

if should_connect {
if let Some(sender_peer) = extract_sender_from_message(&inbound.msg) {
tracing::info!(
"Received message from unconnected peer {}, establishing connection proactively",
sender_peer.peer
);

let tx = Transaction::new::<crate::operations::connect::ConnectMsg>();
let (callback, _rx) = tokio::sync::mpsc::channel(10);

let _ = self
.handle_connect_peer(
sender_peer.peer.clone(),
Box::new(callback),
tx,
handshake_commands,
state,
false,
)
.await;
}
}
}

tracing::debug!(
Expand All @@ -1683,9 +1696,10 @@ impl P2pConnManager {
))
}
Some(ConnEvent::TransportClosed { remote_addr, error }) => {
tracing::debug!(
tracing::warn!(
remote = %remote_addr,
?error,
open_connections = self.bridge.op_manager.ring.open_connections(),
"peer_connection_listener reported transport closure"
);
if let Some(peer) = self
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/node/testing_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -940,8 +940,8 @@ where
NodeEvent::QueryNodeDiagnostics { .. } => {
unimplemented!()
}
NodeEvent::ExpectPeerConnection { peer } => {
tracing::debug!(%peer, "ExpectPeerConnection ignored in testing impl");
NodeEvent::ExpectPeerConnection { peer, courtesy } => {
tracing::debug!(%peer, courtesy, "ExpectPeerConnection ignored in testing impl");
continue;
}
},
Expand Down
48 changes: 30 additions & 18 deletions crates/core/src/operations/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ pub(crate) trait RelayContext {
fn self_location(&self) -> &PeerKeyLocation;

/// Determine whether we should accept the joiner immediately.
fn should_accept(&self, joiner: &PeerKeyLocation) -> bool;
fn should_accept(&self, joiner: &PeerKeyLocation, courtesy: bool) -> bool;

/// Choose the next hop for the request, avoiding peers already visited.
fn select_next_hop(
Expand All @@ -180,10 +180,16 @@ pub(crate) trait RelayContext {
}

/// Result of processing a request at a relay.
#[derive(Debug, Clone)]
pub(crate) struct ExpectedConnection {
pub peer: PeerKeyLocation,
pub courtesy: bool,
}

#[derive(Debug, Default)]
pub(crate) struct RelayActions {
pub accept_response: Option<ConnectResponse>,
pub expect_connection_from: Option<PeerKeyLocation>,
pub expect_connection_from: Option<ExpectedConnection>,
pub forward: Option<(PeerKeyLocation, ConnectRequest)>,
pub observed_address: Option<(PeerKeyLocation, SocketAddr)>,
}
Expand Down Expand Up @@ -212,16 +218,20 @@ impl RelayState {
}
}

if !self.accepted_locally && ctx.should_accept(&self.request.joiner) {
let acceptor = ctx.self_location().clone();
let courtesy = ctx.courtesy_hint(&acceptor, &self.request.joiner);

if !self.accepted_locally && ctx.should_accept(&self.request.joiner, courtesy) {
self.accepted_locally = true;
let acceptor = ctx.self_location().clone();
let courtesy = ctx.courtesy_hint(&acceptor, &self.request.joiner);
self.courtesy_hint = courtesy;
actions.accept_response = Some(ConnectResponse {
acceptor: acceptor.clone(),
courtesy,
});
actions.expect_connection_from = Some(self.request.joiner.clone());
actions.expect_connection_from = Some(ExpectedConnection {
peer: self.request.joiner.clone(),
courtesy,
});
}

if self.forwarded_to.is_none() && self.request.ttl > 0 {
Expand Down Expand Up @@ -276,14 +286,14 @@ impl RelayContext for RelayEnv<'_> {
&self.self_location
}

fn should_accept(&self, joiner: &PeerKeyLocation) -> bool {
fn should_accept(&self, joiner: &PeerKeyLocation, courtesy: bool) -> bool {
let location = joiner
.location
.unwrap_or_else(|| Location::from_address(&joiner.peer.addr));
self.op_manager
.ring
.connection_manager
.should_accept(location, &joiner.peer)
.should_accept(location, &joiner.peer, courtesy)
}

fn select_next_hop(
Expand All @@ -300,10 +310,7 @@ impl RelayContext for RelayEnv<'_> {
}

fn courtesy_hint(&self, _acceptor: &PeerKeyLocation, _joiner: &PeerKeyLocation) -> bool {
// Courtesy slots still piggyback on regular connections. Flag the first acceptance so the
// joiner can prioritise it, and keep the logic simple until dedicated courtesy tracking
// is wired in (see courtesy-connection-budget branch).
self.op_manager.ring.open_connections() == 0
self.op_manager.ring.is_gateway()
}
}

Expand Down Expand Up @@ -594,10 +601,11 @@ impl Operation for ConnectOp {
.await?;
}

if let Some(peer) = actions.expect_connection_from {
if let Some(expected) = actions.expect_connection_from {
op_manager
.notify_node_event(NodeEvent::ExpectPeerConnection {
peer: peer.peer.clone(),
peer: expected.peer.peer.clone(),
courtesy: expected.courtesy,
})
.await?;
}
Expand Down Expand Up @@ -656,6 +664,7 @@ impl Operation for ConnectOp {
.notify_node_event(
crate::message::NodeEvent::ExpectPeerConnection {
peer: new_acceptor.peer.peer.clone(),
courtesy: new_acceptor.courtesy,
},
)
.await?;
Expand Down Expand Up @@ -783,7 +792,7 @@ pub(crate) async fn join_ring_request(
if !op_manager
.ring
.connection_manager
.should_accept(location, &gateway.peer)
.should_accept(location, &gateway.peer, false)
{
return Err(OpError::ConnError(ConnectionError::UnwantedConnection));
}
Expand Down Expand Up @@ -994,7 +1003,7 @@ mod tests {
&self.self_loc
}

fn should_accept(&self, _joiner: &PeerKeyLocation) -> bool {
fn should_accept(&self, _joiner: &PeerKeyLocation, _courtesy: bool) -> bool {
self.accept
}

Expand Down Expand Up @@ -1045,7 +1054,10 @@ mod tests {
let response = actions.accept_response.expect("expected acceptance");
assert_eq!(response.acceptor.peer, self_loc.peer);
assert!(response.courtesy);
assert_eq!(actions.expect_connection_from.unwrap().peer, joiner.peer);
assert_eq!(
actions.expect_connection_from.unwrap().peer.peer,
joiner.peer
);
assert!(actions.forward.is_none());
}

Expand Down Expand Up @@ -1215,6 +1227,6 @@ mod tests {
let expect_conn = accept_actions
.expect_connection_from
.expect("acceptance should request inbound connection from joiner");
assert_eq!(expect_conn.peer, joiner.peer);
assert_eq!(expect_conn.peer, joiner);
}
}
11 changes: 11 additions & 0 deletions crates/core/src/operations/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ pub(crate) async fn request_get(
get_op: GetOp,
skip_list: HashSet<PeerId>,
) -> Result<(), OpError> {
let mut skip_list = skip_list;
// Always avoid bouncing straight back to ourselves.
skip_list.insert(
op_manager
.ring
.connection_manager
.own_location()
.peer
.clone(),
);
let (mut candidates, id, key_val, _fetch_contract) = if let Some(GetState::PrepareRequest {
key,
id,
Expand Down Expand Up @@ -1271,6 +1281,7 @@ async fn try_forward_or_return(

let mut new_skip_list = skip_list.clone();
new_skip_list.insert(this_peer.peer.clone());
new_skip_list.insert(sender.peer.clone());

let new_htl = htl.saturating_sub(1);

Expand Down
Loading
Loading