Skip to content

Commit c9b4e7c

Browse files
committed
Prevent FallbackRepository from download storms
Use a moka Cache to funnel multiple requests for the same missing payload into a single task to sync the payload locally. Any failure to sync the payload will be cached for 5 minutes, after which another request for that digest will attempt to sync it again. This was inspired by observing that syncing a single spfs tag would report having "repaired" several different digests multiple times, often over a dozen times. It suggests that Syncer may be unexpectedly trying to process the same digest multiple times. Each of these digests were for large payloads (500MB+). The sync was happening over a high-latency link. Without this change, it can end up performing excessive duplicate copies over a limited bandwidth link. Signed-off-by: J Robert Ray <[email protected]>
1 parent 656426f commit c9b4e7c

File tree

5 files changed

+127
-55
lines changed

5 files changed

+127
-55
lines changed

Cargo.lock

Lines changed: 58 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ is_default_derive_macro = { path = "crates/is_default_derive_macro" }
7676
itertools = "0.14"
7777
libc = "0.2.172"
7878
miette = "7.0"
79+
moka = "0.12.11"
7980
nix = { version = "0.29", features = ["mount", "sched", "user"] }
8081
nom = "7.1"
8182
nom-supreme = "0.8"

crates/spfs/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ libc = { workspace = true }
5858
linux-raw-sys = "0.8.0"
5959
linux-syscall = "1.0.0"
6060
miette = { workspace = true }
61+
moka = { workspace = true, features = ["future"] }
6162
nix = { workspace = true, features = ["fs"] }
6263
nonempty = "0.8.1"
6364
num_cpus = "1.13.1"

crates/spfs/src/storage/fallback/repository.rs

Lines changed: 66 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::sync::Arc;
88

99
use chrono::{DateTime, Utc};
1010
use futures::Stream;
11+
use moka::future::Cache;
1112
use relative_path::RelativePath;
1213

1314
use crate::config::{ToAddress, default_fallback_repo_include_secondary_tags};
@@ -85,6 +86,7 @@ pub struct FallbackProxy {
8586
primary: Arc<OpenFsRepository>,
8687
secondary: Vec<crate::storage::RepositoryHandle>,
8788
include_secondary_tags: bool,
89+
open_payload_cache: Cache<encoding::Digest, ()>,
8890
}
8991

9092
impl FallbackProxy {
@@ -103,6 +105,12 @@ impl FallbackProxy {
103105
primary: primary.into(),
104106
secondary,
105107
include_secondary_tags,
108+
open_payload_cache: Cache::builder()
109+
// The TTL thought here is that failed attempts will only get
110+
// cached for so long and then reattempted on a subsequent
111+
// open_payload call, if any.
112+
.time_to_live(std::time::Duration::from_secs(300))
113+
.build(),
106114
}
107115
}
108116
}
@@ -162,11 +170,11 @@ impl storage::FromConfig for FallbackProxy {
162170
Ok(secondary)
163171
};
164172
let (primary, secondary) = tokio::try_join!(primary, secondary)?;
165-
Ok(Self {
166-
primary: primary.into(),
173+
Ok(Self::new(
174+
Arc::new(primary),
167175
secondary,
168-
include_secondary_tags: config.include_secondary_tags,
169-
})
176+
config.include_secondary_tags,
177+
))
170178
}
171179
}
172180

