Skip to content

Commit 7ea387f

Browse files
committed
[1/N][VQueues] VQueue Identifier types
1 parent 3369991 commit 7ea387f

File tree

3 files changed

+168
-0
lines changed

3 files changed

+168
-0
lines changed

crates/types/src/clock/unique_timestamp.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,12 @@ impl UniqueTimestamp {
112112
// extract the logical clock
113113
self.0.get() & LC_MAX
114114
}
115+
116+
/// Calculates the number of milliseconds by which this timestamp is ahead of the other,
117+
/// or return 0 if the other timestamp is ahead.
118+
pub fn milliseconds_since(&self, other: Self) -> u64 {
119+
self.physical_raw().saturating_sub(other.physical_raw())
120+
}
115121
}
116122

117123
impl From<SystemTime> for UniqueTimestamp {
@@ -175,6 +181,8 @@ mod bilrost_encoding {
175181
}
176182
}
177183

184+
bilrost::empty_state_via_for_overwrite!(UniqueTimestamp);
185+
178186
bilrost::delegate_proxied_encoding!(
179187
use encoding (bilrost::encoding::Varint)
180188
to encode proxied type (UniqueTimestamp)

crates/types/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ pub mod state_mut;
5757
pub mod storage;
5858
pub mod time;
5959
pub mod timer;
60+
pub mod vqueue;
6061

6162
pub use id_util::IdResourceType;
6263
pub use node_id::*;

crates/types/src/vqueue.rs

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
2+
// All rights reserved.
3+
//
4+
// Use of this software is governed by the Business Source License
5+
// included in the LICENSE file.
6+
//
7+
// As of the Change Date specified in that file, in accordance with
8+
// the Business Source License, use of this software will be governed
9+
// by the Apache License, Version 2.0.
10+
11+
use std::num::NonZero;
12+
13+
use crate::identifiers::PartitionKey;
14+
15+
/// Queue parent identifies which configuration to use for a particular vqueue.
16+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
17+
#[repr(transparent)]
18+
pub struct VQueueParent(u32);
19+
20+
impl VQueueParent {
21+
#[inline]
22+
pub const fn from_raw(raw: u32) -> Self {
23+
Self(raw)
24+
}
25+
26+
#[inline]
27+
pub const fn as_u32(self) -> u32 {
28+
self.0
29+
}
30+
}
31+
32+
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
33+
pub enum VQueueInstance {
34+
/// The default instance is used when the queue is not sharded or when the shard
35+
/// is not defined.
36+
#[default]
37+
Default,
38+
Specific(NonZero<u32>),
39+
}
40+
41+
impl VQueueInstance {
42+
// cannot be const because map_or is not const in stable rust yet.
43+
#[inline]
44+
pub fn from_raw(raw: u32) -> Self {
45+
NonZero::new(raw).map_or(VQueueInstance::Default, VQueueInstance::Specific)
46+
}
47+
48+
#[inline]
49+
pub const fn as_u32(self) -> u32 {
50+
match self {
51+
Self::Default => 0,
52+
Self::Specific(n) => n.get(),
53+
}
54+
}
55+
}
56+
57+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
58+
pub struct VQueueId {
59+
// Identifies the configuration/parent of the vqueue
60+
pub parent: VQueueParent,
61+
// Key+Instance identify the individual queue.
62+
pub partition_key: PartitionKey,
63+
pub instance: VQueueInstance,
64+
}
65+
66+
impl VQueueId {
67+
#[inline]
68+
pub fn new(
69+
parent: VQueueParent,
70+
partition_key: PartitionKey,
71+
instance: VQueueInstance,
72+
) -> Self {
73+
Self {
74+
parent,
75+
partition_key,
76+
instance,
77+
}
78+
}
79+
}
80+
81+
// needed when using hashbrown's entry_ref API to convert the key reference to a value
82+
// lazily when inserting into the map.
83+
impl From<&VQueueId> for VQueueId {
84+
fn from(value: &VQueueId) -> Self {
85+
*value
86+
}
87+
}
88+
89+
#[derive(
90+
Debug,
91+
Default,
92+
Clone,
93+
Copy,
94+
Ord,
95+
PartialOrd,
96+
PartialEq,
97+
Eq,
98+
Hash,
99+
bilrost::Enumeration,
100+
strum::FromRepr,
101+
)]
102+
#[repr(u8)]
103+
pub enum EffectivePriority {
104+
/// Exclusively for wake-ups that hold tokens already. All other wake-ups will
105+
/// continue to run with their original priority.
106+
///
107+
/// This is crucial to ensure that when we release our token back to the pool that it gets
108+
/// picked up again by the scheduler and we can re-acquire it.
109+
TokenHeld = 0, // Resuming with held concurrency token
110+
/// High priority
111+
Started = 1, // Resuming (started before) with no concurrency token
112+
/// System high priority (new)
113+
System = 2,
114+
/// User-defined high-priority
115+
UserHigh = 3,
116+
/// User-defined low priority
117+
#[default]
118+
UserDefault = 4,
119+
}
120+
121+
impl EffectivePriority {
122+
pub const NUM_PRIORITIES: usize = 5;
123+
124+
/// Whether this entry has never been started or not
125+
pub fn is_new(&self) -> bool {
126+
*self > EffectivePriority::Started
127+
}
128+
129+
pub fn token_held(&self) -> bool {
130+
*self == EffectivePriority::TokenHeld
131+
}
132+
133+
pub fn has_started(&self) -> bool {
134+
*self < EffectivePriority::System
135+
}
136+
}
137+
138+
/// Priorities for entries in the vqueue when inserting new entries
139+
#[derive(Debug, Default, Clone, Copy, Ord, PartialOrd, PartialEq, Eq)]
140+
#[repr(u8)]
141+
pub enum NewEntryPriority {
142+
/// System high priority
143+
System = 2,
144+
/// Default priority
145+
UserHigh = 3,
146+
#[default]
147+
UserDefault = 4,
148+
}
149+
150+
impl From<NewEntryPriority> for EffectivePriority {
151+
#[inline(always)]
152+
fn from(value: NewEntryPriority) -> Self {
153+
match value {
154+
NewEntryPriority::System => EffectivePriority::System,
155+
NewEntryPriority::UserHigh => EffectivePriority::UserHigh,
156+
NewEntryPriority::UserDefault => EffectivePriority::UserDefault,
157+
}
158+
}
159+
}

0 commit comments

Comments
 (0)