diff --git a/Cargo.toml b/Cargo.toml index 59ad2b767..9e545fa31 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -86,6 +86,8 @@ lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = " proptest = "1.0.0" regex = "1.5.6" criterion = { version = "0.7.0", features = ["async_tokio"] } +rcgen = "0.14.6" +tokio-rustls = "0.26" [target.'cfg(not(no_download))'.dev-dependencies] electrsd = { version = "0.36.1", default-features = false, features = ["legacy", "esplora_a33e97e1", "corepc-node_27_2"] } diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index c4ebf56a6..97987200a 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -346,6 +346,9 @@ enum NodeError { "LiquidityFeeTooHigh", "InvalidBlindedPaths", "AsyncPaymentServicesDisabled", + "LiquiditySetWebhookFailed", + "LiquidityRemoveWebhookFailed", + "LiquidityListWebhooksFailed" }; dictionary NodeStatus { diff --git a/src/builder.rs b/src/builder.rs index ff84505b4..8b8db3634 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -61,7 +61,8 @@ use crate::io::{ self, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, }; use crate::liquidity::{ - LSPS1ClientConfig, LSPS2ClientConfig, LSPS2ServiceConfig, LiquiditySourceBuilder, + LSPS1ClientConfig, LSPS2ClientConfig, LSPS2ServiceConfig, LSPS5ClientConfig, + LiquiditySourceBuilder, }; use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger}; use crate::message_handler::NodeCustomMessageHandler; @@ -71,7 +72,8 @@ use crate::runtime::Runtime; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph, KeysManager, - MessageRouter, OnionMessenger, PaymentStore, PeerManager, Persister, SyncAndAsyncKVStore, + LSPS5ServiceConfig, MessageRouter, OnionMessenger, PaymentStore, PeerManager, Persister, + SyncAndAsyncKVStore, }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; @@ -119,6 +121,12 @@ struct LiquiditySourceConfig { lsps2_client: Option, // Act as an LSPS2 service. lsps2_service: Option, + // Act as an LSPS5 client connecting to the given service. + lsps5_client: Option, + // Act as an LSPS5 service. + lsps5_service: Option, + // Optional custom HTTP client to be used by LSPS5 service. + http_client: Option, } #[derive(Clone)] @@ -444,6 +452,44 @@ impl NodeBuilder { self } + /// Configures the [`Node`] instance to source webhook notifications from the given + /// [bLIP-55 / LSPS5] service. + /// + /// This allows the client to register webhook endpoints with the LSP to receive + /// push notifications for Lightning events when the client is offline. + /// + /// [bLIP-55 / LSPS5]: https://github.com/lightning/blips/blob/master/blip-0055.md + pub fn set_liquidity_source_lsps5( + &mut self, node_id: PublicKey, address: SocketAddress, + ) -> &mut Self { + let liquidity_source_config = + self.liquidity_source_config.get_or_insert(LiquiditySourceConfig::default()); + let lsps5_client_config = LSPS5ClientConfig { node_id, address }; + liquidity_source_config.lsps5_client = Some(lsps5_client_config); + self + } + + /// Configures the [`Node`] instance to provide an [LSPS5] service, enabling clients + /// to register webhooks for push notifications. + /// + /// [LSPS5]: https://github.com/lightning/blips/blob/master/blip-0055.md + pub fn set_liquidity_provider_lsps5( + &mut self, service_config: LSPS5ServiceConfig, + ) -> &mut Self { + let liquidity_source_config = + self.liquidity_source_config.get_or_insert(LiquiditySourceConfig::default()); + liquidity_source_config.lsps5_service = Some(service_config); + self + } + + /// Sets a custom HTTP client to be used by the LSPS5 service. + pub fn set_liquidity_http_client(&mut self, http_client: reqwest::Client) -> &mut Self { + let liquidity_source_config = + self.liquidity_source_config.get_or_insert(LiquiditySourceConfig::default()); + liquidity_source_config.http_client = Some(http_client); + self + } + /// Sets the used storage directory path. pub fn set_storage_dir_path(&mut self, storage_dir_path: String) -> &mut Self { self.config.storage_dir_path = storage_dir_path; @@ -845,6 +891,30 @@ impl ArcedNodeBuilder { self.inner.write().unwrap().set_liquidity_provider_lsps2(service_config); } + /// Configures the [`Node`] instance to source webhook notifications from the given + /// [bLIP-55 / LSPS5] service. + /// + /// This allows the client to register webhook endpoints with the LSP to receive + /// push notifications for Lightning events when the client is offline. + /// + /// [bLIP-55 / LSPS5]: https://github.com/lightning/blips/blob/master/blip-0055.md + pub fn set_liquidity_source_lsps5(&self, node_id: PublicKey, address: SocketAddress) { + self.inner.write().unwrap().set_liquidity_source_lsps5(node_id, address); + } + + /// Configures the [`Node`] instance to provide an [LSPS5] service, enabling clients + /// to register webhooks for push notifications. + /// + /// [LSPS5]: https://github.com/lightning/blips/blob/master/blip-0055.md + pub fn set_liquidity_provider_lsps5(&self, service_config: LSPS5ServiceConfig) { + self.inner.write().unwrap().set_liquidity_provider_lsps5(service_config); + } + + /// Sets a custom HTTP client to be used by the LSPS5 service. + pub fn set_liquidity_http_client(&self, http_client: reqwest::Client) { + self.inner.write().unwrap().set_liquidity_http_client(http_client); + } + /// Sets the used storage directory path. pub fn set_storage_dir_path(&self, storage_dir_path: String) { self.inner.write().unwrap().set_storage_dir_path(storage_dir_path); @@ -1547,6 +1617,15 @@ fn build_with_store_internal( liquidity_source_builder.lsps2_service(promise_secret, config.clone()) }); + lsc.lsps5_client.as_ref().map(|config| { + liquidity_source_builder.lsps5_client(config.node_id, config.address.clone()) + }); + + lsc.lsps5_service.as_ref().map(|config| { + let http_client = lsc.http_client.clone().unwrap_or_else(|| reqwest::Client::new()); + liquidity_source_builder.lsps5_service_with_http_client(config.clone(), http_client) + }); + let liquidity_source = runtime .block_on(async move { liquidity_source_builder.build().await.map(Arc::new) })?; let custom_message_handler = diff --git a/src/error.rs b/src/error.rs index 20b1cceab..59a3c9500 100644 --- a/src/error.rs +++ b/src/error.rs @@ -127,6 +127,12 @@ pub enum Error { InvalidBlindedPaths, /// Asynchronous payment services are disabled. AsyncPaymentServicesDisabled, + /// Failed to set a webhook with the LSP. + LiquiditySetWebhookFailed, + /// Failed to remove a webhook with the LSP. + LiquidityRemoveWebhookFailed, + /// Failed to list webhooks with the LSP. + LiquidityListWebhooksFailed, } impl fmt::Display for Error { @@ -205,6 +211,15 @@ impl fmt::Display for Error { Self::AsyncPaymentServicesDisabled => { write!(f, "Asynchronous payment services are disabled.") }, + Self::LiquiditySetWebhookFailed => { + write!(f, "Failed to set a webhook with the LSP.") + }, + Self::LiquidityRemoveWebhookFailed => { + write!(f, "Failed to remove a webhook with the LSP.") + }, + Self::LiquidityListWebhooksFailed => { + write!(f, "Failed to list webhooks with the LSP.") + }, } } } diff --git a/src/lib.rs b/src/lib.rs index fdaa0f4f1..4ab44abc8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -146,7 +146,7 @@ use lightning::ln::msgs::SocketAddress; use lightning::routing::gossip::NodeAlias; use lightning::util::persist::KVStoreSync; use lightning_background_processor::process_events_async; -use liquidity::{LSPS1Liquidity, LiquiditySource}; +use liquidity::{LSPS1Liquidity, LSPS5Liquidity, LiquiditySource}; use logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use payment::asynchronous::om_mailbox::OnionMessageMailbox; use payment::asynchronous::static_invoice_store::StaticInvoiceStore; @@ -1004,6 +1004,32 @@ impl Node { )) } + /// Returns a liquidity handler allowing to handle webhooks and notifications via the [bLIP-55 / LSPS5] protocol. + /// + /// [bLIP-55 / LSPS5]: https://github.com/lightning/blips/blob/master/blip-0055.md + #[cfg(not(feature = "uniffi"))] + pub fn lsps5_liquidity(&self) -> LSPS5Liquidity { + LSPS5Liquidity::new( + Arc::clone(&self.runtime), + Arc::clone(&self.connection_manager), + self.liquidity_source.clone(), + Arc::clone(&self.logger), + ) + } + + /// Returns a liquidity handler allowing to handle webhooks and notifications via the [bLIP-55 / LSPS5] protocol. + /// + /// [bLIP-55 / LSPS5]: https://github.com/lightning/blips/blob/master/blip-0055.md + #[cfg(feature = "uniffi")] + pub fn lsps5_liquidity(&self) -> Arc { + Arc::new(LSPS5Liquidity::new( + Arc::clone(&self.runtime), + Arc::clone(&self.connection_manager), + self.liquidity_source.clone(), + Arc::clone(&self.logger), + )) + } + /// Retrieve a list of known channels. pub fn list_channels(&self) -> Vec { self.channel_manager.list_channels().into_iter().map(|c| c.into()).collect() diff --git a/src/liquidity.rs b/src/liquidity.rs index 74e6098dd..e5fb4dd10 100644 --- a/src/liquidity.rs +++ b/src/liquidity.rs @@ -34,6 +34,12 @@ use lightning_liquidity::lsps2::event::{LSPS2ClientEvent, LSPS2ServiceEvent}; use lightning_liquidity::lsps2::msgs::{LSPS2OpeningFeeParams, LSPS2RawOpeningFeeParams}; use lightning_liquidity::lsps2::service::LSPS2ServiceConfig as LdkLSPS2ServiceConfig; use lightning_liquidity::lsps2::utils::compute_opening_fee; +use lightning_liquidity::lsps5::client::LSPS5ClientConfig as LdkLSPS5ClientConfig; +use lightning_liquidity::lsps5::event::{LSPS5ClientEvent, LSPS5ServiceEvent}; +use lightning_liquidity::lsps5::msgs::{ + LSPS5Error, ListWebhooksResponse, RemoveWebhookResponse, SetWebhookResponse, +}; +use lightning_liquidity::lsps5::service::LSPS5ServiceConfig as LdkLSPS5ServiceConfig; use lightning_liquidity::{LiquidityClientConfig, LiquidityServiceConfig}; use lightning_types::payment::PaymentHash; use rand::Rng; @@ -54,6 +60,29 @@ const LIQUIDITY_REQUEST_TIMEOUT_SECS: u64 = 5; const LSPS2_GETINFO_REQUEST_EXPIRY: Duration = Duration::from_secs(60 * 60 * 24); const LSPS2_CHANNEL_CLTV_EXPIRY_DELTA: u32 = 72; +/// Error type for HTTP client operations. +#[derive(Debug)] +pub enum HttpClientError { + /// Network or connection error. + Network(String), + /// HTTP status error. + Status(u16), + /// Other error. + Other(String), +} + +impl std::fmt::Display for HttpClientError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + HttpClientError::Network(msg) => write!(f, "Network error: {}", msg), + HttpClientError::Status(code) => write!(f, "HTTP error: {}", code), + HttpClientError::Other(msg) => write!(f, "HTTP client error: {}", msg), + } + } +} + +impl std::error::Error for HttpClientError {} + struct LSPS1Client { lsp_node_id: PublicKey, lsp_address: SocketAddress, @@ -145,6 +174,29 @@ pub struct LSPS2ServiceConfig { pub client_trusts_lsp: bool, } +struct LSPS5Client { + lsp_node_id: PublicKey, + lsp_address: SocketAddress, + ldk_client_config: LdkLSPS5ClientConfig, + pending_set_webhook_requests: + Mutex>>>, + pending_list_webhooks_requests: + Mutex>>>, + pending_remove_webhook_requests: + Mutex>>>, +} + +#[derive(Debug, Clone)] +pub(crate) struct LSPS5ClientConfig { + pub node_id: PublicKey, + pub address: SocketAddress, +} + +struct LSPS5Service { + ldk_service_config: LdkLSPS5ServiceConfig, + http_client: reqwest::Client, +} + pub(crate) struct LiquiditySourceBuilder where L::Target: LdkLogger, @@ -152,6 +204,8 @@ where lsps1_client: Option, lsps2_client: Option, lsps2_service: Option, + lsps5_client: Option, + lsps5_service: Option, wallet: Arc, channel_manager: Arc, keys_manager: Arc, @@ -174,10 +228,14 @@ where let lsps1_client = None; let lsps2_client = None; let lsps2_service = None; + let lsps5_client = None; + let lsps5_service = None; Self { lsps1_client, lsps2_client, lsps2_service, + lsps5_client, + lsps5_service, wallet, channel_manager, keys_manager, @@ -234,17 +292,58 @@ where self } - pub(crate) async fn build(self) -> Result, BuildError> { - let liquidity_service_config = self.lsps2_service.as_ref().map(|s| { - let lsps2_service_config = Some(s.ldk_service_config.clone()); - let lsps5_service_config = None; - let advertise_service = s.service_config.advertise_service; - LiquidityServiceConfig { lsps2_service_config, lsps5_service_config, advertise_service } + pub(crate) fn lsps5_client( + &mut self, lsp_node_id: PublicKey, lsp_address: SocketAddress, + ) -> &mut Self { + let ldk_client_config = LdkLSPS5ClientConfig {}; + + let pending_set_webhook_requests = Mutex::new(HashMap::new()); + let pending_list_webhooks_requests = Mutex::new(HashMap::new()); + let pending_remove_webhook_requests = Mutex::new(HashMap::new()); + + self.lsps5_client = Some(LSPS5Client { + ldk_client_config, + lsp_node_id, + lsp_address, + pending_set_webhook_requests, + pending_list_webhooks_requests, + pending_remove_webhook_requests, }); + self + } + + pub(crate) fn lsps5_service_with_http_client( + &mut self, service_config: LdkLSPS5ServiceConfig, http_client: reqwest::Client, + ) -> &mut Self { + self.lsps5_service = Some(LSPS5Service { ldk_service_config: service_config, http_client }); + self + } + + pub(crate) async fn build(self) -> Result, BuildError> { + let lsps2_service_config = + self.lsps2_service.as_ref().map(|s| s.ldk_service_config.clone()); + let lsps5_service_config = + self.lsps5_service.as_ref().map(|s| s.ldk_service_config.clone()); + let advertise_service = self + .lsps2_service + .as_ref() + .map(|s| s.service_config.advertise_service) + .unwrap_or(false); + + let liquidity_service_config = + if lsps2_service_config.is_some() || lsps5_service_config.is_some() { + Some(LiquidityServiceConfig { + lsps2_service_config, + lsps5_service_config, + advertise_service, + }) + } else { + None + }; let lsps1_client_config = self.lsps1_client.as_ref().map(|s| s.ldk_client_config.clone()); let lsps2_client_config = self.lsps2_client.as_ref().map(|s| s.ldk_client_config.clone()); - let lsps5_client_config = None; + let lsps5_client_config = self.lsps5_client.as_ref().map(|s| s.ldk_client_config.clone()); let liquidity_client_config = Some(LiquidityClientConfig { lsps1_client_config, lsps2_client_config, @@ -271,6 +370,8 @@ where lsps1_client: self.lsps1_client, lsps2_client: self.lsps2_client, lsps2_service: self.lsps2_service, + lsps5_client: self.lsps5_client, + lsps5_service: self.lsps5_service, wallet: self.wallet, channel_manager: self.channel_manager, peer_manager: RwLock::new(None), @@ -289,6 +390,8 @@ where lsps1_client: Option, lsps2_client: Option, lsps2_service: Option, + lsps5_client: Option, + lsps5_service: Option, wallet: Arc, channel_manager: Arc, peer_manager: RwLock>>, @@ -318,6 +421,10 @@ where self.lsps2_client.as_ref().map(|s| (s.lsp_node_id, s.lsp_address.clone())) } + pub(crate) fn get_lsps5_lsp_details(&self) -> Option<(PublicKey, SocketAddress)> { + self.lsps5_client.as_ref().map(|s| (s.lsp_node_id, s.lsp_address.clone())) + } + pub(crate) fn lsps2_channel_needs_manual_broadcast( &self, counterparty_node_id: PublicKey, user_channel_id: u128, ) -> bool { @@ -910,6 +1017,362 @@ where ); } }, + LiquidityEvent::LSPS5Client(LSPS5ClientEvent::WebhookRegistered { + request_id, + counterparty_node_id, + num_webhooks, + max_webhooks, + no_change, + .. + }) => { + if let Some(lsps5_client) = self.lsps5_client.as_ref() { + if counterparty_node_id != lsps5_client.lsp_node_id { + debug_assert!( + false, + "Received response from unexpected LSP counterparty. This should never happen." + ); + log_error!( + self.logger, + "Received response from unexpected LSP counterparty. This should never happen." + ); + return; + } + + let response = Ok(SetWebhookResponse { num_webhooks, max_webhooks, no_change }); + + match lsps5_client + .pending_set_webhook_requests + .lock() + .unwrap() + .remove(&request_id) + { + Some(sender) => { + if sender.send(response).is_err() { + log_error!( + self.logger, + "Failed to handle response for request {:?} from liquidity service", + request_id + ); + } + }, + None => { + debug_assert!( + false, + "Received response from liquidity service for unknown request." + ); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + }, + } + } else { + log_error!( + self.logger, + "Received unexpected LSPS5Client::WebhookRegistered event!" + ); + } + }, + LiquidityEvent::LSPS5Client(LSPS5ClientEvent::WebhookRegistrationFailed { + request_id, + counterparty_node_id, + error, + app_name, + url, + }) => { + if let Some(lsps5_client) = self.lsps5_client.as_ref() { + if counterparty_node_id != lsps5_client.lsp_node_id { + debug_assert!( + false, + "Received response from unexpected LSP counterparty. This should never happen." + ); + log_error!( + self.logger, + "Received response from unexpected LSP counterparty. This should never happen." + ); + return; + } + + match lsps5_client + .pending_set_webhook_requests + .lock() + .unwrap() + .remove(&request_id) + { + Some(sender) => { + log_error!( + self.logger, + "Webhook registration failed for app '{}' with url '{}': {:?}", + app_name, + url, + error + ); + if sender.send(Err(error)).is_err() { + log_error!( + self.logger, + "Failed to handle response for request {:?} from liquidity service", + request_id + ); + } + }, + None => { + debug_assert!( + false, + "Received response from liquidity service for unknown request." + ); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + }, + } + } else { + log_error!( + self.logger, + "Received unexpected LSPS5Client::WebhookRegistrationFailed event!" + ); + } + }, + LiquidityEvent::LSPS5Client(LSPS5ClientEvent::WebhooksListed { + request_id, + counterparty_node_id, + app_names, + max_webhooks, + }) => { + if let Some(lsps5_client) = self.lsps5_client.as_ref() { + if counterparty_node_id != lsps5_client.lsp_node_id { + debug_assert!( + false, + "Received response from unexpected LSP counterparty. This should never happen." + ); + log_error!( + self.logger, + "Received response from unexpected LSP counterparty. This should never happen." + ); + return; + } + + let response = + Ok(ListWebhooksResponse { app_names: app_names.clone(), max_webhooks }); + + match lsps5_client + .pending_list_webhooks_requests + .lock() + .unwrap() + .remove(&request_id) + { + Some(sender) => { + if sender.send(response).is_err() { + log_error!( + self.logger, + "Failed to handle response for request {:?} from liquidity service", + request_id + ); + } + }, + None => { + debug_assert!( + false, + "Received response from liquidity service for unknown request." + ); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + }, + } + } else { + log_error!( + self.logger, + "Received unexpected LSPS5Client::WebhooksListed event!" + ); + } + }, + LiquidityEvent::LSPS5Client(LSPS5ClientEvent::WebhookRemoved { + request_id, + counterparty_node_id, + .. + }) => { + if let Some(lsps5_client) = self.lsps5_client.as_ref() { + if counterparty_node_id != lsps5_client.lsp_node_id { + debug_assert!( + false, + "Received response from unexpected LSP counterparty. This should never happen." + ); + log_error!( + self.logger, + "Received response from unexpected LSP counterparty. This should never happen." + ); + return; + } + + match lsps5_client + .pending_remove_webhook_requests + .lock() + .unwrap() + .remove(&request_id) + { + Some(sender) => { + if sender.send(Ok(RemoveWebhookResponse {})).is_err() { + log_error!( + self.logger, + "Failed to handle response for request {:?} from liquidity service", + request_id + ); + } + }, + None => { + debug_assert!( + false, + "Received response from liquidity service for unknown request." + ); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + }, + } + } else { + log_error!( + self.logger, + "Received unexpected LSPS5Client::WebhookRemoved event!" + ); + } + }, + LiquidityEvent::LSPS5Client(LSPS5ClientEvent::WebhookRemovalFailed { + request_id, + counterparty_node_id, + error, + app_name, + }) => { + if let Some(lsps5_client) = self.lsps5_client.as_ref() { + if counterparty_node_id != lsps5_client.lsp_node_id { + debug_assert!( + false, + "Received response from unexpected LSP counterparty. This should never happen." + ); + log_error!( + self.logger, + "Received response from unexpected LSP counterparty. This should never happen." + ); + return; + } + + match lsps5_client + .pending_remove_webhook_requests + .lock() + .unwrap() + .remove(&request_id) + { + Some(sender) => { + log_error!( + self.logger, + "Webhook removal failed for app '{}': {:?}", + app_name, + error + ); + if sender.send(Err(error)).is_err() { + log_error!( + self.logger, + "Failed to handle response for request {:?} from liquidity service", + request_id + ); + } + }, + None => { + debug_assert!( + false, + "Received response from liquidity service for unknown request." + ); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + }, + } + } else { + log_error!( + self.logger, + "Received unexpected LSPS5Client::WebhookRemovalFailed event!" + ); + } + }, + LiquidityEvent::LSPS5Service(LSPS5ServiceEvent::SendWebhookNotification { + counterparty_node_id: _, + app_name, + url, + notification, + headers, + }) => { + if self.liquidity_manager.lsps5_service_handler().is_some() { + let http_client = if let Some(http_client) = + self.lsps5_service.as_ref().map(|s| s.http_client.clone()) + { + http_client + } else { + log_error!(self.logger, "Failed to handle LSPS5ServiceEvent as LSPS5 liquidity service was not configured.",); + return; + }; + + log_info!( + self.logger, + "Sending webhook notification for {} to {}: {:?}", + app_name, + url, + notification + ); + + let notification_str = serde_json::to_string(¬ification) + .unwrap_or_else(|_| format!("{:?}", notification)); + + let client = http_client; + + let mut header_map = reqwest::header::HeaderMap::new(); + for (key, value) in headers.iter() { + if let (Ok(header_name), Ok(header_value)) = ( + reqwest::header::HeaderName::from_bytes(key.as_bytes()), + reqwest::header::HeaderValue::from_str(value.as_str()), + ) { + header_map.insert(header_name, header_value); + } + } + + let result = client + .post(url.as_str()) + .headers(header_map) + .body(notification_str) + .send() + .await; + + match result { + Ok(response) => { + if !response.status().is_success() { + log_error!( + self.logger, + "Webhook call failed with status {} for {} to {}", + response.status(), + app_name, + url + ); + } + }, + Err(e) => { + log_error!( + self.logger, + "Failed to send webhook notification for {} to {}: {}", + app_name, + url, + e + ); + }, + } + } else { + log_error!( + self.logger, + "Received unexpected LSPS5ServiceEvent::SendWebhookNotification event!" + ); + } + }, e => { log_error!(self.logger, "Received unexpected liquidity event: {:?}", e); }, @@ -1336,6 +1799,184 @@ where }) } + pub(crate) async fn lsps5_set_webhook( + &self, app_name: String, webhook_url: String, + ) -> Result { + let lsps5_client = self.lsps5_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + let client_handler = self.liquidity_manager.lsps5_client_handler().ok_or_else(|| { + log_error!(self.logger, "LSPS5 liquidity client was not configured."); + Error::LiquiditySourceUnavailable + })?; + + let (sender, receiver) = oneshot::channel(); + let request_id = match client_handler.set_webhook( + lsps5_client.lsp_node_id, + app_name.clone(), + webhook_url.clone(), + ) { + Ok(request_id) => request_id, + Err(e) => { + log_error!( + self.logger, + "Failed to send set webhook request to liquidity service: {:?}", + e + ); + return Err(Error::LiquiditySetWebhookFailed); + }, + }; + + lsps5_client + .pending_set_webhook_requests + .lock() + .unwrap() + .insert(request_id.clone(), sender); + + match tokio::time::timeout(Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), receiver) + .await + { + Ok(Ok(result)) => result.map_err(|e| { + log_error!(self.logger, "Failed to set webhook: {:?}", e); + Error::LiquiditySetWebhookFailed + }), + Ok(Err(e)) => { + log_error!( + self.logger, + "Failed to handle response from liquidity service: {:?}", + e + ); + Err(Error::LiquidityRequestFailed) + }, + Err(e) => { + lsps5_client.pending_set_webhook_requests.lock().unwrap().remove(&request_id); + log_error!(self.logger, "Liquidity request timed out: {}", e); + Err(Error::LiquidityRequestFailed) + }, + } + } + + pub(crate) async fn lsps5_list_webhooks(&self) -> Result { + let lsps5_client = self.lsps5_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + let client_handler = self.liquidity_manager.lsps5_client_handler().ok_or_else(|| { + log_error!(self.logger, "LSPS5 liquidity client was not configured."); + Error::LiquiditySourceUnavailable + })?; + + let (sender, receiver) = oneshot::channel(); + let request_id = client_handler.list_webhooks(lsps5_client.lsp_node_id); + lsps5_client + .pending_list_webhooks_requests + .lock() + .unwrap() + .insert(request_id.clone(), sender); + + match tokio::time::timeout(Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), receiver) + .await + { + Ok(Ok(result)) => result.map_err(|e| { + log_error!(self.logger, "Failed to list webhooks: {:?}", e); + Error::LiquidityListWebhooksFailed + }), + Ok(Err(e)) => { + log_error!( + self.logger, + "Failed to handle response from liquidity service: {:?}", + e + ); + Err(Error::LiquidityRequestFailed) + }, + Err(e) => { + lsps5_client.pending_list_webhooks_requests.lock().unwrap().remove(&request_id); + log_error!(self.logger, "Liquidity request timed out: {}", e); + Err(Error::LiquidityRequestFailed) + }, + } + } + + pub(crate) async fn lsps5_remove_webhook( + &self, app_name: String, + ) -> Result { + let lsps5_client = self.lsps5_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + let client_handler = self.liquidity_manager.lsps5_client_handler().ok_or_else(|| { + log_error!(self.logger, "LSPS5 liquidity client was not configured."); + Error::LiquiditySourceUnavailable + })?; + + let (sender, receiver) = oneshot::channel(); + let request_id = + match client_handler.remove_webhook(lsps5_client.lsp_node_id, app_name.clone()) { + Ok(request_id) => request_id, + Err(_) => return Err(Error::LiquidityRemoveWebhookFailed), + }; + + lsps5_client + .pending_remove_webhook_requests + .lock() + .unwrap() + .insert(request_id.clone(), sender); + + match tokio::time::timeout(Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), receiver) + .await + { + Ok(Ok(result)) => result.map_err(|e| { + log_error!(self.logger, "Failed to remove webhook: {:?}", e); + Error::LiquidityRemoveWebhookFailed + }), + Ok(Err(e)) => { + log_error!( + self.logger, + "Failed to handle response from liquidity service: {:?}", + e + ); + Err(Error::LiquidityRequestFailed) + }, + Err(e) => { + lsps5_client.pending_remove_webhook_requests.lock().unwrap().remove(&request_id); + log_error!(self.logger, "Liquidity request timed out: {}", e); + Err(Error::LiquidityRequestFailed) + }, + } + } + + pub(crate) fn lsps5_notify_payment_incoming(&self, client_id: PublicKey) -> Result<(), Error> { + let handler = self + .liquidity_manager + .lsps5_service_handler() + .ok_or(Error::LiquiditySourceUnavailable)?; + handler.notify_payment_incoming(client_id).map_err(|_| Error::LiquidityRequestFailed) + } + + pub(crate) fn lsps5_notify_expiry_soon( + &self, client_id: PublicKey, timeout: u32, + ) -> Result<(), Error> { + let handler = self + .liquidity_manager + .lsps5_service_handler() + .ok_or(Error::LiquiditySourceUnavailable)?; + handler.notify_expiry_soon(client_id, timeout).map_err(|_| Error::LiquidityRequestFailed) + } + + pub(crate) fn lsps5_notify_liquidity_management_request( + &self, client_id: PublicKey, + ) -> Result<(), Error> { + let handler = self + .liquidity_manager + .lsps5_service_handler() + .ok_or(Error::LiquiditySourceUnavailable)?; + handler + .notify_liquidity_management_request(client_id) + .map_err(|_| Error::LiquidityRequestFailed) + } + + pub(crate) fn lsps5_notify_onion_message_incoming( + &self, client_id: PublicKey, + ) -> Result<(), Error> { + let handler = self + .liquidity_manager + .lsps5_service_handler() + .ok_or(Error::LiquiditySourceUnavailable)?; + handler.notify_onion_message_incoming(client_id).map_err(|_| Error::LiquidityRequestFailed) + } + pub(crate) async fn handle_channel_ready( &self, user_channel_id: u128, channel_id: &ChannelId, counterparty_node_id: &PublicKey, ) { @@ -1540,3 +2181,213 @@ impl LSPS1Liquidity { Ok(response) } } + +/// A liquidity handler for managing LSPS5 webhook notifications. +/// +/// Should be retrieved by calling [`Node::lsps5_liquidity`]. +/// +/// This handler allows clients to register webhook endpoints with their LSP to receive +/// push notifications for Lightning events when the client is offline. +/// +/// [bLIP-55 / LSPS5]: https://github.com/lightning/blips/blob/master/blip-0055.md +/// [`Node::lsps5_liquidity`]: crate::Node::lsps5_liquidity +#[derive(Clone)] +pub struct LSPS5Liquidity { + runtime: Arc, + connection_manager: Arc>>, + liquidity_source: Option>>>, + logger: Arc, +} + +impl LSPS5Liquidity { + pub(crate) fn new( + runtime: Arc, connection_manager: Arc>>, + liquidity_source: Option>>>, logger: Arc, + ) -> Self { + Self { runtime, connection_manager, liquidity_source, logger } + } + + fn set_webhook_impl( + &self, app_name: String, webhook_url: String, + ) -> Result { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + + let (lsp_node_id, lsp_address) = + liquidity_source.get_lsps5_lsp_details().ok_or(Error::LiquiditySourceUnavailable)?; + + let con_node_id = lsp_node_id; + let con_addr = lsp_address.clone(); + let con_cm = Arc::clone(&self.connection_manager); + + // We need to use our main runtime here as a local runtime might not be around to poll + // connection futures going forward. + self.runtime.block_on(async move { + con_cm.connect_peer_if_necessary(con_node_id, con_addr).await + })?; + + log_info!(self.logger, "Connected to LSP {}@{}. ", lsp_node_id, lsp_address); + + let liquidity_source = Arc::clone(&liquidity_source); + let response = self.runtime.block_on(async move { + liquidity_source.lsps5_set_webhook(app_name, webhook_url).await + })?; + + Ok(response) + } + + /// Connects to the configured LSP and registers a webhook URL for receiving LSPS5 notifications. + /// + /// The webhook will receive signed push notifications for Lightning events such as incoming + /// payments when the client is offline. + #[cfg(not(feature = "uniffi"))] + pub fn set_webhook( + &self, app_name: String, webhook_url: String, + ) -> Result { + self.set_webhook_impl(app_name, webhook_url) + } + + /// Connects to the configured LSP and registers a webhook URL for receiving LSPS5 notifications. + /// + /// The webhook will receive signed push notifications for Lightning events such as incoming + /// payments when the client is offline. + #[cfg(feature = "uniffi")] + pub fn set_webhook( + &self, app_name: String, webhook_url: String, + ) -> Result { + self.set_webhook_impl(app_name, webhook_url) + } + + fn list_webhooks_impl(&self) -> Result { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + let (lsp_node_id, lsp_address) = + liquidity_source.get_lsps5_lsp_details().ok_or(Error::LiquiditySourceUnavailable)?; + let con_node_id = lsp_node_id; + let con_addr = lsp_address.clone(); + let con_cm = Arc::clone(&self.connection_manager); + + // We need to use our main runtime here as a local runtime might not be around to poll + // connection futures going forward. + self.runtime.block_on(async move { + con_cm.connect_peer_if_necessary(con_node_id, con_addr).await + })?; + + let liquidity_source = Arc::clone(&liquidity_source); + let response = + self.runtime.block_on(async move { liquidity_source.lsps5_list_webhooks().await })?; + Ok(response) + } + + /// Lists all currently configured webhooks at the configured LSP. + #[cfg(not(feature = "uniffi"))] + pub fn list_webhooks(&self) -> Result { + self.list_webhooks_impl() + } + + /// Lists all currently configured webhooks at the configured LSP. + #[cfg(feature = "uniffi")] + pub fn list_webhooks(&self) -> Result { + self.list_webhooks_impl().map(|response| response.into()) + } + + fn remove_webhook_impl(&self, app_name: String) -> Result { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + + let (lsp_node_id, lsp_address) = + liquidity_source.get_lsps5_lsp_details().ok_or(Error::LiquiditySourceUnavailable)?; + + let con_node_id = lsp_node_id; + let con_addr = lsp_address.clone(); + let con_cm = Arc::clone(&self.connection_manager); + + // We need to use our main runtime here as a local runtime might not be around to poll + // connection futures going forward. + self.runtime.block_on(async move { + con_cm.connect_peer_if_necessary(con_node_id, con_addr).await + })?; + + log_info!(self.logger, "Connected to LSP {}@{}. ", lsp_node_id, lsp_address); + + let liquidity_source = Arc::clone(&liquidity_source); + let response = self + .runtime + .block_on(async move { liquidity_source.lsps5_remove_webhook(app_name).await })?; + + Ok(response) + } + + /// Removes a previously-configured webhook at the configured LSP. + #[cfg(not(feature = "uniffi"))] + pub fn remove_webhook(&self, app_name: String) -> Result { + self.remove_webhook_impl(app_name) + } + + /// Removes a previously-configured webhook at the configured LSP. + #[cfg(feature = "uniffi")] + pub fn remove_webhook(&self, app_name: String) -> Result { + self.remove_webhook_impl(app_name) + } + + /// Notifies the configured LSP about an incoming payment. + /// + /// This is called by the LSP service to trigger webhook notifications to the specified client. + pub fn notify_payment_incoming(&self, client_id: PublicKey) -> Result<(), Error> { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + liquidity_source.lsps5_notify_payment_incoming(client_id) + } + + /// Notifies the configured LSP about an invoice expiring soon. + /// + /// This is called by the LSP service to trigger webhook notifications to the specified client. + pub fn notify_expiry_soon(&self, client_id: PublicKey, timeout: u32) -> Result<(), Error> { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + liquidity_source.lsps5_notify_expiry_soon(client_id, timeout) + } + + /// Notifies the configured LSP about a liquidity management request. + /// + /// This is called by the LSP service to trigger webhook notifications to the specified client. + pub fn notify_liquidity_management_request(&self, client_id: PublicKey) -> Result<(), Error> { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + liquidity_source.lsps5_notify_liquidity_management_request(client_id) + } + + /// Notifies the configured LSP about an incoming onion message. + /// + /// This is called by the LSP service to trigger webhook notifications to the specified client. + pub fn notify_onion_message_incoming(&self, client_id: PublicKey) -> Result<(), Error> { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + liquidity_source.lsps5_notify_onion_message_incoming(client_id) + } +} + +#[cfg(feature = "uniffi")] +// Re-export LSPS5 response types for uniffi +pub use lightning_liquidity::lsps5::msgs::{ + RemoveWebhookResponse as LSPS5RemoveWebhookResponse, + SetWebhookResponse as LSPS5SetWebhookResponse, +}; + +#[cfg(feature = "uniffi")] +/// Wrapper for ListWebhooksResponse that converts LSPS5AppName to String for uniffi +#[derive(Clone, Debug)] +pub struct LSPS5ListWebhooksResponse { + pub app_names: Vec, + pub max_webhooks: u32, +} + +#[cfg(feature = "uniffi")] +impl From for LSPS5ListWebhooksResponse { + fn from(response: ListWebhooksResponse) -> Self { + Self { + app_names: response.app_names.into_iter().map(|name| name.to_string()).collect(), + max_webhooks: response.max_webhooks, + } + } +} diff --git a/src/types.rs b/src/types.rs index ea4de2a63..61e4cca10 100644 --- a/src/types.rs +++ b/src/types.rs @@ -306,6 +306,8 @@ pub(crate) type BumpTransactionEventHandler = pub(crate) type PaymentStore = DataStore>; +pub type LSPS5ServiceConfig = lightning_liquidity::lsps5::service::LSPS5ServiceConfig; + /// A local, potentially user-provided, identifier of a channel. /// /// By default, this will be randomly generated for the user to ensure local uniqueness. diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 892afedcc..004b4c719 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -36,6 +36,7 @@ use lightning::ln::channelmanager::PaymentId; use lightning::routing::gossip::{NodeAlias, NodeId}; use lightning::routing::router::RouteParametersConfig; use lightning_invoice::{Bolt11InvoiceDescription, Description}; +use lightning_liquidity::lsps5::service::LSPS5ServiceConfig; use lightning_types::payment::{PaymentHash, PaymentPreimage}; use log::LevelFilter; @@ -2297,3 +2298,322 @@ async fn lsps2_lsp_trusts_client_but_client_does_not_claim() { Some(6) ); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn lsps5_client_webhook_integration() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); + let sync_config = EsploraSyncConfig { background_sync_config: None }; + + // Setup LSPS5 service provider node with custom HTTP client for testing + let test_http_client = + reqwest::Client::builder().danger_accept_invalid_certs(true).build().unwrap(); + + // Setup LSPS5 service provider node + let service_config = random_config(true); + setup_builder!(service_builder, service_config.node_config); + service_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); + let lsps5_service_config = LSPS5ServiceConfig { max_webhooks_per_client: 2 }; + service_builder.set_liquidity_provider_lsps5(lsps5_service_config); + service_builder.set_liquidity_http_client(test_http_client); + let service_node = service_builder.build(service_config.node_entropy.into()).unwrap(); + service_node.start().unwrap(); + let service_node_id = service_node.node_id(); + let service_addr = service_node.onchain_payment().new_address().unwrap(); + let service_socket_addr = service_node.listening_addresses().unwrap().first().unwrap().clone(); + + // Setup LSPS5 client node + let client_config = random_config(true); + setup_builder!(client_builder, client_config.node_config); + client_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); + client_builder.set_liquidity_source_lsps5(service_node_id, service_socket_addr.clone()); + let client_node = client_builder.build(client_config.node_entropy.into()).unwrap(); + client_node.start().unwrap(); + let client_node_id = client_node.node_id(); + let client_addr = client_node.onchain_payment().new_address().unwrap(); + + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![service_addr, client_addr], + Amount::from_sat(10_000_000), + ) + .await; + service_node.sync_wallets().unwrap(); + client_node.sync_wallets().unwrap(); + + open_channel(&client_node, &service_node, 5_000_000, false, &electrsd).await; + + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + service_node.sync_wallets().unwrap(); + client_node.sync_wallets().unwrap(); + expect_channel_ready_event!(client_node, service_node.node_id()); + expect_channel_ready_event!(service_node, client_node.node_id()); + + // Setup certificate for HTTPS testing + let (cert_path, tls_acceptor) = setup_test_certificate(); + + // Setup HTTPS webhook server + let (mut webhook_rx, shutdown_tx) = setup_webhook_server(tls_acceptor).await; + + // Test webhook registration + let lsps5_client = client_node.lsps5_liquidity(); + let app_name_1 = "test-app".to_string(); + let webhook_url_1 = "https://example.com/webhook".to_string(); + + // Register first webhook + let response = lsps5_client + .set_webhook(app_name_1.clone(), webhook_url_1.clone()) + .expect("Failed to register webhook"); + assert_eq!(response.num_webhooks, 1, "Expected 1 webhook after first registration"); + assert_eq!(response.max_webhooks, 2, "Expected max_webhooks to be 2"); + assert!(!response.no_change); + expect_webhook_notification(&mut webhook_rx, "lsps5.webhook_registered").await; + + // Register second webhook + let app_name_2 = "test-app-2".to_string(); + let webhook_url_2 = "https://example.com/webhook-2".to_string(); + let response = lsps5_client + .set_webhook(app_name_2.clone(), webhook_url_2.clone()) + .expect("Failed to register webhook"); + assert_eq!(response.num_webhooks, 2, "Expected 2 webhooks after second registration"); + assert_eq!(response.max_webhooks, 2, "Expected max_webhooks to be 2"); + assert!(!response.no_change); + expect_webhook_notification(&mut webhook_rx, "lsps5.webhook_registered").await; + + // Register the same webhook again, should return no_change=true + let response = lsps5_client + .set_webhook(app_name_2.clone(), webhook_url_2.clone()) + .expect("Failed to register webhook again"); + assert_eq!(response.num_webhooks, 2, "Expected 2 webhooks after registering same webhook"); + assert_eq!(response.max_webhooks, 2, "Expected max_webhooks to be 2"); + assert!(response.no_change); + + // Attempt to register a third webhook, which should fail due to max_webhooks_per_client=2 + let app_name_3 = "test-app-3".to_string(); + let webhook_url_3 = "https://example.com/webhook-3".to_string(); + assert_eq!( + Err(NodeError::LiquiditySetWebhookFailed), + lsps5_client.set_webhook(app_name_3.clone(), webhook_url_3.clone()) + ); + + // list registered webhooks + let registered_webhooks = lsps5_client.list_webhooks().expect("Failed to list webhooks"); + assert_eq!(registered_webhooks.app_names.len(), 2, "Expected 2 registered webhooks"); + assert!( + registered_webhooks.app_names.iter().any(|name| name.as_str() == app_name_1), + "Expected app_name_1 to be in registered webhooks" + ); + assert!( + registered_webhooks.app_names.iter().any(|name| name.as_str() == app_name_2), + "Expected app_name_2 to be in registered webhooks" + ); + + // Delete non-existing webhook + let non_existing_app_name = "non-existing-app".to_string(); + assert_eq!( + Err(NodeError::LiquidityRemoveWebhookFailed), + lsps5_client.remove_webhook(non_existing_app_name.clone()) + ); + + // Delete a registered webhook + lsps5_client.remove_webhook(app_name_1.clone()).expect("Failed to delete first webhook"); + + // List registered webhooks after deletion + let registered_webhooks = + lsps5_client.list_webhooks().expect("Failed to list webhooks after deletion"); + assert_eq!(registered_webhooks.app_names.len(), 1, "Expected 1 webhook after deletion"); + assert!( + registered_webhooks.app_names.iter().any(|name| name.as_str() == app_name_2), + "Expected remaining webhook to be app_name_2" + ); + assert!( + !registered_webhooks.app_names.iter().any(|name| name.as_str() == app_name_1), + "Expected app_name_1 to be removed from registered webhooks" + ); + + // Test each notification type + let lsps5_service = service_node.lsps5_liquidity(); + lsps5_service.notify_payment_incoming(client_node_id).expect("notify_payment_incoming failed"); + expect_webhook_notification(&mut webhook_rx, "lsps5.payment_incoming").await; + + // Sleep for 65 seconds to ensure we're above the NOTIFICATION_COOLDOWN_TIME (60 seconds) + // This prevents NotificationRateLimitError + tokio::time::sleep(tokio::time::Duration::from_secs(65)).await; + + lsps5_service + .notify_onion_message_incoming(client_node_id) + .expect("notify_onion_message_incoming failed"); + expect_webhook_notification(&mut webhook_rx, "lsps5.onion_message_incoming").await; + // Sleep for 65 seconds to ensure we're above the NOTIFICATION_COOLDOWN_TIME (60 seconds) + // This prevents NotificationRateLimitError + tokio::time::sleep(tokio::time::Duration::from_secs(65)).await; + + lsps5_service + .notify_liquidity_management_request(client_node_id) + .expect("notify_liquidity_management_request failed"); + expect_webhook_notification(&mut webhook_rx, "lsps5.liquidity_management_request").await; + // Sleep for 65 seconds to ensure we're above the NOTIFICATION_COOLDOWN_TIME (60 seconds) + // This prevents NotificationRateLimitError + tokio::time::sleep(tokio::time::Duration::from_secs(65)).await; + + lsps5_service.notify_expiry_soon(client_node_id, 3600).expect("notify_expiry_soon failed"); + expect_webhook_notification(&mut webhook_rx, "lsps5.expiry_soon").await; + + // Cleanup + // Shutdown webhook server + let _ = shutdown_tx.send(()); + + // Remove certificate file and env vars + std::env::remove_var("SSL_CERT_FILE"); + std::env::remove_var("REQUESTS_CA_BUNDLE"); + let _ = std::fs::remove_file(cert_path); + + service_node.stop().unwrap(); + client_node.stop().unwrap(); +} + +fn setup_test_certificate() -> (std::path::PathBuf, tokio_rustls::TlsAcceptor) { + let subject_alt_names = vec!["localhost".to_string(), "127.0.0.1".to_string()]; + let rcgen::CertifiedKey { cert, signing_key } = + rcgen::generate_simple_self_signed(subject_alt_names) + .expect("Failed to generate certificate"); + + let cert_der = cert.der().to_vec(); + let cert_pem = cert.pem(); + let key_der = signing_key.serialize_der(); + + // Write cert to temporary file + let temp_dir = std::env::temp_dir(); + let cert_path = temp_dir.join("test_webhook_cert.pem"); + std::fs::write(&cert_path, cert_pem).expect("Failed to write certificate"); + + // Set environment variables for certificate + std::env::set_var("SSL_CERT_FILE", &cert_path); + std::env::set_var("REQUESTS_CA_BUNDLE", &cert_path); + + // Configure TLS + let cert_chain = vec![rustls::pki_types::CertificateDer::from(cert_der)]; + let private_key = + rustls::pki_types::PrivateKeyDer::try_from(key_der).expect("Failed to parse private key"); + + let tls_config = rustls::ServerConfig::builder() + .with_no_client_auth() + .with_single_cert(cert_chain, private_key) + .expect("Failed to configure TLS"); + + let tls_acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(tls_config)); + + (cert_path, tls_acceptor) +} + +async fn setup_webhook_server( + tls_acceptor: tokio_rustls::TlsAcceptor, +) -> (tokio::sync::mpsc::Receiver<(String, String)>, tokio::sync::oneshot::Sender<()>) { + let listener = + tokio::net::TcpListener::bind("127.0.0.1:0").await.expect("Failed to bind webhook server"); + + // Channel sends (method, body) tuples + let (tx, rx) = tokio::sync::mpsc::channel(10); + let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel(); + + tokio::spawn(async move { + loop { + tokio::select! { + _ = &mut shutdown_rx => { + println!("Webhook server shutting down"); + break; + } + result = listener.accept() => { + match result { + Ok((socket, _)) => { + let tls_acceptor = tls_acceptor.clone(); + let tx = tx.clone(); + + tokio::spawn(async move { + if let Err(e) = handle_webhook_connection(socket, tls_acceptor, tx).await { + eprintln!("Error handling webhook connection: {}", e); + } + }); + } + Err(e) => { + eprintln!("Error accepting connection: {}", e); + } + } + } + } + } + }); + + // Give server time to start + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + (rx, shutdown_tx) +} + +async fn handle_webhook_connection( + socket: tokio::net::TcpStream, tls_acceptor: tokio_rustls::TlsAcceptor, + tx: tokio::sync::mpsc::Sender<(String, String)>, +) -> Result<(), Box> { + let mut tls_stream = tls_acceptor.accept(socket).await?; + + // Read HTTP request + let mut buf = vec![0u8; 8192]; + let n = tokio::io::AsyncReadExt::read(&mut tls_stream, &mut buf).await?; + buf.truncate(n); + + let request_str = String::from_utf8_lossy(&buf); + println!("Received webhook HTTPS request:\n{}", request_str); + + // Extract body + let body = if let Some(body_start) = request_str.find("\r\n\r\n") { + request_str[body_start + 4..].trim().to_string() + } else { + return Err("No body found in request".into()); + }; + + // Parse JSON to get method + let json: serde_json::Value = serde_json::from_str(&body)?; + let method = json["method"].as_str().ok_or("Missing method field")?.to_string(); + + // Send (method, body) tuple + tx.send((method, body)).await?; + + // Send HTTP response + let response = "HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"; + tokio::io::AsyncWriteExt::write_all(&mut tls_stream, response.as_bytes()).await?; + tokio::io::AsyncWriteExt::flush(&mut tls_stream).await?; + + Ok(()) +} + +async fn expect_webhook_notification( + rx: &mut tokio::sync::mpsc::Receiver<(String, String)>, expected_method: &str, +) { + match tokio::time::timeout(std::time::Duration::from_secs(5), rx.recv()).await { + Ok(Some((method, body))) => { + assert_eq!( + method, expected_method, + "Expected method '{}' but got '{}'", + expected_method, method + ); + assert!(!body.is_empty(), "Webhook body should not be empty"); + + // Verify JSON structure + let json: serde_json::Value = + serde_json::from_str(&body).expect("Body should be valid JSON"); + + assert_eq!(json["jsonrpc"], "2.0", "Expected JSON-RPC 2.0"); + assert_eq!(json["method"], expected_method, "Method should match in body"); + + println!("✓ Verified notification: {}", expected_method); + println!(" Body: {}", body); + }, + Ok(None) => { + panic!("Webhook channel closed before receiving '{}'", expected_method); + }, + Err(_) => { + panic!("Timeout waiting for webhook notification '{}'", expected_method); + }, + } +}