Skip to content

Commit 92253d3

Browse files
committed
[2/N][VQueues] Storage API
1 parent 21f4e1e commit 92253d3

File tree

9 files changed

+857
-9
lines changed

9 files changed

+857
-9
lines changed

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/storage-api/Cargo.toml

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,22 @@ restate-workspace-hack = { workspace = true }
1515

1616
restate-types = { workspace = true }
1717

18-
prost = { workspace = true }
19-
prost-types = { workspace = true }
18+
ahash = { workspace = true }
2019
anyhow = { workspace = true }
20+
bilrost = { workspace = true, features = ["smallvec"] }
2121
bytes = { workspace = true }
2222
bytestring = { workspace = true }
2323
derive_more = { workspace = true }
2424
futures = { workspace = true }
2525
num-traits = { workspace = true }
26+
opentelemetry = { workspace = true }
27+
prost = { workspace = true }
28+
prost-types = { workspace = true }
29+
rangemap = { workspace = true }
2630
serde = { workspace = true }
31+
smallvec = { workspace = true }
2732
strum = { workspace = true }
2833
thiserror = { workspace = true }
29-
rangemap = { workspace = true }
30-
opentelemetry = { workspace = true }
3134

3235
[build-dependencies]
33-
prost-build = { workspace = true }
36+
prost-build = { workspace = true }

