Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 60 additions & 24 deletions crates/agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,24 +241,6 @@ async fn async_main(args: Args) -> Result<(), anyhow::Error> {
})
.connect_lazy_with(pg_options);

// A fresh Cloud Run instance with direct VPC egress can take tens of seconds
// before its network interface passes outbound traffic. Retry the initial
// connection against a generous deadline, rather than raising the pool's
// acquire_timeout, which also bounds request-path acquires at runtime.
let connect_deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(120);
loop {
match pg_pool.acquire().await {
Ok(_) => break,
Err(err) if tokio::time::Instant::now() < connect_deadline => {
tracing::warn!(error = ?err, "initial database connection failed; retrying");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Err(err) => {
return Err(anyhow::Error::new(err).context("connecting to database"));
}
}
}

// Periodically log information about the connection pool to aid in debugging.
let pool_copy = pg_pool.clone();
tokio::spawn(async move {
Expand All @@ -275,9 +257,27 @@ async fn async_main(args: Args) -> Result<(), anyhow::Error> {
}
});

let system_user_id = control_plane_api::get_user_id_for_email(&args.accounts_email, &pg_pool)
.await
.context("querying for agent user id")?;
// A fresh Cloud Run instance with direct VPC egress can take tens of seconds
// before its network interface passes outbound traffic. Retry transient
// failures of this first query against a generous deadline, rather than
// raising the pool's acquire_timeout, which also bounds request-path
// acquires at runtime. Non-transient errors (bad credentials, TLS or query
// failures) exit immediately so that misconfiguration surfaces fast.
let startup_deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(120);
let system_user_id = loop {
match control_plane_api::get_user_id_for_email(&args.accounts_email, &pg_pool).await {
Ok(user_id) => break user_id,
Err(err @ (sqlx::Error::PoolTimedOut | sqlx::Error::Io(_)))
if tokio::time::Instant::now() < startup_deadline =>
{
tracing::warn!(error = ?err, "initial database query failed; retrying");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Err(err) => {
return Err(anyhow::Error::new(err).context("querying for agent user id"));
}
}
};
let jwt_secret: String =
std::env::var("CONTROL_PLANE_JWT_SECRET").context("missing CONTROL_PLANE_JWT_SECRET")?;

Expand Down Expand Up @@ -308,11 +308,31 @@ async fn async_main(args: Args) -> Result<(), anyhow::Error> {
}

// Share-able future which completes when the agent should exit.
let shutdown = tokio::signal::ctrl_c().map(|_| ()).shared();
// Cloud Run and Kubernetes signal shutdown with SIGTERM; SIGINT covers
// interactive use.
let shutdown = async {
let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("failed to install SIGTERM handler");
tokio::select! {
_ = tokio::signal::ctrl_c() => {},
_ = sigterm.recv() => {},
}
}
.shared();

// Create the snapshot source and start the refresh loop.
// Snapshot fetches retry internally forever, so a persistent failure (a
// broken query, sops / KMS breakage) would otherwise hang here with the
// port unbound and nothing logged at error level. Bound the wait so that
// startup fails visibly, and fits within Cloud Run's 240s startup probe
// window even after the database retry budget above.
let snapshot_source = control_plane_api::snapshot::PgSnapshotSource::new(pg_pool.clone());
let snapshot_watch = tokens::watch(snapshot_source).ready_owned().await;
let snapshot_watch = tokio::time::timeout(
std::time::Duration::from_secs(60),
tokens::watch(snapshot_source).ready_owned(),
)
.await
.context("timed out fetching the initial authorization snapshot")?;

let controller_publication_cooldown =
chrono::Duration::from_std(args.controller_publication_cooldown)?;
Expand Down Expand Up @@ -425,7 +445,23 @@ async fn async_main(args: Args) -> Result<(), anyhow::Error> {
};

std::mem::drop(logs_tx);
let ((), (), ()) = tokio::try_join!(api_server, logs_sink, automations_fut)?;

// Don't join on the logs sink: it completes only once every sender clone
// has dropped, and several are owned by locals of this function (App,
// Publisher, and friends), so joining deadlocks the graceful-shutdown
// path and the process must be SIGKILLed. Instead serve until shutdown,
// surfacing an early sink failure as fatal, then give the sink a bounded
// window to drain logs buffered in the channel before exiting.
tokio::pin!(logs_sink);
tokio::select! {
result = async { tokio::try_join!(api_server, automations_fut) } => {
let ((), ()) = result?;
}
result = &mut logs_sink => {
return result.and(Err(anyhow::anyhow!("logs sink stopped while serving")));
}
}
let _ = tokio::time::timeout(std::time::Duration::from_secs(5), logs_sink).await;

Ok(())
}
Expand Down
Loading