Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
948d472
refactor(cardano): shard EWRAP into prepare/shard/finalize work units
scarmuega Apr 24, 2026
9627c3a
refactor(cardano): rename EwrapPrepare → Ewrap, seed EpochState.end i…
scarmuega Apr 25, 2026
ae26079
refactor(cardano): rename EwrapShard → AccountShard
scarmuega Apr 25, 2026
eaf8e08
refactor(cardano): run AccountShard before Ewrap on the boundary pipe…
scarmuega Apr 25, 2026
a1e0d5c
refactor(cardano): merge EwrapFinalize into Ewrap; drop EpochEndInit …
scarmuega Apr 25, 2026
5122e07
fix(cardano): seed EpochState.end in Genesis
scarmuega Apr 25, 2026
2ff83b5
refactor(cardano): split AccountShard into its own `ashard` module
scarmuega Apr 25, 2026
4728e21
docs(cardano): fix stale references in EWRAP/AccountShard refactor co…
scarmuega Apr 25, 2026
772b53a
refactor(cardano): decouple shard naming from `ewrap`
scarmuega Apr 25, 2026
9a80080
refactor(cardano): rename AccountShard → AShard for structs and variants
scarmuega Apr 25, 2026
1a3722d
refactor(cardano): rename `ashard_total` → `account_shards`
scarmuega Apr 25, 2026
8174d2a
feat(cardano): persist shard count alongside ashard_progress
scarmuega Apr 25, 2026
bb3a195
fix(cardano): make EpochEndAccumulate::undo no-op on skipped apply
scarmuega Apr 26, 2026
f84792b
feat(cardano): validate account_shards at startup
scarmuega Apr 26, 2026
aa835bb
docs(skills): split AShard from Ewrap in debug-epoch-mismatch guide
scarmuega Apr 26, 2026
1a1afb3
feat(core): emit RSS deltas around each work-unit phase
scarmuega Apr 26, 2026
2294bb3
fix(cardano): shard accounts on entropic byte of credential key
scarmuega Apr 27, 2026
fd888b8
feat(cardano): shard ESTART account-snapshot transitions
scarmuega Apr 27, 2026
1b84936
docs(cardano): document EStartShard split in work_units.md
scarmuega Apr 27, 2026
07c3191
refactor(cardano): unify epoch boundary into Ewrap and Estart work units
scarmuega Apr 27, 2026
62031ea
refactor(cardano): align progress delta and field names with new boun…
scarmuega Apr 28, 2026
c7fce11
feat(cardano): shard RUPD reward computation by credential key
scarmuega Apr 28, 2026
317b676
refactor(core): drop RSS probe instrumentation around work-unit phases
scarmuega Apr 28, 2026
a36ccf6
refactor(cardano): replace account_shards config with ACCOUNT_SHARDS …
scarmuega Apr 28, 2026
3923753
refactor(cardano): version EpochWrapUp/EpochTransition for WAL compat
scarmuega Apr 28, 2026
3e50db6
fix(cardano): resume sharded work units at committed shard cursor
scarmuega Apr 28, 2026
28fe4c3
fix(cardano): refresh ending_state before archive write in ewrap fina…
scarmuega Apr 28, 2026
56b6dbb
fix(cardano): assert shard invariants in release builds
scarmuega Apr 28, 2026
bd08246
chore(cardano): housekeeping fixes from PR review
scarmuega Apr 28, 2026
0719956
chore(cardano): clear clippy warnings
scarmuega Apr 28, 2026
3492e5f
docs(skills): drop stale AShard/EpochEndAccumulate references
scarmuega Apr 28, 2026
118c6c7
fix(cardano): precompute per-pool live pledge for sharded rupd
scarmuega Apr 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 84 additions & 19 deletions crates/cardano/src/estart/commit.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
//! Commit logic for epoch start (estart) work unit.
//! Commit logic for the open half of the epoch boundary (per-shard runs
//! plus the finalize pass).
//!
//! This module uses a streaming pattern that processes entities one-by-one,
//! applying deltas and writing immediately without accumulating all entities
//! in memory.
//! Both code paths use the same streaming pattern: each entity namespace
//! is read one record at a time, deltas for that record are applied, and
//! the result is written immediately. Per-shard commits flush
//! `EpochState`'s `EStartProgress` and the shard's account-range
//! slice; the finalize commit flushes pool / drep / proposal transitions,
//! the closing `EpochTransition`, optional era-summary writes, archive
//! logs, and advances the cursor.

