Skip to content

Commit dd61743

Browse files
authored
Merge pull request #2 from seun-ja/1-move-to-multiaddr
1 move to multiaddr
2 parents 8270390 + 33b8f71 commit dd61743

File tree

12 files changed

+3564
-233
lines changed

12 files changed

+3564
-233
lines changed

Cargo.lock

Lines changed: 3318 additions & 186 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,20 @@ version = "0.1.0"
44
edition = "2024"
55

66
[dependencies]
7-
tokio = { version = "1.21.0", features = ["macros", "rt-multi-thread"] }
7+
tokio = { version = "1.21.0", features = ["full"] }
8+
9+
# P2P
10+
libp2p = { version = "0.55.0", features = [
11+
"noise",
12+
"tcp",
13+
"tokio",
14+
"yamux",
15+
"quic",
16+
"request-response",
17+
"cbor",
18+
"gossipsub",
19+
"kad",
20+
] }
821

922
# Serde
1023
serde = { version = "^1", features = ["derive"] }

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,5 @@
33
# Tasks
44

55
- [ ] Consensus Algorithm
6-
- [ ] Networking
6+
- [x] Networking
77
- [ ] Temp Storage

src/cli.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,17 @@ use clap_derive::ValueEnum;
66
#[derive(Parser)]
77
#[command(version, about, long_about = None)]
88
pub struct Args {
9-
/// Node's port
10-
#[arg(short, long, default_value_t = 8000)]
11-
pub port: u16,
12-
139
/// Node's role
1410
#[arg(short, long, default_value_t = Role::Sender)]
1511
pub role: Role,
1612

17-
/// Node's address
18-
#[arg(short, long, default_value_t = String::from("127.0.0.1:8000"))]
19-
pub address: String,
13+
/// Peer's address
14+
#[arg(short)]
15+
pub peer_address: Option<String>,
16+
17+
/// Bootstrap Nodes
18+
#[arg(short)]
19+
pub bootstrap: Option<String>,
2020
}
2121

2222
#[derive(Clone, Debug, ValueEnum)]

src/comms/message.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::fmt::Display;
22

33
use serde::Serialize;
44

