diff --git a/Cargo.toml b/Cargo.toml index 02d491e..6186414 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,12 +29,21 @@ reqwest = { version = "0.12", features = ["json"], default-features = false, op # default async runtime tokio = { version = "1", features = ["time"], optional = true } +bitcoin-ohttp = { version = "0.6.0", optional = true} +url = {version = "2.5.7", optional = true} +bhttp = { version = "0.6.1", optional = true} +http = { version = "1.3.1", optional = true} + [dev-dependencies] serde_json = "1.0" tokio = { version = "1.20.1", features = ["full"] } electrsd = { version = "0.33.0", features = ["legacy", "esplora_a33e97e1", "corepc-node_28_0"] } lazy_static = "1.4.0" +ohttp-relay = { git = "https://github.com/payjoin/ohttp-relay.git", branch = "main", features = ["_test-util"]} +hyper = {version = "1.8.1", features = ["full"]} +hyper-util = {version = "0.1.19"} +http-body-util = "0.1.1" [features] default = ["blocking", "async", "async-https", "tokio"] @@ -43,6 +52,7 @@ blocking-https = ["blocking", "minreq/https"] blocking-https-rustls = ["blocking", "minreq/https-rustls"] blocking-https-native = ["blocking", "minreq/https-native"] blocking-https-bundled = ["blocking", "minreq/https-bundled"] +async-ohttp = ["async", "bitcoin-ohttp", "bhttp", "reqwest", "tokio", "url", "http"] tokio = ["dep:tokio"] async = ["reqwest", "reqwest/socks", "tokio?/time"] diff --git a/src/async.rs b/src/async.rs index 91a64a8..3b67e4f 100644 --- a/src/async.rs +++ b/src/async.rs @@ -26,6 +26,8 @@ use log::{debug, error, info, trace}; use reqwest::{header, Client, Response}; +#[cfg(feature = "async-ohttp")] +use crate::ohttp::OhttpClient; use crate::{ AddressStats, BlockInfo, BlockStatus, BlockSummary, Builder, Error, MempoolRecentTx, MempoolStats, MerkleProof, OutputStatus, ScriptHashStats, Tx, TxStatus, Utxo, @@ -43,6 +45,9 @@ pub struct AsyncClient { /// Marker for the type of sleeper used marker: PhantomData, + /// Ohttp config + #[cfg(feature = "async-ohttp")] + ohttp_client: Option, } impl AsyncClient { @@ -77,6 +82,8 @@ impl AsyncClient { client: client_builder.build()?, max_retries: builder.max_retries, marker: PhantomData, + #[cfg(feature = "async-ohttp")] + ohttp_client: None, }) } @@ -86,9 +93,17 @@ impl AsyncClient { client, max_retries: crate::DEFAULT_MAX_RETRIES, marker: PhantomData, + #[cfg(feature = "async-ohttp")] + ohttp_client: None, } } + #[cfg(feature = "async-ohttp")] + pub(crate) fn set_ohttp_client(mut self, ohttp_client: OhttpClient) -> Self { + self.ohttp_client = Some(ohttp_client); + self + } + /// Make an HTTP GET request to given URL, deserializing to any `T` that /// implement [`bitcoin::consensus::Decodable`]. /// @@ -557,12 +572,32 @@ impl AsyncClient { let mut attempts = 0; loop { - match self.client.get(url).send().await? { + let res = { + #[cfg(feature = "async-ohttp")] + if let Some(ohttp_client) = &self.ohttp_client { + let (body, ctx) = ohttp_client.ohttp_encapsulate("get", url, None)?; + let res = self + .client + .post(ohttp_client.relay_url().to_string()) + .header("Content-Type", "message/ohttp-req") + .body(body) + .send() + .await?; + let body = res.bytes().await?.to_vec(); + ohttp_client.ohttp_decapsulate(ctx, body)?.into() + } else { + self.client.get(url).send().await? + } + #[cfg(not(feature = "async-ohttp"))] + self.client.get(url).send().await? + }; + match res { resp if attempts < self.max_retries && is_status_retryable(resp.status()) => { S::sleep(delay).await; attempts += 1; delay *= 2; } + resp => return Ok(resp), } } diff --git a/src/lib.rs b/src/lib.rs index 1e9b0ed..1471cad 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -83,6 +83,9 @@ pub mod r#async; #[cfg(feature = "blocking")] pub mod blocking; +#[cfg(feature = "async-ohttp")] +pub(crate) mod ohttp; + pub use api::*; #[cfg(feature = "blocking")] pub use blocking::BlockingClient; @@ -195,6 +198,20 @@ impl Builder { pub fn build_async_with_sleeper(self) -> Result, Error> { AsyncClient::from_builder(self) } + + #[cfg(feature = "async-ohttp")] + pub async fn build_async_with_ohttp( + self, + ohttp_relay_url: &str, + ohttp_gateway_url: &str, + ) -> Result { + use crate::ohttp::OhttpClient; + + let ohttp_client = OhttpClient::new(ohttp_relay_url, ohttp_gateway_url).await?; + Ok(self + .build_async_with_sleeper()? + .set_ohttp_client(ohttp_client)) + } } /// Errors that can happen during a request to `Esplora` servers. @@ -230,6 +247,18 @@ pub enum Error { InvalidHttpHeaderValue(String), /// The server sent an invalid response InvalidResponse, + /// Error from Ohttp library + #[cfg(feature = "async-ohttp")] + Ohttp(bitcoin_ohttp::Error), + /// Error when reading and writing to bhttp payloads + #[cfg(feature = "async-ohttp")] + Bhttp(bhttp::Error), + /// Error when converting the http response to and from bhttp response + #[cfg(feature = "async-ohttp")] + Http(http::Error), + /// Error when parsing the URL + #[cfg(feature = "async-ohttp")] + UrlParsing(url::ParseError), } impl fmt::Display for Error { @@ -344,6 +373,194 @@ mod test { (blocking_client, async_client) } + #[cfg(feature = "async-ohttp")] + fn find_free_port() -> u16 { + let listener = std::net::TcpListener::bind("0.0.0.0:0").unwrap(); + listener.local_addr().unwrap().port() + } + + #[cfg(feature = "async-ohttp")] + async fn start_ohttp_relay( + gateway_url: ohttp_relay::GatewayUri, + ) -> ( + u16, + tokio::task::JoinHandle>>, + ) { + let port = find_free_port(); + let relay = ohttp_relay::listen_tcp(port, gateway_url).await.unwrap(); + + (port, relay) + } + + #[cfg(feature = "async-ohttp")] + async fn start_ohttp_gateway() -> (u16, tokio::task::JoinHandle<()>) { + use http_body_util::Full; + use hyper::body::Incoming; + use hyper::service::service_fn; + use hyper::Response; + use hyper::{Method, Request}; + use hyper_util::rt::TokioIo; + use tokio::net::TcpListener; + + let port = find_free_port(); + let listener = TcpListener::bind(format!("0.0.0.0:{}", port)) + .await + .unwrap(); + + let handle = tokio::spawn(async move { + let key_config = bitcoin_ohttp::KeyConfig::new( + 0, + bitcoin_ohttp::hpke::Kem::K256Sha256, + vec![bitcoin_ohttp::SymmetricSuite::new( + bitcoin_ohttp::hpke::Kdf::HkdfSha256, + bitcoin_ohttp::hpke::Aead::ChaCha20Poly1305, + )], + ) + .expect("valid key config"); + let server = bitcoin_ohttp::Server::new(key_config).expect("valid server"); + let server = std::sync::Arc::new(server); + loop { + match listener.accept().await { + Ok((stream, _)) => { + let io = TokioIo::new(stream); + let server = server.clone(); + let service = service_fn(move |req: Request| { + let server = server.clone(); + async move { + let path = req.uri().path(); + if path == "/.well-known/ohttp-gateway" + && req.method() == Method::GET + { + let key_config = server.config().encode().unwrap(); + Ok::<_, hyper::Error>( + Response::builder() + .status(200) + .header("content-type", "application/ohttp-keys") + .body(Full::new(hyper::body::Bytes::from(key_config))) + .unwrap(), + ) + } else if path == "/.well-known/ohttp-gateway" + && req.method() == Method::POST + { + use http_body_util::BodyExt; + + // Assert that the content-type header is set to + // "message/ohttp-req". + let content_type_header = req + .headers() + .get("content-type") + .expect("content-type header should be set by the client"); + assert_eq!(content_type_header, "message/ohttp-req"); + + let bytes = req.collect().await?.to_bytes(); + let (bhttp_body, response_ctx) = + server.decapsulate(bytes.iter().as_slice()).unwrap(); + // Reconstruct the inner HTTP message from the bhttp message. + let mut r = std::io::Cursor::new(bhttp_body); + let m: bhttp::Message = bhttp::Message::read_bhttp(&mut r) + .expect("Should be valid bhttp message"); + let base_url = format!( + "http://{}", + ELECTRSD.esplora_url.as_ref().unwrap() + ); + let path = + String::from_utf8(m.control().path().unwrap().to_vec()) + .unwrap(); + let _ = + Method::from_bytes(m.control().method().unwrap()).unwrap(); + // TODO: Use the actual method from the bhttp message + // This will be refactored out to use bitreq + let req = reqwest::Request::new( + Method::GET, + url::Url::parse(&(base_url + &path)).unwrap(), + ); + let mut req_builder = reqwest::RequestBuilder::from_parts( + reqwest::Client::new(), + req, + ); + for field in m.header().iter() { + req_builder = + req_builder.header(field.name(), field.value()); + } + + let res = req_builder.send().await.unwrap(); + // Convert HTTP response to bhttp response + let mut m: bhttp::Message = bhttp::Message::response( + res.status().as_u16().try_into().unwrap(), + ); + m.write_content(res.bytes().await.unwrap()); + let mut bhttp_res = vec![]; + m.write_bhttp(bhttp::Mode::IndeterminateLength, &mut bhttp_res) + .unwrap(); + // Now we need to encapsulate the response + let encapsulated_response = + response_ctx.encapsulate(&bhttp_res).unwrap(); + + Ok::<_, hyper::Error>( + Response::builder() + .status(200) + .header("content-type", "message/ohttp-res") + .body(Full::new(hyper::body::Bytes::copy_from_slice( + &encapsulated_response, + ))) + .unwrap(), + ) + } else { + Ok::<_, hyper::Error>( + Response::builder() + .status(404) + .body(Full::new(hyper::body::Bytes::from("Not Found"))) + .unwrap(), + ) + } + } + }); + + tokio::spawn(async move { + if let Err(err) = hyper::server::conn::http1::Builder::new() + .serve_connection(io, service) + .await + { + eprintln!("Error serving connection: {:?}", err); + } + }); + } + Err(e) => { + eprintln!("Error accepting connection: {:?}", e); + break; + } + } + } + }); + println!("OHTTP gateway started on port {}", port); + + (port, handle) + } + #[cfg(feature = "async-ohttp")] + #[tokio::test] + async fn test_ohttp_e2e() { + let (_, async_client) = setup_clients().await; + let block_hash = async_client.get_block_hash(1).await.unwrap(); + let esplora_url = ELECTRSD.esplora_url.as_ref().unwrap(); + let (gateway_port, _) = start_ohttp_gateway().await; + let gateway_origin = format!("http://localhost:{gateway_port}"); + let (relay_port, _) = + start_ohttp_relay(gateway_origin.parse::().unwrap()).await; + let gateway_url = format!( + "http://localhost:{}/.well-known/ohttp-gateway", + gateway_port + ); + let relay_url = format!("http://localhost:{}", relay_port); + + let ohttp_client = Builder::new(&format!("http://{}", esplora_url)) + .build_async_with_ohttp(&relay_url, &gateway_url) + .await + .unwrap(); + + let res = ohttp_client.get_block_hash(1).await.unwrap(); + assert_eq!(res, block_hash); + } + #[cfg(all(feature = "blocking", feature = "async"))] fn generate_blocks_and_wait(num: usize) { let cur_height = BITCOIND.client.get_block_count().unwrap().0; diff --git a/src/ohttp.rs b/src/ohttp.rs new file mode 100644 index 0000000..5353121 --- /dev/null +++ b/src/ohttp.rs @@ -0,0 +1,102 @@ +use crate::Error; +use bitcoin_ohttp as ohttp; +use reqwest::Client; +use url::Url; + +#[derive(Debug, Clone)] +pub struct OhttpClient { + key_config: ohttp::KeyConfig, + relay_url: Url, +} + +impl OhttpClient { + /// Will attempt to fetch the key config from the gateway and then create a new client. + /// Keyconfig is fetched directly from the gateway thus revealing our network metadata. + /// TODO: use the relay HTTP connect proxy to fetch to. + pub(crate) async fn new(relay_url: &str, ohttp_gateway_url: &str) -> Result { + let gateway_url = Url::parse(ohttp_gateway_url).map_err(Error::UrlParsing)?; + let res = Client::new() + .get(gateway_url) + .send() + .await + .map_err(Error::Reqwest)?; + let body = res.bytes().await.map_err(Error::Reqwest)?; + let key_config = ohttp::KeyConfig::decode(&body).map_err(Error::Ohttp)?; + Ok(Self { + key_config, + relay_url: Url::parse(relay_url).map_err(Error::UrlParsing)?, + }) + } + + pub(crate) fn relay_url(&self) -> &Url { + &self.relay_url + } + + pub(crate) fn ohttp_encapsulate( + &self, + method: &str, + target_resource: &str, + body: Option<&[u8]>, + ) -> Result<(Vec, ohttp::ClientResponse), Error> { + use std::fmt::Write; + + // Bitcoin-hpke takes keyconfig as mutable ref but it doesnt mutate it should fix it + // upstream but for now we can clone it to avoid changing self to mutable self + let mut key_config = self.key_config.clone(); + + let ctx = ohttp::ClientRequest::from_config(&mut key_config).map_err(Error::Ohttp)?; + let url = url::Url::parse(target_resource).map_err(Error::UrlParsing)?; + let authority_bytes = url.host().map_or_else(Vec::new, |host| { + let mut authority = host.to_string(); + if let Some(port) = url.port() { + write!(authority, ":{port}").unwrap(); + } + authority.into_bytes() + }); + let mut bhttp_message = bhttp::Message::request( + method.as_bytes().to_vec(), + url.scheme().as_bytes().to_vec(), + authority_bytes, + url.path().as_bytes().to_vec(), + ); + // TODO: do we need to add headers? + if let Some(body) = body { + bhttp_message.write_content(body); + } + + let mut bhttp_req = Vec::new(); + bhttp_message + .write_bhttp(bhttp::Mode::IndeterminateLength, &mut bhttp_req) + .map_err(Error::Bhttp)?; + let (encapsulated, ohttp_ctx) = ctx.encapsulate(&bhttp_req).map_err(Error::Ohttp)?; + + Ok((encapsulated, ohttp_ctx)) + } + + pub(crate) fn ohttp_decapsulate( + &self, + res_ctx: ohttp::ClientResponse, + ohttp_body: Vec, + ) -> Result>, Error> { + let bhttp_body = res_ctx.decapsulate(&ohttp_body).map_err(Error::Ohttp)?; + let mut r = std::io::Cursor::new(bhttp_body); + let m: bhttp::Message = bhttp::Message::read_bhttp(&mut r).map_err(Error::Bhttp)?; + let mut builder = http::Response::builder(); + for field in m.header().iter() { + builder = builder.header(field.name(), field.value()); + } + builder + .status({ + let code = m + .control() + .status() + .ok_or(bhttp::Error::InvalidStatus) + .map_err(Error::Bhttp)?; + http::StatusCode::from_u16(code.code()) + .map_err(|_| bhttp::Error::InvalidStatus) + .map_err(Error::Bhttp)? + }) + .body(m.content().to_vec()) + .map_err(Error::Http) + } +}