Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ result
crates/client-ffi/client_ffi.h

# Compiled C FFI example binary
examples/c-ffi/c-client
crates/client-ffi/examples/message-exchange/c-client
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

99 changes: 62 additions & 37 deletions bin/chat-cli/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::mpsc;

use anyhow::Result;
use arboard::Clipboard;
use logos_chat::{ChatClient, ConversationIdOwned, DeliveryService};
use logos_chat::{ChatClient, ConversationIdOwned, DeliveryService, Event};
use serde::{Deserialize, Serialize};

use crate::utils::now;
Expand Down Expand Up @@ -43,7 +43,7 @@ pub struct AppState {

pub struct ChatApp<D: DeliveryService> {
pub client: ChatClient<D>,
inbound: mpsc::Receiver<Vec<u8>>,
events: mpsc::Receiver<Event>,
pub state: AppState,
/// Ephemeral command output — not persisted, cleared on chat switch.
command_output: Vec<DisplayMessage>,
Expand All @@ -56,7 +56,7 @@ pub struct ChatApp<D: DeliveryService> {
impl<D: DeliveryService + 'static> ChatApp<D> {
pub fn new(
client: ChatClient<D>,
inbound: mpsc::Receiver<Vec<u8>>,
events: mpsc::Receiver<Event>,
user_name: &str,
data_dir: &Path,
) -> Result<Self> {
Expand All @@ -76,7 +76,7 @@ impl<D: DeliveryService + 'static> ChatApp<D> {

Ok(Self {
client,
inbound,
events,
state,
command_output: Vec::new(),
input: String::new(),
Expand Down Expand Up @@ -142,41 +142,59 @@ impl<D: DeliveryService + 'static> ChatApp<D> {
}

pub fn process_incoming(&mut self) -> Result<()> {
while let Ok(payload) = self.inbound.try_recv() {
match self.client.receive(&payload) {
Ok(Some(content)) => {
let chat_id = &content.conversation_id;

if !self.state.chats.contains_key(chat_id) && content.is_new_convo {
let session = ChatSession {
chat_id: chat_id.clone(),
nickname: None,
messages: Vec::new(),
};
self.state.chats.insert(chat_id.clone(), session);
let label = chat_id[..8.min(chat_id.len())].to_string();
self.set_active_chat(Some(chat_id.clone()));
self.status = format!("New chat ({label})! Use /nickname to name it.");
}

if !content.data.is_empty() {
let text = String::from_utf8_lossy(&content.data).to_string();
if let Some(session) = self.state.chats.get_mut(chat_id) {
session.messages.push(DisplayMessage {
from_self: false,
content: text,
timestamp: now(),
});
}
}
let mut had_event = false;
while let Ok(event) = self.events.try_recv() {
self.handle_event(event);
had_event = true;
}
if had_event {
self.save_state()?;
}
Ok(())
}

self.save_state()?;
fn handle_event(&mut self, event: Event) {
match event {
Event::ConversationStarted {
conversation_id, ..
} => {
let chat_id = conversation_id.to_string();
if !self.state.chats.contains_key(&chat_id) {
let session = ChatSession {
chat_id: chat_id.clone(),
nickname: None,
messages: Vec::new(),
};
self.state.chats.insert(chat_id.clone(), session);
let label = chat_id[..8.min(chat_id.len())].to_string();
self.set_active_chat(Some(chat_id));
self.status = format!("New chat ({label})! Use /nickname to name it.");
}
}
Event::MessageReceived {
conversation_id,
data,
..
} => {
if data.is_empty() {
return;
}
let chat_id = conversation_id.as_ref();
let text = String::from_utf8_lossy(&data).to_string();
if let Some(session) = self.state.chats.get_mut(chat_id) {
session.messages.push(DisplayMessage {
from_self: false,
content: text,
timestamp: now(),
});
}
Ok(None) => {}
Err(e) => tracing::warn!("receive error: {e:?}"),
}
Event::DeliveryFailed { reason, .. } => {
tracing::warn!("delivery failed: {reason:?}");
self.status = format!("Delivery failed ({reason:?}); peer may not receive.");
}
_ => {}
}
Ok(())
}

pub fn send_message(&mut self, content: &str) -> Result<()> {
Expand All @@ -188,9 +206,13 @@ impl<D: DeliveryService + 'static> ChatApp<D> {

let convo_id: ConversationIdOwned = chat_id.as_str().into();

self.client
let events = self
.client
.send_message(&convo_id, content.as_bytes())
.map_err(|e| anyhow::anyhow!("{e:?}"))?;
for event in events {
self.handle_event(event);
}

if let Some(session) = self.state.chats.get_mut(&chat_id) {
session.messages.push(DisplayMessage {
Expand Down Expand Up @@ -253,10 +275,13 @@ impl<D: DeliveryService + 'static> ChatApp<D> {
return Ok(Some("Usage: /connect <bundle>".to_string()));
}
let initial = format!("Hello from {}!", self.user_name);
let convo_id = self
let (convo_id, events) = self
.client
.create_conversation(args.as_bytes(), initial.as_bytes())
.map_err(|e| anyhow::anyhow!("{e:?}"))?;
for event in events {
self.handle_event(event);
}

let chat_id = convo_id.to_string();
let label = chat_id[..8.min(chat_id.len())].to_string();
Expand Down
85 changes: 7 additions & 78 deletions bin/chat-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ mod ui;
mod utils;

use std::path::{Path, PathBuf};
use std::sync::mpsc;

use anyhow::{Context, Result};
use clap::{Parser, ValueEnum};
Expand Down Expand Up @@ -66,9 +65,9 @@ fn main() -> Result<()> {
match cli.transport {
TransportKind::File => {
let transport_dir = cli.data.join("transport");
let (transport, inbound) = transport::file::FileTransport::new(&transport_dir)
let transport = transport::file::FileTransport::new(&transport_dir)
.context("failed to create file transport")?;
run(transport, inbound, &cli)
run(transport, &cli)
}
#[cfg(logos_delivery)]
TransportKind::LogosDelivery => {
Expand All @@ -82,20 +81,15 @@ fn main() -> Result<()> {
tcp_port: cli.port,
..Default::default()
};
let (transport, inbound) =
Service::start(cfg).context("failed to start logos-delivery")?;
let transport = Service::start(cfg).context("failed to start logos-delivery")?;

println!("Node connected. Initializing chat client...");
run(transport, inbound, &cli)
run(transport, &cli)
}
}
}

fn run<D: DeliveryService + 'static>(
transport: D,
inbound: mpsc::Receiver<Vec<u8>>,
cli: &Cli,
) -> Result<()> {
fn run<D: DeliveryService + 'static>(transport: D, cli: &Cli) -> Result<()> {
let db_path = cli
.db
.clone()
Expand All @@ -105,7 +99,7 @@ fn run<D: DeliveryService + 'static>(
.context("db path contains non-UTF-8 characters")?
.to_string();

let client = logos_chat::ChatClient::open(
let (client, events) = logos_chat::ChatClient::open(
cli.name.clone(),
logos_chat::StorageConfig::Encrypted {
path: db_str,
Expand All @@ -116,7 +110,7 @@ fn run<D: DeliveryService + 'static>(
.map_err(|e| anyhow::anyhow!("{e:?}"))
.context("failed to open chat client")?;

let mut app = ChatApp::new(client, inbound, &cli.name, &cli.data)?;
let mut app = ChatApp::new(client, events, &cli.name, &cli.data)?;

if cli.smoketest {
return Ok(());
Expand All @@ -128,71 +122,6 @@ fn run<D: DeliveryService + 'static>(
result
}

#[cfg_attr(not(logos_delivery), allow(dead_code, unused_variables))]
fn run_logos_delivery(cli: Cli) -> Result<()> {
#[cfg(logos_delivery)]
{
use transport::logos_delivery::{Config, Service};

eprintln!("Starting logos-delivery node (preset={})...", cli.preset);
eprintln!("This may take a few seconds while connecting to the network.");

let logos_cfg = Config {
preset: cli.preset.clone(),
tcp_port: cli.port,
..Default::default()
};
let (delivery, inbound) =
Service::start(logos_cfg).context("failed to start logos-delivery")?;

eprintln!("Node connected. Initializing chat client...");

let data_dir = cli
.db
.as_ref()
.and_then(|p| p.parent())
.map(|p| p.to_path_buf())
.unwrap_or_else(|| cli.data.clone());

let client = match cli.db {
Some(ref path) => {
let db_str = path
.to_str()
.context("db path contains non-UTF-8 characters")?
.to_string();
logos_chat::ChatClient::open(
cli.name.clone(),
logos_chat::StorageConfig::Encrypted {
path: db_str,
key: "chat-cli".to_string(),
},
delivery,
)
.map_err(|e| anyhow::anyhow!("{e:?}"))
.context("failed to open persistent client")?
}
None => logos_chat::ChatClient::new(cli.name.clone(), delivery),
};

let mut app = ChatApp::new(client, inbound, &cli.name, &data_dir)?;

if cli.smoketest {
return Ok(());
}

let mut terminal = ui::init().context("failed to initialize terminal")?;
let result = run_app(&mut terminal, &mut app);
ui::restore().context("failed to restore terminal")?;
return result;
}

#[cfg(not(logos_delivery))]
anyhow::bail!(
"logos-delivery transport is not available in this build.\n\
Build with LOGOS_DELIVERY_LIB_DIR set to enable it."
)
}

fn run_app<D: DeliveryService + 'static>(
terminal: &mut ui::Tui,
app: &mut ChatApp<D>,
Expand Down
29 changes: 16 additions & 13 deletions bin/chat-cli/src/transport/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ use std::collections::BTreeMap;
use std::fs::{self, File, OpenOptions};
use std::io::{self, BufReader, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::mpsc;
use std::sync::{Mutex, mpsc};
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use logos_chat::{AddressedEnvelope, DeliveryService};
use logos_chat::{AddressedEnvelope, DeliveryService, drain_inbound};

#[derive(Debug, thiserror::Error)]
pub enum FileTransportError {
Expand All @@ -17,16 +17,17 @@ pub enum FileTransportError {
#[derive(Debug)]
pub struct FileTransport {
transport_dir: PathBuf,
inbound: Mutex<mpsc::Receiver<Vec<u8>>>,
}

impl FileTransport {
/// All instances pointing at the same `transport_dir` share one broadcast bus.
///
/// Messages are written to `{transport_dir}/{delivery_address}/{hours_since_epoch}.bin`
/// as length-prefixed frames (`[u32 BE length][payload bytes]`). The background
/// as length-prefixed frames (`[u32 BE length][payload bytes]`). A background
/// thread reads all files under `transport_dir` and forwards every frame to
/// the returned channel; `client.receive()` discards frames it cannot decrypt.
pub fn new(transport_dir: &Path) -> io::Result<(Self, mpsc::Receiver<Vec<u8>>)> {
/// the internal inbound channel; the client discards frames it cannot decrypt.
pub fn new(transport_dir: &Path) -> io::Result<Self> {
fs::create_dir_all(transport_dir)?;

let (tx, rx) = mpsc::sync_channel(1024);
Expand All @@ -36,19 +37,17 @@ impl FileTransport {
.name("file-transport".into())
.spawn(move || poll_reader(dir, tx))?;

Ok((
Self {
transport_dir: transport_dir.to_path_buf(),
},
rx,
))
Ok(Self {
transport_dir: transport_dir.to_path_buf(),
inbound: Mutex::new(rx),
})
}
}

impl DeliveryService for FileTransport {
type Error = FileTransportError;

fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), FileTransportError> {
fn publish(&self, envelope: AddressedEnvelope) -> Result<(), FileTransportError> {
let addr_dir = self.transport_dir.join(&envelope.delivery_address);
fs::create_dir_all(&addr_dir)?;

Expand All @@ -62,10 +61,14 @@ impl DeliveryService for FileTransport {
Ok(())
}

fn subscribe(&mut self, _delivery_address: &str) -> Result<(), Self::Error> {
fn subscribe(&self, _delivery_address: &str) -> Result<(), Self::Error> {
// FileTransport does not support filtering
Ok(())
}

fn pull(&self) -> Vec<Vec<u8>> {
drain_inbound(&self.inbound)
}
}

/// Hours since Unix epoch — used as the rolling filename.
Expand Down
Loading
Loading