Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 89 additions & 36 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1904,51 +1904,91 @@ impl ModuleHost {
Ok((tx, false))
}

/// Refreshes every view made stale by `tx`.
///
/// The returned transaction contains both the original writes and any backing-table
/// updates produced by re-materializing the affected views.
pub fn call_views_with_tx<I: WasmInstance>(
tx: MutTxId,
instance: &mut RefInstance<'_, I>,
caller: Identity,
) -> Result<(ViewCallResult, bool), ViewCallError> {
let mut out = ViewCallResult::default(tx);
) -> (ViewCallResult, bool) {
Self::call_views_with_tx_at(tx, instance, caller, Timestamp::now())
}

/// Refreshes every view made stale by `tx`.
///
/// This is the shared host-side path for finishing a transaction after writes have
/// invalidated one or more materialized views.
pub fn call_views_with_tx_at<I: WasmInstance>(
tx: MutTxId,
instance: &mut RefInstance<'_, I>,
caller: Identity,
timestamp: Timestamp,
) -> (ViewCallResult, bool) {
let mut tx = tx;
let module_def = &instance.common.info().module_def;
let mut outcome = ViewOutcome::Success;
let mut energy_used = FunctionBudget::ZERO;
let mut total_duration = Duration::ZERO;
let mut abi_duration = Duration::ZERO;
let mut trapped = false;
use FunctionArgs::Nullary;
for ViewCallInfo {
view_id,
table_id,
fn_ptr,
sender,
} in out.tx.views_for_refresh().cloned().collect::<Vec<_>>()
} in tx.views_for_refresh().cloned().collect::<Vec<_>>()
{
let view_def = module_def
.get_view_by_id(fn_ptr, sender.is_none())
.ok_or(ViewCallError::NoSuchView)?;
let Some(view_def) = module_def.get_view_by_id(fn_ptr, sender.is_none()) else {
outcome = ViewOutcome::Failed(format!("view with fn_ptr `{fn_ptr}` not found"));
break;
};
let args = match FunctionArgs::Nullary.into_tuple_for_def(module_def, view_def) {
Ok(args) => args,
Err(err) => {
outcome = ViewOutcome::Failed(format!("failed to build view args: {err}"));
break;
}
};

let (result, trap) = Self::call_view(
let (result, trap) = Self::call_view_inner(
instance,
out.tx,
tx,
&view_def.name,
view_id,
table_id,
Nullary,
view_def.fn_ptr,
caller,
sender,
)?;
args,
view_def.product_type_ref,
timestamp,
);

// Increment execution stats
out.tx = result.tx;
out.outcome = result.outcome;
out.energy_used += result.energy_used;
out.total_duration += result.total_duration;
out.abi_duration += result.abi_duration;
tx = result.tx;
outcome = result.outcome;
energy_used += result.energy_used;
total_duration += result.total_duration;
abi_duration += result.abi_duration;
trapped |= trap;

// Terminate early if execution failed
if !matches!(out.outcome, ViewOutcome::Success) || trapped {
if !matches!(outcome, ViewOutcome::Success) || trapped {
break;
}
}
Ok((out, trapped))
(
ViewCallResult {
outcome,
tx,
energy_used,
total_duration,
abi_duration,
},
trapped,
)
}

