diff --git a/.gitignore b/.gitignore index 9ccb3e10..2df56b0b 100644 --- a/.gitignore +++ b/.gitignore @@ -39,4 +39,4 @@ result crates/client-ffi/client_ffi.h # Compiled C FFI example binary -examples/c-ffi/c-client +crates/client-ffi/examples/message-exchange/c-client diff --git a/Cargo.lock b/Cargo.lock index 20f7ed03..c883fa9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1422,6 +1422,7 @@ dependencies = [ "storage", "tempfile", "thiserror", + "tracing", "x25519-dalek", ] @@ -1742,6 +1743,7 @@ dependencies = [ "libchat", "tempfile", "thiserror", + "tracing", ] [[package]] diff --git a/bin/chat-cli/src/app.rs b/bin/chat-cli/src/app.rs index de9e47b2..98b03771 100644 --- a/bin/chat-cli/src/app.rs +++ b/bin/chat-cli/src/app.rs @@ -5,7 +5,7 @@ use std::sync::mpsc; use anyhow::Result; use arboard::Clipboard; -use logos_chat::{ChatClient, ConversationIdOwned, DeliveryService}; +use logos_chat::{ChatClient, ConversationIdOwned, DeliveryService, Event}; use serde::{Deserialize, Serialize}; use crate::utils::now; @@ -43,7 +43,7 @@ pub struct AppState { pub struct ChatApp { pub client: ChatClient, - inbound: mpsc::Receiver>, + events: mpsc::Receiver, pub state: AppState, /// Ephemeral command output — not persisted, cleared on chat switch. command_output: Vec, @@ -56,7 +56,7 @@ pub struct ChatApp { impl ChatApp { pub fn new( client: ChatClient, - inbound: mpsc::Receiver>, + events: mpsc::Receiver, user_name: &str, data_dir: &Path, ) -> Result { @@ -76,7 +76,7 @@ impl ChatApp { Ok(Self { client, - inbound, + events, state, command_output: Vec::new(), input: String::new(), @@ -142,41 +142,59 @@ impl ChatApp { } pub fn process_incoming(&mut self) -> Result<()> { - while let Ok(payload) = self.inbound.try_recv() { - match self.client.receive(&payload) { - Ok(Some(content)) => { - let chat_id = &content.conversation_id; - - if !self.state.chats.contains_key(chat_id) && content.is_new_convo { - let session = ChatSession { - chat_id: chat_id.clone(), - nickname: None, - messages: Vec::new(), - }; - self.state.chats.insert(chat_id.clone(), session); - let label = chat_id[..8.min(chat_id.len())].to_string(); - self.set_active_chat(Some(chat_id.clone())); - self.status = format!("New chat ({label})! Use /nickname to name it."); - } - - if !content.data.is_empty() { - let text = String::from_utf8_lossy(&content.data).to_string(); - if let Some(session) = self.state.chats.get_mut(chat_id) { - session.messages.push(DisplayMessage { - from_self: false, - content: text, - timestamp: now(), - }); - } - } + let mut had_event = false; + while let Ok(event) = self.events.try_recv() { + self.handle_event(event); + had_event = true; + } + if had_event { + self.save_state()?; + } + Ok(()) + } - self.save_state()?; + fn handle_event(&mut self, event: Event) { + match event { + Event::ConversationStarted { + conversation_id, .. + } => { + let chat_id = conversation_id.to_string(); + if !self.state.chats.contains_key(&chat_id) { + let session = ChatSession { + chat_id: chat_id.clone(), + nickname: None, + messages: Vec::new(), + }; + self.state.chats.insert(chat_id.clone(), session); + let label = chat_id[..8.min(chat_id.len())].to_string(); + self.set_active_chat(Some(chat_id)); + self.status = format!("New chat ({label})! Use /nickname to name it."); + } + } + Event::MessageReceived { + conversation_id, + data, + .. + } => { + if data.is_empty() { + return; + } + let chat_id = conversation_id.as_ref(); + let text = String::from_utf8_lossy(&data).to_string(); + if let Some(session) = self.state.chats.get_mut(chat_id) { + session.messages.push(DisplayMessage { + from_self: false, + content: text, + timestamp: now(), + }); } - Ok(None) => {} - Err(e) => tracing::warn!("receive error: {e:?}"), } + Event::DeliveryFailed { reason, .. } => { + tracing::warn!("delivery failed: {reason:?}"); + self.status = format!("Delivery failed ({reason:?}); peer may not receive."); + } + _ => {} } - Ok(()) } pub fn send_message(&mut self, content: &str) -> Result<()> { @@ -188,9 +206,13 @@ impl ChatApp { let convo_id: ConversationIdOwned = chat_id.as_str().into(); - self.client + let events = self + .client .send_message(&convo_id, content.as_bytes()) .map_err(|e| anyhow::anyhow!("{e:?}"))?; + for event in events { + self.handle_event(event); + } if let Some(session) = self.state.chats.get_mut(&chat_id) { session.messages.push(DisplayMessage { @@ -253,10 +275,13 @@ impl ChatApp { return Ok(Some("Usage: /connect ".to_string())); } let initial = format!("Hello from {}!", self.user_name); - let convo_id = self + let (convo_id, events) = self .client .create_conversation(args.as_bytes(), initial.as_bytes()) .map_err(|e| anyhow::anyhow!("{e:?}"))?; + for event in events { + self.handle_event(event); + } let chat_id = convo_id.to_string(); let label = chat_id[..8.min(chat_id.len())].to_string(); diff --git a/bin/chat-cli/src/main.rs b/bin/chat-cli/src/main.rs index 366b1005..0077908f 100644 --- a/bin/chat-cli/src/main.rs +++ b/bin/chat-cli/src/main.rs @@ -4,7 +4,6 @@ mod ui; mod utils; use std::path::{Path, PathBuf}; -use std::sync::mpsc; use anyhow::{Context, Result}; use clap::{Parser, ValueEnum}; @@ -66,9 +65,9 @@ fn main() -> Result<()> { match cli.transport { TransportKind::File => { let transport_dir = cli.data.join("transport"); - let (transport, inbound) = transport::file::FileTransport::new(&transport_dir) + let transport = transport::file::FileTransport::new(&transport_dir) .context("failed to create file transport")?; - run(transport, inbound, &cli) + run(transport, &cli) } #[cfg(logos_delivery)] TransportKind::LogosDelivery => { @@ -82,20 +81,15 @@ fn main() -> Result<()> { tcp_port: cli.port, ..Default::default() }; - let (transport, inbound) = - Service::start(cfg).context("failed to start logos-delivery")?; + let transport = Service::start(cfg).context("failed to start logos-delivery")?; println!("Node connected. Initializing chat client..."); - run(transport, inbound, &cli) + run(transport, &cli) } } } -fn run( - transport: D, - inbound: mpsc::Receiver>, - cli: &Cli, -) -> Result<()> { +fn run(transport: D, cli: &Cli) -> Result<()> { let db_path = cli .db .clone() @@ -105,7 +99,7 @@ fn run( .context("db path contains non-UTF-8 characters")? .to_string(); - let client = logos_chat::ChatClient::open( + let (client, events) = logos_chat::ChatClient::open( cli.name.clone(), logos_chat::StorageConfig::Encrypted { path: db_str, @@ -116,7 +110,7 @@ fn run( .map_err(|e| anyhow::anyhow!("{e:?}")) .context("failed to open chat client")?; - let mut app = ChatApp::new(client, inbound, &cli.name, &cli.data)?; + let mut app = ChatApp::new(client, events, &cli.name, &cli.data)?; if cli.smoketest { return Ok(()); @@ -128,71 +122,6 @@ fn run( result } -#[cfg_attr(not(logos_delivery), allow(dead_code, unused_variables))] -fn run_logos_delivery(cli: Cli) -> Result<()> { - #[cfg(logos_delivery)] - { - use transport::logos_delivery::{Config, Service}; - - eprintln!("Starting logos-delivery node (preset={})...", cli.preset); - eprintln!("This may take a few seconds while connecting to the network."); - - let logos_cfg = Config { - preset: cli.preset.clone(), - tcp_port: cli.port, - ..Default::default() - }; - let (delivery, inbound) = - Service::start(logos_cfg).context("failed to start logos-delivery")?; - - eprintln!("Node connected. Initializing chat client..."); - - let data_dir = cli - .db - .as_ref() - .and_then(|p| p.parent()) - .map(|p| p.to_path_buf()) - .unwrap_or_else(|| cli.data.clone()); - - let client = match cli.db { - Some(ref path) => { - let db_str = path - .to_str() - .context("db path contains non-UTF-8 characters")? - .to_string(); - logos_chat::ChatClient::open( - cli.name.clone(), - logos_chat::StorageConfig::Encrypted { - path: db_str, - key: "chat-cli".to_string(), - }, - delivery, - ) - .map_err(|e| anyhow::anyhow!("{e:?}")) - .context("failed to open persistent client")? - } - None => logos_chat::ChatClient::new(cli.name.clone(), delivery), - }; - - let mut app = ChatApp::new(client, inbound, &cli.name, &data_dir)?; - - if cli.smoketest { - return Ok(()); - } - - let mut terminal = ui::init().context("failed to initialize terminal")?; - let result = run_app(&mut terminal, &mut app); - ui::restore().context("failed to restore terminal")?; - return result; - } - - #[cfg(not(logos_delivery))] - anyhow::bail!( - "logos-delivery transport is not available in this build.\n\ - Build with LOGOS_DELIVERY_LIB_DIR set to enable it." - ) -} - fn run_app( terminal: &mut ui::Tui, app: &mut ChatApp, diff --git a/bin/chat-cli/src/transport/file.rs b/bin/chat-cli/src/transport/file.rs index 097af473..5b3acdb5 100644 --- a/bin/chat-cli/src/transport/file.rs +++ b/bin/chat-cli/src/transport/file.rs @@ -2,11 +2,11 @@ use std::collections::BTreeMap; use std::fs::{self, File, OpenOptions}; use std::io::{self, BufReader, Read, Seek, SeekFrom, Write}; use std::path::{Path, PathBuf}; -use std::sync::mpsc; +use std::sync::{Mutex, mpsc}; use std::thread; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use logos_chat::{AddressedEnvelope, DeliveryService}; +use logos_chat::{AddressedEnvelope, DeliveryService, drain_inbound}; #[derive(Debug, thiserror::Error)] pub enum FileTransportError { @@ -17,16 +17,17 @@ pub enum FileTransportError { #[derive(Debug)] pub struct FileTransport { transport_dir: PathBuf, + inbound: Mutex>>, } impl FileTransport { /// All instances pointing at the same `transport_dir` share one broadcast bus. /// /// Messages are written to `{transport_dir}/{delivery_address}/{hours_since_epoch}.bin` - /// as length-prefixed frames (`[u32 BE length][payload bytes]`). The background + /// as length-prefixed frames (`[u32 BE length][payload bytes]`). A background /// thread reads all files under `transport_dir` and forwards every frame to - /// the returned channel; `client.receive()` discards frames it cannot decrypt. - pub fn new(transport_dir: &Path) -> io::Result<(Self, mpsc::Receiver>)> { + /// the internal inbound channel; the client discards frames it cannot decrypt. + pub fn new(transport_dir: &Path) -> io::Result { fs::create_dir_all(transport_dir)?; let (tx, rx) = mpsc::sync_channel(1024); @@ -36,19 +37,17 @@ impl FileTransport { .name("file-transport".into()) .spawn(move || poll_reader(dir, tx))?; - Ok(( - Self { - transport_dir: transport_dir.to_path_buf(), - }, - rx, - )) + Ok(Self { + transport_dir: transport_dir.to_path_buf(), + inbound: Mutex::new(rx), + }) } } impl DeliveryService for FileTransport { type Error = FileTransportError; - fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), FileTransportError> { + fn publish(&self, envelope: AddressedEnvelope) -> Result<(), FileTransportError> { let addr_dir = self.transport_dir.join(&envelope.delivery_address); fs::create_dir_all(&addr_dir)?; @@ -62,10 +61,14 @@ impl DeliveryService for FileTransport { Ok(()) } - fn subscribe(&mut self, _delivery_address: &str) -> Result<(), Self::Error> { + fn subscribe(&self, _delivery_address: &str) -> Result<(), Self::Error> { // FileTransport does not support filtering Ok(()) } + + fn pull(&self) -> Vec> { + drain_inbound(&self.inbound) + } } /// Hours since Unix epoch — used as the rolling filename. diff --git a/bin/chat-cli/src/transport/logos_delivery.rs b/bin/chat-cli/src/transport/logos_delivery.rs index 1b6dce44..e3213229 100644 --- a/bin/chat-cli/src/transport/logos_delivery.rs +++ b/bin/chat-cli/src/transport/logos_delivery.rs @@ -18,7 +18,7 @@ use std::time::Duration; use base64::Engine; use base64::engine::general_purpose::STANDARD as BASE64; -use logos_chat::{AddressedEnvelope, DeliveryService}; +use logos_chat::{AddressedEnvelope, DeliveryService, drain_inbound}; use tracing::{error, info, warn}; use wrapper::LogosNodeCtx; @@ -123,12 +123,14 @@ pub struct Service { outbound: mpsc::SyncSender, #[allow(dead_code)] subscribers: SubscriberList, + inbound: Arc>>>, } impl Service { - /// Start the embedded logos-delivery node. Returns the service and a - /// receiver for inbound raw payloads. - pub fn start(cfg: Config) -> Result<(Self, mpsc::Receiver>), DeliveryError> { + /// Start the embedded logos-delivery node. Returns the service handle — + /// inbound payloads are drained via the `DeliveryService::pull` trait + /// method. + pub fn start(cfg: Config) -> Result { let (out_tx, out_rx) = mpsc::sync_channel::(256); let subscribers: SubscriberList = Arc::new(Mutex::new(Vec::new())); let (ready_tx, ready_rx) = mpsc::channel::>(); @@ -167,13 +169,11 @@ impl Service { return Err(e); } - Ok(( - Self { - outbound: out_tx, - subscribers, - }, - inbound_rx, - )) + Ok(Self { + outbound: out_tx, + subscribers, + inbound: Arc::new(Mutex::new(inbound_rx)), + }) } fn node_thread( @@ -281,7 +281,7 @@ impl Service { impl DeliveryService for Service { type Error = DeliveryError; - fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), DeliveryError> { + fn publish(&self, envelope: AddressedEnvelope) -> Result<(), DeliveryError> { let msg = WakuMessage { content_topic: content_topic_for(&envelope.delivery_address), payload: BASE64.encode(&envelope.data), @@ -301,8 +301,12 @@ impl DeliveryService for Service { reply_rx.recv().map_err(|_| DeliveryError::ChannelClosed)? } - fn subscribe(&mut self, _: &str) -> Result<(), ::Error> { + fn subscribe(&self, _: &str) -> Result<(), ::Error> { // This Service does not support filtering Ok(()) } + + fn pull(&self) -> Vec> { + drain_inbound(&self.inbound) + } } diff --git a/core/conversations/Cargo.toml b/core/conversations/Cargo.toml index eff48230..265d63aa 100644 --- a/core/conversations/Cargo.toml +++ b/core/conversations/Cargo.toml @@ -25,6 +25,7 @@ prost = "0.14.1" rand_core = { version = "0.6" } safer-ffi = "0.1.13" thiserror = "2.0.17" +tracing = "0.1" x25519-dalek = { version = "2.0.1", features = ["static_secrets", "reusable_secrets", "getrandom"] } [dev-dependencies] diff --git a/core/conversations/src/context.rs b/core/conversations/src/context.rs index a01b4f7b..f0e5191c 100644 --- a/core/conversations/src/context.rs +++ b/core/conversations/src/context.rs @@ -1,6 +1,4 @@ -use std::cell::{Ref, RefMut}; -use std::sync::Arc; -use std::{cell::RefCell, rc::Rc}; +use std::sync::{Arc, Mutex, MutexGuard}; use crate::account::LogosAccount; use crate::conversation::{Convo, GroupConvo}; @@ -9,10 +7,11 @@ use crate::{DeliveryService, RegistrationService}; use crate::{ conversation::{Conversation, Id, PrivateV1Convo}, errors::ChatError, + event::Event, inbox::Inbox, inbox_v2::InboxV2, proto::{EncryptedPayload, EnvelopeV1, Message}, - types::{AccountId, AddressedEnvelope, ContentData}, + types::AccountId, }; use crypto::{Identity, PublicKey}; use storage::{ChatStore, ConversationKind}; @@ -20,12 +19,17 @@ use storage::{ChatStore, ConversationKind}; pub use crate::conversation::{ConversationId, ConversationIdOwned}; pub use crate::inbox::Introduction; +/// Delivery address used by the legacy PrivateV1 inbox path. Consumers must +/// subscribe to this address to receive private-conversation invitations and +/// follow-up frames. +pub(crate) const PRIVATE_V1_INBOX_ADDRESS: &str = "delivery_address"; + // This is the main entry point to the conversations api. // Ctx manages lifetimes of objects to process and generate payloads. pub struct Context { - identity: Rc, - ds: Rc>, - store: Rc>, + identity: Arc, + ds: Arc, + store: Arc>, inbox: Inbox, pq_inbox: InboxV2, } @@ -48,33 +52,34 @@ where ) -> Result { let name = name.into(); - // Services for sharing with Converastions/Inboxes - let ds = Rc::new(RefCell::new(delivery)); - let contact_registry = Rc::new(RefCell::new(registration)); - let store = Rc::new(RefCell::new(store)); + // Services for sharing with Conversations/Inboxes + let ds = Arc::new(delivery); + let contact_registry = Arc::new(Mutex::new(registration)); + let store = Arc::new(Mutex::new(store)); // Load or create identity - let identity = if let Some(identity) = store.borrow().load_identity()? { + let identity = if let Some(identity) = store.lock().unwrap().load_identity()? { identity } else { let identity = Identity::new(&name); - store.borrow_mut().save_identity(&identity)?; + store.lock().unwrap().save_identity(&identity)?; identity }; - let identity = Rc::new(identity); - let inbox = Inbox::new(Rc::clone(&store), Rc::clone(&identity)); + let identity = Arc::new(identity); + let inbox = Inbox::new(Arc::clone(&store), Arc::clone(&identity)); let pq_inbox = InboxV2::new( LogosAccount::new_test(name), - ds.clone(), + Arc::clone(&ds), contact_registry.clone(), store.clone(), ); - // Subscribe - ds.borrow_mut() - .subscribe(&pq_inbox.delivery_address()) + // Subscribe to both inbox addresses so DS::pull yields their traffic. + ds.subscribe(&pq_inbox.delivery_address()) + .map_err(ChatError::generic)?; + ds.subscribe(PRIVATE_V1_INBOX_ADDRESS) .map_err(ChatError::generic)?; Ok(Self { @@ -98,21 +103,22 @@ where let name = name.into(); let identity = Identity::new(&name); - // Services for sharing with Converastions/Inboxes - let ds = Rc::new(RefCell::new(delivery)); - let contact_registry = Rc::new(RefCell::new(registration)); - let store = Rc::new(RefCell::new(chat_store)); + // Services for sharing with Conversations/Inboxes + let ds = Arc::new(delivery); + let contact_registry = Arc::new(Mutex::new(registration)); + let store = Arc::new(Mutex::new(chat_store)); store - .borrow_mut() + .lock() + .unwrap() .save_identity(&identity) .expect("in-memory storage should not fail"); - let identity = Rc::new(identity); - let inbox = Inbox::new(store.clone(), Rc::clone(&identity)); + let identity = Arc::new(identity); + let inbox = Inbox::new(store.clone(), Arc::clone(&identity)); let mut pq_inbox = InboxV2::new( LogosAccount::new_test(name), - ds.clone(), + Arc::clone(&ds), contact_registry.clone(), store.clone(), ); @@ -120,8 +126,9 @@ where // TODO: (P2) Initialize Account in Context or upper client. pq_inbox.register()?; - ds.borrow_mut() - .subscribe(&pq_inbox.delivery_address()) + ds.subscribe(&pq_inbox.delivery_address()) + .map_err(ChatError::generic)?; + ds.subscribe(PRIVATE_V1_INBOX_ADDRESS) .map_err(ChatError::generic)?; Ok(Self { @@ -133,19 +140,22 @@ where }) } - pub fn ds(&self) -> RefMut<'_, DS> { - self.ds.borrow_mut() + pub fn ds(&self) -> &DS { + &self.ds } - pub fn store(&self) -> Ref<'_, CS> { - self.store.borrow() + pub fn delivery_arc(&self) -> Arc { + Arc::clone(&self.ds) + } + + pub fn store(&self) -> MutexGuard<'_, CS> { + self.store.lock().unwrap() } pub fn identity(&self) -> &Identity { &self.identity } - /// Returns the unique identifier associated with the account pub fn account_id(&self) -> &AccountId { self.pq_inbox.account_id() } @@ -162,43 +172,48 @@ where &mut self, remote_bundle: &Introduction, content: &[u8], - ) -> Result<(ConversationIdOwned, Vec), ChatError> { + ) -> Result<(ConversationIdOwned, Vec), ChatError> { let (mut convo, payloads) = self .inbox - .invite_to_private_convo(remote_bundle, content, Rc::clone(&self.store)) + .invite_to_private_convo(remote_bundle, content, Arc::clone(&self.store)) .unwrap_or_else(|_| todo!("Log/Surface Error")); let remote_id = Inbox::::inbox_identifier_for_key(*remote_bundle.installation_key()); - let payload_bytes = payloads - .into_iter() - .map(|p| p.into_envelope(remote_id.clone())) - .collect(); - let convo_id = convo.persist()?; - Ok((convo_id, payload_bytes)) + + let mut events = Vec::new(); + for payload in payloads { + let envelope = payload.into_envelope(remote_id.clone()); + if let Err(e) = self.ds.publish(envelope) { + tracing::warn!("publish failed for convo {convo_id}: {e}"); + events.push(Event::transport_failure(convo_id.clone())); + } + } + Ok((convo_id, events)) } + #[allow(clippy::type_complexity)] pub fn create_group_convo( &mut self, participants: &[&AccountId], - ) -> Result>, ChatError> { + ) -> Result<(Box>, Vec), ChatError> { // TODO: (P1) Ensure errors are handled propertly. This is a high chance for desynchronized state. // MlsGroup persistence, conversation persistence, and invite delivery all happen seperately let mut convo = self.pq_inbox.create_group_v1()?; self.store - .borrow_mut() + .lock() + .unwrap() .save_conversation(&storage::ConversationMeta { local_convo_id: convo.id().to_string(), remote_convo_id: "0".into(), kind: ConversationKind::GroupV1, })?; - convo.add_member(participants)?; - - Ok(Box::new(convo)) + let events = convo.add_member(participants)?; + Ok((Box::new(convo), events)) } pub fn list_conversations(&self) -> Result, ChatError> { - let records = self.store.borrow().load_conversations()?; + let records = self.store.lock().unwrap().load_conversations()?; Ok(records .into_iter() .map(|r| Arc::from(r.local_convo_id.as_str())) @@ -209,18 +224,25 @@ where &mut self, convo_id: ConversationId, content: &[u8], - ) -> Result, ChatError> { + ) -> Result, ChatError> { let mut convo = self.load_convo(convo_id)?; let payloads = convo.send_message(content)?; let remote_id = convo.remote_id(); - Ok(payloads - .into_iter() - .map(|p| p.into_envelope(remote_id.clone())) - .collect()) + let convo_id_owned: ConversationIdOwned = Arc::from(convo_id); + + let mut events = Vec::new(); + for payload in payloads { + let envelope = payload.into_envelope(remote_id.clone()); + if let Err(e) = self.ds.publish(envelope) { + tracing::warn!("publish failed for convo {convo_id}: {e}"); + events.push(Event::transport_failure(convo_id_owned.clone())); + } + } + Ok(events) } // Decode bytes and send to protocol for processing. - pub fn handle_payload(&mut self, payload: &[u8]) -> Result, ChatError> { + pub fn handle_payload(&mut self, payload: &[u8]) -> Result, ChatError> { let env = EnvelopeV1::decode(payload)?; // TODO: Impl Conversation hinting @@ -229,45 +251,39 @@ where match convo_id { c if c == self.inbox.id() => self.dispatch_to_inbox(&env.payload), c if c == self.pq_inbox.id() => self.dispatch_to_inbox2(&env.payload), - c if self.store.borrow().has_conversation(&c)? => { + c if self.store.lock().unwrap().has_conversation(&c)? => { self.dispatch_to_convo(&c, &env.payload) } - _ => Ok(Some(ContentData { - conversation_id: "".into(), - data: vec![], - is_new_convo: false, - })), + c => { + tracing::warn!("dropping payload for unknown conversation hint {c}"); + Ok(Vec::new()) + } } } // Dispatch encrypted payload to Inbox, and register the created Conversation - fn dispatch_to_inbox( - &mut self, - enc_payload_bytes: &[u8], - ) -> Result, ChatError> { + fn dispatch_to_inbox(&mut self, enc_payload_bytes: &[u8]) -> Result, ChatError> { // EncryptedPayloads are not used by GroupConvos at this time, else this can be performed in `handle_payload` // TODO: (P1) reconcile envelope parsing between Covno and GroupConvo let enc_payload = EncryptedPayload::decode(enc_payload_bytes)?; let public_key_hex = Inbox::::extract_ephemeral_key_hex(&enc_payload)?; - let (convo, content) = + let (convo, events) = self.inbox - .handle_frame(enc_payload, &public_key_hex, Rc::clone(&self.store))?; + .handle_frame(enc_payload, &public_key_hex, Arc::clone(&self.store))?; match convo { Conversation::Private(mut convo) => convo.persist()?, }; self.store - .borrow_mut() + .lock() + .unwrap() .remove_ephemeral_key(&public_key_hex)?; - Ok(content) + Ok(events) } - // Dispatch encrypted payload to Inbox, and register the created Conversation - fn dispatch_to_inbox2(&mut self, payload: &[u8]) -> Result, ChatError> { - self.pq_inbox.handle_frame(payload)?; - - Ok(None) + fn dispatch_to_inbox2(&mut self, payload: &[u8]) -> Result, ChatError> { + self.pq_inbox.handle_frame(payload) } // Dispatch encrypted payload to its corresponding conversation @@ -275,7 +291,7 @@ where &mut self, convo_id: ConversationId, enc_payload_bytes: &[u8], - ) -> Result, ChatError> { + ) -> Result, ChatError> { let enc_payload = EncryptedPayload::decode(enc_payload_bytes)?; let mut convo = self.load_convo(convo_id)?; convo.handle_frame(enc_payload) @@ -297,7 +313,8 @@ where fn load_convo(&mut self, convo_id: ConversationId) -> Result, ChatError> { let record = self .store - .borrow() + .lock() + .unwrap() .load_conversation(convo_id)? .ok_or_else(|| ChatError::NoConvo(convo_id.into()))?; @@ -326,7 +343,8 @@ where ) -> Result>, ChatError> { let record = self .store - .borrow() + .lock() + .unwrap() .load_conversation(convo_id)? .ok_or_else(|| ChatError::NoConvo(convo_id.into()))?; diff --git a/core/conversations/src/conversation.rs b/core/conversations/src/conversation.rs index 702ca932..83cbf1c7 100644 --- a/core/conversations/src/conversation.rs +++ b/core/conversations/src/conversation.rs @@ -3,8 +3,9 @@ mod privatev1; use crate::{ DeliveryService, + event::Event, service_traits::KeyPackageProvider, - types::{AccountId, AddressedEncryptedPayload, ContentData}, + types::{AccountId, AddressedEncryptedPayload}, }; use chat_proto::logoschat::encryption::EncryptedPayload; use std::fmt::Debug; @@ -28,13 +29,11 @@ pub trait Convo: Id + Debug { /// Decrypts and processes an incoming encrypted frame. /// - /// Returns `Ok(Some(ContentData))` if the frame contains user content, - /// `Ok(None)` for protocol frames (e.g., placeholders), or an error if - /// decryption or frame parsing fails. - fn handle_frame( - &mut self, - enc_payload: EncryptedPayload, - ) -> Result, ChatError>; + /// Returns the events generated by processing the frame: typically a + /// single `Event::MessageReceived` for user content, an empty vector for + /// protocol frames (e.g. placeholders), or an error if decryption or + /// frame parsing fails. + fn handle_frame(&mut self, enc_payload: EncryptedPayload) -> Result, ChatError>; fn remote_id(&self) -> String; @@ -43,11 +42,15 @@ pub trait Convo: Id + Debug { } pub trait GroupConvo: Convo { - fn add_member(&mut self, members: &[&AccountId]) -> Result<(), ChatError>; - - // This is intended to replace `send_message`. The trait change is that it automatically - // sends the payload directly. - fn send_content(&mut self, content: &[u8]) -> Result<(), ChatError>; + /// Invite new members and publish the resulting MLS commit + welcome + /// messages. Returns the observation events generated while publishing — + /// typically empty on success, `DeliveryFailed` if a publish errored. + fn add_member(&mut self, members: &[&AccountId]) -> Result, ChatError>; + + /// Encrypt and publish `content`. This is the publish-side counterpart of + /// `Convo::send_message`. Returns the observation events generated while + /// publishing — typically empty on success. + fn send_content(&mut self, content: &[u8]) -> Result, ChatError>; } pub enum Conversation { diff --git a/core/conversations/src/conversation/group_v1.rs b/core/conversations/src/conversation/group_v1.rs index eadc4417..ea07e96b 100644 --- a/core/conversations/src/conversation/group_v1.rs +++ b/core/conversations/src/conversation/group_v1.rs @@ -2,8 +2,7 @@ /// Properties: /// - Harvest Now Decrypt Later (HNDL) protection provided by XWING /// - Multiple -use std::cell::RefCell; -use std::rc::Rc; +use std::sync::{Arc, Mutex}; use blake2::{Blake2b, Digest, digest::consts::U6}; use chat_proto::logoschat::encryption::{EncryptedPayload, Plaintext, encrypted_payload}; @@ -18,8 +17,9 @@ use crate::types::AccountId; use crate::{ DeliveryService, conversation::{ChatError, ConversationId, Convo, GroupConvo, Id}, + event::Event, service_traits::KeyPackageProvider, - types::{AddressedEncryptedPayload, ContentData}, + types::AddressedEncryptedPayload, }; /// Provides the identity information needed to participate in an MLS group. @@ -58,16 +58,16 @@ pub trait MlsContext { fn invite_user( &self, - ds: &mut DS, + ds: &DS, account_id: &AccountId, welcome: &MlsMessageOut, ) -> Result<(), ChatError>; } pub struct GroupV1Convo { - ctx: Rc>, - ds: Rc>, - keypkg_provider: Rc>, + ctx: Arc>, + ds: Arc, + keypkg_provider: Arc>, mls_group: MlsGroup, convo_id: String, } @@ -80,7 +80,7 @@ where { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("GroupV1Convo") - .field("name", &self.ctx.borrow().ident().friendly_name()) + .field("name", &self.ctx.lock().unwrap().ident().friendly_name()) .field("convo_id", &self.convo_id) .field("mls_epoch", &self.mls_group.epoch()) .finish_non_exhaustive() @@ -95,13 +95,13 @@ where { // Create a new conversation with the creator as the only participant. pub fn new( - ctx: Rc>, - ds: Rc>, - keypkg_provider: Rc>, + ctx: Arc>, + ds: Arc, + keypkg_provider: Arc>, ) -> Result { let config = Self::mls_create_config(); let mls_group = { - let ctx_ref = ctx.borrow(); + let ctx_ref = ctx.lock().unwrap(); MlsGroup::new( ctx_ref.provider(), ctx_ref.ident(), @@ -111,7 +111,7 @@ where .unwrap() }; let convo_id = hex::encode(mls_group.group_id().as_slice()); - Self::subscribe(&mut ds.borrow_mut(), &convo_id)?; + Self::subscribe(&*ds, &convo_id)?; Ok(Self { ctx, @@ -124,13 +124,13 @@ where // Constructs a new conversation upon receiving a MlsWelcome message. pub fn new_from_welcome( - ctx: Rc>, - ds: Rc>, - keypkg_provider: Rc>, + ctx: Arc>, + ds: Arc, + keypkg_provider: Arc>, welcome: Welcome, ) -> Result { let mls_group = { - let ctx_borrow = ctx.borrow(); + let ctx_borrow = ctx.lock().unwrap(); let provider = ctx_borrow.provider(); StagedWelcome::build_from_welcome(provider, &Self::mls_join_config(), welcome) @@ -142,7 +142,7 @@ where }; let convo_id = hex::encode(mls_group.group_id().as_slice()); - Self::subscribe(&mut *ds.borrow_mut(), &convo_id)?; + Self::subscribe(&*ds, &convo_id)?; Ok(Self { ctx, @@ -154,17 +154,17 @@ where } pub fn load( - ctx: Rc>, - ds: Rc>, - keypkg_provider: Rc>, + ctx: Arc>, + ds: Arc, + keypkg_provider: Arc>, convo_id: String, group_id: GroupId, ) -> Result { - let mls_group = MlsGroup::load(ctx.borrow().provider().storage(), &group_id) + let mls_group = MlsGroup::load(ctx.lock().unwrap().provider().storage(), &group_id) .map_err(ChatError::generic)? .ok_or_else(|| ChatError::NoConvo("mls group not found".into()))?; - Self::subscribe(&mut *ds.borrow_mut(), &convo_id)?; + Self::subscribe(&*ds, &convo_id)?; Ok(GroupV1Convo { ctx, @@ -176,7 +176,7 @@ where } // Configure the delivery service to listen for the required delivery addresses. - fn subscribe(ds: &mut DS, convo_id: &str) -> Result<(), ChatError> { + fn subscribe(ds: &DS, convo_id: &str) -> Result<(), ChatError> { ds.subscribe(&Self::delivery_address_from_id(convo_id)) .map_err(ChatError::generic)?; ds.subscribe(&Self::ctrl_delivery_address_from_id(convo_id)) @@ -220,23 +220,28 @@ where Self::ctrl_delivery_address_from_id(&self.convo_id) } - fn key_package_for_account(&self, ident: &AccountId) -> Result { + // `provider` is passed in rather than acquired from `self.ctx` so callers + // that already hold the `ctx` lock can use this; `std::sync::Mutex` is not + // reentrant. + fn key_package_for_account( + &self, + ident: &AccountId, + provider: &LibcruxProvider, + ) -> Result { let retrieved_bytes = self .keypkg_provider - .borrow() + .lock() + .unwrap() .retrieve(ident) .map_err(|e: KP::Error| ChatError::Generic(e.to_string()))?; - // dbg!(ctx.contact_registry()); let Some(keypkg_bytes) = retrieved_bytes else { return Err(ChatError::Protocol("Contact Not Found".into())); }; let key_package_in = KeyPackageIn::tls_deserialize(&mut keypkg_bytes.as_slice())?; - let keypkg = key_package_in.validate( - self.ctx.borrow().provider().crypto(), - ProtocolVersion::Mls10, - )?; //TODO: P3 - Hardcoded Protocol Version + let keypkg = key_package_in.validate(provider.crypto(), ProtocolVersion::Mls10)?; + //TODO: P3 - Hardcoded Protocol Version Ok(keypkg) } } @@ -262,7 +267,7 @@ where &mut self, content: &[u8], ) -> Result, ChatError> { - let ctx_ref = self.ctx.borrow(); + let ctx_ref = self.ctx.lock().unwrap(); let provider = ctx_ref.provider(); let mls_message_out = self .mls_group @@ -281,10 +286,7 @@ where Ok(vec![a]) } - fn handle_frame( - &mut self, - encoded_payload: EncryptedPayload, - ) -> Result, ChatError> { + fn handle_frame(&mut self, encoded_payload: EncryptedPayload) -> Result, ChatError> { let bytes = match encoded_payload.encryption { Some(encrypted_payload::Encryption::Plaintext(pt)) => pt.payload, _ => { @@ -302,12 +304,12 @@ where .try_into_protocol_message() .map_err(ChatError::generic)?; - let ctx_borrow = self.ctx.borrow(); + let ctx_borrow = self.ctx.lock().unwrap(); let provider = ctx_borrow.provider(); if protocol_message.epoch() < self.mls_group.epoch() { // TODO: (P1) Add logging for messages arriving from past epoch. - return Ok(None); + return Ok(Vec::new()); } let processed = self @@ -316,20 +318,19 @@ where .map_err(ChatError::generic)?; match processed.into_content() { - ProcessedMessageContent::ApplicationMessage(msg) => Ok(Some(ContentData { - conversation_id: hex::encode(self.mls_group.group_id().as_slice()), + ProcessedMessageContent::ApplicationMessage(msg) => Ok(vec![Event::MessageReceived { + conversation_id: Arc::from(hex::encode(self.mls_group.group_id().as_slice())), data: msg.into_bytes(), - is_new_convo: false, - })), + }]), ProcessedMessageContent::StagedCommitMessage(commit) => { self.mls_group .merge_staged_commit(provider, *commit) .map_err(ChatError::generic)?; - Ok(None) + Ok(Vec::new()) } _ => { // TODO: (P2) Log unknown message type - Ok(None) + Ok(Vec::new()) } } } @@ -354,8 +355,8 @@ where // commit — the Commit message Alice broadcasts to all members // welcome — the Welcome message sent privately to each new joiner // _group_info — used for external joins; ignore for now - fn add_member(&mut self, members: &[&AccountId]) -> Result<(), ChatError> { - let ctx_ref = self.ctx.borrow(); + fn add_member(&mut self, members: &[&AccountId]) -> Result, ChatError> { + let ctx_ref = self.ctx.lock().unwrap(); let provider = ctx_ref.provider(); if members.len() > 50 { @@ -369,7 +370,7 @@ where // The account_id is kept so invites can be addressed properly let keypkgs = members .iter() - .map(|ident| self.key_package_for_account(ident)) + .map(|ident| self.key_package_for_account(ident, provider)) .collect::, ChatError>>()?; let (commit, welcome, _group_info) = self @@ -381,7 +382,7 @@ where // TODO: (P3) Evaluate privacy/performance implications of an aggregated Welcome for multiple users for account_id in members { - ctx_ref.invite_user(&mut *self.ds.borrow_mut(), account_id, &welcome)?; + ctx_ref.invite_user(&*self.ds, account_id, &welcome)?; } let encrypted_payload = EncryptedPayload { @@ -398,20 +399,24 @@ where // TODO: (P1) Make GroupConvos agnostic to framing so its less error prone and more let env = addr_enc_payload.into_envelope(self.convo_id.clone()); - self.ds - .borrow_mut() - .publish(env) - .map_err(|e| ChatError::Generic(format!("Publish: {e}"))) + let mut events = Vec::new(); + if let Err(e) = self.ds.publish(env) { + tracing::warn!("commit publish failed for group {}: {e}", self.convo_id); + events.push(Event::transport_failure(Arc::from(self.id()))); + } + Ok(events) } - fn send_content(&mut self, content: &[u8]) -> Result<(), ChatError> { + fn send_content(&mut self, content: &[u8]) -> Result, ChatError> { let payloads = self.send_message(content)?; + let mut events = Vec::new(); for payload in payloads { - self.ds - .borrow_mut() - .publish(payload.into_envelope(self.id().into())) - .map_err(|e| ChatError::Delivery(e.to_string()))?; + let envelope = payload.into_envelope(self.id().into()); + if let Err(e) = self.ds.publish(envelope) { + tracing::warn!("publish failed for group {}: {e}", self.convo_id); + events.push(Event::transport_failure(Arc::from(self.id()))); + } } - Ok(()) + Ok(events) } } diff --git a/core/conversations/src/conversation/privatev1.rs b/core/conversations/src/conversation/privatev1.rs index f4f39e55..fd273a1d 100644 --- a/core/conversations/src/conversation/privatev1.rs +++ b/core/conversations/src/conversation/privatev1.rs @@ -9,15 +9,17 @@ use chat_proto::logoschat::{ use crypto::{PrivateKey, PublicKey, SymmetricKey32}; use double_ratchets::{Header, InstallationKeyPair, RatchetState, restore_ratchet_state}; use prost::{Message, bytes::Bytes}; -use std::{cell::RefCell, fmt::Debug, rc::Rc, sync::Arc}; +use std::fmt::Debug; +use std::sync::{Arc, Mutex}; use storage::{ConversationKind, ConversationMeta, ConversationStore}; use crate::{ - context::ConversationIdOwned, + context::{ConversationIdOwned, PRIVATE_V1_INBOX_ADDRESS}, conversation::{ChatError, ConversationId, Convo, Id}, errors::EncryptionError, + event::Event, proto, - types::{AddressedEncryptedPayload, ContentData}, + types::AddressedEncryptedPayload, utils::timestamp_millis, }; use double_ratchets::{to_ratchet_record, to_skipped_key_records}; @@ -60,18 +62,18 @@ pub struct PrivateV1Convo { local_convo_id: String, remote_convo_id: String, dr_state: RatchetState, - store: Rc>, + store: Arc>, } impl PrivateV1Convo { /// Reconstructs a PrivateV1Convo from persisted metadata and ratchet state. pub fn new( - store: Rc>, + store: Arc>, local_convo_id: String, remote_convo_id: String, ) -> Result { - let dr_record = store.borrow().load_ratchet_state(&local_convo_id)?; - let skipped_keys = store.borrow().load_skipped_keys(&local_convo_id)?; + let dr_record = store.lock().unwrap().load_ratchet_state(&local_convo_id)?; + let skipped_keys = store.lock().unwrap().load_skipped_keys(&local_convo_id)?; let dr_state: RatchetState = restore_ratchet_state(dr_record, skipped_keys); Ok(Self { @@ -83,7 +85,7 @@ impl PrivateV1Convo { } pub fn new_initiator( - store: Rc>, + store: Arc>, seed_key: SymmetricKey32, remote: PublicKey, ) -> Self { @@ -106,7 +108,7 @@ impl PrivateV1Convo { } pub fn new_responder( - store: Rc>, + store: Arc>, seed_key: SymmetricKey32, dh_self: &PrivateKey, ) -> Self { @@ -182,12 +184,11 @@ impl PrivateV1Convo { } // Handler for application content - fn handle_content(&self, data: Vec) -> Option { - Some(ContentData { - conversation_id: self.id().into(), + fn handle_content(&self, data: Vec) -> Vec { + vec![Event::MessageReceived { + conversation_id: Arc::from(self.id()), data, - is_new_convo: false, - }) + }] } /// Persists a conversation's metadata and ratchet state to DB. @@ -197,8 +198,8 @@ impl PrivateV1Convo { remote_convo_id: self.remote_id(), kind: self.convo_type(), }; - self.store.borrow_mut().save_conversation(&convo_info)?; - self.save_ratchet_state(&mut *self.store.borrow_mut())?; + self.store.lock().unwrap().save_conversation(&convo_info)?; + self.save_ratchet_state(&mut *self.store.lock().unwrap())?; Ok(Arc::from(self.id())) } @@ -230,18 +231,15 @@ impl Convo for PrivateV1Convo { let data = self.encrypt(frame); - self.save_ratchet_state::(&mut *self.store.borrow_mut())?; + self.save_ratchet_state::(&mut *self.store.lock().unwrap())?; Ok(vec![AddressedEncryptedPayload { - delivery_address: "delivery_address".into(), + delivery_address: PRIVATE_V1_INBOX_ADDRESS.into(), data, }]) } - fn handle_frame( - &mut self, - encoded_payload: EncryptedPayload, - ) -> Result, ChatError> { + fn handle_frame(&mut self, encoded_payload: EncryptedPayload) -> Result, ChatError> { // Extract expected frame let frame = self .decrypt(encoded_payload) @@ -251,12 +249,12 @@ impl Convo for PrivateV1Convo { return Err(ChatError::ProtocolExpectation("None", "Some".into())); }; - self.save_ratchet_state(&mut *self.store.borrow_mut())?; + self.save_ratchet_state(&mut *self.store.lock().unwrap())?; // Handle FrameTypes let output = match frame_type { FrameType::Content(bytes) => self.handle_content(bytes.into()), - FrameType::Placeholder(_) => None, + FrameType::Placeholder(_) => Vec::new(), }; Ok(output) @@ -291,11 +289,11 @@ mod tests { let saro = PrivateKey::random(); let raya = PrivateKey::random(); - let saro_storage = Rc::new(RefCell::new( + let saro_storage = Arc::new(Mutex::new( ChatStorage::new(StorageConfig::InMemory).unwrap(), )); - let raya_storage = Rc::new(RefCell::new( + let raya_storage = Arc::new(Mutex::new( ChatStorage::new(StorageConfig::InMemory).unwrap(), )); diff --git a/core/conversations/src/event.rs b/core/conversations/src/event.rs new file mode 100644 index 00000000..7ca1a380 --- /dev/null +++ b/core/conversations/src/event.rs @@ -0,0 +1,56 @@ +//! Observable events surfaced to the application layer. See +//! `docs/adr/0001-client-event-system.md`. + +use crate::conversation::ConversationIdOwned; + +/// Opaque correlation handle for outbound envelopes. Reserved for future +/// delivery-receipt support; no path produces a non-`None` value yet. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct EnvelopeId([u8; 16]); + +impl EnvelopeId { + pub fn as_bytes(&self) -> &[u8; 16] { + &self.0 + } +} + +#[derive(Debug, Clone)] +#[non_exhaustive] +pub enum Event { + #[non_exhaustive] + ConversationStarted { + conversation_id: ConversationIdOwned, + }, + #[non_exhaustive] + MessageReceived { + conversation_id: ConversationIdOwned, + data: Vec, + }, + #[non_exhaustive] + DeliveryFailed { + conversation_id: ConversationIdOwned, + /// `None` when the failure isn't tied to a specific outbound envelope. + envelope_id: Option, + reason: FailureReason, + }, +} + +impl Event { + pub fn transport_failure(conversation_id: ConversationIdOwned) -> Self { + Self::DeliveryFailed { + conversation_id, + envelope_id: None, + reason: FailureReason::Transport, + } + } + + pub fn is_delivery_failure(&self) -> bool { + matches!(self, Self::DeliveryFailed { .. }) + } +} + +#[derive(Debug, Clone)] +#[non_exhaustive] +pub enum FailureReason { + Transport, +} diff --git a/core/conversations/src/inbox/handler.rs b/core/conversations/src/inbox/handler.rs index ca022405..748b5774 100644 --- a/core/conversations/src/inbox/handler.rs +++ b/core/conversations/src/inbox/handler.rs @@ -3,30 +3,30 @@ use chat_proto::logoschat::encryption::EncryptedPayload; use prost::Message; use prost::bytes::Bytes; use rand_core::OsRng; -use std::cell::RefCell; -use std::rc::Rc; +use std::sync::{Arc, Mutex}; use storage::{ConversationStore, EphemeralKeyStore, RatchetStore}; use crypto::{PrekeyBundle, SymmetricKey32}; -use crate::context::Introduction; +use crate::context::{Introduction, PRIVATE_V1_INBOX_ADDRESS}; use crate::conversation::{ChatError, Conversation, ConversationId, Convo, Id, PrivateV1Convo}; use crate::crypto::{CopyBytes, PrivateKey, PublicKey}; +use crate::event::Event; use crate::inbox::handshake::InboxHandshake; use crate::proto; -use crate::types::{AddressedEncryptedPayload, ContentData}; +use crate::types::AddressedEncryptedPayload; use crypto::Identity; -/// Compute the deterministic Delivery_address for an installation +// TODO: Derive per-installation address; today every PrivateV1 client shares +// the same one. fn delivery_address_for_installation(_: PublicKey) -> String { - // TODO: Implement Delivery Address - "delivery_address".into() + PRIVATE_V1_INBOX_ADDRESS.into() } pub struct Inbox { - ident: Rc, + ident: Arc, local_convo_id: String, - store: Rc>, + store: Arc>, } impl std::fmt::Debug for Inbox { @@ -39,7 +39,7 @@ impl std::fmt::Debug for Inbox { } impl Inbox { - pub fn new(store: Rc>, ident: Rc) -> Self { + pub fn new(store: Arc>, ident: Arc) -> Self { let local_convo_id = Self::inbox_identifier_for_key(ident.public_key()); Self { ident, @@ -57,7 +57,8 @@ impl Inbox { let public_key_hex = hex::encode(ephemeral_key.as_bytes()); self.store - .borrow_mut() + .lock() + .unwrap() .save_ephemeral_key(&public_key_hex, &ephemeral)?; let intro = Introduction::new(self.ident.secret(), ephemeral_key, OsRng); @@ -68,7 +69,7 @@ impl Inbox { &self, remote_bundle: &Introduction, initial_message: &[u8], - private_store: Rc>, + private_store: Arc>, ) -> Result<(PrivateV1Convo, Vec), ChatError> { let mut rng = OsRng; @@ -120,16 +121,19 @@ impl Inbox { } /// Handles an incoming inbox frame. The caller must provide the ephemeral private key - /// looked up from storage. Returns the created conversation and optional content data. + /// looked up from storage. Returns the created conversation and the events + /// observed while processing the invite (a `ConversationStarted` followed by + /// any events from the embedded initial frame). pub fn handle_frame( &self, enc_payload: EncryptedPayload, public_key_hex: &str, - private_store: Rc>, - ) -> Result<(Conversation, Option), ChatError> { + private_store: Arc>, + ) -> Result<(Conversation, Vec), ChatError> { let ephemeral_key = self .store - .borrow() + .lock() + .unwrap() .load_ephemeral_key(public_key_hex)? .ok_or(ChatError::UnknownEphemeralKey())?; @@ -152,16 +156,12 @@ impl Inbox { return Err(ChatError::Protocol("missing initial encpayload".into())); }; - // Set is_new_convo for content data - let content = match convo.handle_frame(enc_payload)? { - Some(v) => ContentData { - is_new_convo: true, - ..v - }, - None => return Err(ChatError::Protocol("expected contentData".into())), - }; + let mut events = vec![Event::ConversationStarted { + conversation_id: Arc::from(convo.id()), + }]; + events.extend(convo.handle_frame(enc_payload)?); - Ok((Conversation::Private(convo), Some(content))) + Ok((Conversation::Private(convo), events)) } } } @@ -257,25 +257,23 @@ impl Id for Inbox { #[cfg(test)] mod tests { - use std::cell::RefCell; - use super::*; use chat_sqlite::{ChatStorage, StorageConfig}; #[test] fn test_invite_privatev1_roundtrip() { - let saro_storage = Rc::new(RefCell::new( + let saro_storage = Arc::new(Mutex::new( ChatStorage::new(StorageConfig::InMemory).unwrap(), )); - let raya_storage = Rc::new(RefCell::new( + let raya_storage = Arc::new(Mutex::new( ChatStorage::new(StorageConfig::InMemory).unwrap(), )); let saro_ident = Identity::new("saro"); - let saro_inbox = Inbox::new(Rc::clone(&saro_storage), saro_ident.into()); + let saro_inbox = Inbox::new(Arc::clone(&saro_storage), saro_ident.into()); let raya_ident = Identity::new("raya"); - let raya_inbox = Inbox::new(Rc::clone(&raya_storage), raya_ident.into()); + let raya_inbox = Inbox::new(Arc::clone(&raya_storage), raya_ident.into()); let bundle = raya_inbox.create_intro_bundle().unwrap(); diff --git a/core/conversations/src/inbox_v2.rs b/core/conversations/src/inbox_v2.rs index ef7b0308..f3273b7f 100644 --- a/core/conversations/src/inbox_v2.rs +++ b/core/conversations/src/inbox_v2.rs @@ -1,5 +1,4 @@ -use std::cell::RefCell; -use std::rc::Rc; +use std::sync::{Arc, Mutex}; use chat_proto::logoschat::envelope::EnvelopeV1; use openmls::prelude::tls_codec::Serialize; @@ -16,7 +15,8 @@ use crate::RegistrationService; use crate::account::LogosAccount; use crate::conversation::GroupConvo; use crate::conversation::group_v1::MlsContext; -use crate::conversation::{GroupV1Convo, IdentityProvider}; +use crate::conversation::{GroupV1Convo, Id, IdentityProvider}; +use crate::event::Event; use crate::types::AccountId; use crate::utils::{blake2b_hex, hash_size}; pub struct PqMlsContext { @@ -37,7 +37,7 @@ impl MlsContext for PqMlsContext { fn invite_user( &self, - ds: &mut DS, + ds: &DS, account_id: &AccountId, welcome: &MlsMessageOut, ) -> Result<(), ChatError> { @@ -79,10 +79,10 @@ fn conversation_id_for(account_id: &AccountId) -> String { /// such as MLS. pub struct InboxV2 { account_id: AccountId, - ds: Rc>, - reg_service: Rc>, - store: Rc>, - ctx: Rc>, + ds: Arc, + reg_service: Arc>, + store: Arc>, + ctx: Arc>, } impl InboxV2 @@ -93,9 +93,9 @@ where { pub fn new( account: LogosAccount, - ds: Rc>, - reg_service: Rc>, - store: Rc>, + ds: Arc, + reg_service: Arc>, + store: Arc>, ) -> Self { let account_id = account.account_id().clone(); let provider = LibcruxProvider::new().unwrap(); @@ -104,7 +104,7 @@ where ds, reg_service, store, - ctx: Rc::new(RefCell::new(PqMlsContext { + ctx: Arc::new(Mutex::new(PqMlsContext { ident_provider: account, provider, })), @@ -122,9 +122,10 @@ where // TODO: (P3) Each keypackage can only be used once either enable... // "LastResort" package or publish multiple self.reg_service - .borrow_mut() + .lock() + .unwrap() .register( - &self.ctx.borrow().ident_provider.friendly_name(), + &self.ctx.lock().unwrap().ident_provider.friendly_name(), keypackage_bytes, ) .map_err(ChatError::generic) @@ -142,7 +143,7 @@ where GroupV1Convo::new(self.ctx.clone(), self.ds.clone(), self.reg_service.clone()) } - pub fn handle_frame(&self, payload_bytes: &[u8]) -> Result<(), ChatError> { + pub fn handle_frame(&self, payload_bytes: &[u8]) -> Result, ChatError> { let inbox_frame = InboxV2Frame::decode(payload_bytes)?; let Some(payload) = inbox_frame.payload else { @@ -156,7 +157,7 @@ where } } - fn persist_convo(&self, convo: impl GroupConvo) -> Result<(), ChatError> { + fn persist_convo(&self, convo: &impl GroupConvo) -> Result<(), ChatError> { // TODO: (P2) Remove remote_convo_id this is an implementation detail specific to PrivateV1 // TODO: (P3) Implement From for ConversationMeta let meta = ConversationMeta { @@ -164,12 +165,12 @@ where remote_convo_id: "0".into(), kind: storage::ConversationKind::GroupV1, }; - self.store.borrow_mut().save_conversation(&meta)?; + self.store.lock().unwrap().save_conversation(&meta)?; // TODO: (P1) Persist state Ok(()) } - fn handle_heavy_invite(&self, invite: GroupV1HeavyInvite) -> Result<(), ChatError> { + fn handle_heavy_invite(&self, invite: GroupV1HeavyInvite) -> Result, ChatError> { let (msg_in, _rest) = MlsMessageIn::tls_deserialize_bytes(invite.welcome_bytes.as_slice())?; let MlsMessageBodyIn::Welcome(welcome) = msg_in.extract() else { @@ -185,11 +186,13 @@ where self.reg_service.clone(), welcome, )?; - self.persist_convo(convo) + let conversation_id = Arc::from(convo.id()); + self.persist_convo(&convo)?; + Ok(vec![Event::ConversationStarted { conversation_id }]) } fn create_keypackage(&self) -> Result { - let ctx_borrow = self.ctx.borrow(); + let ctx_borrow = self.ctx.lock().unwrap(); let capabilities = Capabilities::builder() .ciphersuites(vec![ Ciphersuite::MLS_256_XWING_CHACHA20POLY1305_SHA256_Ed25519, diff --git a/core/conversations/src/lib.rs b/core/conversations/src/lib.rs index b48e5214..e9e4deac 100644 --- a/core/conversations/src/lib.rs +++ b/core/conversations/src/lib.rs @@ -3,6 +3,7 @@ mod context; mod conversation; mod crypto; mod errors; +mod event; mod inbox; mod inbox_v2; mod proto; @@ -16,6 +17,7 @@ pub use chat_sqlite::StorageConfig; pub use context::{Context, ConversationId, ConversationIdOwned, Introduction}; pub use conversation::GroupConvo; pub use errors::ChatError; -pub use service_traits::{DeliveryService, RegistrationService}; -pub use types::{AccountId, AddressedEnvelope, ContentData}; +pub use event::{EnvelopeId, Event, FailureReason}; +pub use service_traits::{DeliveryService, RegistrationService, drain_inbound}; +pub use types::{AccountId, AddressedEnvelope}; pub use utils::hex_trunc; diff --git a/core/conversations/src/service_traits.rs b/core/conversations/src/service_traits.rs index 669de34b..08f64cfd 100644 --- a/core/conversations/src/service_traits.rs +++ b/core/conversations/src/service_traits.rs @@ -1,18 +1,31 @@ /// Service traits define the functionality which must be externally supplied by /// platform clients. Platforms can alter the behaviour of the chat core by supplying /// different implementations. +use std::sync::{Mutex, mpsc}; use std::{fmt::Debug, fmt::Display}; use crate::types::{AccountId, AddressedEnvelope}; +pub fn drain_inbound(rx: &Mutex>>) -> Vec> { + let rx = rx.lock().unwrap(); + let mut out = Vec::new(); + while let Ok(bytes) = rx.try_recv() { + out.push(bytes); + } + out +} + /// A Delivery service is responsible for payload transport. -/// This interface allows Conversations to send payloads on the wire as well as -/// register interest in delivery_addresses. Client implementations are responsible -/// for providing the inbound payloads to Context::handle_payload. -pub trait DeliveryService: Debug { +/// This interface allows Conversations to send payloads on the wire, register +/// interest in delivery_addresses, and pull inbound payloads. +pub trait DeliveryService: Debug + Send + Sync { type Error: Display + Debug; - fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), Self::Error>; - fn subscribe(&mut self, delivery_address: &str) -> Result<(), Self::Error>; + fn publish(&self, envelope: AddressedEnvelope) -> Result<(), Self::Error>; + fn subscribe(&self, delivery_address: &str) -> Result<(), Self::Error>; + + /// Return every inbound payload that has arrived since the last call. + /// Non-blocking; returns an empty vec when nothing is available. + fn pull(&self) -> Vec>; } /// Manages key bundle storage for MLS group creation/addition while contacts are diff --git a/core/conversations/src/types.rs b/core/conversations/src/types.rs index 95642349..b6c5bfd2 100644 --- a/core/conversations/src/types.rs +++ b/core/conversations/src/types.rs @@ -48,15 +48,6 @@ impl Debug for AddressedEnvelope { } } -// This struct represents the result of processed inbound data. -// It wraps content payload with a conversation_id -#[derive(Debug)] -pub struct ContentData { - pub conversation_id: String, - pub data: Vec, - pub is_new_convo: bool, -} - // Internal type Definitions // Used by Conversations to attach addresses to outbound encrypted payloads diff --git a/core/integration_tests_core/tests/mls_integration.rs b/core/integration_tests_core/tests/mls_integration.rs index 03119e73..fb1631b1 100644 --- a/core/integration_tests_core/tests/mls_integration.rs +++ b/core/integration_tests_core/tests/mls_integration.rs @@ -1,37 +1,37 @@ use std::ops::{Deref, DerefMut}; use components::{EphemeralRegistry, LocalBroadcaster, MemStore}; -use libchat::{ContentData, Context, GroupConvo, hex_trunc}; +use libchat::{Context, DeliveryService, Event, GroupConvo, hex_trunc}; // Simple client Functionality for testing struct Client { inner: Context, - on_content: Option>, + on_event: Option>, } impl Client { fn init( ctx: Context, - cb: Option, + cb: Option, ) -> Self { Client { inner: ctx, - on_content: cb.map(|f| Box::new(f) as Box), + on_event: cb.map(|f| Box::new(f) as Box), } } fn process_messages(&mut self) { let messages: Vec<_> = { - let mut ds = self.ds(); - std::iter::from_fn(|| ds.poll()).collect() + let ds = self.ds(); + ds.pull() }; for data in messages { - let res = self.handle_payload(&data).unwrap(); - if let Some(cb) = &self.on_content - && let Some(content_data) = res - { - cb(content_data); + let events = self.handle_payload(&data).unwrap(); + if let Some(cb) = &self.on_event { + for event in events { + cb(event); + } } } } @@ -60,12 +60,25 @@ impl DerefMut for Client { } // Higher order function to handle printing -fn pretty_print(prefix: impl Into) -> Box { +fn pretty_print(prefix: impl Into) -> Box { let prefix = prefix.into(); - Box::new(move |c: ContentData| { - let cid = hex_trunc(c.conversation_id.as_bytes()); - let content = String::from_utf8(c.data).unwrap(); - println!("{} ({:?}) {}", prefix, cid, content) + Box::new(move |e: Event| match e { + Event::ConversationStarted { + conversation_id, .. + } => { + let cid = hex_trunc(conversation_id.as_bytes()); + println!("{prefix} ({cid:?}) [conversation started]"); + } + Event::MessageReceived { + conversation_id, + data, + .. + } => { + let cid = hex_trunc(conversation_id.as_bytes()); + let content = String::from_utf8(data).unwrap(); + println!("{prefix} ({cid:?}) {content}"); + } + _ => {} }) } @@ -93,7 +106,7 @@ fn create_group() { const RAYA: usize = 1; let raya_id = clients[RAYA].account_id().clone(); - let s_convo = clients[SARO].create_group_convo(&[&raya_id]).unwrap(); + let (s_convo, _events) = clients[SARO].create_group_convo(&[&raya_id]).unwrap(); let convo_id = s_convo.id(); @@ -141,3 +154,49 @@ fn create_group() { process(&mut clients); } + +/// Regression for the silent-group-join bug fixed by the event system: when +/// Saro creates a group with Raya, Raya processes a Welcome message that +/// carries no application content. The application must still observe a +/// `ConversationStarted` event so the new group becomes visible. +#[test] +fn group_join_emits_conversation_started() { + let ds = LocalBroadcaster::new(); + let rs = EphemeralRegistry::new(); + + let mut saro = + Context::new_with_name("saro", ds.new_consumer(), rs.clone(), MemStore::new()).unwrap(); + let mut raya = Context::new_with_name("raya", ds, rs, MemStore::new()).unwrap(); + + let raya_account_id = raya.account_id().clone(); + + let (group_convo, _events) = saro.create_group_convo(&[&raya_account_id]).unwrap(); + let expected_group_id = group_convo.id().to_string(); + + // Drain everything Raya's transport produced and collect every event. + let payloads: Vec<_> = { + let ds = raya.ds(); + ds.pull() + }; + let mut events = Vec::new(); + for data in payloads { + events.extend(raya.handle_payload(&data).unwrap()); + } + + // Welcome carries no content, so we expect exactly one ConversationStarted + // and nothing else. Prior to the bug fix Raya received Ok(None) and the + // new group was invisible to the application layer. + assert_eq!( + events.len(), + 1, + "expected exactly one event, got {events:?}" + ); + match &events[0] { + Event::ConversationStarted { + conversation_id, .. + } => { + assert_eq!(conversation_id.as_ref(), expected_group_id.as_str()); + } + other => panic!("expected ConversationStarted, got {other:?}"), + } +} diff --git a/core/integration_tests_core/tests/private_integration.rs b/core/integration_tests_core/tests/private_integration.rs index 165ba769..5805b7a0 100644 --- a/core/integration_tests_core/tests/private_integration.rs +++ b/core/integration_tests_core/tests/private_integration.rs @@ -1,24 +1,54 @@ use chat_sqlite::{ChatStorage, StorageConfig}; -use libchat::{Context, Introduction}; +use libchat::{Context, ConversationIdOwned, DeliveryService, Event, Introduction}; use storage::{ConversationStore, IdentityStore}; use tempfile::tempdir; use components::{EphemeralRegistry, LocalBroadcaster}; +fn poll_one(ctx: &Context) -> Vec { + ctx.ds() + .pull() + .into_iter() + .next() + .expect("expected payload in delivery queue") +} + fn send_and_verify( sender: &mut Context, receiver: &mut Context, convo_id: &str, content: &[u8], ) { - let payloads = sender.send_content(convo_id, content).unwrap(); - let payload = payloads.first().unwrap(); - let received = receiver - .handle_payload(&payload.data) - .unwrap() - .expect("expected content"); - assert_eq!(content, received.data.as_slice()); - assert!(!received.is_new_convo); + let events = sender.send_content(convo_id, content).unwrap(); + assert!(events.is_empty(), "unexpected send events: {events:?}"); + + let payload = poll_one(receiver); + let events = receiver.handle_payload(&payload).unwrap(); + match events.as_slice() { + [Event::MessageReceived { data, .. }] => assert_eq!(data.as_slice(), content), + other => panic!("expected [MessageReceived], got {other:?}"), + } +} + +fn expect_invite(events: &[Event], expected_data: &[u8]) -> ConversationIdOwned { + match events { + [ + Event::ConversationStarted { + conversation_id: started, + .. + }, + Event::MessageReceived { + conversation_id: received, + data, + .. + }, + ] => { + assert_eq!(started, received); + assert_eq!(data.as_slice(), expected_data); + started.clone() + } + other => panic!("expected [ConversationStarted, MessageReceived], got {other:?}"), + } } #[test] @@ -36,18 +66,13 @@ fn ctx_integration() { // Saro initiates conversation with Raya let mut content = vec![10]; - let (saro_convo_id, payloads) = saro.create_private_convo(&intro, &content).unwrap(); + let (saro_convo_id, events) = saro.create_private_convo(&intro, &content).unwrap(); + assert!(events.is_empty(), "unexpected create events: {events:?}"); // Raya receives initial message - let payload = payloads.first().unwrap(); - let initial_content = raya - .handle_payload(&payload.data) - .unwrap() - .expect("expected initial content"); - - let raya_convo_id = initial_content.conversation_id; - assert_eq!(content, initial_content.data); - assert!(initial_content.is_new_convo); + let payload = poll_one(&raya); + let events = raya.handle_payload(&payload).unwrap(); + let raya_convo_id = expect_invite(&events, &content); // Exchange messages back and forth for _ in 0..10 { @@ -68,8 +93,6 @@ fn identity_persistence() { let pubkey1 = ctx1.identity().public_key(); let name1 = ctx1.installation_name().to_string(); - // For persistence tests with file-based storage, we'd need a shared db. - // With in-memory, we just verify the identity was created. assert_eq!(name1, "alice"); assert!(!pubkey1.as_bytes().iter().all(|&b| b == 0)); } @@ -104,11 +127,12 @@ fn conversation_metadata_persistence() { let bundle = alice.create_intro_bundle().unwrap(); let intro = Introduction::try_from(bundle.as_slice()).unwrap(); - let (_, payloads) = bob.create_private_convo(&intro, b"hi").unwrap(); + let (_, events) = bob.create_private_convo(&intro, b"hi").unwrap(); + assert!(events.is_empty()); - let payload = payloads.first().unwrap(); - let content = alice.handle_payload(&payload.data).unwrap().unwrap(); - assert!(content.is_new_convo); + let payload = poll_one(&alice); + let events = alice.handle_payload(&payload).unwrap(); + expect_invite(&events, b"hi"); let convos = alice.store().load_conversations().unwrap(); assert_eq!(convos.len(), 1); @@ -125,39 +149,44 @@ fn conversation_full_flow() { let bundle = alice.create_intro_bundle().unwrap(); let intro = Introduction::try_from(bundle.as_slice()).unwrap(); - let (bob_convo_id, payloads) = bob.create_private_convo(&intro, b"hello").unwrap(); + let (bob_convo_id, events) = bob.create_private_convo(&intro, b"hello").unwrap(); + assert!(events.is_empty()); - let payload = payloads.first().unwrap(); - let content = alice.handle_payload(&payload.data).unwrap().unwrap(); - let alice_convo_id = content.conversation_id; + let payload = poll_one(&alice); + let events = alice.handle_payload(&payload).unwrap(); + let alice_convo_id = expect_invite(&events, b"hello"); - let payloads = alice.send_content(&alice_convo_id, b"reply 1").unwrap(); - let payload = payloads.first().unwrap(); - bob.handle_payload(&payload.data).unwrap().unwrap(); + let events = alice.send_content(&alice_convo_id, b"reply 1").unwrap(); + assert!(events.is_empty()); + let payload = poll_one(&bob); + bob.handle_payload(&payload).unwrap(); - let payloads = bob.send_content(&bob_convo_id, b"reply 2").unwrap(); - let payload = payloads.first().unwrap(); - alice.handle_payload(&payload.data).unwrap().unwrap(); + let events = bob.send_content(&bob_convo_id, b"reply 2").unwrap(); + assert!(events.is_empty()); + let payload = poll_one(&alice); + alice.handle_payload(&payload).unwrap(); // Verify conversation list let convo_ids = alice.list_conversations().unwrap(); assert_eq!(convo_ids.len(), 1); // Continue exchanging messages - let payloads = bob.send_content(&bob_convo_id, b"more messages").unwrap(); - let payload = payloads.first().unwrap(); - let content = alice - .handle_payload(&payload.data) - .expect("should decrypt") - .expect("should have content"); - assert_eq!(content.data, b"more messages"); + let events = bob.send_content(&bob_convo_id, b"more messages").unwrap(); + assert!(events.is_empty()); + let payload = poll_one(&alice); + let events = alice.handle_payload(&payload).expect("should decrypt"); + match events.as_slice() { + [Event::MessageReceived { data, .. }] => assert_eq!(data.as_slice(), b"more messages"), + other => panic!("expected [MessageReceived], got {other:?}"), + } // Alice can also send back - let payloads = alice.send_content(&alice_convo_id, b"alice reply").unwrap(); - let payload = payloads.first().unwrap(); - let content = bob - .handle_payload(&payload.data) - .unwrap() - .expect("bob should receive"); - assert_eq!(content.data, b"alice reply"); + let events = alice.send_content(&alice_convo_id, b"alice reply").unwrap(); + assert!(events.is_empty()); + let payload = poll_one(&bob); + let events = bob.handle_payload(&payload).unwrap(); + match events.as_slice() { + [Event::MessageReceived { data, .. }] => assert_eq!(data.as_slice(), b"alice reply"), + other => panic!("expected [MessageReceived], got {other:?}"), + } } diff --git a/crates/client-ffi/examples/message-exchange/src/main.c b/crates/client-ffi/examples/message-exchange/src/main.c index 2c498617..1d2e5147 100644 --- a/crates/client-ffi/examples/message-exchange/src/main.c +++ b/crates/client-ffi/examples/message-exchange/src/main.c @@ -1,8 +1,9 @@ /* * message-exchange: Saro-Raya message exchange written entirely in C. * - * Demonstrates that the client-ffi C API is straightforward to consume - * directly — no Rust glue required. Build with the provided Makefile. + * Demonstrates the push-inbound / drain-events flow of the client-ffi API: + * the C consumer pushes received bytes into the client and drains observed + * events back out. The translator thread inside the client does the work. */ #include "client_ffi.h" @@ -16,8 +17,6 @@ /* ------------------------------------------------------------------ * Convenience macros for building slice_ref_uint8_t values. - * SLICE(p, n) — arbitrary pointer + length. - * STR(s) — string literal (length computed at compile time). * ------------------------------------------------------------------ */ #define SLICE(p, n) ((slice_ref_uint8_t){ .ptr = (const uint8_t *)(p), .len = (n) }) @@ -83,19 +82,20 @@ static int32_t deliver_cb( } /* ------------------------------------------------------------------ - * Helper: pop one envelope from the bus and push it into receiver. - * Returns a heap-allocated result; caller frees with - * push_inbound_result_free(). + * Helper: pop one envelope from the bus, push it into `receiver`, then + * drain whatever events come out. Caller frees the returned EventList. * ------------------------------------------------------------------ */ -static PushInboundResult_t *route(ClientHandle_t *receiver) +static EventList_t *route(ClientHandle_t *receiver) { const uint8_t *data; size_t len; int ok = queue_pop(&bus, &data, &len); assert(ok && "expected an envelope in the bus"); - PushInboundResult_t *r = client_receive(receiver, SLICE(data, len)); - assert(push_inbound_result_error_code(r) == 0 && "push_inbound failed"); + int rc = client_push_inbound(receiver, SLICE(data, len)); + assert(rc == 0 && "client_push_inbound failed"); + EventList_t *r = client_drain_events(receiver, 5000); + assert(event_list_error_code(r) == 0 && "drain_events failed"); return r; } @@ -125,19 +125,20 @@ int main(void) assert(create_convo_result_error_code(saro_convo) == 0); create_intro_result_free(raya_intro); - /* Route saro -> raya */ - PushInboundResult_t *recv = route(raya); + /* Route saro -> raya. Welcome carries [ConversationStarted, MessageReceived]. */ + EventList_t *recv = route(raya); - assert(push_inbound_result_has_content(recv) && "expected content from saro"); - assert(push_inbound_result_is_new_convo(recv) && "expected new-conversation flag"); + assert(event_list_len(recv) == 2 && "expected 2 events"); + assert(event_list_tag(recv, 0) == EVENT_TAG_CONVERSATION_STARTED); + assert(event_list_tag(recv, 1) == EVENT_TAG_MESSAGE_RECEIVED); - slice_ref_uint8_t content = push_inbound_result_content(recv); + slice_ref_uint8_t cid_ref = event_list_conversation_id(recv, 0); + slice_ref_uint8_t content = event_list_message_data(recv, 1); assert(content.len == 10); assert(memcmp(content.ptr, "hello raya", 10) == 0); printf("Raya received: \"%.*s\"\n", (int)content.len, content.ptr); /* Copy Raya's convo_id before freeing recv */ - slice_ref_uint8_t cid_ref = push_inbound_result_convo_id(recv); uint8_t raya_cid[256]; size_t raya_cid_len = cid_ref.len; if (raya_cid_len >= sizeof(raya_cid)) { @@ -145,7 +146,7 @@ int main(void) return 1; } memcpy(raya_cid, cid_ref.ptr, raya_cid_len); - push_inbound_result_free(recv); + event_list_free(recv); /* Raya replies */ ErrorCode_t rc = client_send_message( @@ -153,13 +154,13 @@ int main(void) assert(rc == ERROR_CODE_NONE); recv = route(saro); - assert(push_inbound_result_has_content(recv) && "expected content from raya"); - assert(!push_inbound_result_is_new_convo(recv) && "unexpected new-convo flag"); - content = push_inbound_result_content(recv); + assert(event_list_len(recv) == 1); + assert(event_list_tag(recv, 0) == EVENT_TAG_MESSAGE_RECEIVED); + content = event_list_message_data(recv, 0); assert(content.len == 7); assert(memcmp(content.ptr, "hi saro", 7) == 0); printf("Saro received: \"%.*s\"\n", (int)content.len, content.ptr); - push_inbound_result_free(recv); + event_list_free(recv); /* Multiple back-and-forth rounds */ slice_ref_uint8_t saro_cid = create_convo_result_id(saro_convo); @@ -171,11 +172,12 @@ int main(void) assert(rc == ERROR_CODE_NONE); recv = route(raya); - assert(push_inbound_result_has_content(recv)); - content = push_inbound_result_content(recv); + assert(event_list_len(recv) == 1); + assert(event_list_tag(recv, 0) == EVENT_TAG_MESSAGE_RECEIVED); + content = event_list_message_data(recv, 0); assert((int)content.len == mlen); assert(memcmp(content.ptr, msg, (size_t)mlen) == 0); - push_inbound_result_free(recv); + event_list_free(recv); char reply[32]; int rlen = snprintf(reply, sizeof(reply), "reply %d", i); @@ -185,11 +187,12 @@ int main(void) assert(rc == ERROR_CODE_NONE); recv = route(saro); - assert(push_inbound_result_has_content(recv)); - content = push_inbound_result_content(recv); + assert(event_list_len(recv) == 1); + assert(event_list_tag(recv, 0) == EVENT_TAG_MESSAGE_RECEIVED); + content = event_list_message_data(recv, 0); assert((int)content.len == rlen); assert(memcmp(content.ptr, reply, (size_t)rlen) == 0); - push_inbound_result_free(recv); + event_list_free(recv); } /* Cleanup */ diff --git a/crates/client-ffi/src/api.rs b/crates/client-ffi/src/api.rs index d08eab57..72050d4c 100644 --- a/crates/client-ffi/src/api.rs +++ b/crates/client-ffi/src/api.rs @@ -1,8 +1,9 @@ use safer_ffi::prelude::*; -use std::sync::Arc; +use std::sync::{Arc, Mutex, mpsc}; +use std::time::Duration; use crate::delivery::{CDelivery, DeliverFn}; -use logos_chat::{ChatClient, ClientError}; +use logos_chat::{ChatClient, ClientError, Event}; // --------------------------------------------------------------------------- // Opaque client handle @@ -10,7 +11,11 @@ use logos_chat::{ChatClient, ClientError}; #[derive_ReprC] #[repr(opaque)] -pub struct ClientHandle(pub(crate) ChatClient); +pub struct ClientHandle { + client: ChatClient, + push_tx: mpsc::Sender>, + event_rx: Mutex>, +} // --------------------------------------------------------------------------- // Error codes @@ -46,12 +51,23 @@ pub struct CreateConvoResult { #[derive_ReprC] #[repr(opaque)] -pub struct PushInboundResult { +pub struct EventList { error_code: i32, - has_content: bool, - is_new_convo: bool, - convo_id: Option, - content: Option>, + events: Vec, +} + +#[derive_ReprC] +#[repr(i32)] +pub enum EventTag { + /// A new conversation was started (responder side). + ConversationStarted = 0, + /// User content was received on an existing conversation. + MessageReceived = 1, + /// Delivery of a previously-sent envelope failed. + DeliveryFailed = 2, + /// Returned when the index is out of bounds or the variant is unknown to + /// this binary (e.g. a new `Event` variant from a newer library version). + Unknown = -1, } // --------------------------------------------------------------------------- @@ -60,6 +76,9 @@ pub struct PushInboundResult { /// Create an ephemeral in-memory client. Returns NULL if `callback` is None or /// `name` is not valid UTF-8. Free with `client_destroy`. +/// +/// Inbound bytes are fed via `client_push_inbound`; events are consumed via +/// `client_drain_events`. #[ffi_export] fn client_create( name: c_slice::Ref<'_, u8>, @@ -70,8 +89,16 @@ fn client_create( Err(_) => return None, }; callback?; - let delivery = CDelivery { callback }; - Some(Box::new(ClientHandle(ChatClient::new(name_str, delivery))).into()) + let (delivery, push_tx) = CDelivery::new(callback); + let (client, event_rx) = ChatClient::new(name_str, delivery); + Some( + Box::new(ClientHandle { + client, + push_tx, + event_rx: Mutex::new(event_rx), + }) + .into(), + ) } /// Free a client handle. Must not be used after this call. @@ -89,7 +116,7 @@ fn client_destroy(handle: repr_c::Box) { #[ffi_export] fn client_installation_name(handle: &ClientHandle) -> c_slice::Box { handle - .0 + .client .installation_name() .as_bytes() .to_vec() @@ -110,7 +137,7 @@ fn client_installation_name_free(name: c_slice::Box) { /// Free with `create_intro_result_free`. #[ffi_export] fn client_create_intro_bundle(handle: &mut ClientHandle) -> repr_c::Box { - let result = match handle.0.create_intro_bundle() { + let result = match handle.client.create_intro_bundle() { Ok(bytes) => CreateIntroResult { error_code: ErrorCode::None as i32, data: Some(bytes), @@ -154,13 +181,20 @@ fn client_create_conversation( content: c_slice::Ref<'_, u8>, ) -> repr_c::Box { let result = match handle - .0 + .client .create_conversation(bundle.as_slice(), content.as_slice()) { - Ok(convo_id) => CreateConvoResult { - error_code: ErrorCode::None as i32, - convo_id: Some(convo_id.to_string()), - }, + Ok((convo_id, events)) => { + let error_code = if events.iter().any(Event::is_delivery_failure) { + ErrorCode::DeliveryFail as i32 + } else { + ErrorCode::None as i32 + }; + CreateConvoResult { + error_code, + convo_id: Some(convo_id.to_string()), + } + } Err(ClientError::Chat(_)) => CreateConvoResult { error_code: ErrorCode::BadIntro as i32, convo_id: None, @@ -206,80 +240,117 @@ fn client_send_message( Err(_) => return ErrorCode::BadUtf8, }; let convo_id_owned: logos_chat::ConversationIdOwned = Arc::from(id_str); - match handle.0.send_message(&convo_id_owned, content.as_slice()) { - Ok(()) => ErrorCode::None, + match handle + .client + .send_message(&convo_id_owned, content.as_slice()) + { + Ok(events) if events.iter().any(Event::is_delivery_failure) => ErrorCode::DeliveryFail, + Ok(_) => ErrorCode::None, Err(ClientError::Delivery(_)) => ErrorCode::DeliveryFail, Err(_) => ErrorCode::UnknownError, } } // --------------------------------------------------------------------------- -// Push inbound +// Inbound + event drain // --------------------------------------------------------------------------- -/// Decrypt an inbound payload. `has_content` is false for protocol frames. -/// Free with `push_inbound_result_free`. +/// Queue an inbound payload for processing. Events surfaced from it are +/// observed via `client_drain_events`. Returns 0 on success, negative on +/// shutdown. #[ffi_export] -fn client_receive( - handle: &mut ClientHandle, - payload: c_slice::Ref<'_, u8>, -) -> repr_c::Box { - let result = match handle.0.receive(payload.as_slice()) { - Ok(Some(cd)) => PushInboundResult { - error_code: ErrorCode::None as i32, - has_content: true, - is_new_convo: cd.is_new_convo, - convo_id: Some(cd.conversation_id), - content: Some(cd.data), - }, - Ok(None) => PushInboundResult { +fn client_push_inbound(handle: &mut ClientHandle, payload: c_slice::Ref<'_, u8>) -> i32 { + match handle.push_tx.send(payload.as_slice().to_vec()) { + Ok(()) => 0, + Err(_) => -1, + } +} + +/// Wait up to `timeout_ms` for the next event and then drain everything else +/// that's currently buffered. Returns an `EventList` (possibly empty on +/// timeout). Free with `event_list_free`. +#[ffi_export] +fn client_drain_events(handle: &mut ClientHandle, timeout_ms: u64) -> repr_c::Box { + let rx = handle.event_rx.lock().unwrap(); + let timeout = Duration::from_millis(timeout_ms); + let Ok(first) = rx.recv_timeout(timeout) else { + return Box::new(EventList { error_code: ErrorCode::None as i32, - has_content: false, - is_new_convo: false, - convo_id: None, - content: None, - }, - Err(_) => PushInboundResult { - error_code: ErrorCode::UnknownError as i32, - has_content: false, - is_new_convo: false, - convo_id: None, - content: None, - }, + events: Vec::new(), + }) + .into(); }; - Box::new(result).into() + let mut events = vec![first]; + // Brief settle window so events from the same payload arrive together + // rather than across separate drain calls. + std::thread::sleep(Duration::from_micros(500)); + while let Ok(e) = rx.try_recv() { + events.push(e); + } + Box::new(EventList { + error_code: ErrorCode::None as i32, + events, + }) + .into() } #[ffi_export] -fn push_inbound_result_error_code(r: &PushInboundResult) -> i32 { +fn event_list_error_code(r: &EventList) -> i32 { r.error_code } #[ffi_export] -fn push_inbound_result_has_content(r: &PushInboundResult) -> bool { - r.has_content +fn event_list_len(r: &EventList) -> usize { + r.events.len() } +/// Returns the variant tag for the event at `idx`, or `EventTag::Unknown` +/// if `idx` is out of bounds. #[ffi_export] -fn push_inbound_result_is_new_convo(r: &PushInboundResult) -> bool { - r.is_new_convo +fn event_list_tag(r: &EventList, idx: usize) -> EventTag { + match r.events.get(idx) { + Some(Event::ConversationStarted { .. }) => EventTag::ConversationStarted, + Some(Event::MessageReceived { .. }) => EventTag::MessageReceived, + Some(Event::DeliveryFailed { .. }) => EventTag::DeliveryFailed, + _ => EventTag::Unknown, + } } -/// Returns an empty slice when has_content is false. +/// Returns the conversation id (UTF-8 bytes) for the event at `idx`, +/// or an empty slice if `idx` is out of bounds. /// The slice is valid only while `r` is alive. #[ffi_export] -fn push_inbound_result_convo_id(r: &PushInboundResult) -> c_slice::Ref<'_, u8> { - r.convo_id.as_deref().unwrap_or("").as_bytes().into() +fn event_list_conversation_id(r: &EventList, idx: usize) -> c_slice::Ref<'_, u8> { + let bytes: &[u8] = match r.events.get(idx) { + Some( + Event::ConversationStarted { + conversation_id, .. + } + | Event::MessageReceived { + conversation_id, .. + } + | Event::DeliveryFailed { + conversation_id, .. + }, + ) => conversation_id.as_bytes(), + _ => &[], + }; + bytes.into() } -/// Returns an empty slice when has_content is false. +/// Returns the message bytes for a `MessageReceived` event at `idx`. +/// Returns an empty slice for any other variant or out-of-bounds index. /// The slice is valid only while `r` is alive. #[ffi_export] -fn push_inbound_result_content(r: &PushInboundResult) -> c_slice::Ref<'_, u8> { - r.content.as_deref().unwrap_or(&[]).into() +fn event_list_message_data(r: &EventList, idx: usize) -> c_slice::Ref<'_, u8> { + let bytes: &[u8] = match r.events.get(idx) { + Some(Event::MessageReceived { data, .. }) => data.as_slice(), + _ => &[], + }; + bytes.into() } #[ffi_export] -fn push_inbound_result_free(r: repr_c::Box) { +fn event_list_free(r: repr_c::Box) { drop(r) } diff --git a/crates/client-ffi/src/delivery.rs b/crates/client-ffi/src/delivery.rs index 6b4d70ef..a3d924d6 100644 --- a/crates/client-ffi/src/delivery.rs +++ b/crates/client-ffi/src/delivery.rs @@ -1,5 +1,7 @@ +use std::sync::{Mutex, mpsc}; + use libchat::AddressedEnvelope; -use logos_chat::DeliveryService; +use logos_chat::{DeliveryService, drain_inbound}; /// C callback invoked for each outbound envelope. Return 0 or positive on success, negative on /// error. `addr_ptr/addr_len` is the delivery address; `data_ptr/data_len` is the encrypted @@ -14,16 +16,32 @@ pub type DeliverFn = Option< ) -> i32, >; +/// `DeliveryService` for FFI consumers. Outbound publishes invoke the C +/// `DeliverFn` callback; inbound payloads are fed through a `Sender>` +/// returned at construction. #[derive(Debug)] - pub struct CDelivery { - pub callback: DeliverFn, + callback: DeliverFn, + inbound: Mutex>>, +} + +impl CDelivery { + /// Returns the delivery together with the `Sender` that feeds its + /// inbound side. + pub fn new(callback: DeliverFn) -> (Self, mpsc::Sender>) { + let (tx, rx) = mpsc::channel(); + let delivery = Self { + callback, + inbound: Mutex::new(rx), + }; + (delivery, tx) + } } impl DeliveryService for CDelivery { type Error = i32; - fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), i32> { + fn publish(&self, envelope: AddressedEnvelope) -> Result<(), i32> { let cb = self.callback.expect("callback must be non-null"); let addr = envelope.delivery_address.as_bytes(); let data = envelope.data.as_slice(); @@ -31,8 +49,12 @@ impl DeliveryService for CDelivery { if rc < 0 { Err(rc) } else { Ok(()) } } - fn subscribe(&mut self, _delivery_address: &str) -> Result<(), Self::Error> { - // TODO: (P1) CDelivery does not support delivery_address filtering + fn subscribe(&self, _delivery_address: &str) -> Result<(), Self::Error> { + // TODO: (P1) CDelivery does not support delivery_address filtering. Ok(()) } + + fn pull(&self) -> Vec> { + drain_inbound(&self.inbound) + } } diff --git a/crates/client/Cargo.toml b/crates/client/Cargo.toml index 3b61d95b..6a0790d1 100644 --- a/crates/client/Cargo.toml +++ b/crates/client/Cargo.toml @@ -9,11 +9,12 @@ crate-type = ["rlib"] [dependencies] # Workspace dependencies (sorted) chat-sqlite = { workspace = true } -components = { workspace = true} +components = { workspace = true } libchat = { workspace = true } # External dependencies (sorted) thiserror = "2" +tracing = "0.1" [dev-dependencies] # External dependencies (sorted) diff --git a/crates/client/examples/message-exchange/main.rs b/crates/client/examples/message-exchange/main.rs index 2698290e..27a0c3ba 100644 --- a/crates/client/examples/message-exchange/main.rs +++ b/crates/client/examples/message-exchange/main.rs @@ -1,33 +1,51 @@ -use logos_chat::{ChatClient, ConversationIdOwned, InProcessDelivery}; -use std::sync::Arc; +use std::sync::mpsc; +use std::time::Duration; + +use logos_chat::{ChatClient, ConversationIdOwned, Event, InProcessDelivery}; fn main() { let delivery = InProcessDelivery::new(Default::default()); - let mut cursor = delivery.cursor_at_tail("delivery_address"); - let mut saro = ChatClient::new("saro", delivery.clone()); - let mut raya = ChatClient::new("raya", delivery); + let (mut saro, saro_events) = ChatClient::new("saro", delivery.clone()); + let (mut raya, raya_events) = ChatClient::new("raya", delivery); let raya_bundle = raya.create_intro_bundle().unwrap(); - saro.create_conversation(&raya_bundle, b"hello raya") + let (_saro_convo_id, _events) = saro + .create_conversation(&raya_bundle, b"hello raya") .unwrap(); - let raw = cursor.next().unwrap(); - let content = raya.receive(&raw).unwrap().unwrap(); - println!( - "Raya received: {:?}", - std::str::from_utf8(&content.data).unwrap() - ); + let raya_convo_id = expect_invite(&raya_events, "Raya"); - let raya_convo_id: ConversationIdOwned = Arc::from(content.conversation_id.as_str()); raya.send_message(&raya_convo_id, b"hi saro").unwrap(); - - let raw = cursor.next().unwrap(); - let content = saro.receive(&raw).unwrap().unwrap(); - println!( - "Saro received: {:?}", - std::str::from_utf8(&content.data).unwrap() - ); + expect_message(&saro_events, "Saro"); println!("Message exchange complete."); } + +fn expect_invite(events: &mpsc::Receiver, who: &str) -> ConversationIdOwned { + let started = events.recv_timeout(Duration::from_secs(5)).unwrap(); + let convo_id = match started { + Event::ConversationStarted { + conversation_id, .. + } => conversation_id, + other => panic!("expected ConversationStarted, got {other:?}"), + }; + let received = events.recv_timeout(Duration::from_secs(5)).unwrap(); + match received { + Event::MessageReceived { data, .. } => { + println!("{who} received: {:?}", std::str::from_utf8(&data).unwrap()); + } + other => panic!("expected MessageReceived, got {other:?}"), + } + convo_id +} + +fn expect_message(events: &mpsc::Receiver, who: &str) { + let event = events.recv_timeout(Duration::from_secs(5)).unwrap(); + match event { + Event::MessageReceived { data, .. } => { + println!("{who} received: {:?}", std::str::from_utf8(&data).unwrap()); + } + other => panic!("expected MessageReceived, got {other:?}"), + } +} diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index 95a36af0..bb103b62 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -1,96 +1,153 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex, mpsc}; +use std::thread::{self, JoinHandle}; +use std::time::Duration; + use libchat::{ - AddressedEnvelope, ChatError, ChatStorage, ContentData, Context, ConversationIdOwned, - DeliveryService, Introduction, StorageConfig, + ChatError, ChatStorage, Context, ConversationIdOwned, DeliveryService, Event, Introduction, + StorageConfig, }; use components::EphemeralRegistry; use crate::errors::ClientError; +type ChatContext = Context; + +const IDLE_POLL_INTERVAL: Duration = Duration::from_millis(50); + +/// High-level chat client. Construction returns the handle together with a +/// `Receiver` for inbound observation. pub struct ChatClient { - ctx: Context, + ctx: Arc>>, + shutdown: Arc, + translator: Option>, } impl ChatClient { - /// Create an in-memory, ephemeral client. Identity is lost on drop. - pub fn new(name: impl Into, delivery: D) -> Self { + /// In-memory, ephemeral client. Identity is lost on drop. + pub fn new(name: impl Into, delivery: D) -> (Self, mpsc::Receiver) { let registry = EphemeralRegistry::new(); let store = ChatStorage::in_memory(); - Self { - ctx: Context::new_with_name(name, delivery, registry, store).unwrap(), - } + let ctx = Context::new_with_name(name, delivery, registry, store).unwrap(); + Self::wrap(ctx) } - /// Open or create a persistent client backed by `StorageConfig`. - /// - /// If an identity already exists in storage it is loaded; otherwise a new - /// one is created and saved. + /// Persistent client backed by `config`. Identity is loaded if present, + /// otherwise created and saved. pub fn open( name: impl Into, config: StorageConfig, delivery: D, - ) -> Result> { + ) -> Result<(Self, mpsc::Receiver), ClientError> { let store = ChatStorage::new(config).map_err(ChatError::from)?; let registry = EphemeralRegistry::new(); let ctx = Context::new_from_store(name, delivery, registry, store)?; - Ok(Self { ctx }) + Ok(Self::wrap(ctx)) } - /// Returns the installation name (identity label) of this client. - pub fn installation_name(&self) -> &str { - self.ctx.installation_name() + pub fn installation_name(&self) -> String { + self.ctx.lock().unwrap().installation_name().to_string() } - /// Produce a serialised introduction bundle for sharing out-of-band. pub fn create_intro_bundle(&mut self) -> Result, ClientError> { - self.ctx.create_intro_bundle().map_err(Into::into) + self.ctx + .lock() + .unwrap() + .create_intro_bundle() + .map_err(Into::into) } - /// Parse intro bundle bytes, initiate a private conversation, and deliver - /// all outbound envelopes. Returns this side's conversation ID. pub fn create_conversation( &mut self, intro_bundle: &[u8], initial_content: &[u8], - ) -> Result> { + ) -> Result<(ConversationIdOwned, Vec), ClientError> { let intro = Introduction::try_from(intro_bundle)?; - let (convo_id, envelopes) = self.ctx.create_private_convo(&intro, initial_content)?; - self.dispatch_all(envelopes)?; - Ok(convo_id) + self.ctx + .lock() + .unwrap() + .create_private_convo(&intro, initial_content) + .map_err(Into::into) } - /// List all conversation IDs known to this client. pub fn list_conversations(&self) -> Result, ClientError> { - self.ctx.list_conversations().map_err(Into::into) + self.ctx + .lock() + .unwrap() + .list_conversations() + .map_err(Into::into) } - /// Encrypt `content` and dispatch all outbound envelopes. pub fn send_message( &mut self, convo_id: &ConversationIdOwned, content: &[u8], - ) -> Result<(), ClientError> { - let envelopes = self.ctx.send_content(convo_id.as_ref(), content)?; - self.dispatch_all(envelopes) + ) -> Result, ClientError> { + self.ctx + .lock() + .unwrap() + .send_content(convo_id.as_ref(), content) + .map_err(Into::into) } - /// Decrypt an inbound payload. Returns `Some(ContentData)` for user - /// content, `None` for protocol frames. - pub fn receive( - &mut self, - payload: &[u8], - ) -> Result, ClientError> { - self.ctx.handle_payload(payload).map_err(Into::into) + fn wrap(ctx: ChatContext) -> (Self, mpsc::Receiver) { + let delivery = ctx.delivery_arc(); + let ctx = Arc::new(Mutex::new(ctx)); + let (event_tx, event_rx) = mpsc::channel(); + let shutdown = Arc::new(AtomicBool::new(false)); + let translator_ctx = Arc::clone(&ctx); + let translator_shutdown = Arc::clone(&shutdown); + let translator = thread::spawn(move || { + translator_loop(delivery, translator_ctx, event_tx, translator_shutdown) + }); + ( + Self { + ctx, + shutdown, + translator: Some(translator), + }, + event_rx, + ) + } +} + +impl Drop for ChatClient { + fn drop(&mut self) { + self.shutdown.store(true, Ordering::Release); + if let Some(handle) = self.translator.take() { + // Best-effort: a panicked translator should not poison Drop. + let _ = handle.join(); + } } +} - fn dispatch_all( - &mut self, - envelopes: Vec, - ) -> Result<(), ClientError> { - for env in envelopes { - let mut delivery = self.ctx.ds(); - delivery.publish(env).map_err(ClientError::Delivery)?; +fn translator_loop( + delivery: Arc, + ctx: Arc>>, + event_tx: mpsc::Sender, + shutdown: Arc, +) { + while !shutdown.load(Ordering::Acquire) { + let batch = delivery.pull(); + if batch.is_empty() { + thread::sleep(IDLE_POLL_INTERVAL); + continue; + } + for bytes in batch { + let events = match ctx.lock().unwrap().handle_payload(&bytes) { + Ok(events) => events, + Err(e) => { + tracing::warn!("handle_payload error: {e:?}"); + continue; + } + }; + for event in events { + if event_tx.send(event).is_err() { + tracing::info!("translator exiting: event receiver dropped"); + return; + } + } } - Ok(()) } } diff --git a/crates/client/src/delivery_in_process.rs b/crates/client/src/delivery_in_process.rs index 3feff062..c65e9cc3 100644 --- a/crates/client/src/delivery_in_process.rs +++ b/crates/client/src/delivery_in_process.rs @@ -1,7 +1,7 @@ use crate::{AddressedEnvelope, DeliveryService}; use std::collections::HashMap; use std::convert::Infallible; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, Mutex}; type Message = Vec; @@ -12,7 +12,7 @@ type Message = Vec; /// so multiple consumers on the same address each see every message. #[derive(Clone, Default, Debug)] pub struct MessageBus { - log: Arc>>>, + log: Arc>>>, } impl MessageBus { @@ -29,7 +29,7 @@ impl MessageBus { /// Returns a cursor positioned at the current tail of `address`. /// The cursor will only see messages delivered after this call. pub fn cursor_at_tail(&self, address: &str) -> Cursor { - let pos = self.log.read().unwrap().get(address).map_or(0, |v| v.len()); + let pos = self.log.lock().unwrap().get(address).map_or(0, |v| v.len()); Cursor { bus: self.clone(), address: address.to_string(), @@ -41,12 +41,12 @@ impl MessageBus { // Unwrap produces a panic when the lock is poisoned. // It would most likely indicate log corruption (e.g. incomplete write from another thread), // so panic propagation seems appropriate. - self.log.read().unwrap().get(address)?.get(pos).cloned() + self.log.lock().unwrap().get(address)?.get(pos).cloned() } fn push(&self, address: String, data: Message) { self.log - .write() + .lock() .unwrap() .entry(address) .or_default() @@ -77,40 +77,87 @@ impl Iterator for Cursor { /// In-process delivery service backed by a [`MessageBus`]. /// /// Cheap to clone — all clones share the same underlying bus, so multiple -/// clients can share one logical delivery service. Construct with a -/// [`MessageBus`] and use [`cursor`](InProcessDelivery::cursor) / -/// [`cursor_at_tail`](InProcessDelivery::cursor_at_tail) to read messages. -#[derive(Clone, Default, Debug)] -pub struct InProcessDelivery(MessageBus); +/// clients can share one logical delivery service. Each clone has its own +/// per-address cursor for [`DeliveryService::pull`]; tests that prefer to +/// pull directly can use [`cursor`](InProcessDelivery::cursor) / +/// [`cursor_at_tail`](InProcessDelivery::cursor_at_tail) instead. +#[derive(Default, Debug)] +pub struct InProcessDelivery { + bus: MessageBus, + state: Mutex, +} + +#[derive(Default, Debug, Clone)] +struct DeliveryState { + cursors: HashMap, +} impl InProcessDelivery { /// Create a delivery service backed by `bus`. pub fn new(bus: MessageBus) -> Self { - Self(bus) + Self { + bus, + state: Mutex::new(DeliveryState::default()), + } } - /// Returns a cursor positioned at the beginning of `address`. pub fn cursor(&self, address: &str) -> Cursor { - self.0.cursor(address) + self.bus.cursor(address) } - /// Returns a cursor positioned at the current tail of `address`. - /// The cursor will only see messages delivered after this call. pub fn cursor_at_tail(&self, address: &str) -> Cursor { - self.0.cursor_at_tail(address) + self.bus.cursor_at_tail(address) + } +} + +impl Clone for InProcessDelivery { + fn clone(&self) -> Self { + Self { + bus: self.bus.clone(), + state: Mutex::new(self.state.lock().unwrap().clone()), + } } } impl DeliveryService for InProcessDelivery { type Error = Infallible; - fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), Infallible> { - self.0.push(envelope.delivery_address, envelope.data); + fn publish(&self, envelope: AddressedEnvelope) -> Result<(), Infallible> { + self.bus.push(envelope.delivery_address, envelope.data); Ok(()) } - fn subscribe(&mut self, _delivery_address: &str) -> Result<(), Self::Error> { - // TODO: (P1) implement subscribe + fn subscribe(&self, delivery_address: &str) -> Result<(), Self::Error> { + // Initialise the cursor at the current tail so the subscriber only + // sees subsequent messages on this address. + let pos = self + .bus + .log + .lock() + .unwrap() + .get(delivery_address) + .map_or(0, |v| v.len()); + self.state + .lock() + .unwrap() + .cursors + .entry(delivery_address.to_string()) + .or_insert(pos); Ok(()) } + + fn pull(&self) -> Vec> { + let mut out = Vec::new(); + let log = self.bus.log.lock().unwrap(); + let mut state = self.state.lock().unwrap(); + for (addr, cursor) in state.cursors.iter_mut() { + if let Some(messages) = log.get(addr) { + while *cursor < messages.len() { + out.push(messages[*cursor].clone()); + *cursor += 1; + } + } + } + out + } } diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index a0cac6f9..6ef1a235 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -8,5 +8,6 @@ pub use errors::ClientError; // Re-export types callers need to interact with ChatClient pub use libchat::{ - AddressedEnvelope, ContentData, ConversationIdOwned, DeliveryService, StorageConfig, + AddressedEnvelope, ConversationIdOwned, DeliveryService, EnvelopeId, Event, FailureReason, + StorageConfig, drain_inbound, }; diff --git a/crates/client/tests/saro_and_raya.rs b/crates/client/tests/saro_and_raya.rs index 4aba9a72..748d3379 100644 --- a/crates/client/tests/saro_and_raya.rs +++ b/crates/client/tests/saro_and_raya.rs @@ -1,68 +1,159 @@ +use std::sync::mpsc; +use std::time::Duration; + use logos_chat::{ - ChatClient, ContentData, ConversationIdOwned, Cursor, InProcessDelivery, StorageConfig, + AddressedEnvelope, ChatClient, DeliveryService, Event, InProcessDelivery, StorageConfig, }; -use std::sync::Arc; - -fn receive(receiver: &mut ChatClient, cursor: &mut Cursor) -> ContentData { - let raw = cursor.next().expect("expected envelope"); - receiver - .receive(&raw) - .expect("receive failed") - .expect("expected content") + +fn expect_event(events: &mpsc::Receiver, label: &str, mut f: F) -> T +where + F: FnMut(Event) -> Result, +{ + let event = events + .recv_timeout(Duration::from_secs(5)) + .unwrap_or_else(|_| panic!("timed out waiting for {label}")); + f(event).unwrap_or_else(|other| panic!("expected {label}, got {other:?}")) } #[test] fn saro_raya_message_exchange() { let delivery = InProcessDelivery::new(Default::default()); - let mut cursor = delivery.cursor_at_tail("delivery_address"); - let mut saro = ChatClient::new("saro", delivery.clone()); - let mut raya = ChatClient::new("raya", delivery); + let (mut saro, saro_events) = ChatClient::new("saro", delivery.clone()); + let (mut raya, raya_events) = ChatClient::new("raya", delivery); let raya_bundle = raya.create_intro_bundle().unwrap(); - let saro_convo_id = saro + let (saro_convo_id, send_events) = saro .create_conversation(&raya_bundle, b"hello raya") .unwrap(); + assert!( + send_events.is_empty(), + "unexpected send events: {send_events:?}" + ); - let content = receive(&mut raya, &mut cursor); - assert_eq!(content.data, b"hello raya"); - assert!(content.is_new_convo); - - let raya_convo_id: ConversationIdOwned = Arc::from(content.conversation_id.as_str()); + // The invite payload yields ConversationStarted then MessageReceived. + let raya_convo_id = expect_event(&raya_events, "ConversationStarted", |e| match e { + Event::ConversationStarted { + conversation_id, .. + } => Ok(conversation_id), + other => Err(other), + }); + expect_event(&raya_events, "MessageReceived", |e| match e { + Event::MessageReceived { + conversation_id, + data, + .. + } => { + assert_eq!(conversation_id, raya_convo_id); + assert_eq!(data.as_slice(), b"hello raya"); + Ok(()) + } + other => Err(other), + }); - raya.send_message(&raya_convo_id, b"hi saro").unwrap(); - let content = receive(&mut saro, &mut cursor); - assert_eq!(content.data, b"hi saro"); - assert!(!content.is_new_convo); + let send_events = raya.send_message(&raya_convo_id, b"hi saro").unwrap(); + assert!(send_events.is_empty()); + expect_event(&saro_events, "MessageReceived", |e| match e { + Event::MessageReceived { data, .. } => { + assert_eq!(data.as_slice(), b"hi saro"); + Ok(()) + } + other => Err(other), + }); for i in 0u8..5 { let msg = format!("msg {i}"); - saro.send_message(&saro_convo_id, msg.as_bytes()).unwrap(); - let content = receive(&mut raya, &mut cursor); - assert_eq!(content.data, msg.as_bytes()); + let send_events = saro.send_message(&saro_convo_id, msg.as_bytes()).unwrap(); + assert!(send_events.is_empty()); + expect_event( + &raya_events, + &format!("MessageReceived(msg {i})"), + |e| match e { + Event::MessageReceived { data, .. } => { + assert_eq!(data.as_slice(), msg.as_bytes()); + Ok(()) + } + other => Err(other), + }, + ); let reply = format!("reply {i}"); - raya.send_message(&raya_convo_id, reply.as_bytes()).unwrap(); - let content = receive(&mut saro, &mut cursor); - assert_eq!(content.data, reply.as_bytes()); + let send_events = raya.send_message(&raya_convo_id, reply.as_bytes()).unwrap(); + assert!(send_events.is_empty()); + expect_event( + &saro_events, + &format!("MessageReceived(reply {i})"), + |e| match e { + Event::MessageReceived { data, .. } => { + assert_eq!(data.as_slice(), reply.as_bytes()); + Ok(()) + } + other => Err(other), + }, + ); } assert_eq!(saro.list_conversations().unwrap().len(), 1); assert_eq!(raya.list_conversations().unwrap().len(), 1); } +#[derive(Debug, Default)] +struct FailingDelivery; + +impl DeliveryService for FailingDelivery { + type Error = &'static str; + + fn publish(&self, _: AddressedEnvelope) -> Result<(), Self::Error> { + Err("simulated transport failure") + } + + fn subscribe(&self, _: &str) -> Result<(), Self::Error> { + Ok(()) + } + + fn pull(&self) -> Vec> { + Vec::new() + } +} + +#[test] +fn dropping_client_shuts_down_translator() { + let (client, events) = ChatClient::new("saro", InProcessDelivery::default()); + drop(client); + // Drop must join the translator thread; once joined, the translator's + // Sender is gone and recv returns Disconnected immediately. + let res = events.recv_timeout(Duration::from_secs(5)); + assert!(matches!(res, Err(mpsc::RecvTimeoutError::Disconnected))); +} + +#[test] +fn publish_failure_surfaces_as_event() { + // Spin a real raya just to mint a valid intro bundle. + let (mut raya, _) = ChatClient::new("raya", InProcessDelivery::default()); + let bundle = raya.create_intro_bundle().unwrap(); + + let (mut saro, _) = ChatClient::new("saro", FailingDelivery); + let (_, send_events) = saro.create_conversation(&bundle, b"hello").unwrap(); + assert!( + send_events.iter().any(Event::is_delivery_failure), + "expected a DeliveryFailed event, got {send_events:?}" + ); +} + #[test] fn open_persistent_client() { let dir = tempfile::tempdir().unwrap(); let db_path = dir.path().join("test.db").to_string_lossy().to_string(); let config = StorageConfig::File(db_path); - let client1 = ChatClient::open("saro", config.clone(), InProcessDelivery::default()).unwrap(); - let name1 = client1.installation_name().to_string(); + let (client1, _events1) = + ChatClient::open("saro", config.clone(), InProcessDelivery::default()).unwrap(); + let name1 = client1.installation_name(); drop(client1); - let client2 = ChatClient::open("saro", config, InProcessDelivery::default()).unwrap(); - let name2 = client2.installation_name().to_string(); + let (client2, _events2) = + ChatClient::open("saro", config, InProcessDelivery::default()).unwrap(); + let name2 = client2.installation_name(); assert_eq!( name1, name2, diff --git a/extensions/components/src/delivery/local_broadcaster.rs b/extensions/components/src/delivery/local_broadcaster.rs index 5889def1..35305e74 100644 --- a/extensions/components/src/delivery/local_broadcaster.rs +++ b/extensions/components/src/delivery/local_broadcaster.rs @@ -1,87 +1,56 @@ use std::{ - cell::RefCell, collections::{HashSet, VecDeque}, hash::{DefaultHasher, Hash, Hasher}, - rc::Rc, + sync::{Arc, Mutex}, }; use libchat::{AddressedEnvelope, DeliveryService}; -#[derive(Debug)] -struct BroadcasterShared { - /// Per-address message queue; all published messages are appended here. - messages: VecDeque, - base_index: usize, +#[derive(Debug, Default)] +struct SharedStore { + /// Append-only log of every published envelope. + messages: VecDeque, } -impl BroadcasterShared { - pub fn read(&self, cursor: usize) -> Option<&T> { - self.messages.get(cursor + self.base_index) - } - - pub fn tail(&self) -> usize { - self.messages.len() + self.base_index - } +#[derive(Clone, Debug, Default)] +struct ConsumerState { + /// Position in the shared log this consumer has scanned up to. + cursor: usize, + /// Addresses this consumer is interested in. + subscriptions: HashSet, + /// IDs of envelopes this consumer itself published — used to filter them + /// out when scanning the log (a consumer doesn't receive its own output). + outbound_msgs: HashSet, } -#[derive(Clone, Debug)] +/// `DeliveryService` for tests and local examples. +/// +/// Each clone is an independent consumer (own cursor, subscriptions, and +/// outbound filter) over a shared in-memory log. +#[derive(Debug)] pub struct LocalBroadcaster { - shared: Rc>>, - cursor: usize, - subscriptions: HashSet, - outbound_msgs: Vec, + shared: Arc>, + state: Mutex, } -/// This is Lightweight DeliveryService which can be used for tests -/// and local examples. Messages are not delivered until `poll` is called -/// which allows for more fine grain test cases. impl LocalBroadcaster { pub fn new() -> Self { - let shared = Rc::new(RefCell::new(BroadcasterShared { - messages: VecDeque::new(), - base_index: 0, - })); - - let cursor = shared.borrow().tail(); Self { - shared, - cursor, - subscriptions: HashSet::new(), - outbound_msgs: Vec::new(), + shared: Arc::new(Mutex::new(SharedStore::default())), + state: Mutex::new(ConsumerState::default()), } } - /// Returns a new consumer that shares the same message store but has its - /// own independent cursor — it starts from the beginning of each address - /// queue regardless of what any other consumer has already processed. + /// Returns a new consumer that shares the same underlying log but starts + /// at the current tail — historical messages are skipped. pub fn new_consumer(&self) -> Self { - let inner = self.shared.clone(); - let cursor = inner.borrow().tail(); + let cursor = self.shared.lock().unwrap().messages.len(); Self { - shared: inner, - cursor, - subscriptions: HashSet::new(), - outbound_msgs: Vec::new(), - } - } - - /// Pulls all messages this consumer has not yet seen on `address`, - /// applying any registered filter. Advances the cursor so the same - /// messages are not returned again. - pub fn poll(&mut self) -> Option> { - loop { - let next = self.cursor; - match self.shared.borrow().read(next) { - None => return None, - Some(ae) => { - self.cursor = next + 1; - if self.subscriptions.contains(ae.delivery_address.as_str()) - && self.is_inbound(ae) - { - return Some(ae.data.clone()); - } - } - } + shared: Arc::clone(&self.shared), + state: Mutex::new(ConsumerState { + cursor, + ..ConsumerState::default() + }), } } @@ -90,11 +59,6 @@ impl LocalBroadcaster { msg.data.as_slice().hash(&mut hasher); hasher.finish() } - - fn is_inbound(&self, msg: &AddressedEnvelope) -> bool { - let mid = Self::msg_id(msg); - !self.outbound_msgs.contains(&mid) - } } impl Default for LocalBroadcaster { @@ -103,20 +67,47 @@ impl Default for LocalBroadcaster { } } +impl Clone for LocalBroadcaster { + fn clone(&self) -> Self { + Self { + shared: Arc::clone(&self.shared), + state: Mutex::new(self.state.lock().unwrap().clone()), + } + } +} + impl DeliveryService for LocalBroadcaster { type Error = String; - fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), Self::Error> { - self.outbound_msgs.push(Self::msg_id(&envelope)); - self.shared.borrow_mut().messages.push_back(envelope); - + fn publish(&self, envelope: AddressedEnvelope) -> Result<(), Self::Error> { + let id = Self::msg_id(&envelope); + self.state.lock().unwrap().outbound_msgs.insert(id); + self.shared.lock().unwrap().messages.push_back(envelope); Ok(()) } - fn subscribe(&mut self, delivery_address: &str) -> Result<(), Self::Error> { - // Strict temporal ordering of subscriptions is not enforced. - // Subscriptions are evaluated on polling, not when the message is published - self.subscriptions.insert(delivery_address.to_string()); + fn subscribe(&self, delivery_address: &str) -> Result<(), Self::Error> { + self.state + .lock() + .unwrap() + .subscriptions + .insert(delivery_address.to_string()); Ok(()) } + + fn pull(&self) -> Vec> { + let mut out = Vec::new(); + let shared = self.shared.lock().unwrap(); + let mut state = self.state.lock().unwrap(); + while state.cursor < shared.messages.len() { + let ae = &shared.messages[state.cursor]; + state.cursor += 1; + if state.subscriptions.contains(&ae.delivery_address) + && !state.outbound_msgs.contains(&Self::msg_id(ae)) + { + out.push(ae.data.clone()); + } + } + out + } } diff --git a/flake.nix b/flake.nix index e588d489..873dc456 100644 --- a/flake.nix +++ b/flake.nix @@ -79,7 +79,7 @@ } ); - devShells = forAllSystems ({ pkgs }: + devShells = forAllSystems ({ pkgs, ... }: let rustToolchain = pkgs.rust-bin.fromRustupToolchainFile ./rust_toolchain.toml; in @@ -89,6 +89,7 @@ rustToolchain pkgs.pkg-config pkgs.cmake + pkgs.perl ]; }; }