@@ -14,7 +14,11 @@ use anyhow::anyhow;
1414use bytes:: { Buf , BufMut , Bytes , BytesMut } ;
1515use bytestring:: ByteString ;
1616use prost:: encoding:: encoded_len_varint;
17+ use rocksdb:: MergeOperands ;
1718use strum:: EnumIter ;
19+ use tracing:: { error, trace} ;
20+
21+ use restate_types:: clock:: UniqueTimestamp ;
1822
1923/// Every table key needs to have a key kind. This allows to multiplex different keys in the same
2024/// column family and to evolve a key if necessary.
@@ -42,6 +46,17 @@ pub enum KeyKind {
4246 State ,
4347 Timers ,
4448 Promise ,
49+ // VQueues --> owned by restate-vqueues
50+ //
51+ // todo: split this into empty and non-empty, or add the status in the key prefix
52+ // for instance, make this VQueueStatus (S | VQueueId)
53+ // or have empty_vqueues that carry the empty_since in their key prefix. Note that
54+ // doing so would require that we know the empty_since when we attempt to delete it
55+ VQueueActive ,
56+ VQueueInbox ,
57+ VQueueMeta ,
58+ // Resources' canonical key(s)
59+ VQueueEntryState ,
4560}
4661
4762impl KeyKind {
@@ -86,6 +101,13 @@ impl KeyKind {
86101 KeyKind :: State => b"st" ,
87102 KeyKind :: Timers => b"ti" ,
88103 KeyKind :: Promise => b"pr" ,
104+ // ** VQueues ** //
105+ // VQueues own all keys that start with b"q".
106+ KeyKind :: VQueueActive => b"qa" ,
107+ KeyKind :: VQueueInbox => b"qi" ,
108+ KeyKind :: VQueueMeta => b"qm" ,
109+ // Queue Entry State (canonical state of vqueue entries)
110+ KeyKind :: VQueueEntryState => b"qe" ,
89111 }
90112 }
91113
@@ -115,6 +137,11 @@ impl KeyKind {
115137 b"st" => Some ( KeyKind :: State ) ,
116138 b"ti" => Some ( KeyKind :: Timers ) ,
117139 b"pr" => Some ( KeyKind :: Promise ) ,
140+ // VQueues own all keys that start with b"q"
141+ b"qa" => Some ( KeyKind :: VQueueActive ) ,
142+ b"qi" => Some ( KeyKind :: VQueueInbox ) ,
143+ b"qm" => Some ( KeyKind :: VQueueMeta ) ,
144+ b"qe" => Some ( KeyKind :: VQueueEntryState ) ,
118145 _ => None ,
119146 }
120147 }
@@ -134,6 +161,50 @@ impl KeyKind {
134161 Self :: from_bytes ( & bytes)
135162 . ok_or_else ( || StorageError :: Generic ( anyhow:: anyhow!( "unknown key kind: {:x?}" , bytes) ) )
136163 }
164+
165+ // Rocksdb merge operator function (full merge)
166+ pub fn full_merge (
167+ key : & [ u8 ] ,
168+ existing_val : Option < & [ u8 ] > ,
169+ operands : & MergeOperands ,
170+ ) -> Option < Vec < u8 > > {
171+ let mut kind_buf = key;
172+ let kind = match KeyKind :: deserialize ( & mut kind_buf) {
173+ Ok ( kind) => kind,
174+ Err ( e) => {
175+ error ! ( "Cannot apply merge operator; {e}" ) ;
176+ return None ;
177+ }
178+ } ;
179+ trace ! ( ?kind, "full merge" ) ;
180+
181+ match kind {
182+ KeyKind :: VQueueMeta => vqueue_meta_merge:: full_merge ( key, existing_val, operands) ,
183+ _ => None ,
184+ }
185+ }
186+
187+ // Rocksdb merge operator function (partial merge)
188+ pub fn partial_merge (
189+ key : & [ u8 ] ,
190+ _unused : Option < & [ u8 ] > ,
191+ operands : & MergeOperands ,
192+ ) -> Option < Vec < u8 > > {
193+ let mut kind_buf = key;
194+ let kind = match KeyKind :: deserialize ( & mut kind_buf) {
195+ Ok ( kind) => kind,
196+ Err ( e) => {
197+ error ! ( "Cannot apply merge operator; {e}" ) ;
198+ return None ;
199+ }
200+ } ;
201+ trace ! ( ?kind, "partial merge" ) ;
202+
203+ match kind {
204+ KeyKind :: VQueueMeta => vqueue_meta_merge:: partial_merge ( key, operands) ,
205+ _ => None ,
206+ }
207+ }
137208}
138209
139210pub trait TableKey : Sized + std:: fmt:: Debug + Send + ' static {
@@ -350,6 +421,7 @@ macro_rules! define_table_key {
350421
351422use crate :: PaddedPartitionId ;
352423use crate :: TableKind ;
424+ use crate :: vqueue_table:: vqueue_meta_merge;
353425pub ( crate ) use define_table_key;
354426use restate_storage_api:: StorageError ;
355427use restate_storage_api:: deduplication_table:: ProducerId ;
@@ -411,6 +483,41 @@ impl KeyCodec for PaddedPartitionId {
411483 }
412484}
413485
486+ impl KeyCodec for UniqueTimestamp {
487+ fn encode < B : BufMut > ( & self , target : & mut B ) {
488+ // store u64 in big-endian order to support byte-wise increment operation. See `crate::scan::try_increment`.
489+ target. put_u64 ( self . as_u64 ( ) ) ;
490+ }
491+
492+ fn decode < B : Buf > ( source : & mut B ) -> crate :: Result < Self > {
493+ UniqueTimestamp :: try_from ( source. get_u64 ( ) ) . map_err ( |e| StorageError :: Conversion ( e. into ( ) ) )
494+ }
495+
496+ fn serialized_length ( & self ) -> usize {
497+ std:: mem:: size_of :: < Self > ( )
498+ }
499+ }
500+
501+ impl < const L : usize > KeyCodec for [ u8 ; L ] {
502+ fn encode < B : BufMut > ( & self , target : & mut B ) {
503+ // stores the array as is.
504+ target. put_slice ( self . as_ref ( ) ) ;
505+ }
506+
507+ fn decode < B : Buf > ( source : & mut B ) -> crate :: Result < Self > {
508+ if source. remaining ( ) < L {
509+ return Err ( StorageError :: DataIntegrityError ) ;
510+ }
511+ let mut buf = [ 0u8 ; L ] ;
512+ source. copy_to_slice ( & mut buf) ;
513+ Ok ( buf)
514+ }
515+
516+ fn serialized_length ( & self ) -> usize {
517+ L
518+ }
519+ }
520+
414521impl KeyCodec for u64 {
415522 fn encode < B : BufMut > ( & self , target : & mut B ) {
416523 // store u64 in big-endian order to support byte-wise increment operation. See `crate::scan::try_increment`.
0 commit comments