diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 1b46b12af08..6ba6c58e430 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -1904,51 +1904,91 @@ impl ModuleHost { Ok((tx, false)) } + /// Refreshes every view made stale by `tx`. + /// + /// The returned transaction contains both the original writes and any backing-table + /// updates produced by re-materializing the affected views. pub fn call_views_with_tx( tx: MutTxId, instance: &mut RefInstance<'_, I>, caller: Identity, - ) -> Result<(ViewCallResult, bool), ViewCallError> { - let mut out = ViewCallResult::default(tx); + ) -> (ViewCallResult, bool) { + Self::call_views_with_tx_at(tx, instance, caller, Timestamp::now()) + } + + /// Refreshes every view made stale by `tx`. + /// + /// This is the shared host-side path for finishing a transaction after writes have + /// invalidated one or more materialized views. + pub fn call_views_with_tx_at( + tx: MutTxId, + instance: &mut RefInstance<'_, I>, + caller: Identity, + timestamp: Timestamp, + ) -> (ViewCallResult, bool) { + let mut tx = tx; let module_def = &instance.common.info().module_def; + let mut outcome = ViewOutcome::Success; + let mut energy_used = FunctionBudget::ZERO; + let mut total_duration = Duration::ZERO; + let mut abi_duration = Duration::ZERO; let mut trapped = false; - use FunctionArgs::Nullary; for ViewCallInfo { view_id, table_id, fn_ptr, sender, - } in out.tx.views_for_refresh().cloned().collect::>() + } in tx.views_for_refresh().cloned().collect::>() { - let view_def = module_def - .get_view_by_id(fn_ptr, sender.is_none()) - .ok_or(ViewCallError::NoSuchView)?; + let Some(view_def) = module_def.get_view_by_id(fn_ptr, sender.is_none()) else { + outcome = ViewOutcome::Failed(format!("view with fn_ptr `{fn_ptr}` not found")); + break; + }; + let args = match FunctionArgs::Nullary.into_tuple_for_def(module_def, view_def) { + Ok(args) => args, + Err(err) => { + outcome = ViewOutcome::Failed(format!("failed to build view args: {err}")); + break; + } + }; - let (result, trap) = Self::call_view( + let (result, trap) = Self::call_view_inner( instance, - out.tx, + tx, &view_def.name, view_id, table_id, - Nullary, + view_def.fn_ptr, caller, sender, - )?; + args, + view_def.product_type_ref, + timestamp, + ); // Increment execution stats - out.tx = result.tx; - out.outcome = result.outcome; - out.energy_used += result.energy_used; - out.total_duration += result.total_duration; - out.abi_duration += result.abi_duration; + tx = result.tx; + outcome = result.outcome; + energy_used += result.energy_used; + total_duration += result.total_duration; + abi_duration += result.abi_duration; trapped |= trap; // Terminate early if execution failed - if !matches!(out.outcome, ViewOutcome::Success) || trapped { + if !matches!(outcome, ViewOutcome::Success) || trapped { break; } } - Ok((out, trapped)) + ( + ViewCallResult { + outcome, + tx, + energy_used, + total_duration, + abi_duration, + }, + trapped, + ) } fn call_view( @@ -1960,6 +2000,30 @@ impl ModuleHost { args: FunctionArgs, caller: Identity, sender: Option, + ) -> Result<(ViewCallResult, bool), ViewCallError> { + Self::call_view_at( + instance, + tx, + view_name, + view_id, + table_id, + args, + caller, + sender, + Timestamp::now(), + ) + } + + fn call_view_at( + instance: &mut RefInstance<'_, I>, + tx: MutTxId, + view_name: &Identifier, + view_id: ViewId, + table_id: TableId, + args: FunctionArgs, + caller: Identity, + sender: Option, + timestamp: Timestamp, ) -> Result<(ViewCallResult, bool), ViewCallError> { let module_def = &instance.common.info().module_def; let view_def = module_def.view(view_name).ok_or(ViewCallError::NoSuchView)?; @@ -1969,21 +2033,9 @@ impl ModuleHost { .into_tuple_for_def(module_def, view_def) .map_err(InvalidViewArguments)?; - match Self::call_view_inner( - instance, tx, view_name, view_id, table_id, fn_ptr, caller, sender, args, row_type, - ) { - err @ Err(ViewCallError::NoSuchView) => { - let _log_message = no_such_function_log_message("view", view_name); - // self.inject_logs(LogLevel::Error, view_name, &log_message); - err - } - err @ Err(ViewCallError::Args(_)) => { - let _log_message = args_error_log_message("view", view_name); - // self.inject_logs(LogLevel::Error, view_name, &log_message); - err - } - res => res, - } + Ok(Self::call_view_inner( + instance, tx, view_name, view_id, table_id, fn_ptr, caller, sender, args, row_type, timestamp, + )) } fn call_view_inner( @@ -1997,10 +2049,11 @@ impl ModuleHost { sender: Option, args: ArgsTuple, row_type: AlgebraicTypeRef, - ) -> Result<(ViewCallResult, bool), ViewCallError> { + timestamp: Timestamp, + ) -> (ViewCallResult, bool) { let view_name = name.clone(); let params = CallViewParams { - timestamp: Timestamp::now(), + timestamp, view_name, view_id, table_id, @@ -2011,7 +2064,7 @@ impl ModuleHost { row_type, }; - Ok(instance.common.call_view_with_tx(tx, params, instance.instance)) + instance.common.call_view_with_tx(tx, params, instance.instance) } pub async fn init_database(&self, program: Program) -> Result, InitDatabaseError> { diff --git a/crates/core/src/host/scheduler.rs b/crates/core/src/host/scheduler.rs index 75917b10c34..7328376c0f0 100644 --- a/crates/core/src/host/scheduler.rs +++ b/crates/core/src/host/scheduler.rs @@ -379,21 +379,6 @@ pub(super) async fn call_scheduled_function( }; let db = &**module_info.relational_db(); - let delete_scheduled_function_row = |tx: Option, timestamp: Option<_>| { - id.and_then(|id| { - let (timestamp, instant) = timestamp.unwrap_or_else(|| (Timestamp::now(), Instant::now())); - let tx = tx.unwrap_or_else(|| db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal)); - let schedule_at = delete_scheduled_function_row_with_tx(module_info, db, tx, id)?; - let ScheduleAt::Interval(dur) = schedule_at else { - return None; - }; - Some(Reschedule { - at_ts: schedule_at.to_timestamp_from(timestamp), - at_real: instant + dur.to_duration_abs(), - }) - }) - }; - enum Function { Reducer(CallScheduledFunctionResult, bool), Procedure { @@ -416,7 +401,7 @@ pub(super) async fn call_scheduled_function( Err(err) => { // All we can do here is log an error. log::error!("could not determine scheduled function or its parameters: {err:#}"); - let reschedule = delete_scheduled_function_row(Some(tx), None); + let reschedule = delete_scheduled_function_row(module_info, db, id, Some(tx), None, inst_common, inst); return (CallScheduledFunctionResult { reschedule }, false); } }; @@ -452,7 +437,7 @@ pub(super) async fn call_scheduled_function( let result = panic::catch_unwind(panic::AssertUnwindSafe(|| { inst_common.call_reducer_with_tx(Some(tx), params, inst) })); - let reschedule = delete_scheduled_function_row(None, None); + let reschedule = delete_scheduled_function_row(module_info, db, id, None, None, inst_common, inst); // Currently, we drop the return value from the function call. In the future, // we might want to handle it somehow. let trapped = match result { @@ -463,7 +448,15 @@ pub(super) async fn call_scheduled_function( } CallParams::Procedure(params) => { // Delete scheduled row. - let reschedule = delete_scheduled_function_row(Some(tx), Some((timestamp, instant))); + let reschedule = delete_scheduled_function_row( + module_info, + db, + id, + Some(tx), + Some((timestamp, instant)), + inst_common, + inst, + ); Function::Procedure { params, reschedule } } } @@ -492,11 +485,47 @@ pub(super) async fn call_scheduled_function( } } +/// Deletes a scheduled-row entry after its function runs, reusing `tx` when one is already +/// open and otherwise creating an internal transaction for the cleanup. +/// +/// One-shot schedules are deleted and committed immediately. Interval schedules are not +/// deleted here; instead their `ScheduleAt` is returned to the caller as a `Reschedule`. +fn delete_scheduled_function_row( + module_info: &ModuleInfo, + db: &RelationalDB, + id: Option, + tx: Option, + timestamp: Option<(Timestamp, Instant)>, + inst_common: &mut InstanceCommon, + inst: &mut impl WasmInstance, +) -> Option { + id.and_then(|id| { + let (timestamp, instant) = timestamp.unwrap_or_else(|| (Timestamp::now(), Instant::now())); + let tx = tx.unwrap_or_else(|| db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal)); + let schedule_at = delete_scheduled_function_row_with_tx(module_info, db, tx, id, inst_common, inst)?; + let ScheduleAt::Interval(dur) = schedule_at else { + return None; + }; + Some(Reschedule { + at_ts: schedule_at.to_timestamp_from(timestamp), + at_real: instant + dur.to_duration_abs(), + }) + }) +} + +/// Deletes a scheduled-row entry inside an existing mutable transaction. +/// +/// If the row describes a one-shot schedule, this also refreshes any stale views and +/// broadcasts the resulting delete in the same transaction. +/// +/// Interval schedules are left in place and returned to the caller for rescheduling. fn delete_scheduled_function_row_with_tx( module_info: &ModuleInfo, db: &RelationalDB, mut tx: MutTxId, id: ScheduledFunctionId, + inst_common: &mut InstanceCommon, + inst: &mut impl WasmInstance, ) -> Option { if let Ok(Some(schedule_row)) = get_schedule_row_mut(&tx, db, id) { match read_schedule_at(&schedule_row, id.at_column) { @@ -508,7 +537,7 @@ fn delete_scheduled_function_row_with_tx( let row_ptr = schedule_row.pointer(); db.delete(&mut tx, id.table_id, [row_ptr]); - commit_and_broadcast_deletion_event(tx, module_info); + refresh_views_then_commit_and_broadcast(tx, module_info, inst_common, inst); } _ => { log::debug!( @@ -522,13 +551,30 @@ fn delete_scheduled_function_row_with_tx( None } -fn commit_and_broadcast_deletion_event(tx: MutTxId, module_info: &ModuleInfo) { +/// Refreshes stale views, commits transaction, and broadcasts subscription updates. +fn refresh_views_then_commit_and_broadcast( + tx: MutTxId, + module_info: &ModuleInfo, + inst_common: &mut InstanceCommon, + inst: &mut impl WasmInstance, +) { + let timestamp = Timestamp::now(); + let (view_result, trapped) = inst_common.call_views_with_tx(tx, module_info.database_identity, inst, timestamp); + let mut status = match view_result.outcome { + crate::host::module_host::ViewOutcome::Success => EventStatus::Committed(DatabaseUpdate::default()), + crate::host::module_host::ViewOutcome::Failed(err) => EventStatus::FailedInternal(err), + crate::host::module_host::ViewOutcome::BudgetExceeded => EventStatus::OutOfEnergy, + }; + if trapped && matches!(status, EventStatus::Committed(_)) { + status = EventStatus::FailedInternal("The instance encountered a fatal error.".into()); + } + let event = ModuleEvent { - timestamp: Timestamp::now(), + timestamp, caller_identity: module_info.database_identity, caller_connection_id: None, function_call: ModuleFunctionCall::default(), - status: EventStatus::Committed(DatabaseUpdate::default()), + status, reducer_return_value: None, //Keeping them 0 as it is internal transaction, not by reducer energy_quanta_used: EnergyQuanta { quanta: 0 }, @@ -537,7 +583,10 @@ fn commit_and_broadcast_deletion_event(tx: MutTxId, module_info: &ModuleInfo) { timer: None, }; - if let Err(e) = module_info.subscriptions.commit_and_broadcast_event(None, event, tx) { + if let Err(e) = module_info + .subscriptions + .commit_and_broadcast_event(None, event, view_result.tx) + { log::error!("Failed to broadcast deletion event: {e:#}"); } } diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 654c081c4c6..6932777bb15 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -947,7 +947,7 @@ impl InstanceCommon { // Only re-evaluate and update views if the reducer's execution was successful let (out, trapped) = if !trapped && matches!(status, EventStatus::Committed(_)) { - self.call_views_with_tx(tx, caller_identity, &info.module_def, inst, timestamp) + self.call_views_with_tx(tx, caller_identity, inst, timestamp) } else { (ViewCallResult::default(tx), trapped) }; @@ -1315,32 +1315,14 @@ impl InstanceCommon { &mut self, tx: MutTxId, caller: Identity, - module_def: &ModuleDef, inst: &mut I, timestamp: Timestamp, ) -> (ViewCallResult, bool) { - let view_calls = tx - .views_for_refresh() - .map(|info| { - let view_def = module_def - .get_view_by_id(info.fn_ptr, info.sender.is_none()) - .unwrap_or_else(|| panic!("view with fn_ptr `{}` not found", info.fn_ptr)); - - CallViewParams { - view_name: view_def.name.clone(), - view_id: info.view_id, - table_id: info.table_id, - fn_ptr: view_def.fn_ptr, - caller, - sender: info.sender, - args: ArgsTuple::nullary(), - row_type: view_def.product_type_ref, - timestamp, - } - }) - .collect::>(); - - self.execute_view_calls(tx, view_calls, inst) + let mut instance = RefInstance { + common: self, + instance: inst, + }; + ModuleHost::call_views_with_tx_at(tx, &mut instance, caller, timestamp) } /// Executes view calls and accumulate results. diff --git a/crates/core/src/sql/execute.rs b/crates/core/src/sql/execute.rs index f42b0c10a83..abb3692f6de 100644 --- a/crates/core/src/sql/execute.rs +++ b/crates/core/src/sql/execute.rs @@ -299,7 +299,7 @@ fn run_inner( // Update views let (result, trapped) = match instance { - Some(instance) => ModuleHost::call_views_with_tx(tx, instance, auth.caller())?, + Some(instance) => ModuleHost::call_views_with_tx(tx, instance, auth.caller()), None => (ViewCallResult::default(tx), false), }; diff --git a/crates/smoketests/modules/schedule-procedure/src/lib.rs b/crates/smoketests/modules/schedule-procedure/src/lib.rs index 7c49de6e215..0b8ae6f5a97 100644 --- a/crates/smoketests/modules/schedule-procedure/src/lib.rs +++ b/crates/smoketests/modules/schedule-procedure/src/lib.rs @@ -1,4 +1,4 @@ -use spacetimedb::{duration, log, ProcedureContext, ReducerContext, Table, Timestamp}; +use spacetimedb::{duration, log, ProcedureContext, Query, ReducerContext, Table, Timestamp, ViewContext}; #[spacetimedb::table(accessor = scheduled_table, public, scheduled(my_procedure, at = sched_at))] pub struct ScheduledTable { @@ -9,6 +9,11 @@ pub struct ScheduledTable { prev: Timestamp, } +#[spacetimedb::view(accessor = scheduled_view, public)] +fn scheduled_view(ctx: &ViewContext) -> impl Query { + ctx.from.scheduled_table().build() +} + #[spacetimedb::reducer] fn schedule_procedure(ctx: &ReducerContext) { ctx.db.scheduled_table().insert(ScheduledTable { diff --git a/crates/smoketests/modules/schedule-subscribe/src/lib.rs b/crates/smoketests/modules/schedule-subscribe/src/lib.rs index 990c83467aa..6a7f63a5a74 100644 --- a/crates/smoketests/modules/schedule-subscribe/src/lib.rs +++ b/crates/smoketests/modules/schedule-subscribe/src/lib.rs @@ -1,4 +1,4 @@ -use spacetimedb::{duration, log, ReducerContext, Table, Timestamp}; +use spacetimedb::{duration, log, Identity, Query, ReducerContext, Table, Timestamp, ViewContext}; #[spacetimedb::table(accessor = scheduled_table, public, scheduled(my_reducer, at = sched_at))] pub struct ScheduledTable { @@ -9,6 +9,47 @@ pub struct ScheduledTable { prev: Timestamp, } +#[spacetimedb::table(accessor = failing_scheduled_table, public, scheduled(failing_reducer, at = sched_at))] +pub struct FailingScheduledTable { + #[primary_key] + #[auto_inc] + scheduled_id: u64, + sched_at: spacetimedb::ScheduleAt, + prev: Timestamp, +} + +#[spacetimedb::table(accessor = player_entity, public)] +pub struct PlayerEntity { + #[primary_key] + entity_id: u64, + owner: Identity, +} + +#[spacetimedb::view(accessor = scheduled_view, public)] +fn scheduled_view(ctx: &ViewContext) -> impl Query { + ctx.from.scheduled_table().build() +} + +#[spacetimedb::view(accessor = scheduled_sender_view, public)] +fn scheduled_sender_view(ctx: &ViewContext) -> impl Query { + ctx.from + .player_entity() + .r#where(|pe| pe.owner.eq(ctx.sender())) + .right_semijoin(ctx.from.scheduled_table(), |pe, st| pe.entity_id.eq(st.scheduled_id)) + .build() +} + +#[spacetimedb::view(accessor = failing_scheduled_sender_view, public)] +fn failing_scheduled_sender_view(ctx: &ViewContext) -> impl Query { + ctx.from + .player_entity() + .r#where(|pe| pe.owner.eq(ctx.sender())) + .right_semijoin(ctx.from.failing_scheduled_table(), |pe, st| { + pe.entity_id.eq(st.scheduled_id) + }) + .build() +} + #[spacetimedb::reducer] fn schedule_reducer(ctx: &ReducerContext) { ctx.db.scheduled_table().insert(ScheduledTable { @@ -18,6 +59,15 @@ fn schedule_reducer(ctx: &ReducerContext) { }); } +#[spacetimedb::reducer] +fn schedule_failing_reducer(ctx: &ReducerContext) { + ctx.db.failing_scheduled_table().insert(FailingScheduledTable { + prev: Timestamp::from_micros_since_unix_epoch(0), + scheduled_id: 3, + sched_at: Timestamp::from_micros_since_unix_epoch(0).into(), + }); +} + #[spacetimedb::reducer] fn schedule_repeated_reducer(ctx: &ReducerContext) { ctx.db.scheduled_table().insert(ScheduledTable { @@ -27,6 +77,15 @@ fn schedule_repeated_reducer(ctx: &ReducerContext) { }); } +#[spacetimedb::reducer] +fn seed_player_entity(ctx: &ReducerContext, entity_id: u64) { + ctx.db.player_entity().entity_id().delete(&entity_id); + ctx.db.player_entity().insert(PlayerEntity { + entity_id, + owner: ctx.sender(), + }); +} + #[spacetimedb::reducer] pub fn my_reducer(ctx: &ReducerContext, arg: ScheduledTable) { log::info!( @@ -35,3 +94,8 @@ pub fn my_reducer(ctx: &ReducerContext, arg: ScheduledTable) { ctx.timestamp.duration_since(arg.prev) ); } + +#[spacetimedb::reducer] +pub fn failing_reducer(_ctx: &ReducerContext, _arg: FailingScheduledTable) -> Result<(), String> { + Err("scheduled reducer failed".into()) +} diff --git a/crates/smoketests/tests/smoketests/schedule_reducer.rs b/crates/smoketests/tests/smoketests/schedule_reducer.rs index 27fce9baaf3..29172047ae6 100644 --- a/crates/smoketests/tests/smoketests/schedule_reducer.rs +++ b/crates/smoketests/tests/smoketests/schedule_reducer.rs @@ -1,8 +1,62 @@ use serde_json::json; +use serde_json::Value; use spacetimedb_smoketests::Smoketest; use std::thread; use std::time::Duration; +fn time_row_entry(scheduled_id: u64) -> Value { + json!({ + "prev": {"__timestamp_micros_since_unix_epoch__": 0}, + "scheduled_id": scheduled_id, + "sched_at": {"Time": {"__timestamp_micros_since_unix_epoch__": 0}}, + }) +} + +fn interval_row_entry(scheduled_id: u64, duration_micros: u64) -> Value { + json!({ + "prev": {"__timestamp_micros_since_unix_epoch__": 0}, + "scheduled_id": scheduled_id, + "sched_at": {"Interval": {"__time_duration_micros__": duration_micros}}, + }) +} + +fn collect_updates_after_call(test: &Smoketest, queries: &[&str], reducer_or_procedure: &str) -> Vec { + let sub = test.subscribe_background(queries, 2).unwrap(); + test.call(reducer_or_procedure, &[]).unwrap(); + sub.collect().unwrap() +} + +fn assert_table_and_view_insert_delete_updates( + updates: Vec, + table_name: &str, + view_name: &str, + row_entry: Value, +) { + assert_eq!( + serde_json::json!(updates), + serde_json::json!([ + { + table_name: {"deletes": [], "inserts": [row_entry.clone()]}, + view_name: {"deletes": [], "inserts": [row_entry.clone()]}, + }, + { + table_name: {"deletes": [row_entry.clone()], "inserts": []}, + view_name: {"deletes": [row_entry], "inserts": []}, + }, + ]) + ); +} + +fn assert_table_insert_only_updates(updates: Vec, table_name: &str, first_row: Value, second_row: Value) { + assert_eq!( + serde_json::json!(updates), + serde_json::json!([ + {table_name: {"deletes": [], "inserts": [first_row]}}, + {table_name: {"deletes": [], "inserts": [second_row]}}, + ]) + ); +} + /// Ensure cancelling a reducer works #[test] fn test_cancel_reducer() { @@ -20,47 +74,21 @@ fn test_cancel_reducer() { ); } -/// Test deploying a module with a scheduled reducer and check if client receives -/// subscription update for scheduled table entry and deletion of reducer once it ran +/// Test deploying a module with a scheduled reducer and check that the automatic cleanup +/// transaction updates both the scheduled table and its dependent view together. #[test] -fn test_scheduled_table_subscription() { +fn test_scheduled_table_and_view_subscription() { let test = Smoketest::builder().precompiled_module("schedule-subscribe").build(); - // Subscribe to empty scheduled_table. - let sub = test - .subscribe_background(&["SELECT * FROM scheduled_table"], 2) - .unwrap(); - - // Call a reducer to schedule a reducer (runs immediately since timestamp is 0) - test.call("schedule_reducer", &[]).unwrap(); - - // Wait for the scheduled reducer to run - thread::sleep(Duration::from_secs(2)); - - let logs = test.logs(100).unwrap(); - let invoked_count = logs.iter().filter(|line| line.contains("Invoked:")).count(); - assert_eq!( - invoked_count, 1, - "Expected scheduled reducer to run exactly once, but it ran {} times. Logs: {:?}", - invoked_count, logs + let updates = collect_updates_after_call( + &test, + &["SELECT * FROM scheduled_table", "SELECT * FROM scheduled_view"], + "schedule_reducer", ); - let updates = sub.collect().unwrap(); - - let row_entry = json!({ - "prev": {"__timestamp_micros_since_unix_epoch__": 0}, - "scheduled_id": 2, - "sched_at": {"Time": {"__timestamp_micros_since_unix_epoch__": 0}}, - }); - - // subscription should have 2 updates, first for row insert in scheduled table and second for row deletion. - assert_eq!( - serde_json::json!(updates), - serde_json::json!([ - {"scheduled_table": {"deletes": [], "inserts": [row_entry.clone()]}}, - {"scheduled_table": {"deletes": [row_entry], "inserts": []}}, - ]) - ); + // The insert and delete should update the scheduled table and its view in the same + // subscription transactions. + assert_table_and_view_insert_delete_updates(updates, "scheduled_table", "scheduled_view", time_row_entry(2)); } /// Test that repeated reducers run multiple times @@ -93,64 +121,74 @@ fn test_scheduled_table_subscription_repeated_reducer() { let updates = sub.collect().unwrap(); - let repeated_row_entry = json!({ - "prev": {"__timestamp_micros_since_unix_epoch__": 0}, - "scheduled_id": 1, - "sched_at": {"Interval": {"__time_duration_micros__": 100000}}, - }); - - let row_entry = json!({ - "prev": {"__timestamp_micros_since_unix_epoch__": 0}, - "scheduled_id": 2, - "sched_at": {"Time": {"__timestamp_micros_since_unix_epoch__": 0}}, - }); - // subscription should have 2 updates and should not have any deletes - assert_eq!( - serde_json::json!(updates), - serde_json::json!([ - {"scheduled_table": {"deletes": [], "inserts": [repeated_row_entry]}}, - {"scheduled_table": {"deletes": [], "inserts": [row_entry]}}, - ]) + assert_table_insert_only_updates( + updates, + "scheduled_table", + interval_row_entry(1, 100_000), + time_row_entry(2), ); } -/// Scheduled *procedure* subscription: expect insert + delete. +/// Scheduled *procedure* subscription: expect insert + delete for both table and view. #[test] -fn test_scheduled_procedure_table_subscription() { +fn test_scheduled_procedure_table_and_view_subscription() { let test = Smoketest::builder().precompiled_module("schedule-procedure").build(); - // Subscribe to empty table. - let sub = test - .subscribe_background(&["SELECT * FROM scheduled_table"], 2) - .unwrap(); + let updates = collect_updates_after_call( + &test, + &["SELECT * FROM scheduled_table", "SELECT * FROM scheduled_view"], + "schedule_procedure", + ); - test.call("schedule_procedure", &[]).unwrap(); + assert_table_and_view_insert_delete_updates(updates, "scheduled_table", "scheduled_view", time_row_entry(2)); +} - thread::sleep(Duration::from_secs(2)); +/// Test that scheduled reducers refresh views for scheduled tables. +#[test] +fn test_view_refresh_for_scheduled_reducer() { + let test = Smoketest::builder().precompiled_module("schedule-subscribe").build(); - let logs = test.logs(100).unwrap(); - let invoked_count = logs.iter().filter(|line| line.contains("Invoked:")).count(); - assert_eq!( - invoked_count, 1, - "Expected scheduled procedure to run exactly once, but it ran {} times. Logs: {:?}", - invoked_count, logs + test.call("seed_player_entity", &["2"]).unwrap(); + + let updates = collect_updates_after_call( + &test, + &["SELECT * FROM scheduled_table", "SELECT * FROM scheduled_sender_view"], + "schedule_reducer", ); - let updates = sub.collect().unwrap(); + assert_table_and_view_insert_delete_updates(updates, "scheduled_table", "scheduled_sender_view", time_row_entry(2)); +} - let row_entry = json!({ - "prev": {"__timestamp_micros_since_unix_epoch__": 0}, - "scheduled_id": 2, - "sched_at": {"Time": {"__timestamp_micros_since_unix_epoch__": 0}}, - }); +/// Test that cleanup still refreshes a view when the scheduled reducer fails. +#[test] +fn test_view_refresh_on_failed_scheduled_reducer() { + let test = Smoketest::builder().precompiled_module("schedule-subscribe").build(); - assert_eq!( - serde_json::json!(updates), - serde_json::json!([ - {"scheduled_table": {"deletes": [], "inserts": [row_entry.clone()]}}, - {"scheduled_table": {"deletes": [row_entry], "inserts": []}}, - ]) + test.call("seed_player_entity", &["3"]).unwrap(); + + let updates = collect_updates_after_call( + &test, + &[ + "SELECT * FROM failing_scheduled_table", + "SELECT * FROM failing_scheduled_sender_view", + ], + "schedule_failing_reducer", + ); + + assert_table_and_view_insert_delete_updates( + updates, + "failing_scheduled_table", + "failing_scheduled_sender_view", + time_row_entry(3), + ); + + let logs = test.logs(100).unwrap(); + let logs_str = logs.join("\n"); + assert!( + logs_str.contains("scheduled reducer failed"), + "Expected scheduled reducer failure to be logged, got: {:?}", + logs ); } @@ -181,24 +219,11 @@ fn test_scheduled_procedure_table_subscription_repeated_procedure() { let updates = sub.collect().unwrap(); - let repeated_row_entry = json!({ - "prev": {"__timestamp_micros_since_unix_epoch__": 0}, - "scheduled_id": 1, - "sched_at": {"Interval": {"__time_duration_micros__": 100000}}, - }); - - let row_entry = json!({ - "prev": {"__timestamp_micros_since_unix_epoch__": 0}, - "scheduled_id": 2, - "sched_at": {"Time": {"__timestamp_micros_since_unix_epoch__": 0}}, - }); - - assert_eq!( - serde_json::json!(updates), - serde_json::json!([ - {"scheduled_table": {"deletes": [], "inserts": [repeated_row_entry]}}, - {"scheduled_table": {"deletes": [], "inserts": [row_entry]}}, - ]) + assert_table_insert_only_updates( + updates, + "scheduled_table", + interval_row_entry(1, 100_000), + time_row_entry(2), ); }