Skip to content

Commit 9c78d98

Browse files
committed
Create error type specific to Sync domain
This new error type can give better context to the kinds of errors that can happen during a sync and also limits the enum variants compared to spfs::Error so errors that can't happen aren't represented. This work was originally motivated to make an Error type that is Clone, which is why the wrapped errors are all Arc, however it ended up that FallbackProxy::open_payload still has to return spfs::Error so having a Clone SyncError didn't help. The actual need is for an Error type that open_payload can return that is Clone. A Clone Error type is needed because the moka Cache puts errors into an Arc, since it needs to be able to clone the error itself. To get an error out of the cache and then return it, it needs to be Clone. Signed-off-by: J Robert Ray <[email protected]>
1 parent d1bf725 commit 9c78d98

File tree

7 files changed

+143
-45
lines changed

7 files changed

+143
-45
lines changed

crates/spfs/src/error.rs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@
55
use std::io;
66
use std::path::PathBuf;
77
use std::str::Utf8Error;
8+
use std::sync::Arc;
89

910
use miette::Diagnostic;
11+
use spfs_encoding::PartialDigest;
1012
use thiserror::Error;
1113

12-
use crate::{encoding, graph, storage};
14+
use crate::tracking::TagSpec;
15+
use crate::{Digest, encoding, graph, storage};
1316

1417
#[derive(Diagnostic, Debug, Error)]
1518
#[diagnostic(
@@ -384,3 +387,39 @@ impl OsError for std::io::Error {
384387
}
385388

386389
pub type Result<T> = std::result::Result<T, Error>;
390+
391+
#[derive(Diagnostic, Debug, Clone, Error)]
392+
#[diagnostic(
393+
url(
394+
"https://spkenv.dev/error_codes#{}",
395+
self.code().unwrap_or_else(|| Box::new("spfs::generic"))
396+
)
397+
)]
398+
pub enum SyncError {
399+
#[error("reference '{0}' could not be parsed: {1}")]
400+
ReferenceParseError(String, Arc<Error>),
401+
#[error("tag '{0}' could not be resolved: {1}")]
402+
TagResolveError(TagSpec, Arc<Error>),
403+
#[error("tag '{0}' could not be inserted: {1}")]
404+
TagInsertError(TagSpec, Arc<Error>),
405+
#[error("partial digest '{0}' could not be resolved: {1}")]
406+
DigestResolveError(PartialDigest, Arc<Error>),
407+
#[error("flatbuffer object digest() failed: {0}")]
408+
ObjectDigestError(Arc<Error>),
409+
#[error("object '{0}' could not be read: {1}")]
410+
ObjectReadError(Digest, Arc<Error>),
411+
#[error("object write failed: {0}")]
412+
ObjectWriteError(Arc<Error>),
413+
#[error("manifest '{0}' could not be read: {1}")]
414+
ManifestReadError(Digest, Arc<Error>),
415+
#[error("payload '{0}' could not be read: {1}")]
416+
PayloadReadError(Digest, Arc<Error>),
417+
#[error("payload '{0}' could not be written: {1}")]
418+
PayloadWriteError(Digest, Arc<Error>),
419+
#[error(
420+
"source repository provided payload that did not match the requested digest: wanted {0}, got {1}. wrote {2} bytes"
421+
)]
422+
PayloadDigestMismatch(Digest, Digest, u64),
423+
}
424+
425+
pub type SyncResult<T> = std::result::Result<T, SyncError>;