use dolos_core::{
ArchiveStore, ArchiveWriter, BlockSlot, BrokenInvariant, ChainError, ChainPoint, Domain,
Expand Down Expand Up @@ -71,16 +76,17 @@ impl super::WorkContext {
///
/// Processes entities one at a time without accumulating them in memory,
/// reducing peak memory usage during epoch boundary commits.
fn stream_and_apply_namespace<D, E>(
pub(crate) fn stream_and_apply_namespace<D, E>(
&mut self,
state: &D::State,
writer: &<D::State as StateStore>::Writer,
range: Option<std::ops::Range<dolos_core::EntityKey>>,
) -> Result<(), ChainError>
where
D: Domain,
E: Entity + FixedNamespace + Into<CardanoEntity>,
{
let records = state.iter_entities_typed::<E>(E::NS, None)?;
let records = state.iter_entities_typed::<E>(E::NS, range)?;

for record in records {
let (entity_id, entity) = record?;
Expand Down Expand Up @@ -108,40 +114,99 @@ impl super::WorkContext {
Ok(())
}

/// Commit a single per-shard run: stream-and-apply per-account snapshot
/// transitions for the shard's key ranges, then commit the
/// `EStartProgress` delta against `EpochState`. Archive logs
/// (if any) are flushed too — the start-of-epoch temporal key is
/// shared across shards.
///
/// **Does not advance the cursor.** Cursor moves only in
/// `commit_finalize`.
#[instrument(skip(self, state, archive))]
pub fn commit_shard<D: Domain>(
&mut self,
state: &D::State,
archive: &D::Archive,
ranges: Vec<std::ops::Range<EntityKey>>,
) -> Result<(), ChainError> {
debug!("committing estart changes");

let writer = state.start_writer()?;
let archive_writer = archive.start_writer()?;

// Stream accounts in this shard's ranges only (one per
// StakeCredential variant). Each call drains the matching deltas
// from `self.deltas`, so a delta keyed inside range N stays in
// the map until range N is streamed.
for range in ranges {
self.stream_and_apply_namespace::<D, AccountState>(state, &writer, Some(range))?;
}

// EpochState gets the EStartProgress delta (single entity).
self.stream_and_apply_namespace::<D, EpochState>(state, &writer, None)?;

// Archive logs — share the start-of-epoch temporal key across shards.
let start_of_epoch = self.chain_summary.epoch_start(self.starting_epoch_no());
let temporal_key = TemporalKey::from(&ChainPoint::Slot(start_of_epoch));

debug!(log_count = self.logs.len(), "writing shard archive logs");
for (entity_key, log) in self.logs.drain(..) {
let log_key = LogKey::from((temporal_key.clone(), entity_key));
archive_writer.write_log_typed(&log_key, &log)?;
}

if !self.deltas.entities.is_empty() {
warn!(quantity = %self.deltas.entities.len(), "uncommitted shard deltas");
}

writer.commit()?;
archive_writer.commit()?;

debug!("estart commit complete");
Ok(())
}

/// Commit the finalize half: pool / drep / proposal transitions + the
/// closing `EpochTransition` + (optional) era-summary writes + archive
/// logs + cursor advance.
///
/// `AccountState` is intentionally **not** streamed here — per-account
/// snapshot transitions were committed by the preceding per-shard
/// runs. The cursor is set only here, so a crash mid-shard restarts
/// from the boundary block and the pre-finalize state stays at the
/// previous-epoch cursor.
#[instrument(skip_all)]
pub fn commit<D: Domain>(
pub fn commit_finalize<D: Domain>(
&mut self,
state: &D::State,
archive: &D::Archive,
slot: BlockSlot,
) -> Result<(), ChainError> {
debug!("committing estart changes");
debug!("committing estart finalize changes");

// Collect era transition data first (only 1-2 entities, not a memory concern)
let era_transition = self.collect_era_transition(state)?;

// Prepare archive logs (still accumulated during compute_deltas)
// Prepare archive logs
let start_of_epoch = self.chain_summary.epoch_start(self.starting_epoch_no());
let temporal_key = TemporalKey::from(&ChainPoint::Slot(start_of_epoch));

let writer = state.start_writer()?;
let archive_writer = archive.start_writer()?;

// Stream each namespace - entities are read, processed, and written one at a time
debug!("streaming account entities");
self.stream_and_apply_namespace::<D, AccountState>(state, &writer)?;
// Skip AccountState — committed earlier by per-shard runs.

debug!("streaming pool entities");
self.stream_and_apply_namespace::<D, PoolState>(state, &writer)?;
self.stream_and_apply_namespace::<D, PoolState>(state, &writer, None)?;

debug!("streaming drep entities");
self.stream_and_apply_namespace::<D, DRepState>(state, &writer)?;
self.stream_and_apply_namespace::<D, DRepState>(state, &writer, None)?;

debug!("streaming proposal entities");
self.stream_and_apply_namespace::<D, ProposalState>(state, &writer)?;
self.stream_and_apply_namespace::<D, ProposalState>(state, &writer, None)?;

debug!("streaming epoch entities");
self.stream_and_apply_namespace::<D, EpochState>(state, &writer)?;
self.stream_and_apply_namespace::<D, EpochState>(state, &writer, None)?;

// Write era transition if needed (only 2 entities)
if let Some(transition) = era_transition {
Expand All @@ -151,7 +216,7 @@ impl super::WorkContext {
.write_entity_typed::<EraSummary>(&transition.new_key, &transition.new_summary)?;
}

// Write archive logs (still accumulated during compute_deltas, but much smaller than entities)
// Write archive logs (accumulated during compute_global_deltas, much smaller than entities)
debug!(log_count = self.logs.len(), "writing archive logs");
for (entity_key, log) in self.logs.drain(..) {
let log_key = LogKey::from((temporal_key.clone(), entity_key));
Expand All @@ -163,14 +228,14 @@ impl super::WorkContext {
warn!(quantity = %self.deltas.entities.len(), "uncommitted deltas");
}

// Set cursor
// Set cursor — only in finalize, never in shards.
writer.set_cursor(ChainPoint::Slot(slot))?;

// Commit both writers atomically
writer.commit()?;
archive_writer.commit()?;

debug!("estart commit complete");
debug!("estart finalize commit complete");

Ok(())
}
Expand Down
164 changes: 130 additions & 34 deletions crates/cardano/src/estart/loading.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,40 @@
use std::sync::Arc;
//! Load + compute helpers for `EstartWorkUnit`.
//!
//! Adds methods to `WorkContext` covering both halves of the open
//! pipeline: per-shard account transitions (`load_shard` /
//! `compute_shard_deltas`) and the finalize-time global pass
//! (`compute_global_deltas` / `load_finalize`). The shared boundary state
//! (ended_state + chain summary + AVVM reclamation) is built by
//! `new_empty_with_avvm`; `compute_boundary_avvm` exposes the once-per-
//! boundary AVVM lookup so the work unit can hoist it into `initialize`.

use dolos_core::{ChainError, Domain, Genesis, StateStore, TxoRef};
use std::{ops::Range, sync::Arc};

use dolos_core::{ChainError, Domain, EntityKey, Genesis, StateStore, TxoRef};

use crate::{
estart::BoundaryVisitor, load_era_summary, roll::WorkDeltas, AccountState, DRepState,
EraProtocol, FixedNamespace as _, PoolState, ProposalState,
estart::{BoundaryVisitor as _, WorkContext},
load_era_summary,
roll::WorkDeltas,
AccountState, DRepState, EStartProgress, EraProtocol, FixedNamespace as _, PoolState,
ProposalState,
};

impl super::WorkContext {
pub fn compute_deltas<D: Domain>(&mut self, state: &D::State) -> Result<(), ChainError> {
impl WorkContext {
/// Iterate the global (non-account-keyed) entity classes of the
/// finalize pass — pools, dreps, proposals — and emit transition
/// deltas for each. Closes by emitting the single `EpochTransition`
/// delta that advances the epoch number, recomputes pots, and
/// (optionally) migrates pparams across an era boundary.
///
/// Account-keyed transitions are handled by `compute_shard_deltas`,
/// which the executor runs once per shard ahead of this finalize pass.
pub fn compute_global_deltas<D: Domain>(
&mut self,
state: &D::State,
) -> Result<(), ChainError> {
let mut visitor_nonces = super::nonces::BoundaryVisitor;
let mut visitor_reset = super::reset::BoundaryVisitor::default();
let mut visitor_reset = super::reset::BoundaryVisitor;

let pools = state.iter_entities_typed::<PoolState>(PoolState::NS, None)?;

Expand All @@ -30,15 +54,6 @@ impl super::WorkContext {
visitor_reset.visit_drep(self, &drep_id, &drep)?;
}

let accounts = state.iter_entities_typed::<AccountState>(AccountState::NS, None)?;

for account in accounts {
let (account_id, account) = account?;

visitor_nonces.visit_account(self, &account_id, &account)?;
visitor_reset.visit_account(self, &account_id, &account)?;
}

let proposals = state.iter_entities_typed::<ProposalState>(ProposalState::NS, None)?;

for proposal in proposals {
Expand All @@ -51,6 +66,10 @@ impl super::WorkContext {
visitor_nonces.flush(self)?;
visitor_reset.flush(self)?;

// Closing global delta — emitted once per epoch boundary, after
// all per-entity transitions have been queued.
super::reset::emit_epoch_transition(self);

Ok(())
}

Expand Down Expand Up @@ -88,36 +107,113 @@ impl super::WorkContext {
Ok(total)
}

pub fn load<D: Domain>(state: &D::State, genesis: Arc<Genesis>) -> Result<Self, ChainError> {
/// Compute the AVVM reclamation total for the boundary closing the
/// current epoch. Returns `0` outside the Shelley→Allegra transition.
/// Exposed so the work unit can hoist this once-per-boundary state read
/// out of the per-shard `load` calls.
pub(crate) fn compute_boundary_avvm<D: Domain>(
state: &D::State,
genesis: &Genesis,
) -> Result<u64, ChainError> {
let ended_state = crate::load_epoch::<D>(state)?;
let chain_summary = load_era_summary::<D>(state)?;
let active_protocol = EraProtocol::from(chain_summary.edge().protocol);

// Check for AVVM reclamation at Shelley→Allegra boundary
let avvm_reclamation = if let Some(transition) = ended_state.pparams.era_transition() {
if let Some(transition) = ended_state.pparams.era_transition() {
if transition.entering_allegra() {
Self::compute_avvm_reclamation::<D>(state, &genesis)?
} else {
0
return Self::compute_avvm_reclamation::<D>(state, genesis);
}
} else {
0
};
}
Ok(0)
}

/// Build a fresh `WorkContext` (ended_state + chain summary + AVVM
/// reclamation) without any computed deltas. Used by the finalize-phase
/// loader; for the per-shard loader prefer `new_empty_with_avvm` so the
/// AVVM lookup happens once per boundary rather than once per shard.
pub fn new_empty<D: Domain>(
state: &D::State,
genesis: Arc<Genesis>,
) -> Result<Self, ChainError> {
let avvm_reclamation = Self::compute_boundary_avvm::<D>(state, &genesis)?;
Self::new_empty_with_avvm::<D>(state, genesis, avvm_reclamation)
}

let mut boundary = Self {
/// Variant of `new_empty` that takes a precomputed AVVM reclamation
/// total. Used by the per-shard loader so the AVVM state read happens
/// once per boundary (in `initialize`) rather than once per shard.
pub(crate) fn new_empty_with_avvm<D: Domain>(
state: &D::State,
genesis: Arc<Genesis>,
avvm_reclamation: u64,
) -> Result<Self, ChainError> {
let ended_state = crate::load_epoch::<D>(state)?;
let chain_summary = load_era_summary::<D>(state)?;
let active_protocol = EraProtocol::from(chain_summary.edge().protocol);

Ok(Self {
ended_state,
chain_summary,
active_protocol,
genesis,
avvm_reclamation,

// empty until computed
deltas: WorkDeltas::default(),
logs: Default::default(),
};
})
}

/// Load + compute for the finalize phase: skips per-account
/// transitions (those landed via the preceding per-shard runs) and
/// emits pool / drep / proposal transitions plus the closing
/// `EpochTransition`.
pub fn load_finalize<D: Domain>(
state: &D::State,
genesis: Arc<Genesis>,
) -> Result<Self, ChainError> {
let mut ctx = Self::new_empty::<D>(state, genesis)?;
ctx.compute_global_deltas::<D>(state)?;
Ok(ctx)
}

/// Iterate accounts in this shard's two ranges and emit
/// `AccountTransition` deltas via the snapshot-rotation visitor.
/// Closes by emitting `EStartProgress` to advance
/// `EpochState.estart_progress`.
fn compute_shard_deltas<D: Domain>(
&mut self,
state: &D::State,
ranges: Vec<Range<EntityKey>>,
shard_index: u32,
total_shards: u32,
) -> Result<(), ChainError> {
let mut visitor_reset = super::reset::BoundaryVisitor;

for range in ranges {
let accounts =
state.iter_entities_typed::<AccountState>(AccountState::NS, Some(range))?;

for record in accounts {
let (account_id, account) = record?;
visitor_reset.visit_account(self, &account_id, &account)?;
}
}

boundary.compute_deltas::<D>(state)?;
self.add_delta(EStartProgress::new(shard_index, total_shards));

Ok(boundary)
Ok(())
}

/// Load + compute for a per-shard Estart phase: build a fresh
/// context (no global iteration) using a precomputed AVVM reclamation
/// total (hoisted into the work unit's `initialize`) and run the
/// per-shard account branch.
pub fn load_shard<D: Domain>(
state: &D::State,
genesis: Arc<Genesis>,
avvm_reclamation: u64,
shard_index: u32,
total_shards: u32,
ranges: Vec<Range<EntityKey>>,
) -> Result<Self, ChainError> {
let mut ctx = Self::new_empty_with_avvm::<D>(state, genesis, avvm_reclamation)?;
ctx.compute_shard_deltas::<D>(state, ranges, shard_index, total_shards)?;
Ok(ctx)
}
}
14 changes: 14 additions & 0 deletions crates/cardano/src/estart/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
//! Estart work unit — open half of the epoch boundary.
//!
//! Single sharded work unit covering the entire open-half pipeline. The
//! per-shard body iterates accounts in a key-range slice and emits
//! `AccountTransition` deltas; `finalize()` runs the global Estart pass
//! (pool / drep / proposal transitions, nonce, `EpochTransition`, era
//! transitions) and is the only phase that advances the cursor.
//!
//! `WorkContext` and the `BoundaryVisitor` trait live here and are shared
//! between the shard body and the finalize pass. Both code paths build
//! deltas onto the same `WorkDeltas` accumulator. The `nonces` and `reset`
//! visitors run in finalize; the `reset` visitor's `visit_account` arm is
//! reused by the shard body.

use std::sync::Arc;

use dolos_core::{ChainError, EntityKey, Genesis};
Expand Down
Loading
Loading