@@ -302,62 +310,65 @@ impl PayloadStorage for FallbackProxy {
302310
&self,
303311
digest: encoding::Digest,
304312
) -> PayloadResult<(Pin<Box<dyn BlobRead>>, std::path::PathBuf)> {
305-
let mut fallbacks = self.secondary.iter();
306-
307-
'retry_open: loop {
308-
let missing_payload_error = match self.primary.open_payload(digest).await {
309-
Ok(r) => return Ok(r),
310-
Err(err @ PayloadError::UnknownPayload(_)) => err,
311-
Err(err) => return Err(err),
312-
};
313+
if let ok @ Ok(_) = self.primary.open_payload(digest).await {
314+
return ok;
315+
}
313316

314-
let mut repair_failure = None;
315-
316-
let dest_repo = self.primary.clone().into();
317-
for fallback in fallbacks.by_ref() {
318-
let syncer = crate::Syncer::new(fallback, &dest_repo)
319-
.with_policy(crate::sync::SyncPolicy::ResyncEverything)
320-
.with_reporter(
321-
// There may already be a progress bar in use in this
322-
// context, so don't make another one here.
323-
SyncReporters::silent(),
324-
);
325-
match syncer.sync_payload(digest).await {
326-
Ok(_) => {
327-
// Warn for non-sentry users; info for sentry users.
328-
#[cfg(not(feature = "sentry"))]
329-
{
330-
tracing::warn!("Repaired a missing payload! {digest}",);
317+
// If a payload is not available in the primary, we want only one task
318+
// attempting to download it from the secondary repositories. Other
319+
// concurrent attempts to open the same payload should wait for
320+
// that attempt to complete, and then try to open the payload again.
321+
let mut fallbacks = self.secondary.iter();
322+
let dest_repo = self.primary.clone().into();
323+
self.open_payload_cache
324+
.try_get_with(digest, async move {
325+
let mut repair_failure = None;
326+
for fallback in fallbacks.by_ref() {
327+
let syncer = crate::Syncer::new(fallback, &dest_repo)
328+
.with_policy(crate::sync::SyncPolicy::ResyncEverything)
329+
.with_reporter(
330+
// There may already be a progress bar in use in this
331+
// context, so don't make another one here.
332+
SyncReporters::silent(),
333+
);
334+
match syncer.sync_payload(digest).await {
335+
Ok(_) => {
336+
// Warn for non-sentry users; info for sentry users.
337+
#[cfg(not(feature = "sentry"))]
338+
{
339+
tracing::warn!("Repaired a missing payload! {digest}",);
340+
}
341+
#[cfg(feature = "sentry")]
342+
{
343+
tracing::info!("Repaired a missing payload! {digest}",);
344+
tracing::error!(target: "sentry", object = %digest, "Repaired a missing payload!");
345+
}
346+
return Ok(());
331347
}
332-
#[cfg(feature = "sentry")]
333-
{
334-
tracing::info!("Repaired a missing payload! {digest}",);
335-
tracing::error!(target: "sentry", object = %digest, "Repaired a missing payload!");
348+
Err(err) => {
349+
#[cfg(feature = "sentry")]
350+
tracing::error!(
351+
target: "sentry",
352+
object = %digest,
353+
?err,
354+
"Could not repair a missing payload"
355+
);
356+
357+
repair_failure = Some(PayloadError::String(format!("failed to repair payload: {err}")));
336358
}
337-
continue 'retry_open;
338-
}
339-
Err(err) => {
340-
#[cfg(feature = "sentry")]
341-
tracing::error!(
342-
target: "sentry",
343-
object = %digest,
344-
?err,
345-
"Could not repair a missing payload"
346-
);
347-
348-
repair_failure = Some(err);
349359
}
350360
}
351-
}
352-
353-
if let Some(err) = repair_failure {
354-
// The different fallbacks may fail for different reasons,
355-
// we just show the most recent failure here.
356-
tracing::warn!("Could not repair a missing payload: {err}");
357-
}
358-
359-
return Err(missing_payload_error);
360-
}
361+
if let Some(err) = repair_failure {
362+
return Err(err);
363+
}
364+
// Probably can only get here if there were no secondary repos.
365+
Err(PayloadError::String("no repositories could successfully read the payload".into()))
366+
})
367+
.await.map_err(|err| (*err).clone())?;
368+
369+
// Then each caller needs to try to open the payload again, to get their
370+
// own handle to it.
371+
self.primary.open_payload(digest).await
361372
}
362373

363374
async fn remove_payload(&self, digest: encoding::Digest) -> PayloadResult<()> {

cspell.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,7 @@
426426
"mksource",
427427
"mksrc",
428428
"modversions",
429+
"moka",
429430
"mountpoint",
430431
"mpfr",
431432
"mrtm",

0 commit comments

Comments
 (0)