Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions agent/src/sender/uniform_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use std::sync::{
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant, SystemTime};

use socket2::TcpKeepalive;

use arc_swap::access::Access;
use lazy_static::lazy_static;
use log::{debug, error, info, warn};
Expand Down Expand Up @@ -433,6 +435,10 @@ impl<T: Sendable> UniformSender<T> {
const TCP_WRITE_TIMEOUT: u64 = 3; // s
const QUEUE_READ_TIMEOUT: u64 = 3; // s
const DEFAULT_RECONNECT_INTERVAL: u8 = 10; // s
const TCP_KEEPALIVE_TIME: u64 = 30; // s, idle time before first keepalive probe
const TCP_KEEPALIVE_INTERVAL: u64 = 10; // s, interval between keepalive probes
const TCP_KEEPALIVE_RETRIES: u32 = 3; // number of keepalive probes before declaring dead
const TCP_USER_TIMEOUT: u64 = 100; // s, max time for unacked data before TCP gives up

pub fn new(
id: usize,
Expand Down Expand Up @@ -596,6 +602,36 @@ impl<T: Sendable> UniformSender<T> {
conn.tcp_stream.take();
return;
}
#[cfg(target_os = "linux")]
{
// Wrap the raw fd temporarily to configure socket options; forget the
// wrapper so it does not close the fd owned by TcpStream.
let sock = socket2::SockRef::from(&*tcp_stream);
// SO_KEEPALIVE: detect idle dead connections (server went away silently).
// After TCP_KEEPALIVE_TIME seconds of idle the kernel sends probes every
// TCP_KEEPALIVE_INTERVAL seconds; TCP_KEEPALIVE_RETRIES failed probes
// close the connection (~60 s total with these defaults).
let keepalive = TcpKeepalive::new()
.with_time(Duration::from_secs(Self::TCP_KEEPALIVE_TIME))
.with_interval(Duration::from_secs(Self::TCP_KEEPALIVE_INTERVAL))
.with_retries(Self::TCP_KEEPALIVE_RETRIES);
if let Err(e) = sock.set_tcp_keepalive(&keepalive) {
debug!("{} sender tcp stream set keepalive failed {}", self.name, e);
}
// TCP_USER_TIMEOUT: limit how long unacknowledged data may remain in
// flight before the kernel gives up. This caps the TCP retransmission
// storm that would otherwise last ~15 min (tcp_retries2 = 15) when the
// remote end vanishes without sending RST/FIN. 100 s corresponds
// roughly to tcp_retries2 = 5.
if let Err(e) =
sock.set_tcp_user_timeout(Some(Duration::from_secs(Self::TCP_USER_TIMEOUT)))
{
debug!(
"{} sender tcp stream set user timeout failed {}",
self.name, e
);
}
}
info!(
"{} sender tcp connection to {}:{} succeed.",
self.name, conn.dest_ip, conn.dest_port
Expand Down
Loading