Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,504 changes: 3,318 additions & 186 deletions Cargo.lock

Large diffs are not rendered by default.

15 changes: 14 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,20 @@ version = "0.1.0"
edition = "2024"

[dependencies]
tokio = { version = "1.21.0", features = ["macros", "rt-multi-thread"] }
tokio = { version = "1.21.0", features = ["full"] }

# P2P
libp2p = { version = "0.55.0", features = [
"noise",
"tcp",
"tokio",
"yamux",
"quic",
"request-response",
"cbor",
"gossipsub",
"kad",
] }

# Serde
serde = { version = "^1", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
# Tasks

- [ ] Consensus Algorithm
- [ ] Networking
- [x] Networking
- [ ] Temp Storage
14 changes: 7 additions & 7 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@ use clap_derive::ValueEnum;
#[derive(Parser)]
#[command(version, about, long_about = None)]
pub struct Args {
/// Node's port
#[arg(short, long, default_value_t = 8000)]
pub port: u16,

/// Node's role
#[arg(short, long, default_value_t = Role::Sender)]
pub role: Role,

/// Node's address
#[arg(short, long, default_value_t = String::from("127.0.0.1:8000"))]
pub address: String,
/// Peer's address
#[arg(short)]
pub peer_address: Option<String>,

/// Bootstrap Nodes
#[arg(short)]
pub bootstrap: Option<String>,
}

#[derive(Clone, Debug, ValueEnum)]
Expand Down
2 changes: 1 addition & 1 deletion src/comms/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::fmt::Display;

use serde::Serialize;

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, PartialEq, Eq)]
pub enum Message {
RememeberMe,
Comms(String),
Expand Down
7 changes: 7 additions & 0 deletions src/comms/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,8 @@
use libp2p::{Multiaddr, PeerId};

pub mod message;

pub struct Peer {
_id: PeerId,
_addr: Multiaddr,
}
15 changes: 13 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
pub mod cli;
pub mod comms;
pub mod network;
pub mod storage;

pub mod tracing {
use tracing_subscriber::{EnvFilter, layer::SubscriberExt as _};
use tracing_subscriber::{
EnvFilter,
fmt::{self, format::FmtSpan},
layer::SubscriberExt as _,
};

/// Initializes the tracing subscriber with the given environment filter.
pub fn init(env_filter: &str) {
// Formats logs with for stdout
let fmt_layer = fmt::layer()
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
.pretty();

let env_filter = EnvFilter::from(env_filter);
let subscriber = tracing_subscriber::registry()
.with(env_filter)
.with(tracing_subscriber::fmt::layer());
.with(fmt_layer);

tracing::subscriber::set_global_default(subscriber)
.expect("Failed to set global default subscriber");
}
Expand Down
85 changes: 50 additions & 35 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,69 @@
use std::{
io::{self, Read, Write as _},
net::{Ipv4Addr, SocketAddrV4, TcpListener, TcpStream},
};
use std::{error::Error, net::Ipv4Addr, time::Duration};

use clap::Parser as _;
use libp2p::{
Multiaddr,
gossipsub::{self, AllowAllSubscriptionFilter, Config, IdentityTransform, MessageAuthenticity},
kad::{self, store::MemoryStore},
multiaddr::Protocol,
noise, tcp, yamux,
};
use peer_node::{
cli::{Args, Role},
comms::message::Message,
cli::Args,
network::{
behaviour::PeerBehavior,
event::{Topic, event_runner},
},
};

#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
async fn main() -> Result<(), Box<dyn Error>> {
peer_node::tracing::init("info");
let args = Args::parse();

let address = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), args.port);
let listerner = TcpListener::bind(address)?;
let ip_addr = Ipv4Addr::new(0, 0, 0, 0);

tracing::info!("Node address: {}", address);
tracing::info!("A {}", args.role);
let peer_multi_addr = Multiaddr::from(ip_addr).with(Protocol::Tcp(0));

match args.role {
Role::Receiver => loop {
for mut incoming_stream in listerner.incoming().flatten() {
let mut msg = [0; 5];
let _byte_count = incoming_stream.read(&mut msg)?;
tracing::info!("A {}", args.role);

let msg: Message = String::from_utf8_lossy(&msg).trim().to_string().into();
let mut swarm = libp2p::SwarmBuilder::with_new_identity()
.with_tokio()
.with_tcp(
tcp::Config::default(),
noise::Config::new,
yamux::Config::default,
)?
.with_behaviour(|keypair| {
let peer_id = keypair.public().to_peer_id();
let store = MemoryStore::new(peer_id);
let kademlia = kad::Behaviour::new(peer_id, store);

// If it's a rememberMe, store to some DHT and if Comms: Act as instructed
let gossipsub: gossipsub::Behaviour<IdentityTransform, AllowAllSubscriptionFilter> =
gossipsub::Behaviour::new(
MessageAuthenticity::Signed(keypair.clone()),
Config::default(),
)
.expect("Gossipsub initiation fails");

tracing::info!("Message received: {msg:?}");
PeerBehavior {
kademlia,
gossipsub,
}
},
Role::Sender => {
let mut msg = String::new();
io::stdin().read_line(&mut msg)?;

tracing::info!("Sending: {msg}");

let mut outgoing_stream = TcpStream::connect(args.address)?;

let msg: Message = msg.into();
})?
.with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX)))
.build();

outgoing_stream.write_all(msg.to_string().as_bytes())?;
let topic = gossipsub::IdentTopic::new("peer-network");
swarm.behaviour_mut().gossipsub.subscribe(&topic)?;

// Wait for the message to be sent before exiting
outgoing_stream.flush()?;
swarm.listen_on(peer_multi_addr)?;

Ok(())
}
}
event_runner(
swarm,
args.role,
args.peer_address,
Topic(topic.to_string()),
)
.await
}
8 changes: 8 additions & 0 deletions src/network/behaviour.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
use libp2p::swarm::NetworkBehaviour;
use libp2p::{gossipsub, kad};