5-
#[derive(Debug, Serialize)]
5+
#[derive(Debug, Serialize, PartialEq, Eq)]
66
pub enum Message {
77
RememeberMe,
88
Comms(String),

src/comms/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,8 @@
1+
use libp2p::{Multiaddr, PeerId};
2+
13
pub mod message;
4+
5+
pub struct Peer {
6+
_id: PeerId,
7+
_addr: Multiaddr,
8+
}

src/lib.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,27 @@
11
pub mod cli;
22
pub mod comms;
3+
pub mod network;
34
pub mod storage;
45

56
pub mod tracing {
6-
use tracing_subscriber::{EnvFilter, layer::SubscriberExt as _};
7+
use tracing_subscriber::{
8+
EnvFilter,
9+
fmt::{self, format::FmtSpan},
10+
layer::SubscriberExt as _,
11+
};
712

813
/// Initializes the tracing subscriber with the given environment filter.
914
pub fn init(env_filter: &str) {
15+
// Formats logs with for stdout
16+
let fmt_layer = fmt::layer()
17+
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
18+
.pretty();
19+
1020
let env_filter = EnvFilter::from(env_filter);
1121
let subscriber = tracing_subscriber::registry()
1222
.with(env_filter)
13-
.with(tracing_subscriber::fmt::layer());
23+
.with(fmt_layer);
24+
1425
tracing::subscriber::set_global_default(subscriber)
1526
.expect("Failed to set global default subscriber");
1627
}

src/main.rs

Lines changed: 50 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,69 @@
1-
use std::{
2-
io::{self, Read, Write as _},
3-
net::{Ipv4Addr, SocketAddrV4, TcpListener, TcpStream},
4-
};
1+
use std::{error::Error, net::Ipv4Addr, time::Duration};
52

63
use clap::Parser as _;
4+
use libp2p::{
5+
Multiaddr,
6+
gossipsub::{self, AllowAllSubscriptionFilter, Config, IdentityTransform, MessageAuthenticity},
7+
kad::{self, store::MemoryStore},
8+
multiaddr::Protocol,
9+
noise, tcp, yamux,
10+
};
711
use peer_node::{
8-
cli::{Args, Role},
9-
comms::message::Message,
12+
cli::Args,
13+
network::{
14+
behaviour::PeerBehavior,
15+
event::{Topic, event_runner},
16+
},
1017
};
1118

1219
#[tokio::main]
13-
async fn main() -> Result<(), std::io::Error> {
20+
async fn main() -> Result<(), Box<dyn Error>> {
1421
peer_node::tracing::init("info");
1522
let args = Args::parse();
1623

17-
let address = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), args.port);
18-
let listerner = TcpListener::bind(address)?;
24+
let ip_addr = Ipv4Addr::new(0, 0, 0, 0);
1925

20-
tracing::info!("Node address: {}", address);
21-
tracing::info!("A {}", args.role);
26+
let peer_multi_addr = Multiaddr::from(ip_addr).with(Protocol::Tcp(0));
2227

23-
match args.role {
24-
Role::Receiver => loop {
25-
for mut incoming_stream in listerner.incoming().flatten() {
26-
let mut msg = [0; 5];
27-
let _byte_count = incoming_stream.read(&mut msg)?;
28+
tracing::info!("A {}", args.role);
2829

29-
let msg: Message = String::from_utf8_lossy(&msg).trim().to_string().into();
30+
let mut swarm = libp2p::SwarmBuilder::with_new_identity()
31+
.with_tokio()
32+
.with_tcp(
33+
tcp::Config::default(),
34+
noise::Config::new,
35+
yamux::Config::default,
36+
)?
37+
.with_behaviour(|keypair| {
38+
let peer_id = keypair.public().to_peer_id();
39+
let store = MemoryStore::new(peer_id);
40+
let kademlia = kad::Behaviour::new(peer_id, store);
3041

31-
// If it's a rememberMe, store to some DHT and if Comms: Act as instructed
42+
let gossipsub: gossipsub::Behaviour<IdentityTransform, AllowAllSubscriptionFilter> =
43+
gossipsub::Behaviour::new(
44+
MessageAuthenticity::Signed(keypair.clone()),
45+
Config::default(),
46+
)
47+
.expect("Gossipsub initiation fails");
3248

33-
tracing::info!("Message received: {msg:?}");
49+
PeerBehavior {
50+
kademlia,
51+
gossipsub,
3452
}
35-
},
36-
Role::Sender => {
37-
let mut msg = String::new();
38-
io::stdin().read_line(&mut msg)?;
39-
40-
tracing::info!("Sending: {msg}");
41-
42-
let mut outgoing_stream = TcpStream::connect(args.address)?;
43-
44-
let msg: Message = msg.into();
53+
})?
54+
.with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX)))
55+
.build();
4556

46-
outgoing_stream.write_all(msg.to_string().as_bytes())?;
57+
let topic = gossipsub::IdentTopic::new("peer-network");
58+
swarm.behaviour_mut().gossipsub.subscribe(&topic)?;
4759

48-
// Wait for the message to be sent before exiting
49-
outgoing_stream.flush()?;
60+
swarm.listen_on(peer_multi_addr)?;
5061

51-
Ok(())
52-
}
53-
}
62+
event_runner(
63+
swarm,
64+
args.role,
65+
args.peer_address,
66+
Topic(topic.to_string()),
67+
)
68+
.await
5469
}

src/network/behaviour.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
use libp2p::swarm::NetworkBehaviour;
2+
use libp2p::{gossipsub, kad};
3+
4+
#[derive(NetworkBehaviour)]
5+
pub struct PeerBehavior {
6+
pub kademlia: kad::Behaviour<kad::store::MemoryStore>,
7+
pub gossipsub: gossipsub::Behaviour,
8+
}