fn call_view<I: WasmInstance>(
Expand All @@ -1960,6 +2000,30 @@ impl ModuleHost {
args: FunctionArgs,
caller: Identity,
sender: Option<Identity>,
) -> Result<(ViewCallResult, bool), ViewCallError> {
Self::call_view_at(
instance,
tx,
view_name,
view_id,
table_id,
args,
caller,
sender,
Timestamp::now(),
)
}

fn call_view_at<I: WasmInstance>(
instance: &mut RefInstance<'_, I>,
tx: MutTxId,
view_name: &Identifier,
view_id: ViewId,
table_id: TableId,
args: FunctionArgs,
caller: Identity,
sender: Option<Identity>,
timestamp: Timestamp,
) -> Result<(ViewCallResult, bool), ViewCallError> {
let module_def = &instance.common.info().module_def;
let view_def = module_def.view(view_name).ok_or(ViewCallError::NoSuchView)?;
Expand All @@ -1969,21 +2033,9 @@ impl ModuleHost {
.into_tuple_for_def(module_def, view_def)
.map_err(InvalidViewArguments)?;

match Self::call_view_inner(
instance, tx, view_name, view_id, table_id, fn_ptr, caller, sender, args, row_type,
) {
err @ Err(ViewCallError::NoSuchView) => {
let _log_message = no_such_function_log_message("view", view_name);
// self.inject_logs(LogLevel::Error, view_name, &log_message);
err
}
err @ Err(ViewCallError::Args(_)) => {
let _log_message = args_error_log_message("view", view_name);
// self.inject_logs(LogLevel::Error, view_name, &log_message);
err
}
res => res,
}
Ok(Self::call_view_inner(
instance, tx, view_name, view_id, table_id, fn_ptr, caller, sender, args, row_type, timestamp,
))
}

fn call_view_inner<I: WasmInstance>(
Expand All @@ -1997,10 +2049,11 @@ impl ModuleHost {
sender: Option<Identity>,
args: ArgsTuple,
row_type: AlgebraicTypeRef,
) -> Result<(ViewCallResult, bool), ViewCallError> {
timestamp: Timestamp,
) -> (ViewCallResult, bool) {
let view_name = name.clone();
let params = CallViewParams {
timestamp: Timestamp::now(),
timestamp,
view_name,
view_id,
table_id,
Expand All @@ -2011,7 +2064,7 @@ impl ModuleHost {
row_type,
};

Ok(instance.common.call_view_with_tx(tx, params, instance.instance))
instance.common.call_view_with_tx(tx, params, instance.instance)
}

pub async fn init_database(&self, program: Program) -> Result<Option<ReducerCallResult>, InitDatabaseError> {
Expand Down
95 changes: 72 additions & 23 deletions crates/core/src/host/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,21 +379,6 @@ pub(super) async fn call_scheduled_function(
};
let db = &**module_info.relational_db();

let delete_scheduled_function_row = |tx: Option<MutTxId>, timestamp: Option<_>| {
id.and_then(|id| {
let (timestamp, instant) = timestamp.unwrap_or_else(|| (Timestamp::now(), Instant::now()));
let tx = tx.unwrap_or_else(|| db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal));
let schedule_at = delete_scheduled_function_row_with_tx(module_info, db, tx, id)?;
let ScheduleAt::Interval(dur) = schedule_at else {
return None;
};
Some(Reschedule {
at_ts: schedule_at.to_timestamp_from(timestamp),
at_real: instant + dur.to_duration_abs(),
})
})
};

enum Function {
Reducer(CallScheduledFunctionResult, bool),
Procedure {
Expand All @@ -416,7 +401,7 @@ pub(super) async fn call_scheduled_function(
Err(err) => {
// All we can do here is log an error.
log::error!("could not determine scheduled function or its parameters: {err:#}");
let reschedule = delete_scheduled_function_row(Some(tx), None);
let reschedule = delete_scheduled_function_row(module_info, db, id, Some(tx), None, inst_common, inst);
return (CallScheduledFunctionResult { reschedule }, false);
}
};
Expand Down Expand Up @@ -452,7 +437,7 @@ pub(super) async fn call_scheduled_function(
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
inst_common.call_reducer_with_tx(Some(tx), params, inst)
}));
let reschedule = delete_scheduled_function_row(None, None);
let reschedule = delete_scheduled_function_row(module_info, db, id, None, None, inst_common, inst);
// Currently, we drop the return value from the function call. In the future,
// we might want to handle it somehow.
let trapped = match result {
Expand All @@ -463,7 +448,15 @@ pub(super) async fn call_scheduled_function(
}
CallParams::Procedure(params) => {
// Delete scheduled row.
let reschedule = delete_scheduled_function_row(Some(tx), Some((timestamp, instant)));
let reschedule = delete_scheduled_function_row(
module_info,
db,
id,
Some(tx),
Some((timestamp, instant)),
inst_common,
inst,
);
Function::Procedure { params, reschedule }
}
}
Expand Down Expand Up @@ -492,11 +485,47 @@ pub(super) async fn call_scheduled_function(
}
}

/// Deletes a scheduled-row entry after its function runs, reusing `tx` when one is already
/// open and otherwise creating an internal transaction for the cleanup.
///
/// One-shot schedules are deleted and committed immediately. Interval schedules are not
/// deleted here; instead their `ScheduleAt` is returned to the caller as a `Reschedule`.
fn delete_scheduled_function_row(
module_info: &ModuleInfo,
db: &RelationalDB,
id: Option<ScheduledFunctionId>,
tx: Option<MutTxId>,
timestamp: Option<(Timestamp, Instant)>,
inst_common: &mut InstanceCommon,
inst: &mut impl WasmInstance,
) -> Option<Reschedule> {
id.and_then(|id| {
let (timestamp, instant) = timestamp.unwrap_or_else(|| (Timestamp::now(), Instant::now()));
let tx = tx.unwrap_or_else(|| db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal));
let schedule_at = delete_scheduled_function_row_with_tx(module_info, db, tx, id, inst_common, inst)?;
let ScheduleAt::Interval(dur) = schedule_at else {
return None;
};
Some(Reschedule {
at_ts: schedule_at.to_timestamp_from(timestamp),
at_real: instant + dur.to_duration_abs(),
})
})
}

/// Deletes a scheduled-row entry inside an existing mutable transaction.
///
/// If the row describes a one-shot schedule, this also refreshes any stale views and
/// broadcasts the resulting delete in the same transaction.
///
/// Interval schedules are left in place and returned to the caller for rescheduling.
fn delete_scheduled_function_row_with_tx(
module_info: &ModuleInfo,
db: &RelationalDB,
mut tx: MutTxId,
id: ScheduledFunctionId,
inst_common: &mut InstanceCommon,
inst: &mut impl WasmInstance,
) -> Option<ScheduleAt> {
if let Ok(Some(schedule_row)) = get_schedule_row_mut(&tx, db, id) {
match read_schedule_at(&schedule_row, id.at_column) {
Expand All @@ -508,7 +537,7 @@ fn delete_scheduled_function_row_with_tx(
let row_ptr = schedule_row.pointer();
db.delete(&mut tx, id.table_id, [row_ptr]);

commit_and_broadcast_deletion_event(tx, module_info);
refresh_views_then_commit_and_broadcast(tx, module_info, inst_common, inst);
}
_ => {
log::debug!(
Expand All @@ -522,13 +551,30 @@ fn delete_scheduled_function_row_with_tx(
None
}

fn commit_and_broadcast_deletion_event(tx: MutTxId, module_info: &ModuleInfo) {
/// Refreshes stale views, commits transaction, and broadcasts subscription updates.
fn refresh_views_then_commit_and_broadcast(
tx: MutTxId,
module_info: &ModuleInfo,
inst_common: &mut InstanceCommon,
inst: &mut impl WasmInstance,
) {
let timestamp = Timestamp::now();
let (view_result, trapped) = inst_common.call_views_with_tx(tx, module_info.database_identity, inst, timestamp);
let mut status = match view_result.outcome {
crate::host::module_host::ViewOutcome::Success => EventStatus::Committed(DatabaseUpdate::default()),
crate::host::module_host::ViewOutcome::Failed(err) => EventStatus::FailedInternal(err),
crate::host::module_host::ViewOutcome::BudgetExceeded => EventStatus::OutOfEnergy,
};
if trapped && matches!(status, EventStatus::Committed(_)) {
status = EventStatus::FailedInternal("The instance encountered a fatal error.".into());
}

let event = ModuleEvent {
timestamp: Timestamp::now(),
timestamp,
caller_identity: module_info.database_identity,
caller_connection_id: None,
function_call: ModuleFunctionCall::default(),
status: EventStatus::Committed(DatabaseUpdate::default()),
status,
reducer_return_value: None,
//Keeping them 0 as it is internal transaction, not by reducer
energy_quanta_used: EnergyQuanta { quanta: 0 },
Expand All @@ -537,7 +583,10 @@ fn commit_and_broadcast_deletion_event(tx: MutTxId, module_info: &ModuleInfo) {
timer: None,
};

if let Err(e) = module_info.subscriptions.commit_and_broadcast_event(None, event, tx) {
if let Err(e) = module_info
.subscriptions
.commit_and_broadcast_event(None, event, view_result.tx)
{
log::error!("Failed to broadcast deletion event: {e:#}");
}
}
Expand Down
30 changes: 6 additions & 24 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -947,7 +947,7 @@ impl InstanceCommon {

// Only re-evaluate and update views if the reducer's execution was successful
let (out, trapped) = if !trapped && matches!(status, EventStatus::Committed(_)) {
self.call_views_with_tx(tx, caller_identity, &info.module_def, inst, timestamp)
self.call_views_with_tx(tx, caller_identity, inst, timestamp)
} else {
(ViewCallResult::default(tx), trapped)
};
Expand Down Expand Up @@ -1315,32 +1315,14 @@ impl InstanceCommon {
&mut self,
tx: MutTxId,
caller: Identity,
module_def: &ModuleDef,
inst: &mut I,
timestamp: Timestamp,
) -> (ViewCallResult, bool) {
let view_calls = tx
.views_for_refresh()
.map(|info| {
let view_def = module_def
.get_view_by_id(info.fn_ptr, info.sender.is_none())
.unwrap_or_else(|| panic!("view with fn_ptr `{}` not found", info.fn_ptr));

CallViewParams {
view_name: view_def.name.clone(),
view_id: info.view_id,
table_id: info.table_id,
fn_ptr: view_def.fn_ptr,
caller,
sender: info.sender,
args: ArgsTuple::nullary(),
row_type: view_def.product_type_ref,
timestamp,
}
})
.collect::<Vec<_>>();

self.execute_view_calls(tx, view_calls, inst)
let mut instance = RefInstance {
common: self,
instance: inst,
};
ModuleHost::call_views_with_tx_at(tx, &mut instance, caller, timestamp)
}

/// Executes view calls and accumulate results.
Expand Down
Loading
Loading