#[derive(NetworkBehaviour)]
pub struct PeerBehavior {
pub kademlia: kad::Behaviour<kad::store::MemoryStore>,
pub gossipsub: gossipsub::Behaviour,
}
142 changes: 142 additions & 0 deletions src/network/event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
use std::error::Error;

use libp2p::{
Multiaddr, Swarm,
futures::StreamExt as _,
gossipsub::{self, TopicHash},
kad,
swarm::SwarmEvent,
};
use tokio::{
io::{self, AsyncBufReadExt as _},
select,
};

use crate::{cli::Role, comms::message::Message};

use super::behaviour::{PeerBehavior, PeerBehaviorEvent};

pub async fn event_runner(
mut swarm: Swarm<PeerBehavior>,
role: Role,
peer_address: Option<String>,
topic: Topic,
) -> Result<(), Box<dyn Error>> {
match role {
Role::Receiver => loop {
if let Some(event) = swarm.next().await {
match event {
SwarmEvent::NewListenAddr {
listener_id,
address,
} => {
tracing::info!("Listening with listener {listener_id} on {address}");
}
SwarmEvent::IncomingConnection {
connection_id,
local_addr,
send_back_addr,
} => {
tracing::info!(
"Incoming connection {connection_id} from {local_addr} to {send_back_addr}"
);
}
SwarmEvent::ConnectionClosed {
peer_id,
connection_id,
endpoint,
num_established,
cause,
} => {
tracing::info!(
"Connection {connection_id} with peer {peer_id} closed, endpoint: {endpoint:?}, num_established: {num_established}, cause: {cause:?}"
);
}
SwarmEvent::Behaviour(PeerBehaviorEvent::Gossipsub(
gossipsub::Event::Message {
propagation_source,
message,
..
},
)) => {
let message: Message =
String::from_utf8_lossy(&message.data).to_string().into();

tracing::info!(
"Got a message: {message} from PeerId: {propagation_source}",
);
}
SwarmEvent::Behaviour(PeerBehaviorEvent::Kademlia(
kad::Event::InboundRequest { request },
)) => {
tracing::info!("{request:?}")
}
_ => {}
}
}
},
Role::Sender => {
if let Some(addr) = peer_address {
let peer_addr: Multiaddr = addr.parse()?;
if let Err(err) = swarm.dial(peer_addr) {
tracing::error!("Dialing peer address: {} fails. reason: {}", addr, err);
}
} else {
tracing::warn!("No peer address provided");
}

let mut stdin = io::BufReader::new(io::stdin()).lines();
loop {
select! {
Ok(Some(line)) = stdin.next_line() => {
if let Err(e) = swarm
.behaviour_mut().gossipsub
.publish(topic.clone(), line.as_bytes()) {
tracing::warn!("Publish error: {e:?}");
}
}
event = swarm.select_next_some() => match event {
SwarmEvent::NewListenAddr { address, .. } => {
tracing::info!("Listening on {address:?}")
}
SwarmEvent::Behaviour(PeerBehaviorEvent::Gossipsub(
gossipsub::Event::Message {
propagation_source,
message,
..
},
)) => {
let message: Message =
String::from_utf8_lossy(&message.data).to_string().into();

tracing::info!(
"Got a message: {message} from PeerId: {propagation_source}",
);
}
SwarmEvent::ConnectionClosed {
peer_id,
connection_id,
endpoint,
num_established,
cause,
} => {
tracing::info!(
"Connection {connection_id} with peer {peer_id} closed, endpoint: {endpoint:?}, num_established: {num_established}, cause: {cause:?}"
);
}
_ => {}
}
}
}
}
}
}

#[derive(Clone)]
pub struct Topic(pub String);

impl From<Topic> for TopicHash {
fn from(val: Topic) -> Self {
TopicHash::from_raw(&val.0)
}
}
2 changes: 2 additions & 0 deletions src/network/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod behaviour;
pub mod event;
1 change: 1 addition & 0 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@