Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 32 additions & 64 deletions crates/store/src/server/proof_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
//!
//! 1. Tracks `chain_tip` via a [`watch::Receiver<BlockNumber>`] and `latest_proven_block` locally.
//! 2. Maintains up to `max_concurrent_proofs` in-flight proving jobs via a [`JoinSet`].
//! 3. Blocks may be proven out of order since proving jobs run concurrently. When a block is marked
//! as proven, the database atomically advances the `proven_in_sequence` column for all blocks
//! that now form a contiguous proven sequence from genesis.
//! 3. Blocks may be proven out of order since proving jobs run concurrently. Completed proofs are
//! buffered and committed to the block store and database in ascending block-number order.
//! 4. On transient errors (DB reads, prover failures, timeouts), the failed block is retried
//! internally within its proving task, subject to an overall per-block time budget.
//! 5. On fatal errors (e.g. deserialization failures, missing proving inputs), the scheduler
//! returns the error to the caller for node shutdown.

use std::collections::BTreeMap;
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -50,7 +50,7 @@ pub const DEFAULT_MAX_CONCURRENT_PROOFS: NonZeroUsize = NonZeroUsize::new(8).unw

/// A wrapper around [`JoinSet`] whose `join_next` returns [`std::future::pending`] when empty
/// instead of `None`, making it safe to use directly in `tokio::select!` without a special case.
struct ProofTaskJoinSet(JoinSet<anyhow::Result<()>>);
struct ProofTaskJoinSet(JoinSet<anyhow::Result<(BlockNumber, Vec<u8>)>>);

