Skip to content

Commit 895abe1

Browse files
committed
Create an error type specific to payload operations
This error type is Clone and can be used in the upcoming change to open_payload that can return a cached result. Signed-off-by: J Robert Ray <[email protected]>
1 parent 9c78d98 commit 895abe1

File tree

25 files changed

+488
-187
lines changed

25 files changed

+488
-187
lines changed

crates/spfs-vfs/src/winfsp/mount.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ impl winfsp::filesystem::FileSystemContext for Mount {
311311
// TODO: are there attribute flags to identify this as a non-seekable file?
312312
return;
313313
}
314-
Err(spfs::Error::UnknownObject(_)) => continue,
314+
Err(spfs::PayloadError::UnknownPayload(_)) => continue,
315315
Err(err) => err!(send, err),
316316
},
317317
}

crates/spfs/src/clean.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::prelude::*;
2222
use crate::runtime::makedirs_with_perms;
2323
use crate::storage::fs::FsRepositoryOps;
2424
use crate::storage::{TagNamespace, TagNamespaceBuf};
25-
use crate::{Digest, Error, Result, encoding, graph, storage, tracking};
25+
use crate::{Digest, Error, PayloadError, Result, encoding, graph, storage, tracking};
2626

2727
#[cfg(test)]
2828
#[path = "./clean_test.rs"]
@@ -544,6 +544,11 @@ where
544544
/// objects has completed successfully and with no errors. Otherwise, it may
545545
/// remove data that is still being used
546546
async unsafe fn remove_unvisited_objects_and_payloads(&self) -> Result<CleanResult> {
547+
enum ErrorFlavor {
548+
Spfs(Error),
549+
Payload(PayloadError),
550+
}
551+
547552
let mut result = CleanResult::default();
548553
let mut stream = self
549554
.repo
@@ -571,17 +576,19 @@ where
571576
graph::DatabaseItem::Object(digest, _flat_object) => self
572577
.repo
573578
.remove_object_if_older_than(self.must_be_older_than, *digest)
579+
.map_err(ErrorFlavor::Spfs)
574580
.boxed(),
575581
graph::DatabaseItem::Payload(digest) => self
576582
.repo
577583
.remove_payload_if_older_than(self.must_be_older_than, *digest)
584+
.map_err(ErrorFlavor::Payload)
578585
.boxed(),
579586
})
580-
.map(|res| {
581-
if let Err(Error::UnknownObject(_)) = res {
582-
return Ok(true);
583-
}
584-
res
587+
.map(|res| match res {
588+
Err(ErrorFlavor::Spfs(Error::UnknownObject(_))) => Ok(true),
589+
Err(ErrorFlavor::Spfs(err)) => Err(err),
590+
Err(ErrorFlavor::Payload(err)) => Err(Error::String(err.to_string())),
591+
Ok(removed) => Ok(removed),
585592
})
586593
.map_ok(move |removed| (item, removed))
587594
.boxed();

crates/spfs/src/clean_test.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use tokio::time::sleep;
1313
use super::{Cleaner, TracingCleanReporter};
1414
use crate::encoding::prelude::*;
1515
use crate::fixtures::*;
16-
use crate::{Error, storage, tracking};
16+
use crate::{Error, PayloadError, storage, tracking};
1717

1818
#[rstest]
1919
#[tokio::test]
@@ -150,7 +150,7 @@ async fn test_clean_untagged_objects(#[future] tmprepo: TempRepo, tmpdir: tempfi
150150
continue;
151151
}
152152
let res = tmprepo.open_payload(node.entry.object).await;
153-
if let Err(Error::UnknownObject(_)) = res {
153+
if let Err(PayloadError::UnknownPayload(_)) = res {
154154
continue;
155155
}
156156
if let Err(err) = res {
@@ -264,7 +264,7 @@ async fn test_clean_on_repo_with_tag_namespace(
264264
continue;
265265
}
266266
let res = namespaced_tmprepo.open_payload(node.entry.object).await;
267-
if let Err(Error::UnknownObject(_)) = res {
267+
if let Err(PayloadError::UnknownPayload(_)) = res {
268268
continue;
269269
}
270270
if let Err(err) = res {
@@ -378,7 +378,7 @@ async fn test_clean_on_repo_with_tag_namespace_set(
378378
continue;
379379
}
380380
let res = namespaced_tmprepo.open_payload(node.entry.object).await;
381-
if let Err(Error::UnknownObject(_)) = res {
381+
if let Err(PayloadError::UnknownPayload(_)) = res {
382382
continue;
383383
}
384384
if let Err(err) = res {

crates/spfs/src/commit.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,10 @@ pub struct WriteToRepositoryBlobHasher<'repo> {
5353
#[tonic::async_trait]
5454
impl BlobHasher for WriteToRepositoryBlobHasher<'_> {
5555
async fn hash_blob(&self, reader: Pin<Box<dyn BlobRead>>) -> Result<encoding::Digest> {
56-
self.repo.commit_payload(reader).await
56+
self.repo
57+
.commit_payload(reader)
58+
.await
59+
.map_err(|err| Error::String(format!("commit_payload failed: {err}")))
5760
}
5861
}
5962

@@ -297,7 +300,7 @@ where
297300
.into_bytes();
298301
let reader =
299302
Box::pin(tokio::io::BufReader::new(std::io::Cursor::new(content)));
300-
self.repo.commit_payload(reader).await?
303+
self.repo.commit_payload(reader).await.map_err(|err| Error::String(format!("commit_payload failed: {err}")))?
301304
} else {
302305
let file = tokio::fs::File::open(&local_path).await.map_err(|err| {
303306
// TODO: add better message for file missing
@@ -308,7 +311,7 @@ where
308311
)
309312
})?;
310313
let reader = Box::pin(tokio::io::BufReader::new(file));
311-
self.repo.commit_payload(reader).await?
314+
self.repo.commit_payload(reader).await.map_err(|err| Error::String(format!("commit_payload failed: {err}")))?
312315
};
313316
if created != entry.object {
314317
return Err(Error::String(format!(

crates/spfs/src/error.rs

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,75 @@ impl OsError for std::io::Error {
388388

389389
pub type Result<T> = std::result::Result<T, Error>;
390390

391+
#[derive(Diagnostic, Debug, Clone, Error)]
392+
#[diagnostic(
393+
url(
394+
"https://spkenv.dev/error_codes#{}",
395+
self.code().unwrap_or_else(|| Box::new("spfs::generic"))
396+
)
397+
)]
398+
pub enum PayloadError {
399+
#[error("payload did not exist: {0}")]
400+
UnknownPayload(encoding::Digest),
401+
#[error("invalid digest: {0}")]
402+
InvalidDigest(String),
403+
#[error("storage read error from {0} at {1}: {2}")]
404+
StorageReadError(&'static str, std::path::PathBuf, #[source] Arc<io::Error>),
405+
#[error("storage write error from {0} at {1}: {2}")]
406+
StorageWriteError(&'static str, std::path::PathBuf, #[source] Arc<io::Error>),
407+
#[error("Cannot write to a repository which has been pinned in time")]
408+
RepositoryIsPinned,
409+
#[error("Error communicating with the server: {0:?}")]
410+
Tonic(Box<tonic::Status>),
411+
#[error("payload error: {0}")]
412+
String(String),
413+
}
414+
415+
impl PayloadError {
416+
/// Return true if this error suggests the call might succeed if retried on
417+
/// a secondary repository.
418+
#[inline]
419+
pub fn try_next_repo(&self) -> bool {
420+
matches!(self, PayloadError::UnknownPayload(_))
421+
}
422+
}
423+
424+
impl OsError for PayloadError {
425+
fn os_error(&self) -> Option<i32> {
426+
match self {
427+
#[cfg(unix)]
428+
Self::UnknownPayload(_) => Some(libc::ENOENT),
429+
#[cfg(windows)]
430+
Self::UnknownPayload(_) => {
431+
Some(windows::Win32::Foundation::ERROR_FILE_NOT_FOUND.0 as i32)
432+
}
433+
Self::StorageReadError(_, _, err) => err.os_error(),
434+
Self::StorageWriteError(_, _, err) => err.os_error(),
435+
_ => None,
436+
}
437+
}
438+
}
439+
440+
impl From<String> for PayloadError {
441+
fn from(err: String) -> Self {
442+
Self::String(err)
443+
}
444+
}
445+
446+
impl From<&str> for PayloadError {
447+
fn from(err: &str) -> Self {
448+
Self::String(err.to_string())
449+
}
450+
}
451+
452+
impl From<tonic::Status> for PayloadError {
453+
fn from(value: tonic::Status) -> Self {
454+
PayloadError::Tonic(Box::new(value))
455+
}
456+
}
457+
458+
pub type PayloadResult<T> = std::result::Result<T, PayloadError>;
459+
391460
#[derive(Diagnostic, Debug, Clone, Error)]
392461
#[diagnostic(
393462
url(
@@ -413,9 +482,9 @@ pub enum SyncError {
413482
#[error("manifest '{0}' could not be read: {1}")]
414483
ManifestReadError(Digest, Arc<Error>),
415484
#[error("payload '{0}' could not be read: {1}")]
416-
PayloadReadError(Digest, Arc<Error>),
485+
PayloadReadError(Digest, Arc<PayloadError>),
417486
#[error("payload '{0}' could not be written: {1}")]
418-
PayloadWriteError(Digest, Arc<Error>),
487+
PayloadWriteError(Digest, Arc<PayloadError>),
419488
#[error(
420489
"source repository provided payload that did not match the requested digest: wanted {0}, got {1}. wrote {2} bytes"
421490
)]

crates/spfs/src/lib.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,16 @@ pub use clean::Cleaner;
5050
pub use commit::Committer;
5151
pub use diff::{diff, diff_runtime_changes, runtime_active_changes};
5252
pub use encoding::Digest;
53-
pub use error::{Error, OsError, OsErrorExt, Result, SyncError, SyncResult};
53+
pub use error::{
54+
Error,
55+
OsError,
56+
OsErrorExt,
57+
PayloadError,
58+
PayloadResult,
59+
Result,
60+
SyncError,
61+
SyncResult,
62+
};
5463
pub use resolve::{
5564
RenderResult,
5665
compute_environment_manifest,

crates/spfs/src/proto/conversions.rs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
use std::convert::{TryFrom, TryInto};
66
use std::ops::Not;
77

8-
use crate::{Error, Result, encoding, graph, storage, tracking};
8+
use crate::{Error, PayloadError, PayloadResult, Result, encoding, graph, storage, tracking};
99

1010
pub(crate) fn convert_to_datetime(
1111
source: Option<super::DateTime>,
@@ -33,6 +33,10 @@ pub fn convert_digest(source: Option<super::Digest>) -> Result<encoding::Digest>
3333
Ok(*convert_digest_ref(source.as_ref())?)
3434
}
3535

36+
pub fn convert_payload_digest(source: Option<super::Digest>) -> PayloadResult<encoding::Digest> {
37+
convert_digest(source).map_err(|err| PayloadError::InvalidDigest(err.to_string()))
38+
}
39+
3640
impl TryFrom<super::Digest> for encoding::Digest {
3741
type Error = Error;
3842
fn try_from(source: super::Digest) -> Result<Self> {
@@ -180,6 +184,41 @@ impl From<super::Error> for Error {
180184
}
181185
}
182186

187+
impl From<PayloadError> for super::PayloadError {
188+
fn from(err: PayloadError) -> Self {
189+
let kind = Some(match err {
190+
PayloadError::UnknownPayload(digest) => {
191+
super::payload_error::Kind::UnknownPayload(super::UnknownPayloadError {
192+
message: digest.to_string(),
193+
})
194+
}
195+
PayloadError::String(message) => super::payload_error::Kind::Other(message),
196+
other => super::payload_error::Kind::Other(other.to_string()),
197+
});
198+
Self { kind }
199+
}
200+
}
201+
202+
impl From<super::PayloadError> for PayloadError {
203+
fn from(rpc: super::PayloadError) -> Self {
204+
match rpc.kind {
205+
Some(super::payload_error::Kind::UnknownPayload(rpc)) => {
206+
match crate::encoding::Digest::parse(&rpc.message) {
207+
Ok(digest) => PayloadError::UnknownPayload(digest),
208+
Err(_) => PayloadError::String(
209+
"Server reported UnknownPayload but did not provide a valid digest"
210+
.to_string(),
211+
),
212+
}
213+
}
214+
Some(super::payload_error::Kind::Other(message)) => PayloadError::String(message),
215+
None => {
216+
PayloadError::String("Server did not provide a payload error message".to_string())
217+
}
218+
}
219+
}
220+
}
221+
183222
impl<T: graph::ObjectProto> From<&graph::FlatObject<T>> for super::Object {
184223
fn from(value: &graph::FlatObject<T>) -> Self {
185224
Self {

crates/spfs/src/proto/defs/database.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,9 @@ message FindDigestsResponse {
4747
message IterDigestsRequest{}
4848
message IterDigestsResponse{
4949
oneof result {
50-
Error error = 1;
50+
// 1 was used
5151
Digest ok = 2;
52+
PayloadError error = 3;
5253
}
5354
}
5455

crates/spfs/src/proto/defs/error.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ message AmbiguousReferenceError {
1919
message InvalidReferenceError {
2020
string message = 1;
2121
}
22+
message UnknownPayloadError {
23+
string message = 1;
24+
}
2225

2326
message Error {
2427
oneof kind {
@@ -29,3 +32,10 @@ message Error {
2932
InvalidReferenceError InvalidReference = 5;
3033
}
3134
}
35+
36+
message PayloadError {
37+
oneof kind {
38+
string other = 1;
39+
UnknownPayloadError UnknownPayload = 2;
40+
}
41+
}

crates/spfs/src/proto/defs/payload.proto

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,17 @@ message HasPayloadResponse{
2020
// Error error = 1;
2121
// bool ok = 2;
2222
// }
23-
bool exists =3;
23+
bool exists = 3;
2424
}
2525

2626
message PayloadSizeRequest{
2727
Digest digest = 1;
2828
}
2929
message PayloadSizeResponse{
3030
oneof result {
31-
Error error = 1;
31+
// 1 was used
3232
uint64 ok = 2;
33+
PayloadError error = 3;
3334
}
3435
}
3536

@@ -46,13 +47,15 @@ message WritePayloadResponse{
4647
uint64 size = 2;
4748
}
4849
oneof result {
49-
Error error = 1;
50+
// 1 was used
5051
UploadResult ok = 2;
52+
PayloadError error = 3;
5153
}
5254
}
5355
oneof result {
54-
Error error = 1;
56+
// 1 was used
5557
UploadOption ok = 2;
58+
PayloadError error = 3;
5659
}
5760
}
5861

@@ -64,8 +67,9 @@ message OpenPayloadResponse{
6467
repeated string locations = 1;
6568
}
6669
oneof result {
67-
Error error = 1;
70+
// 1 was used
6871
DownloadOption ok = 2;
72+
PayloadError error = 3;
6973
}
7074
}
7175

@@ -74,8 +78,9 @@ message RemovePayloadRequest{
7478
}
7579
message RemovePayloadResponse{
7680
oneof result {
77-
Error error = 1;
81+
// 1 was used
7882
Ok ok = 2;
83+
PayloadError error = 3;
7984
}
8085
}
8186

@@ -85,8 +90,9 @@ message RemovePayloadIfOlderThanRequest{
8590
}
8691
message RemovePayloadIfOlderThanResponse{
8792
oneof result {
88-
Error error = 1;
93+
// 1 was used
8994
bool ok = 2;
95+
PayloadError error = 3;
9096
}
9197
}
9298

0 commit comments

Comments
 (0)