crates/storage-api/src/lib.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@ use std::future::Future;
1515
pub enum StorageError {
1616
#[error("generic storage error: {0}")]
1717
Generic(#[from] anyhow::Error),
18-
#[error("failed to convert Rust objects to/from protobuf: {0}")]
18+
#[error("failed to convert Rust objects to/from serialized format: {0}")]
1919
Conversion(anyhow::Error),
20+
#[error("cannot decode bilrost-encoded payload: {0}")]
21+
BilrostDecode(#[from] bilrost::DecodeError),
2022
#[error("integrity constraint is violated")]
2123
DataIntegrityError,
2224
#[error("operational error that can be caused during a graceful shutdown")]
@@ -27,7 +29,7 @@ pub enum StorageError {
2729
PreconditionFailed(anyhow::Error),
2830
}
2931

30-
pub type Result<T> = std::result::Result<T, StorageError>;
32+
pub type Result<T, E = StorageError> = std::result::Result<T, E>;
3133

3234
pub mod deduplication_table;
3335
pub mod fsm_table;
@@ -43,6 +45,7 @@ pub mod protobuf_types;
4345
pub mod service_status_table;
4446
pub mod state_table;
4547
pub mod timer_table;
48+
pub mod vqueue_table;
4649

4750
/// Isolation level of a storage transaction
4851
#[derive(Debug, Default)]
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
2+
// All rights reserved.
3+
//
4+
// Use of this software is governed by the Business Source License
5+
// included in the LICENSE file.
6+
//
7+
// As of the Change Date specified in that file, in accordance with
8+
// the Business Source License, use of this software will be governed
9+
// by the Apache License, Version 2.0.
10+
11+
use bytes::{Buf, BufMut};
12+
13+
use restate_types::clock::UniqueTimestamp;
14+
use restate_types::identifiers::InvocationId;
15+
use restate_types::vqueue::{
16+
EffectivePriority, NewEntryPriority, VQueueId, VQueueInstance, VQueueParent,
17+
};
18+
19+
use crate::StorageError;
20+
21+
use super::{Stage, VisibleAt};
22+
23+
thread_local! {
24+
// arbitrary seeds, safe to change since we don't use hashes in storage
25+
static HASHER: ahash::RandomState = const { ahash::RandomState::with_seeds(1232134512, 14, 82334, 988889) };
26+
}
27+
28+
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, strum::FromRepr)]
29+
#[repr(u8)]
30+
pub enum EntryKind {
31+
Unknown = 0,
32+
Invocation = b'i', // 0x69
33+
StateMutation = b's', // 0x73
34+
}
35+
36+
// Using u128 would have added an extra unnecessary 8 bytes due to alignment
37+
// requirements (u128 is 0x10 aligned and it forces the struct to be 0x10 aligned)
38+
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
39+
pub struct EntryId([u8; 16]);
40+
41+
impl EntryId {
42+
#[inline]
43+
pub const fn new(id: [u8; 16]) -> Self {
44+
Self(id)
45+
}
46+
47+
#[inline]
48+
pub const fn as_bytes(&self) -> &[u8; 16] {
49+
&self.0
50+
}
51+
52+
#[inline]
53+
pub const fn to_bytes(self) -> [u8; 16] {
54+
self.0
55+
}
56+
57+
#[inline]
58+
pub const fn from_bytes(bytes: [u8; 16]) -> Self {
59+
Self(bytes)
60+
}
61+
}
62+
63+
impl From<&InvocationId> for EntryId {
64+
#[inline]
65+
fn from(id: &InvocationId) -> Self {
66+
Self::from_bytes(id.invocation_uuid().to_bytes())
67+
}
68+
}
69+
70+
impl From<InvocationId> for EntryId {
71+
#[inline]
72+
fn from(id: InvocationId) -> Self {
73+
Self::from_bytes(id.invocation_uuid().to_bytes())
74+
}
75+
}
76+
77+
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
78+
pub struct EntryCard {
79+
pub priority: EffectivePriority,
80+
pub visible_at: VisibleAt,
81+
/// The unique timestamp of the initial creation of the entry.
82+
pub created_at: UniqueTimestamp,
83+
pub kind: EntryKind,
84+
pub id: EntryId,
85+
}
86+
87+
impl EntryCard {
88+
pub const fn serialized_length() -> usize {
89+
// priority
90+
std::mem::size_of::<EffectivePriority>()
91+
// visible at
92+
+ std::mem::size_of::<VisibleAt>()
93+
// created_at
94+
+ std::mem::size_of::<UniqueTimestamp>()
95+
// entry kind
96+
+ std::mem::size_of::<EntryKind>()
97+
// entry id
98+
+ std::mem::size_of::<EntryId>()
99+
}
100+
101+
/// A unique hash of the entry card.
102+
///
103+
/// Do not use this for any stored data as it changes across version/restarts.
104+
#[inline(always)]
105+
pub fn unique_hash(&self) -> u64 {
106+
HASHER.with(|hasher| hasher.hash_one(self))
107+
}
108+
109+
pub fn new(
110+
priority: NewEntryPriority,
111+
visible_at: VisibleAt,
112+
created_at: UniqueTimestamp,
113+
kind: EntryKind,
114+
id: EntryId,
115+
) -> Self {
116+
Self {
117+
priority: EffectivePriority::from(priority),
118+
visible_at,
119+
created_at,
120+
kind,
121+
id,
122+
}
123+
}
124+
125+
pub fn encode<B: BufMut>(&self, target: &mut B) {
126+
target.put_u8(self.priority as u8);
127+
target.put_u64(self.visible_at.as_u64());
128+
target.put_u64(self.created_at.as_u64());
129+
target.put_u8(self.kind as u8);
130+
target.put_slice(&self.id.0);
131+
}
132+
133+
pub fn decode<B: Buf>(source: &mut B) -> crate::Result<Self> {
134+
if source.remaining() < Self::serialized_length() {
135+
return Err(StorageError::Generic(anyhow::anyhow!(
136+
"Not enough bytes to decode an EntryCard"
137+
)));
138+
}
139+
140+
let p = source.get_u8();
141+
let priority = EffectivePriority::from_repr(p).ok_or_else(|| {
142+
StorageError::Conversion(anyhow::anyhow!("Wrong value for EffectivePriority: {p}"))
143+
})?;
144+
let visible_at = VisibleAt::from_raw(source.get_u64());
145+
let created_at = UniqueTimestamp::try_from(source.get_u64())
146+
.map_err(|e| StorageError::Conversion(e.into()))?;
147+
let k = source.get_u8();
148+
let kind = EntryKind::from_repr(k).ok_or_else(|| {
149+
StorageError::Generic(anyhow::anyhow!("Wrong value for EntryKind: {k}"))
150+
})?;
151+
152+
let mut buf = [0u8; 16];
153+
source.copy_to_slice(&mut buf);
154+
let entry_id = EntryId::from_bytes(buf);
155+
156+
Ok(Self {
157+
priority,
158+
visible_at,
159+
created_at,
160+
kind,
161+
id: entry_id,
162+
})
163+
}
164+
}
165+
166+
pub trait AsEntryStateHeader {
167+
fn kind(&self) -> EntryKind;
168+
fn stage(&self) -> Stage;
169+
fn queue_parent(&self) -> VQueueParent;
170+
fn queue_instance(&self) -> VQueueInstance;
171+
fn vqueue_id(&self) -> VQueueId;
172+
fn current_entry_card(&self) -> EntryCard;
173+
}
174+
175+
pub trait AsEntryState: AsEntryStateHeader {
176+
type State;
177+
178+
fn state(&self) -> &Self::State;
179+
}
180+
181+
pub trait EntryStateKind: Send {
182+
const KIND: EntryKind;
183+
}
184+
185+
impl EntryStateKind for () {
186+
const KIND: EntryKind = EntryKind::Unknown;
187+
}

0 commit comments

Comments
 (0)