impl ProofTaskJoinSet {
fn new() -> Self {
Expand All @@ -62,26 +62,14 @@ impl ProofTaskJoinSet {
}

/// Spawns a new task to prove a block.
fn spawn(
&mut self,
db: &Arc<Db>,
block_prover: &Arc<BlockProver>,
block_store: &Arc<BlockStore>,
proven_tip: &Arc<ProvenTipWriter>,
block_num: BlockNumber,
proof_cache: ProofCache,
) {
fn spawn(&mut self, db: &Arc<Db>, block_prover: &Arc<BlockProver>, block_num: BlockNumber) {
let db = Arc::clone(db);
let block_prover = Arc::clone(block_prover);
let block_store = Arc::clone(block_store);
let proven_tip = Arc::clone(proven_tip);
self.0.spawn(async move {
prove_block(&db, &block_prover, &block_store, &proven_tip, block_num, proof_cache).await
});
self.0.spawn(async move { prove_block(&db, &block_prover, block_num).await });
}

/// Returns the result of the next completed task, or pends forever if the set is empty.
async fn join_next(&mut self) -> anyhow::Result<()> {
async fn join_next(&mut self) -> anyhow::Result<(BlockNumber, Vec<u8>)> {
if self.0.is_empty() {
std::future::pending().await
} else {
Expand Down Expand Up @@ -115,7 +103,6 @@ pub fn spawn(
max_concurrent_proofs: NonZeroUsize,
proof_cache: ProofCache,
) -> JoinHandle<anyhow::Result<()>> {
let proven_tip = Arc::new(proven_tip);
tokio::spawn(run(
db,
block_prover,
Expand All @@ -141,23 +128,26 @@ async fn run(
block_prover: Arc<BlockProver>,
block_store: Arc<BlockStore>,
mut chain_tip_rx: watch::Receiver<BlockNumber>,
proven_tip: Arc<ProvenTipWriter>,
proven_tip: ProvenTipWriter,
max_concurrent_proofs: NonZeroUsize,
proof_cache: ProofCache,
) -> anyhow::Result<()> {
info!(target: COMPONENT, "Proof scheduler started");

// In-flight proving tasks.
let mut join_set = ProofTaskJoinSet::new();
let mut proving_tasks = ProofTaskJoinSet::new();

// Highest block number that is in-flight or has been proven. Used to avoid re-querying
// blocks we've already scheduled. Initialized from the in-sequence tip so we skip
// already-proven blocks on restart.
let mut highest_scheduled = db.proven_chain_tip().await?;

// Completed proofs waiting to be committed in order.
let mut pending: BTreeMap<BlockNumber, Vec<u8>> = BTreeMap::new();

loop {
// Query the DB for unproven blocks beyond what we've already scheduled.
let capacity = max_concurrent_proofs.get() - join_set.len();
let capacity = max_concurrent_proofs.get() - proving_tasks.len();
if capacity > 0 {
let unproven = db.select_unproven_blocks(highest_scheduled, capacity).await?;

Expand All @@ -166,23 +156,27 @@ async fn run(
}

for block_num in unproven {
join_set.spawn(
&db,
&block_prover,
&block_store,
&proven_tip,
block_num,
proof_cache.clone(),
);
proving_tasks.spawn(&db, &block_prover, block_num);
}
}

// Wait for either a job to complete or the chain tip to advance.
tokio::select! {
result = join_set.join_next() => {
result?;
// Proving a block has completed — cache and commit the proof.
proving_result = proving_tasks.join_next() => {
let (block_num, proof_bytes) = proving_result?;
pending.insert(block_num, proof_bytes);

// Commit all consecutive proofs in ascending order.
let mut next = proven_tip.read().child();
while let Some(proof_bytes) = pending.remove(&next) {
block_store.save_proof(next, &proof_bytes).await?;
let tip = db.mark_proven_and_advance_sequence(next).await?;
proof_cache.push(next, ProofNotification::new(next, proof_bytes));
proven_tip.advance(tip);
next = next.child();
}
},

// New chain tip received — re-query for unproven blocks on next iteration.
result = chain_tip_rx.changed() => {
if result.is_err() {
Expand All @@ -197,25 +191,17 @@ async fn run(
// PROVE BLOCK
// ================================================================================================

/// Proves a single block, saves the proof to the block store, marks the block as proven in the
/// DB, and advances the proven-in-sequence tip.
/// Proves a single block and returns the proof bytes on success.
#[instrument(target = COMPONENT, name = "prove_block", skip_all,
fields(
block.number=block_num.as_u32(),
proven_chain_tip = tracing::field::Empty
), err)]
fields(block.number=block_num.as_u32()), err)]
async fn prove_block(
db: &Db,
block_prover: &BlockProver,
block_store: &BlockStore,
proven_tip: &ProvenTipWriter,
block_num: BlockNumber,
proof_cache: ProofCache,
) -> anyhow::Result<()> {
) -> anyhow::Result<(BlockNumber, Vec<u8>)> {
tokio::time::timeout(BLOCK_PROVE_OVERALL_TIMEOUT, async {
let mut attempt: u32 = 0;
loop {
// Create a span for each attempt.
attempt += 1;
let attempt_span = tracing::info_span!(
target: COMPONENT,
Expand All @@ -225,7 +211,6 @@ async fn prove_block(
timed_out = tracing::field::Empty,
);

// Generate block proof with timeout.
let result = tokio::time::timeout(
BLOCK_PROVE_ATTEMPT_TIMEOUT,
generate_block_proof(db, block_prover, block_num),
Expand All @@ -234,24 +219,7 @@ async fn prove_block(
.await;

match result {
Ok(Ok(proof)) => {
let proof_bytes = proof.to_bytes();

// Save the block proof to file.
block_store.save_proof(block_num, &proof_bytes).await?;

// Mark the block as proven and advance the sequence in the database.
let tip = db.mark_proven_and_advance_sequence(block_num).await?;
tracing::Span::current().record("proven_chain_tip", tip.as_u32());

// Cache the proof bytes for replica subscriptions.
proof_cache.push(block_num, ProofNotification::new(block_num, proof_bytes));

// Advance the proven tip (this also notifies replica watch subscribers).
proven_tip.advance(tip);

return Ok(());
},
Ok(Ok(proof)) => return Ok((block_num, proof.to_bytes())),
Ok(Err(ProveBlockError::Fatal(err))) => Err(err).context("fatal error")?,
Ok(Err(ProveBlockError::Transient(err))) => {
attempt_span.record("error", tracing::field::display(&err));
Expand Down
Loading