diff --git a/crates/partition-store/src/partition_db.rs b/crates/partition-store/src/partition_db.rs index 09e6baaa33..501be266dd 100644 --- a/crates/partition-store/src/partition_db.rs +++ b/crates/partition-store/src/partition_db.rs @@ -30,7 +30,7 @@ use restate_types::partitions::{CfName, Partition}; use crate::durable_lsn_tracking::{AppliedLsnCollectorFactory, DurableLsnEventListener}; use crate::memory::MemoryBudget; -use crate::snapshots::LocalPartitionSnapshot; +use crate::snapshots::{ArchivedLsn, LocalPartitionSnapshot}; type SmartString = smartstring::SmartString; @@ -38,7 +38,7 @@ type SmartString = smartstring::SmartString; pub struct PartitionDb { meta: Arc, durable_lsn: watch::Sender>, - archived_lsn: watch::Sender>, + archived_lsn: watch::Sender>, // Note: Rust will drop the fields in the order they are declared in the struct. // It's crucial to keep the column family and the database in this exact order. cf: PartitionBoundCfHandle, @@ -48,7 +48,7 @@ pub struct PartitionDb { impl PartitionDb { pub(crate) fn new( meta: Arc, - archived_lsn: watch::Sender>, + archived_lsn: watch::Sender>, rocksdb: Arc, cf: Arc>, ) -> Self { @@ -94,10 +94,10 @@ impl PartitionDb { .await } - pub(crate) fn note_archived_lsn(&self, lsn: Lsn) -> bool { + pub(crate) fn note_archived_lsn(&self, archived_lsn: ArchivedLsn) -> bool { self.archived_lsn.send_if_modified(|current| { - if current.is_none_or(|c| lsn > c) { - *current = Some(lsn); + if current.as_mut().is_none_or(|c| &archived_lsn > c) { + *current = Some(archived_lsn); true } else { false @@ -106,11 +106,11 @@ impl PartitionDb { } /// The last (locally) known archived LSN for this partition - pub fn get_archived_lsn(&self) -> Option { + pub fn get_archived_lsn(&self) -> Option { *self.archived_lsn.borrow() } - pub fn watch_archived_lsn(&self) -> watch::Receiver> { + pub fn watch_archived_lsn(&self) -> watch::Receiver> { self.archived_lsn.subscribe() } @@ -171,7 +171,7 @@ impl PartitionBoundCfHandle { pub(crate) struct PartitionCell { meta: Arc, - archived_lsn: watch::Sender>, + archived_lsn: watch::Sender>, durable_lsn: RwLock>>>, pub(crate) inner: AsyncRwLock, } diff --git a/crates/partition-store/src/partition_store_manager.rs b/crates/partition-store/src/partition_store_manager.rs index 2e67946191..6374a51823 100644 --- a/crates/partition-store/src/partition_store_manager.rs +++ b/crates/partition-store/src/partition_store_manager.rs @@ -26,7 +26,7 @@ use restate_types::partitions::Partition; use crate::SnapshotError; use crate::memory::MemoryController; use crate::partition_db::{AllDataCf, PartitionCell, PartitionDb, RocksConfigurator}; -use crate::snapshots::{LocalPartitionSnapshot, Snapshots}; +use crate::snapshots::{ArchivedLsn, LocalPartitionSnapshot, Snapshots}; use crate::{BuildError, OpenError, PartitionStore, SnapshotErrorKind}; const PARTITION_CF_PREFIX: &str = "data-"; @@ -183,7 +183,10 @@ impl PartitionStoreManager { self.snapshots.is_repository_configured() } - pub async fn refresh_latest_archived_lsn(&self, partition_id: PartitionId) -> Option { + pub async fn refresh_latest_archived_lsn( + &self, + partition_id: PartitionId, + ) -> Option { let db = self.get_partition_db(partition_id).await?; self.snapshots.refresh_latest_archived_lsn(db).await } diff --git a/crates/partition-store/src/snapshots.rs b/crates/partition-store/src/snapshots.rs index c4902d1db1..5a7414d2d2 100644 --- a/crates/partition-store/src/snapshots.rs +++ b/crates/partition-store/src/snapshots.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use crate::{PartitionDb, PartitionStore, SnapshotError, SnapshotErrorKind}; pub use self::metadata::*; -pub use self::repository::SnapshotRepository; +pub use self::repository::{ArchivedLsn, SnapshotRepository}; pub use self::snapshot_task::*; use tokio::sync::Semaphore; @@ -26,7 +26,7 @@ use tracing::{debug, instrument, warn}; use restate_types::config::Configuration; use restate_types::identifiers::{PartitionId, SnapshotId}; -use restate_types::logs::{Lsn, SequenceNumber}; +use restate_types::logs::Lsn; #[derive(Clone)] pub struct Snapshots { @@ -85,7 +85,7 @@ impl Snapshots { }) } - pub async fn refresh_latest_archived_lsn(&self, db: PartitionDb) -> Option { + pub async fn refresh_latest_archived_lsn(&self, db: PartitionDb) -> Option { let Some(repository) = &self.repository else { return None; }; @@ -94,10 +94,9 @@ impl Snapshots { let archived_lsn = repository .get_latest_archived_lsn(partition_id) .await - .inspect(|lsn| debug!(?partition_id, "Latest archived LSN: {}", lsn)) .inspect_err(|err| warn!(?partition_id, "Unable to get latest archived LSN: {}", err)) .ok() - .unwrap_or(Lsn::INVALID); + .unwrap_or(ArchivedLsn::None); db.note_archived_lsn(archived_lsn); Some(archived_lsn) } diff --git a/crates/partition-store/src/snapshots/repository.rs b/crates/partition-store/src/snapshots/repository.rs index 2a6e8a2b14..73a46ebb32 100644 --- a/crates/partition-store/src/snapshots/repository.rs +++ b/crates/partition-store/src/snapshots/repository.rs @@ -11,6 +11,7 @@ use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; +use std::time::{Duration, SystemTime}; use anyhow::{Context, anyhow, bail}; use bytes::BytesMut; @@ -152,6 +153,54 @@ impl LatestSnapshot { } } +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] +pub enum ArchivedLsn { + None, + Snapshot { + // Ordering is intentional: LSN takes priority over elapsed wall clock time for comparisons + min_applied_lsn: Lsn, + created_at: SystemTime, + }, +} + +impl ArchivedLsn { + pub fn get_min_applied_lsn(&self) -> Lsn { + match self { + ArchivedLsn::None => Lsn::INVALID, + ArchivedLsn::Snapshot { + min_applied_lsn, .. + } => *min_applied_lsn, + } + } + + pub fn get_age(&self) -> Duration { + match self { + ArchivedLsn::None => Duration::MAX, + ArchivedLsn::Snapshot { created_at, .. } => SystemTime::now() + .duration_since(*created_at) + .unwrap_or_default(), // zero if created-at is earlier than current system time + } + } +} + +impl From<&LatestSnapshot> for ArchivedLsn { + fn from(latest: &LatestSnapshot) -> Self { + ArchivedLsn::Snapshot { + min_applied_lsn: latest.min_applied_lsn, + created_at: latest.created_at.into(), + } + } +} + +impl From<&PartitionSnapshotMetadata> for ArchivedLsn { + fn from(metadata: &PartitionSnapshotMetadata) -> Self { + ArchivedLsn::Snapshot { + min_applied_lsn: metadata.min_applied_lsn, + created_at: metadata.created_at.into(), + } + } +} + struct UniqueSnapshotKey { lsn: Lsn, snapshot_id: SnapshotId, @@ -542,15 +591,17 @@ impl SnapshotRepository { } /// Retrieve the latest known LSN to be archived to the snapshot repository. - /// Response of `Ok(Lsn::INVALID)` indicates no existing snapshot for the partition. - pub async fn get_latest_archived_lsn(&self, partition_id: PartitionId) -> anyhow::Result { + pub async fn get_latest_archived_lsn( + &self, + partition_id: PartitionId, + ) -> anyhow::Result { let latest_path = self.get_latest_snapshot_pointer(partition_id); let latest = match self.object_store.get(&latest_path).await { Ok(result) => result, Err(object_store::Error::NotFound { .. }) => { debug!("Latest snapshot data not found in repository"); - return Ok(Lsn::INVALID); + return Ok(ArchivedLsn::None); } Err(e) => { return Err(anyhow::Error::new(e).context(format!( @@ -562,7 +613,7 @@ impl SnapshotRepository { let latest: LatestSnapshot = serde_json::from_slice(&latest.bytes().await?)?; debug!(partition_id = %partition_id, snapshot_id = %latest.snapshot_id, "Latest snapshot metadata: {:?}", latest); - Ok(latest.min_applied_lsn) + Ok(ArchivedLsn::from(&latest)) } async fn get_latest_snapshot_metadata_for_update( diff --git a/crates/partition-store/src/snapshots/snapshot_task.rs b/crates/partition-store/src/snapshots/snapshot_task.rs index 5988b40eb2..4a7069f49d 100644 --- a/crates/partition-store/src/snapshots/snapshot_task.rs +++ b/crates/partition-store/src/snapshots/snapshot_task.rs @@ -24,6 +24,7 @@ use super::{ SnapshotFormatVersion, SnapshotRepository, }; use crate::PartitionStoreManager; +use crate::snapshots::ArchivedLsn; /// Creates a partition store snapshot along with Restate snapshot metadata. pub struct SnapshotPartitionTask { @@ -91,7 +92,7 @@ impl SnapshotPartitionTask { .get_partition_db(self.partition_id) .await { - db.note_archived_lsn(metadata.min_applied_lsn); + db.note_archived_lsn(ArchivedLsn::from(&metadata)); } Ok(metadata) diff --git a/crates/types/src/config/worker.rs b/crates/types/src/config/worker.rs index d9e8a7db6a..b07701c0e0 100644 --- a/crates/types/src/config/worker.rs +++ b/crates/types/src/config/worker.rs @@ -434,11 +434,12 @@ impl Default for StorageOptions { } } -/// # Snapshot options. +/// # Snapshot options /// -/// Partition store snapshotting settings. At a minimum, set `destination` and -/// `snapshot-interval-num-records` to enable snapshotting. For a complete example, see -/// [Snapshots](https://docs.restate.dev/operate/snapshots). +/// Partition store object-store snapshotting settings. At a minimum, set `destination` to enable +/// manual snapshotting via `restatectl`. Additionally, `snapshot-interval` and +/// `snapshot-interval-num-records` can be used to configure automated periodic snapshots. For a +/// complete example, see [Snapshots](https://docs.restate.dev/operate/snapshots). #[serde_as] #[derive(Debug, Clone, Serialize, Deserialize, derive_builder::Builder)] #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] @@ -448,20 +449,35 @@ impl Default for StorageOptions { pub struct SnapshotsOptions { /// # Snapshot destination URL /// - /// Base URL for cluster snapshots. Supports `s3://` and `file://` protocol scheme. + /// Base URL for cluster snapshots. Currently only supports the `s3://` protocol scheme. /// S3-compatible object stores must support ETag-based conditional writes. /// /// Default: `None` pub destination: Option, - /// # Automatic snapshot creation frequency + /// # Automatic snapshot time interval + /// + /// A time interval at which partition snapshots will be created. If + /// `snapshot-interval-num-records` is also set, it will be treated as an additional requirement + /// before a snapshot is taken. Use both time-based and record-based intervals to reduce the + /// number of snapshots created during times of low activity. + /// + /// Snapshot intervals are calculated based on the wall clock timestamps reported by cluster + /// nodes, assuming a basic level of clock synchronization within the cluster. + /// + /// This setting does not influence explicitly requested snapshots triggered using `restatectl`. + /// + /// Default: `None` - automatic snapshots are disabled + #[serde(skip_serializing_if = "Option::is_none", default)] + pub snapshot_interval: Option, + + /// # Automatic snapshot minimum records /// /// Number of log records that trigger a snapshot to be created. /// /// As snapshots are created asynchronously, the actual number of new records that will trigger /// a snapshot will vary. The counter for the subsequent snapshot begins from the LSN at which - /// the previous snapshot export was initiated. Only leader Partition Processors will take - /// snapshots for a given partition. + /// the previous snapshot export was initiated. /// /// This setting does not influence explicitly requested snapshots triggered using `restatectl`. /// @@ -481,6 +497,7 @@ impl Default for SnapshotsOptions { fn default() -> Self { Self { destination: None, + snapshot_interval: None, snapshot_interval_num_records: None, object_store: Default::default(), object_store_retry_policy: Self::default_retry_policy(), diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index a38e6413cc..60b75c0ece 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -126,7 +126,8 @@ impl Worker { SubscriptionControllerHandle::new(ingress_kafka.create_command_sender()); let snapshots_options = &config.worker.snapshots; - if snapshots_options.snapshot_interval_num_records.is_some() + if (snapshots_options.snapshot_interval.is_some() + || snapshots_options.snapshot_interval_num_records.is_some()) && snapshots_options.destination.is_none() { return Err(BuildError::SnapshotRepository(anyhow::anyhow!( diff --git a/crates/worker/src/partition/leadership/durability_tracker.rs b/crates/worker/src/partition/leadership/durability_tracker.rs index 7a9a6b8a8c..a59200c4a8 100644 --- a/crates/worker/src/partition/leadership/durability_tracker.rs +++ b/crates/worker/src/partition/leadership/durability_tracker.rs @@ -13,6 +13,7 @@ use std::task::Poll; use std::time::Duration; use futures::{Stream, StreamExt}; +use restate_partition_store::snapshots::ArchivedLsn; use tokio::sync::watch; use tokio::time::{Instant, MissedTickBehavior}; use tokio_stream::wrappers::{IntervalStream, WatchStream}; @@ -39,7 +40,7 @@ pub struct DurabilityTracker { partition_id: PartitionId, last_reported_durable_lsn: Lsn, replica_set_states: PartitionReplicaSetStates, - archived_lsn_watch: WatchStream>, + archived_lsn_watch: WatchStream>, check_timer: IntervalStream, last_warning_at: Instant, /// cache of the last archived_lsn @@ -52,7 +53,7 @@ impl DurabilityTracker { partition_id: PartitionId, last_reported_durable_lsn: Option, replica_set_states: PartitionReplicaSetStates, - archived_lsn_watch: watch::Receiver>, + archived_lsn_watch: watch::Receiver>, check_interval: Duration, ) -> Self { let mut check_timer = @@ -166,7 +167,9 @@ impl Stream for DurabilityTracker { self.terminated = true; return Poll::Ready(None); } - (Poll::Ready(Some(archived)), _) => archived.unwrap_or(Lsn::INVALID), + (Poll::Ready(Some(archived)), _) => archived + .map(|a| a.get_min_applied_lsn()) + .unwrap_or(Lsn::INVALID), (_, Poll::Ready(_)) => self.last_archived, (Poll::Pending, Poll::Pending) => return Poll::Pending, }; diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index fd20b75b68..13f67d797b 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -49,7 +49,7 @@ use restate_metadata_server::{MetadataStoreClient, ReadModifyWriteError}; use restate_metadata_store::{ReadWriteError, RetryError, retry_on_retryable_error}; use restate_partition_store::PartitionStoreManager; use restate_partition_store::snapshots::{ - PartitionSnapshotMetadata, SnapshotPartitionTask, SnapshotRepository, + ArchivedLsn, PartitionSnapshotMetadata, SnapshotPartitionTask, SnapshotRepository, }; use restate_partition_store::{SnapshotError, SnapshotErrorKind}; use restate_time_util::DurationExt; @@ -123,7 +123,7 @@ pub struct PartitionProcessorManager { replica_set_states: PartitionReplicaSetStates, target_tail_lsns: HashMap, - archived_lsns: HashMap, + archived_lsns: HashMap, invokers_status_reader: MultiplexedInvokerStatusReader, asynchronous_operations: JoinSet, @@ -806,7 +806,8 @@ impl PartitionProcessorManager { // it is a bit unfortunate that we share PartitionProcessorStatus between the // PP and the PPManager :-(. Maybe at some point we want to split the struct for it. - status.last_archived_log_lsn = self.archived_lsns.get(partition_id).cloned(); + let archived_lsn = self.archived_lsns.get(partition_id); + status.last_archived_log_lsn = archived_lsn.map(|a| a.get_min_applied_lsn()); let current_tail_lsn = self.target_tail_lsns.get(partition_id).cloned(); let target_tail_lsn = if current_tail_lsn > status.target_tail_lsn { @@ -937,7 +938,7 @@ impl PartitionProcessorManager { let (partition_id, response) = match result { Ok(metadata) => { self.archived_lsns - .insert(metadata.partition_id, metadata.min_applied_lsn); + .insert(metadata.partition_id, ArchivedLsn::from(&metadata)); let response = SnapshotCreated::from(&metadata); self.latest_snapshots @@ -971,13 +972,10 @@ impl PartitionProcessorManager { return; }; - let Some(records_per_snapshot) = self - .updateable_config - .live_load() - .worker - .snapshots - .snapshot_interval_num_records - else { + let snapshots_options = &self.updateable_config.live_load().worker.snapshots; + let snapshot_interval = snapshots_options.snapshot_interval; + let records_per_snapshot = snapshots_options.snapshot_interval_num_records; + if snapshot_interval.is_none() && records_per_snapshot.is_none() { return; }; @@ -1019,7 +1017,16 @@ impl PartitionProcessorManager { let snapshot_partitions = known_archived_lsn .into_iter() .filter_map(|(partition_id, applied_lsn, archived_lsn)| { - if applied_lsn >= archived_lsn.add(Lsn::from(records_per_snapshot.get())) { + // At this point, at least one of time-based or record-based interval is set; if + // both requirements are configured, then both must be met. + if records_per_snapshot.is_none_or(|num_records| { + applied_lsn + >= archived_lsn + .get_min_applied_lsn() + .add(Lsn::from(num_records.get())) + }) && snapshot_interval.is_none_or(|interval| { + archived_lsn.get_age() > with_jitter(interval.into(), 0.1) + }) { Some(partition_id) } else { None @@ -1465,7 +1472,7 @@ enum EventKind { tail: Option, }, NewArchivedLsn { - archived_lsn: Lsn, + archived_lsn: ArchivedLsn, }, }