src/network/event.rs

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
use std::error::Error;
2+
3+
use libp2p::{
4+
Multiaddr, Swarm,
5+
futures::StreamExt as _,
6+
gossipsub::{self, TopicHash},
7+
kad,
8+
swarm::SwarmEvent,
9+
};
10+
use tokio::{
11+
io::{self, AsyncBufReadExt as _},
12+
select,
13+
};
14+
15+
use crate::{cli::Role, comms::message::Message};
16+
17+
use super::behaviour::{PeerBehavior, PeerBehaviorEvent};
18+
19+
pub async fn event_runner(
20+
mut swarm: Swarm<PeerBehavior>,
21+
role: Role,
22+
peer_address: Option<String>,
23+
topic: Topic,
24+
) -> Result<(), Box<dyn Error>> {
25+
match role {
26+
Role::Receiver => loop {
27+
if let Some(event) = swarm.next().await {
28+
match event {
29+
SwarmEvent::NewListenAddr {
30+
listener_id,
31+
address,
32+
} => {
33+
tracing::info!("Listening with listener {listener_id} on {address}");
34+
}
35+
SwarmEvent::IncomingConnection {
36+
connection_id,
37+
local_addr,
38+
send_back_addr,
39+
} => {
40+
tracing::info!(
41+
"Incoming connection {connection_id} from {local_addr} to {send_back_addr}"
42+
);
43+
}
44+
SwarmEvent::ConnectionClosed {
45+
peer_id,
46+
connection_id,
47+
endpoint,
48+
num_established,
49+
cause,
50+
} => {
51+
tracing::info!(
52+
"Connection {connection_id} with peer {peer_id} closed, endpoint: {endpoint:?}, num_established: {num_established}, cause: {cause:?}"
53+
);
54+
}
55+
SwarmEvent::Behaviour(PeerBehaviorEvent::Gossipsub(
56+
gossipsub::Event::Message {
57+
propagation_source,
58+
message,
59+
..
60+
},
61+
)) => {
62+
let message: Message =
63+
String::from_utf8_lossy(&message.data).to_string().into();
64+
65+
tracing::info!(
66+
"Got a message: {message} from PeerId: {propagation_source}",
67+
);
68+
}
69+
SwarmEvent::Behaviour(PeerBehaviorEvent::Kademlia(
70+
kad::Event::InboundRequest { request },
71+
)) => {
72+
tracing::info!("{request:?}")
73+
}
74+
_ => {}
75+
}
76+
}
77+
},
78+
Role::Sender => {
79+
if let Some(addr) = peer_address {
80+
let peer_addr: Multiaddr = addr.parse()?;
81+
if let Err(err) = swarm.dial(peer_addr) {
82+
tracing::error!("Dialing peer address: {} fails. reason: {}", addr, err);
83+
}
84+
} else {
85+
tracing::warn!("No peer address provided");
86+
}
87+
88+
let mut stdin = io::BufReader::new(io::stdin()).lines();
89+
loop {
90+
select! {
91+
Ok(Some(line)) = stdin.next_line() => {
92+
if let Err(e) = swarm
93+
.behaviour_mut().gossipsub
94+
.publish(topic.clone(), line.as_bytes()) {
95+
tracing::warn!("Publish error: {e:?}");
96+
}
97+
}
98+
event = swarm.select_next_some() => match event {
99+
SwarmEvent::NewListenAddr { address, .. } => {
100+
tracing::info!("Listening on {address:?}")
101+
}
102+
SwarmEvent::Behaviour(PeerBehaviorEvent::Gossipsub(
103+
gossipsub::Event::Message {
104+
propagation_source,
105+
message,
106+
..
107+
},
108+
)) => {
109+
let message: Message =
110+
String::from_utf8_lossy(&message.data).to_string().into();
111+
112+
tracing::info!(
113+
"Got a message: {message} from PeerId: {propagation_source}",
114+
);
115+
}
116+
SwarmEvent::ConnectionClosed {
117+
peer_id,
118+
connection_id,
119+
endpoint,
120+
num_established,
121+
cause,
122+
} => {
123+
tracing::info!(
124+
"Connection {connection_id} with peer {peer_id} closed, endpoint: {endpoint:?}, num_established: {num_established}, cause: {cause:?}"
125+
);
126+
}
127+
_ => {}
128+
}
129+
}
130+
}
131+
}
132+
}
133+
}
134+
135+
#[derive(Clone)]
136+
pub struct Topic(pub String);
137+
138+
impl From<Topic> for TopicHash {
139+
fn from(val: Topic) -> Self {
140+
TopicHash::from_raw(&val.0)
141+
}
142+
}

0 commit comments

Comments
 (0)