From 47e13a67ad21209cc4aa1bdafa08bfaf0e0167e3 Mon Sep 17 00:00:00 2001 From: "joshua.ho@bytedance.com" Date: Mon, 23 Mar 2026 13:42:02 +0800 Subject: [PATCH 1/8] feat(volo-http): add CommonStats struct --- Cargo.lock | 28 +++++++++++------------ volo-http/src/context/mod.rs | 3 +++ volo-http/src/context/stat.rs | 34 ++++++++++++++++++++++++++++ volo-thrift/src/codec/default/mod.rs | 4 ++-- 4 files changed, 53 insertions(+), 16 deletions(-) create mode 100644 volo-http/src/context/stat.rs diff --git a/Cargo.lock b/Cargo.lock index 9458346f..6f48c2ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -670,7 +670,7 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -758,7 +758,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -1374,7 +1374,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.6.1", + "socket2 0.5.10", "system-configuration", "tokio", "tower-service", @@ -1573,7 +1573,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" dependencies = [ "hermit-abi", "libc", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -2002,7 +2002,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -2611,7 +2611,7 @@ dependencies = [ "quinn-udp", "rustc-hash 2.1.1", "rustls", - "socket2 0.6.1", + "socket2 0.5.10", "thiserror 2.0.17", "tokio", "tracing", @@ -2648,9 +2648,9 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.6.1", + "socket2 0.5.10", "tracing", - "windows-sys 0.60.2", + "windows-sys 0.52.0", ] [[package]] @@ -3002,7 +3002,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.11.0", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -3167,7 +3167,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b55fb86dfd3a2f5f76ea78310a88f96c4ea21a3031f8d212443d56123fd0521" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -3281,9 +3281,9 @@ checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" [[package]] name = "shmipc" -version = "0.1.0" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b567fd4c4565398a0b75345f59906b3f6957a2d3cf2cc8c50a759d829597344b" +checksum = "a5f842e4e077bb5719fd45b7a6dd42b90c5697c46e852a4bec6415ab57142ea0" dependencies = [ "anyhow", "arc-swap", @@ -3527,7 +3527,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix 1.1.3", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -4552,7 +4552,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.48.0", ] [[package]] diff --git a/volo-http/src/context/mod.rs b/volo-http/src/context/mod.rs index 2518986a..bbfb50d4 100644 --- a/volo-http/src/context/mod.rs +++ b/volo-http/src/context/mod.rs @@ -11,3 +11,6 @@ pub mod server; #[cfg(feature = "server")] pub use self::server::ServerContext; + +#[cfg(all(feature = "client", feature = "server"))] +pub mod stat; diff --git a/volo-http/src/context/stat.rs b/volo-http/src/context/stat.rs new file mode 100644 index 00000000..eed8ae9f --- /dev/null +++ b/volo-http/src/context/stat.rs @@ -0,0 +1,34 @@ +//! HTTP request and response statistics shared across client and server contexts. + +use chrono::{DateTime, Local}; +use http::{method::Method, status::StatusCode, uri::Uri}; + +/// Shared HTTP statistics captured for every request on both client and server sides +#[derive(Debug, Default, Clone)] +pub struct CommonStats { + /// The time at which request processing began + pub process_start_time: DateTime, + + /// The time at which request processing completed + pub process_end_time: DateTime, + + /// The HTTP method of the request (e.g. `GET`, `POST`) + pub method: Method, + + /// The full URI of the request + pub uri: Uri, + + /// The HTTP status code of the response. + /// + /// Status code may be None if the service failed + pub status_code: Option, + + /// Size of the request body in bytes + pub req_size: i64, + + /// Size of the response body in bytes + pub resp_size: i64, + + /// Whether the request resulted in an error + pub is_error: bool, +} diff --git a/volo-thrift/src/codec/default/mod.rs b/volo-thrift/src/codec/default/mod.rs index 58878167..4b5fa57e 100644 --- a/volo-thrift/src/codec/default/mod.rs +++ b/volo-thrift/src/codec/default/mod.rs @@ -535,7 +535,7 @@ mod tests { } } - #[cfg(feature = "shmipc")] + #[cfg(all(feature = "shmipc", target_os = "linux"))] #[tokio::test] async fn test_decode_unexpected_eof_returns_none_when_shmipc_available() { let (_env, stream) = ShmipcTestEnv::new().await; @@ -562,7 +562,7 @@ mod tests { assert!(result.unwrap().is_none()); } - #[cfg(feature = "shmipc")] + #[cfg(all(feature = "shmipc", target_os = "linux"))] #[tokio::test] async fn test_decode_other_error_returns_error_when_shmipc_available() { let (_env, stream) = ShmipcTestEnv::new().await; From eb3a7263b4d51ae0c88df34cd0695c655633a46f Mon Sep 17 00:00:00 2001 From: "joshua.ho@bytedance.com" Date: Mon, 30 Mar 2026 17:37:59 +0800 Subject: [PATCH 2/8] feat(volo-http): record http lifecycle timings --- volo-http/Cargo.toml | 2 +- volo-http/src/body.rs | 113 +++++++++++++++++++++++++++++++- volo-http/src/context/server.rs | 36 +++++++++- volo-http/src/server/mod.rs | 30 ++++++++- volo-http/src/utils/macros.rs | 4 +- 5 files changed, 179 insertions(+), 6 deletions(-) diff --git a/volo-http/Cargo.toml b/volo-http/Cargo.toml index faac32fb..47984cfa 100644 --- a/volo-http/Cargo.toml +++ b/volo-http/Cargo.toml @@ -117,7 +117,7 @@ client = [ ] # client core server = [ "hyper-util/server", - "dep:ipnet", "dep:matchit", "dep:memchr", "dep:scopeguard", "dep:mime_guess", + "dep:ipnet", "dep:matchit", "dep:memchr", "dep:scopeguard", "dep:mime_guess", "dep:chrono", ] # server core __serde = ["dep:serde"] # a private feature for enabling `serde` by `serde_xxx` diff --git a/volo-http/src/body.rs b/volo-http/src/body.rs index d555abdc..5a13e522 100644 --- a/volo-http/src/body.rs +++ b/volo-http/src/body.rs @@ -2,6 +2,8 @@ //! //! See [`Body`] for more details. +#[cfg(feature = "server")] +use std::sync::{Arc, Mutex}; use std::{ convert::Infallible, error::Error, @@ -22,16 +24,28 @@ use pin_project::pin_project; #[cfg(feature = "json")] use serde::de::DeserializeOwned; +#[cfg(feature = "server")] +use crate::context::server::ServerStats; use crate::error::BoxError; // The `futures_util::stream::BoxStream` does not have `Sync` type BoxStream<'a, T> = Pin + Send + Sync + 'a>>; +#[cfg(feature = "server")] +#[derive(Debug)] +enum BodyStats { + /// Attached to request bodies: tracks when the server reads the request. + Read(Arc>), + /// Attached to response bodies: tracks when the server writes the response. + Write(Arc>), +} /// An implementation for [`http_body::Body`]. #[pin_project] pub struct Body { #[pin] repr: BodyRepr, + #[cfg(feature = "server")] + stats: Option, } #[pin_project(project = BodyProj)] @@ -60,6 +74,8 @@ impl Body { pub fn empty() -> Self { Self { repr: BodyRepr::Full(Full::new(Bytes::new())), + #[cfg(feature = "server")] + stats: None, } } @@ -70,6 +86,8 @@ impl Body { pub fn from_incoming(incoming: Incoming) -> Self { Self { repr: BodyRepr::Hyper(incoming), + #[cfg(feature = "server")] + stats: None, } } @@ -80,6 +98,8 @@ impl Body { { Self { repr: BodyRepr::Stream(StreamBody::new(Box::pin(stream))), + #[cfg(feature = "server")] + stats: None, } } @@ -91,8 +111,28 @@ impl Body { { Self { repr: BodyRepr::Body(BoxBody::new(body.map_err(Into::into))), + #[cfg(feature = "server")] + stats: None, } } + + /// Attach server stats for recording body read timings. + /// + /// This should only be called on request bodies in a server context. + #[cfg(feature = "server")] + pub fn with_read_stats(mut self, stats: Arc>) -> Self { + self.stats = Some(BodyStats::Read(stats)); + self + } + + /// Attach server stats for recording body write timings. + /// + /// This should only be called on response bodies in a server context. + #[cfg(feature = "server")] + pub fn with_write_stats(mut self, stats: Arc>) -> Self { + self.stats = Some(BodyStats::Write(stats)); + self + } } impl http_body::Body for Body { @@ -103,14 +143,75 @@ impl http_body::Body for Body { self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll, Self::Error>>> { - match self.project().repr.project() { + #[cfg(feature = "server")] + let is_full = matches!(self.repr, BodyRepr::Full(_)); + + let this = self.project(); + + #[cfg(feature = "server")] + match this.stats.as_ref() { + Some(BodyStats::Read(stats)) => { + let mut stats = stats.lock().unwrap(); + if stats.read_body_start().is_none() { + stats.record_read_body_start(); + eprintln!("read_body_start: {:?}", stats.read_body_start()); + } + } + Some(BodyStats::Write(stats)) => { + let mut stats = stats.lock().unwrap(); + if stats.write_start().is_none() { + stats.record_write_start(); + eprintln!("write_start: {:?}", stats.write_start()); + } + } + None => {} + } + + let result = match this.repr.project() { BodyProj::Full(full) => http_body::Body::poll_frame(full, cx).map_err(BoxError::from), BodyProj::Hyper(incoming) => { http_body::Body::poll_frame(incoming, cx).map_err(BoxError::from) } BodyProj::Stream(stream) => http_body::Body::poll_frame(stream, cx), BodyProj::Body(body) => http_body::Body::poll_frame(body, cx), + }; + + #[cfg(feature = "server")] + { + let is_terminal = match this.stats.as_ref() { + // For response bodies, also treat a successful frame as terminal. + // Hyper uses is_end_stream() for Full bodies and won't poll again + // after the last frame, so Poll::Ready(None) never fires. + Some(BodyStats::Write(_)) => { + matches!(result, Poll::Ready(None) | Poll::Ready(Some(Err(_)))) + || (is_full && matches!(result, Poll::Ready(Some(Ok(_))))) + } + // For request bodies, only treat exhaustion or error as terminal. + _ => matches!(result, Poll::Ready(None) | Poll::Ready(Some(Err(_)))), + }; + + if is_terminal { + match this.stats.as_ref() { + Some(BodyStats::Read(stats)) => { + let mut stats = stats.lock().unwrap(); + if stats.read_body_finish().is_none() { + stats.record_read_body_finish(); + eprintln!("read_body_finish: {:?}", stats.read_body_finish()); + } + } + Some(BodyStats::Write(stats)) => { + let mut stats = stats.lock().unwrap(); + if stats.write_finish().is_none() { + stats.record_write_finish(); + eprintln!("write_finish: {:?}", stats.write_finish()); + } + } + None => {} + } + } } + eprintln!("stats: {:?}", this.stats); + result } fn is_end_stream(&self) -> bool { @@ -299,6 +400,8 @@ impl From<&'static str> for Body { fn from(value: &'static str) -> Self { Self { repr: BodyRepr::Full(Full::new(Bytes::from_static(value.as_bytes()))), + #[cfg(feature = "server")] + stats: None, } } } @@ -307,6 +410,8 @@ impl From> for Body { fn from(value: Vec) -> Self { Self { repr: BodyRepr::Full(Full::new(Bytes::from(value))), + #[cfg(feature = "server")] + stats: None, } } } @@ -315,6 +420,8 @@ impl From for Body { fn from(value: Bytes) -> Self { Self { repr: BodyRepr::Full(Full::new(value)), + #[cfg(feature = "server")] + stats: None, } } } @@ -323,6 +430,8 @@ impl From for Body { fn from(value: FastStr) -> Self { Self { repr: BodyRepr::Full(Full::new(value.into_bytes())), + #[cfg(feature = "server")] + stats: None, } } } @@ -331,6 +440,8 @@ impl From for Body { fn from(value: String) -> Self { Self { repr: BodyRepr::Full(Full::new(Bytes::from(value))), + #[cfg(feature = "server")] + stats: None, } } } diff --git a/volo-http/src/context/server.rs b/volo-http/src/context/server.rs index 0b5e34a8..08ff1ed0 100644 --- a/volo-http/src/context/server.rs +++ b/volo-http/src/context/server.rs @@ -1,5 +1,7 @@ //! Context and its utilities of server +use std::sync::{Arc, Mutex}; +use chrono::{DateTime, Local}; use volo::{ context::{Context, Reusable, Role, RpcCx, RpcInfo}, net::Address, @@ -8,7 +10,7 @@ use volo::{ use crate::{ server::param::PathParamsVec, - utils::macros::{impl_deref_and_deref_mut, impl_getter}, + utils::macros::{impl_deref_and_deref_mut, impl_getter, stat_impl}, }; /// RPC context of http server @@ -44,10 +46,42 @@ pub struct ServerCxInner { /// [`PathParamsMap`]: crate::server::param::PathParamsMap /// [`PathParams`]: crate::server::param::PathParams pub params: PathParamsVec, + + /// Statistics of the request + pub stats: Arc>, } impl ServerCxInner { impl_getter!(params, PathParamsVec); + + /// Return the statistics of the request + pub fn stats(&self) -> &Arc> { + &self.stats + } +} + +/// Statistics of server +#[derive(Debug, Default, Clone)] +pub struct ServerStats { + read_header_start: Option>, + read_header_finish: Option>, + read_body_start: Option>, + read_body_finish: Option>, + handle_start: Option>, + handle_finish: Option>, + write_start: Option>, + write_finish: Option>, +} + +impl ServerStats { + stat_impl!(read_header_start); + stat_impl!(read_header_finish); + stat_impl!(read_body_start); + stat_impl!(read_body_finish); + stat_impl!(handle_start); + stat_impl!(handle_finish); + stat_impl!(write_start); + stat_impl!(write_finish); } /// Configuration of the request diff --git a/volo-http/src/server/mod.rs b/volo-http/src/server/mod.rs index a14c1132..492d3c05 100644 --- a/volo-http/src/server/mod.rs +++ b/volo-http/src/server/mod.rs @@ -13,6 +13,7 @@ use std::{ time::Duration, }; +use chrono::{DateTime, Local}; use futures::future::BoxFuture; use hyper_util::{ rt::{TokioExecutor, TokioIo}, @@ -447,12 +448,14 @@ async fn serve( continue; } }; + let accept_time = Local::now(); let hyper_service = HyperService { inner: service.clone(), peer, config: config.clone(), span_provider: span_provider.clone(), + accept_time, }; tokio::spawn(serve_conn( @@ -512,6 +515,7 @@ struct HyperService { peer: Address, config: Config, span_provider: SP, + accept_time: DateTime, } type HyperRequest = http::request::Request; @@ -529,17 +533,41 @@ where fn call(&self, req: HyperRequest) -> Self::Future { let service = self.clone(); + let call_time = Local::now(); Box::pin( METAINFO.scope(RefCell::new(MetaInfo::default()), async move { let mut cx = ServerContext::new(service.peer); cx.rpc_info_mut().set_config(service.config); + + let stats = cx.stats().clone(); + + { + let mut s = stats.lock().unwrap(); + s.set_read_header_start(service.accept_time); + s.set_read_header_finish(call_time); + s.record_handle_start(); + } + + let (parts, body) = req.into_parts(); + let body = Body::from_incoming(body).with_read_stats(stats.clone()); + let req = http::Request::from_parts(parts, body); + let span = service.span_provider.on_serve(&cx); let resp = service .inner - .call(&mut cx, req.map(Body::from_incoming)) + .call(&mut cx, req) .instrument(span) .await .into_response(); + + stats.lock().unwrap().record_handle_finish(); + + // Wrap response body — shares the same Arc, write_start/finish + // will be recorded directly into cx's stats + let (parts, body) = resp.into_parts(); + let body = body.with_write_stats(stats.clone()); + let resp = http::Response::from_parts(parts, body); + eprintln!("{:?}", cx.stats); service.span_provider.leave_serve(&cx); Ok(resp) }), diff --git a/volo-http/src/utils/macros.rs b/volo-http/src/utils/macros.rs index fa4a3ec5..4b3a7308 100644 --- a/volo-http/src/utils/macros.rs +++ b/volo-http/src/utils/macros.rs @@ -94,7 +94,7 @@ macro_rules! impl_getter { #[cfg(feature = "server")] pub(crate) use impl_getter; -#[cfg(feature = "client")] +#[cfg(any(feature = "client", feature = "server"))] macro_rules! stat_impl { ($t: ident) => { paste::paste! { @@ -119,5 +119,5 @@ macro_rules! stat_impl { } }; } -#[cfg(feature = "client")] +#[cfg(any(feature = "client", feature = "server"))] pub(crate) use stat_impl; From 15bb185f0182d1922fb0720adb51df4833b9e017 Mon Sep 17 00:00:00 2001 From: "joshua.ho@bytedance.com" Date: Mon, 30 Mar 2026 17:41:54 +0800 Subject: [PATCH 3/8] chore(volo-http): remove debug lines --- volo-http/src/body.rs | 5 ----- volo-http/src/server/mod.rs | 1 - 2 files changed, 6 deletions(-) diff --git a/volo-http/src/body.rs b/volo-http/src/body.rs index 5a13e522..f6c16d26 100644 --- a/volo-http/src/body.rs +++ b/volo-http/src/body.rs @@ -154,14 +154,12 @@ impl http_body::Body for Body { let mut stats = stats.lock().unwrap(); if stats.read_body_start().is_none() { stats.record_read_body_start(); - eprintln!("read_body_start: {:?}", stats.read_body_start()); } } Some(BodyStats::Write(stats)) => { let mut stats = stats.lock().unwrap(); if stats.write_start().is_none() { stats.record_write_start(); - eprintln!("write_start: {:?}", stats.write_start()); } } None => {} @@ -196,21 +194,18 @@ impl http_body::Body for Body { let mut stats = stats.lock().unwrap(); if stats.read_body_finish().is_none() { stats.record_read_body_finish(); - eprintln!("read_body_finish: {:?}", stats.read_body_finish()); } } Some(BodyStats::Write(stats)) => { let mut stats = stats.lock().unwrap(); if stats.write_finish().is_none() { stats.record_write_finish(); - eprintln!("write_finish: {:?}", stats.write_finish()); } } None => {} } } } - eprintln!("stats: {:?}", this.stats); result } diff --git a/volo-http/src/server/mod.rs b/volo-http/src/server/mod.rs index 492d3c05..5347cc27 100644 --- a/volo-http/src/server/mod.rs +++ b/volo-http/src/server/mod.rs @@ -567,7 +567,6 @@ where let (parts, body) = resp.into_parts(); let body = body.with_write_stats(stats.clone()); let resp = http::Response::from_parts(parts, body); - eprintln!("{:?}", cx.stats); service.span_provider.leave_serve(&cx); Ok(resp) }), From 089f27f0644e3c650c52fd01cb50eff096f06369 Mon Sep 17 00:00:00 2001 From: "joshua.ho@bytedance.com" Date: Wed, 1 Apr 2026 15:08:29 +0800 Subject: [PATCH 4/8] fix(volo-http): move handle start recording to correct place --- volo-http/src/body.rs | 1 + volo-http/src/server/mod.rs | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/volo-http/src/body.rs b/volo-http/src/body.rs index f6c16d26..e77f3ad0 100644 --- a/volo-http/src/body.rs +++ b/volo-http/src/body.rs @@ -194,6 +194,7 @@ impl http_body::Body for Body { let mut stats = stats.lock().unwrap(); if stats.read_body_finish().is_none() { stats.record_read_body_finish(); + stats.record_handle_start(); } } Some(BodyStats::Write(stats)) => { diff --git a/volo-http/src/server/mod.rs b/volo-http/src/server/mod.rs index 5347cc27..e57e9187 100644 --- a/volo-http/src/server/mod.rs +++ b/volo-http/src/server/mod.rs @@ -545,7 +545,6 @@ where let mut s = stats.lock().unwrap(); s.set_read_header_start(service.accept_time); s.set_read_header_finish(call_time); - s.record_handle_start(); } let (parts, body) = req.into_parts(); From c96e61cb746d729a072080a643d83b1494e03290 Mon Sep 17 00:00:00 2001 From: "joshua.ho@bytedance.com" Date: Wed, 1 Apr 2026 17:10:12 +0800 Subject: [PATCH 5/8] fix(volo-http): remove Arc and Mutex from ServerStats --- volo-http/src/body.rs | 109 +------------------------------- volo-http/src/context/server.rs | 9 +-- volo-http/src/server/mod.rs | 32 ++-------- 3 files changed, 7 insertions(+), 143 deletions(-) diff --git a/volo-http/src/body.rs b/volo-http/src/body.rs index e77f3ad0..d555abdc 100644 --- a/volo-http/src/body.rs +++ b/volo-http/src/body.rs @@ -2,8 +2,6 @@ //! //! See [`Body`] for more details. -#[cfg(feature = "server")] -use std::sync::{Arc, Mutex}; use std::{ convert::Infallible, error::Error, @@ -24,28 +22,16 @@ use pin_project::pin_project; #[cfg(feature = "json")] use serde::de::DeserializeOwned; -#[cfg(feature = "server")] -use crate::context::server::ServerStats; use crate::error::BoxError; // The `futures_util::stream::BoxStream` does not have `Sync` type BoxStream<'a, T> = Pin + Send + Sync + 'a>>; -#[cfg(feature = "server")] -#[derive(Debug)] -enum BodyStats { - /// Attached to request bodies: tracks when the server reads the request. - Read(Arc>), - /// Attached to response bodies: tracks when the server writes the response. - Write(Arc>), -} /// An implementation for [`http_body::Body`]. #[pin_project] pub struct Body { #[pin] repr: BodyRepr, - #[cfg(feature = "server")] - stats: Option, } #[pin_project(project = BodyProj)] @@ -74,8 +60,6 @@ impl Body { pub fn empty() -> Self { Self { repr: BodyRepr::Full(Full::new(Bytes::new())), - #[cfg(feature = "server")] - stats: None, } } @@ -86,8 +70,6 @@ impl Body { pub fn from_incoming(incoming: Incoming) -> Self { Self { repr: BodyRepr::Hyper(incoming), - #[cfg(feature = "server")] - stats: None, } } @@ -98,8 +80,6 @@ impl Body { { Self { repr: BodyRepr::Stream(StreamBody::new(Box::pin(stream))), - #[cfg(feature = "server")] - stats: None, } } @@ -111,28 +91,8 @@ impl Body { { Self { repr: BodyRepr::Body(BoxBody::new(body.map_err(Into::into))), - #[cfg(feature = "server")] - stats: None, } } - - /// Attach server stats for recording body read timings. - /// - /// This should only be called on request bodies in a server context. - #[cfg(feature = "server")] - pub fn with_read_stats(mut self, stats: Arc>) -> Self { - self.stats = Some(BodyStats::Read(stats)); - self - } - - /// Attach server stats for recording body write timings. - /// - /// This should only be called on response bodies in a server context. - #[cfg(feature = "server")] - pub fn with_write_stats(mut self, stats: Arc>) -> Self { - self.stats = Some(BodyStats::Write(stats)); - self - } } impl http_body::Body for Body { @@ -143,71 +103,14 @@ impl http_body::Body for Body { self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll, Self::Error>>> { - #[cfg(feature = "server")] - let is_full = matches!(self.repr, BodyRepr::Full(_)); - - let this = self.project(); - - #[cfg(feature = "server")] - match this.stats.as_ref() { - Some(BodyStats::Read(stats)) => { - let mut stats = stats.lock().unwrap(); - if stats.read_body_start().is_none() { - stats.record_read_body_start(); - } - } - Some(BodyStats::Write(stats)) => { - let mut stats = stats.lock().unwrap(); - if stats.write_start().is_none() { - stats.record_write_start(); - } - } - None => {} - } - - let result = match this.repr.project() { + match self.project().repr.project() { BodyProj::Full(full) => http_body::Body::poll_frame(full, cx).map_err(BoxError::from), BodyProj::Hyper(incoming) => { http_body::Body::poll_frame(incoming, cx).map_err(BoxError::from) } BodyProj::Stream(stream) => http_body::Body::poll_frame(stream, cx), BodyProj::Body(body) => http_body::Body::poll_frame(body, cx), - }; - - #[cfg(feature = "server")] - { - let is_terminal = match this.stats.as_ref() { - // For response bodies, also treat a successful frame as terminal. - // Hyper uses is_end_stream() for Full bodies and won't poll again - // after the last frame, so Poll::Ready(None) never fires. - Some(BodyStats::Write(_)) => { - matches!(result, Poll::Ready(None) | Poll::Ready(Some(Err(_)))) - || (is_full && matches!(result, Poll::Ready(Some(Ok(_))))) - } - // For request bodies, only treat exhaustion or error as terminal. - _ => matches!(result, Poll::Ready(None) | Poll::Ready(Some(Err(_)))), - }; - - if is_terminal { - match this.stats.as_ref() { - Some(BodyStats::Read(stats)) => { - let mut stats = stats.lock().unwrap(); - if stats.read_body_finish().is_none() { - stats.record_read_body_finish(); - stats.record_handle_start(); - } - } - Some(BodyStats::Write(stats)) => { - let mut stats = stats.lock().unwrap(); - if stats.write_finish().is_none() { - stats.record_write_finish(); - } - } - None => {} - } - } } - result } fn is_end_stream(&self) -> bool { @@ -396,8 +299,6 @@ impl From<&'static str> for Body { fn from(value: &'static str) -> Self { Self { repr: BodyRepr::Full(Full::new(Bytes::from_static(value.as_bytes()))), - #[cfg(feature = "server")] - stats: None, } } } @@ -406,8 +307,6 @@ impl From> for Body { fn from(value: Vec) -> Self { Self { repr: BodyRepr::Full(Full::new(Bytes::from(value))), - #[cfg(feature = "server")] - stats: None, } } } @@ -416,8 +315,6 @@ impl From for Body { fn from(value: Bytes) -> Self { Self { repr: BodyRepr::Full(Full::new(value)), - #[cfg(feature = "server")] - stats: None, } } } @@ -426,8 +323,6 @@ impl From for Body { fn from(value: FastStr) -> Self { Self { repr: BodyRepr::Full(Full::new(value.into_bytes())), - #[cfg(feature = "server")] - stats: None, } } } @@ -436,8 +331,6 @@ impl From for Body { fn from(value: String) -> Self { Self { repr: BodyRepr::Full(Full::new(Bytes::from(value))), - #[cfg(feature = "server")] - stats: None, } } } diff --git a/volo-http/src/context/server.rs b/volo-http/src/context/server.rs index 08ff1ed0..fcda69ab 100644 --- a/volo-http/src/context/server.rs +++ b/volo-http/src/context/server.rs @@ -1,5 +1,4 @@ //! Context and its utilities of server -use std::sync::{Arc, Mutex}; use chrono::{DateTime, Local}; use volo::{ @@ -48,16 +47,12 @@ pub struct ServerCxInner { pub params: PathParamsVec, /// Statistics of the request - pub stats: Arc>, + pub stats: ServerStats, } impl ServerCxInner { impl_getter!(params, PathParamsVec); - - /// Return the statistics of the request - pub fn stats(&self) -> &Arc> { - &self.stats - } + impl_getter!(stats, ServerStats); } /// Statistics of server diff --git a/volo-http/src/server/mod.rs b/volo-http/src/server/mod.rs index e57e9187..f0573b66 100644 --- a/volo-http/src/server/mod.rs +++ b/volo-http/src/server/mod.rs @@ -13,7 +13,6 @@ use std::{ time::Duration, }; -use chrono::{DateTime, Local}; use futures::future::BoxFuture; use hyper_util::{ rt::{TokioExecutor, TokioIo}, @@ -448,14 +447,12 @@ async fn serve( continue; } }; - let accept_time = Local::now(); let hyper_service = HyperService { inner: service.clone(), peer, config: config.clone(), span_provider: span_provider.clone(), - accept_time, }; tokio::spawn(serve_conn( @@ -515,7 +512,6 @@ struct HyperService { peer: Address, config: Config, span_provider: SP, - accept_time: DateTime, } type HyperRequest = http::request::Request; @@ -533,39 +529,19 @@ where fn call(&self, req: HyperRequest) -> Self::Future { let service = self.clone(); - let call_time = Local::now(); Box::pin( METAINFO.scope(RefCell::new(MetaInfo::default()), async move { let mut cx = ServerContext::new(service.peer); cx.rpc_info_mut().set_config(service.config); - - let stats = cx.stats().clone(); - - { - let mut s = stats.lock().unwrap(); - s.set_read_header_start(service.accept_time); - s.set_read_header_finish(call_time); - } - - let (parts, body) = req.into_parts(); - let body = Body::from_incoming(body).with_read_stats(stats.clone()); - let req = http::Request::from_parts(parts, body); - let span = service.span_provider.on_serve(&cx); - let resp = service + cx.stats.record_handle_start(); + let resp: http::Response = service .inner - .call(&mut cx, req) + .call(&mut cx, req.map(Body::from_incoming)) .instrument(span) .await .into_response(); - - stats.lock().unwrap().record_handle_finish(); - - // Wrap response body — shares the same Arc, write_start/finish - // will be recorded directly into cx's stats - let (parts, body) = resp.into_parts(); - let body = body.with_write_stats(stats.clone()); - let resp = http::Response::from_parts(parts, body); + cx.stats.record_handle_finish(); service.span_provider.leave_serve(&cx); Ok(resp) }), From 0a46e9a08238da9f66ba53cf34f372385813de73 Mon Sep 17 00:00:00 2001 From: "joshua.ho@bytedance.com" Date: Wed, 8 Apr 2026 11:40:56 +0800 Subject: [PATCH 6/8] chore(volo-http): update CLAUDE.md to include stat.rs --- volo-http/CLAUDE.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/volo-http/CLAUDE.md b/volo-http/CLAUDE.md index abdf464c..60e93f87 100644 --- a/volo-http/CLAUDE.md +++ b/volo-http/CLAUDE.md @@ -12,7 +12,8 @@ volo-http/src/ ├── response.rs # Response type alias ├── context/ # RPC contexts │ ├── client.rs # ClientContext (target, stats, timeout) -│ └── server.rs # ServerContext (RpcInfo, path params, extensions) +│ ├── server.rs # ServerContext (RpcInfo, path params, extensions) +│ └── stat.rs # CommonStats ├── error/ │ ├── client.rs # ClientError │ └── server.rs # ExtractBodyError From 7ca20c8c39e8cd040e8a8cd5a8f0331cbe6e696a Mon Sep 17 00:00:00 2001 From: "joshua.ho@bytedance.com" Date: Wed, 8 Apr 2026 14:25:38 +0800 Subject: [PATCH 7/8] fix(volo-http): correct server handle timings --- volo-http/src/server/handler.rs | 5 ++++- volo-http/src/server/mod.rs | 2 -- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/volo-http/src/server/handler.rs b/volo-http/src/server/handler.rs index 802cc034..7cea8033 100644 --- a/volo-http/src/server/handler.rs +++ b/volo-http/src/server/handler.rs @@ -74,7 +74,10 @@ macro_rules! impl_handler { Ok(value) => value, Err(rejection) => return rejection.into_response(), }; - self($($ty,)* $last).await.into_response() + cx.stats.record_handle_start(); + let result = self($($ty,)* $last).await; + cx.stats.record_handle_finish(); + result.into_response() } } }; diff --git a/volo-http/src/server/mod.rs b/volo-http/src/server/mod.rs index f0573b66..02d4deb6 100644 --- a/volo-http/src/server/mod.rs +++ b/volo-http/src/server/mod.rs @@ -534,14 +534,12 @@ where let mut cx = ServerContext::new(service.peer); cx.rpc_info_mut().set_config(service.config); let span = service.span_provider.on_serve(&cx); - cx.stats.record_handle_start(); let resp: http::Response = service .inner .call(&mut cx, req.map(Body::from_incoming)) .instrument(span) .await .into_response(); - cx.stats.record_handle_finish(); service.span_provider.leave_serve(&cx); Ok(resp) }), From efdf473fa27a5875c02051bf805241ad20573651 Mon Sep 17 00:00:00 2001 From: "joshua.ho@bytedance.com" Date: Fri, 17 Apr 2026 09:47:15 +0800 Subject: [PATCH 8/8] chore(volo-http): bump crate version --- Cargo.lock | 2 +- volo-http/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6f48c2ee..0d45ad95 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4308,7 +4308,7 @@ dependencies = [ [[package]] name = "volo-http" -version = "0.5.4" +version = "0.5.5" dependencies = [ "ahash", "async-broadcast", diff --git a/volo-http/Cargo.toml b/volo-http/Cargo.toml index 47984cfa..81cedcec 100644 --- a/volo-http/Cargo.toml +++ b/volo-http/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "volo-http" -version = "0.5.4" +version = "0.5.5" edition.workspace = true homepage.workspace = true repository.workspace = true