From adbf9ef1339d0b4d96bb206d3477be9090cd0509 Mon Sep 17 00:00:00 2001 From: zhuofeng Date: Mon, 30 Mar 2026 16:37:34 +0800 Subject: [PATCH] fix: Agent is unaware of forced Server restarts, causing retransmission timeout latency of ~15 minutes. --- agent/src/sender/uniform_sender.rs | 36 ++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/agent/src/sender/uniform_sender.rs b/agent/src/sender/uniform_sender.rs index 909d712292a..252f3916d9c 100644 --- a/agent/src/sender/uniform_sender.rs +++ b/agent/src/sender/uniform_sender.rs @@ -27,6 +27,8 @@ use std::sync::{ use std::thread::{self, JoinHandle}; use std::time::{Duration, Instant, SystemTime}; +use socket2::TcpKeepalive; + use arc_swap::access::Access; use lazy_static::lazy_static; use log::{debug, error, info, warn}; @@ -433,6 +435,10 @@ impl UniformSender { const TCP_WRITE_TIMEOUT: u64 = 3; // s const QUEUE_READ_TIMEOUT: u64 = 3; // s const DEFAULT_RECONNECT_INTERVAL: u8 = 10; // s + const TCP_KEEPALIVE_TIME: u64 = 30; // s, idle time before first keepalive probe + const TCP_KEEPALIVE_INTERVAL: u64 = 10; // s, interval between keepalive probes + const TCP_KEEPALIVE_RETRIES: u32 = 3; // number of keepalive probes before declaring dead + const TCP_USER_TIMEOUT: u64 = 100; // s, max time for unacked data before TCP gives up pub fn new( id: usize, @@ -596,6 +602,36 @@ impl UniformSender { conn.tcp_stream.take(); return; } + #[cfg(target_os = "linux")] + { + // Wrap the raw fd temporarily to configure socket options; forget the + // wrapper so it does not close the fd owned by TcpStream. + let sock = socket2::SockRef::from(&*tcp_stream); + // SO_KEEPALIVE: detect idle dead connections (server went away silently). + // After TCP_KEEPALIVE_TIME seconds of idle the kernel sends probes every + // TCP_KEEPALIVE_INTERVAL seconds; TCP_KEEPALIVE_RETRIES failed probes + // close the connection (~60 s total with these defaults). + let keepalive = TcpKeepalive::new() + .with_time(Duration::from_secs(Self::TCP_KEEPALIVE_TIME)) + .with_interval(Duration::from_secs(Self::TCP_KEEPALIVE_INTERVAL)) + .with_retries(Self::TCP_KEEPALIVE_RETRIES); + if let Err(e) = sock.set_tcp_keepalive(&keepalive) { + debug!("{} sender tcp stream set keepalive failed {}", self.name, e); + } + // TCP_USER_TIMEOUT: limit how long unacknowledged data may remain in + // flight before the kernel gives up. This caps the TCP retransmission + // storm that would otherwise last ~15 min (tcp_retries2 = 15) when the + // remote end vanishes without sending RST/FIN. 100 s corresponds + // roughly to tcp_retries2 = 5. + if let Err(e) = + sock.set_tcp_user_timeout(Some(Duration::from_secs(Self::TCP_USER_TIMEOUT))) + { + debug!( + "{} sender tcp stream set user timeout failed {}", + self.name, e + ); + } + } info!( "{} sender tcp connection to {}:{} succeed.", self.name, conn.dest_ip, conn.dest_port