crates/spfs/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ pub use clean::Cleaner;
5050
pub use commit::Committer;
5151
pub use diff::{diff, diff_runtime_changes, runtime_active_changes};
5252
pub use encoding::Digest;
53-
pub use error::{Error, OsError, OsErrorExt, Result};
53+
pub use error::{Error, OsError, OsErrorExt, Result, SyncError, SyncResult};
5454
pub use resolve::{
5555
RenderResult,
5656
compute_environment_manifest,

crates/spfs/src/sync.rs

Lines changed: 89 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use tokio::sync::Semaphore;
2727
use crate::graph::AnnotationValue;
2828
use crate::prelude::*;
2929
use crate::sync::reporter::SyncItemResult;
30-
use crate::{Error, Result, encoding, graph, storage, tracking};
30+
use crate::{Error, SyncError, SyncResult, encoding, graph, storage, tracking};
3131

3232
/// The default limit for concurrent manifest sync operations
3333
/// per-syncer if not otherwise specified using
@@ -177,13 +177,15 @@ impl<'src, 'dst> Syncer<'src, 'dst> {
177177
/// Sync the object(s) referenced by the given string.
178178
///
179179
/// Any valid [`crate::tracking::EnvSpec`] is accepted as a reference.
180-
pub async fn sync_ref<R: AsRef<str>>(&self, reference: R) -> Result<SyncEnvResult> {
181-
let env_spec = reference.as_ref().parse()?;
180+
pub async fn sync_ref<R: AsRef<str>>(&self, reference: R) -> SyncResult<SyncEnvResult> {
181+
let env_spec = reference.as_ref().parse().map_err(|err: Error| {
182+
SyncError::ReferenceParseError(reference.as_ref().to_string(), err.into())
183+
})?;
182184
self.sync_env(env_spec).await
183185
}
184186

185187
/// Sync all of the objects identified by the given env.
186-
pub async fn sync_env(&self, env: tracking::EnvSpec) -> Result<SyncEnvResult> {
188+
pub async fn sync_env(&self, env: tracking::EnvSpec) -> SyncResult<SyncEnvResult> {
187189
self.reporter.visit_env(&env);
188190
let mut futures = FuturesUnordered::new();
189191
for item in env.iter().cloned() {
@@ -199,13 +201,16 @@ impl<'src, 'dst> Syncer<'src, 'dst> {
199201
}
200202

201203
/// Sync one environment item and any associated data.
202-
pub async fn sync_env_item(&self, item: tracking::EnvSpecItem) -> Result<SyncEnvItemResult> {
204+
pub async fn sync_env_item(
205+
&self,
206+
item: tracking::EnvSpecItem,
207+
) -> SyncResult<SyncEnvItemResult> {
203208
tracing::debug!(?item, "Syncing item");
204209
self.reporter.visit_env_item(&item);
205210
let res = match item {
206211
tracking::EnvSpecItem::Digest(digest) => match self.sync_object_digest(digest).await {
207212
Ok(r) => SyncEnvItemResult::Object(r),
208-
Err(Error::UnknownObject(digest)) => self
213+
Err(SyncError::ObjectReadError(digest, _)) => self
209214
.sync_payload(digest)
210215
.await
211216
.map(SyncEnvItemResult::Payload)?,
@@ -227,21 +232,28 @@ impl<'src, 'dst> Syncer<'src, 'dst> {
227232
}
228233

229234
/// Sync the identified tag instance and its target.
230-
pub async fn sync_tag(&self, tag: tracking::TagSpec) -> Result<SyncTagResult> {
235+
pub async fn sync_tag(&self, tag: tracking::TagSpec) -> SyncResult<SyncTagResult> {
231236
if self.policy.check_existing_tags() && self.dest.resolve_tag(&tag).await.is_ok() {
232237
return Ok(SyncTagResult::Skipped);
233238
}
234239
self.reporter.visit_tag(&tag);
235-
let resolved = self.src.resolve_tag(&tag).await?;
240+
let resolved = self
241+
.src
242+
.resolve_tag(&tag)
243+
.await
244+
.map_err(|err| SyncError::TagResolveError(tag.clone(), err.into()))?;
236245
let result = match self.sync_object_digest(resolved.target).await {
237246
Ok(r) => SyncItemResult::Object(r),
238-
Err(Error::UnknownObject(digest)) => self
247+
Err(SyncError::ObjectReadError(digest, _)) => self
239248
.sync_payload(digest)
240249
.await
241250
.map(SyncItemResult::Payload)?,
242251
Err(e) => return Err(e),
243252
};
244-
self.dest.insert_tag(&resolved).await?;
253+
self.dest
254+
.insert_tag(&resolved)
255+
.await
256+
.map_err(|err| SyncError::TagInsertError(tag.clone(), err.into()))?;
245257
let res = SyncTagResult::Synced { tag, result };
246258
self.reporter.synced_tag(&res);
247259
Ok(res)
@@ -250,7 +262,7 @@ impl<'src, 'dst> Syncer<'src, 'dst> {
250262
pub async fn sync_partial_digest(
251263
&self,
252264
partial: encoding::PartialDigest,
253-
) -> Result<SyncItemResult> {
265+
) -> SyncResult<SyncItemResult> {
254266
let res = self.src.resolve_full_digest(&partial).await;
255267
let found_digest = match res {
256268
Err(err) if self.policy.check_existing_objects() => {
@@ -268,7 +280,8 @@ impl<'src, 'dst> Syncer<'src, 'dst> {
268280
.map_err(|_| err)
269281
}
270282
res => res,
271-
}?;
283+
}
284+
.map_err(|err| SyncError::DigestResolveError(partial, err.into()))?;
272285
match found_digest {
273286
graph::FoundDigest::Object(digest) => {
274287
let obj_result = self.sync_object_digest(digest).await?;
@@ -281,7 +294,10 @@ impl<'src, 'dst> Syncer<'src, 'dst> {
281294
}
282295
}
283296

284-
pub async fn sync_object_digest(&self, digest: encoding::Digest) -> Result<SyncObjectResult> {
297+
pub async fn sync_object_digest(
298+
&self,
299+
digest: encoding::Digest,
300+
) -> SyncResult<SyncObjectResult> {
285301
// don't write the digest here, as that is the responsibility
286302
// of the function that actually handles the data copying.
287303
// a short-circuit is still nice when possible, though
@@ -293,7 +309,7 @@ impl<'src, 'dst> Syncer<'src, 'dst> {
293309
}
294310

295311
#[async_recursion::async_recursion]
296-
pub async fn sync_object(&self, obj: graph::Object) -> Result<SyncObjectResult> {
312+
pub async fn sync_object(&self, obj: graph::Object) -> SyncResult<SyncObjectResult> {
297313
use graph::object::Enum;
298314
self.reporter.visit_object(&obj);
299315
let res = match obj.into_enum() {
@@ -306,8 +322,10 @@ impl<'src, 'dst> Syncer<'src, 'dst> {
306322
Ok(res)
307323
}
308324

309-
pub async fn sync_platform(&self, platform: graph::Platform) -> Result<SyncPlatformResult> {
310-
let digest = platform.digest()?;
325+
pub async fn sync_platform(&self, platform: graph::Platform) -> SyncResult<SyncPlatformResult> {
326+
let digest = platform
327+
.digest()
328+
.map_err(|err| SyncError::ObjectDigestError(err.into()))?;
311329
if !self.processed_digests.insert(digest) {
312330
return Ok(SyncPlatformResult::Duplicate);
313331
}
@@ -325,15 +343,20 @@ impl<'src, 'dst> Syncer<'src, 'dst> {
325343
results.push(result);
326344
}
327345

328-
self.dest.write_object(&platform).await?;
346+
self.dest
347+
.write_object(&platform)
348+
.await
349+
.map_err(|err| SyncError::ObjectWriteError(err.into()))?;
329350

330351
let res = SyncPlatformResult::Synced { platform, results };
331352
self.reporter.synced_platform(&res);
332353
Ok(res)
333354
}
334355

335-
pub async fn sync_layer(&self, layer: graph::Layer) -> Result<SyncLayerResult> {
336-
let layer_digest = layer.digest()?;
356+
pub async fn sync_layer(&self, layer: graph::Layer) -> SyncResult<SyncLayerResult> {
357+
let layer_digest = layer
358+
.digest()
359+
.map_err(|err| SyncError::ObjectDigestError(err.into()))?;
337360
if !self.processed_digests.insert(layer_digest) {
338361
return Ok(SyncLayerResult::Duplicate);
339362
}
@@ -344,7 +367,11 @@ impl<'src, 'dst> Syncer<'src, 'dst> {
344367
self.reporter.visit_layer(&layer);
345368

346369
let manifest_result = if let Some(manifest_digest) = layer.manifest() {
347-
let manifest = self.src.read_manifest(*manifest_digest).await?;
370+
let manifest = self
371+
.src
372+
.read_manifest(*manifest_digest)
373+
.await
374+
.map_err(|err| SyncError::ManifestReadError(*manifest_digest, err.into()))?;
348375
self.sync_manifest(manifest).await?
349376
} else {
350377
SyncManifestResult::Skipped
@@ -364,7 +391,10 @@ impl<'src, 'dst> Syncer<'src, 'dst> {
364391
results
365392
};
366393

367-
self.dest.write_object(&layer).await?;
394+
self.dest
395+
.write_object(&layer)
396+
.await
397+
.map_err(|err| SyncError::ObjectWriteError(err.into()))?;
368398

369399
let mut results = vec![SyncObjectResult::Manifest(manifest_result)];
370400
results.extend(annotation_results);
@@ -374,8 +404,10 @@ impl<'src, 'dst> Syncer<'src, 'dst> {
374404
Ok(res)
375405
}
376406

377-
pub async fn sync_manifest(&self, manifest: graph::Manifest) -> Result<SyncManifestResult> {
378-
let manifest_digest = manifest.digest()?;
407+
pub async fn sync_manifest(&self, manifest: graph::Manifest) -> SyncResult<SyncManifestResult> {
408+
let manifest_digest = manifest
409+
.digest()
410+
.map_err(|err| SyncError::ObjectDigestError(err.into()))?;
379411
if !self.processed_digests.insert(manifest_digest) {
380412
return Ok(SyncManifestResult::Duplicate);
381413
}
@@ -402,7 +434,10 @@ impl<'src, 'dst> Syncer<'src, 'dst> {
402434
results.push(res);
403435
}
404436

405-
self.dest.write_object(&manifest).await?;
437+
self.dest
438+
.write_object(&manifest)
439+
.await
440+
.map_err(|err| SyncError::ObjectWriteError(err.into()))?;
406441

407442
drop(futures);
408443
let res = SyncManifestResult::Synced { manifest, results };
@@ -413,7 +448,7 @@ impl<'src, 'dst> Syncer<'src, 'dst> {
413448
async fn sync_annotation(
414449
&self,
415450
annotation: graph::Annotation<'_>,
416-
) -> Result<SyncAnnotationResult> {
451+
) -> SyncResult<SyncAnnotationResult> {
417452
match annotation.value() {
418453
AnnotationValue::String(_) => Ok(SyncAnnotationResult::InternalValue),
419454
AnnotationValue::Blob(digest) => {
@@ -435,7 +470,7 @@ impl<'src, 'dst> Syncer<'src, 'dst> {
435470
}
436471
}
437472

438-
async fn sync_entry(&self, entry: graph::Entry<'_>) -> Result<SyncEntryResult> {
473+
async fn sync_entry(&self, entry: graph::Entry<'_>) -> SyncResult<SyncEntryResult> {
439474
if !entry.kind().is_blob() {
440475
return Ok(SyncEntryResult::Skipped);
441476
}
@@ -450,15 +485,15 @@ impl<'src, 'dst> Syncer<'src, 'dst> {
450485
}
451486

452487
/// Sync the identified blob to the destination repository.
453-
pub async fn sync_blob(&self, blob: &graph::Blob) -> Result<SyncBlobResult> {
488+
pub async fn sync_blob(&self, blob: &graph::Blob) -> SyncResult<SyncBlobResult> {
454489
self.sync_blob_with_perms_opt(blob, None).await
455490
}
456491

457492
async fn sync_blob_with_perms_opt(
458493
&self,
459494
blob: &graph::Blob,
460495
perms: Option<u32>,
461-
) -> Result<SyncBlobResult> {
496+
) -> SyncResult<SyncBlobResult> {
462497
let digest = blob.digest();
463498
if self.processed_digests.contains(digest) {
464499
// do not insert here because blobs share a digest with payloads
@@ -484,7 +519,7 @@ impl<'src, 'dst> Syncer<'src, 'dst> {
484519
}
485520

486521
/// Sync a payload with the provided digest
487-
pub async fn sync_payload(&self, digest: encoding::Digest) -> Result<SyncPayloadResult> {
522+
pub async fn sync_payload(&self, digest: encoding::Digest) -> SyncResult<SyncPayloadResult> {
488523
self.sync_payload_with_perms_opt(digest, None).await
489524
}
490525

@@ -494,7 +529,7 @@ impl<'src, 'dst> Syncer<'src, 'dst> {
494529
&self,
495530
digest: encoding::Digest,
496531
perms: Option<u32>,
497-
) -> Result<SyncPayloadResult> {
532+
) -> SyncResult<SyncPayloadResult> {
498533
if self.processed_digests.contains(&digest) {
499534
return Ok(SyncPayloadResult::Duplicate);
500535
}
@@ -509,25 +544,42 @@ impl<'src, 'dst> Syncer<'src, 'dst> {
509544
_permit.is_ok(),
510545
"We never close the semaphore and so should never see errors"
511546
);
512-
let (mut payload, _) = self.src.open_payload(digest).await?;
547+
let (mut payload, _) = self
548+
.src
549+
.open_payload(digest)
550+
.await
551+
.map_err(|err| SyncError::PayloadReadError(digest, err.into()))?;
513552
if let Some(perms) = perms {
514553
payload = Box::pin(payload.with_permissions(perms));
515554
}
516555

517-
let (created_digest, size) = self.dest.write_data(payload).await?;
556+
let (created_digest, size) = self
557+
.dest
558+
.write_data(payload)
559+
.await
560+
.map_err(|err| SyncError::PayloadWriteError(digest, err.into()))?;
518561
if digest != created_digest {
519-
return Err(Error::String(format!(
520-
"Source repository provided payload that did not match the requested digest: wanted {digest}, got {created_digest}. wrote {size} bytes",
521-
)));
562+
return Err(SyncError::PayloadDigestMismatch(
563+
digest,
564+
created_digest,
565+
size,
566+
));
522567
}
523568

524569
let res = SyncPayloadResult::Synced { size };
525570
self.reporter.synced_payload(&res);
526571
Ok(res)
527572
}
528573

529-
async fn read_object_with_fallback(&self, digest: encoding::Digest) -> Result<graph::Object> {
530-
let res = self.src.read_object(digest).await;
574+
async fn read_object_with_fallback(
575+
&self,
576+
digest: encoding::Digest,
577+
) -> SyncResult<graph::Object> {
578+
let res = self
579+
.src
580+
.read_object(digest)
581+
.await
582+
.map_err(|err| SyncError::ObjectReadError(digest, err.into()));
531583
match res {
532584
Err(err) if self.policy.check_existing_objects() => {
533585
// since objects are unique by digest, we can recover

0 commit comments

Comments
 (0)