From bfca89417b90db5a881f1918682d54788a6dcd17 Mon Sep 17 00:00:00 2001 From: Joel Reymont <18791+joelreymont@users.noreply.github.com> Date: Fri, 7 Nov 2025 17:32:42 +0200 Subject: [PATCH 1/2] RRD ingestion with per-EntityDb workers (#4298) - Each EntityDb now has its own dedicated ingestion worker with separate channels for commands and message batches - Add benchmarks for ingestion performance testing --- .gitignore | 3 + Cargo.lock | 6 + crates/store/re_data_loader/Cargo.toml | 6 + .../benches/parallel_ingestion_bench.rs | 102 +++ crates/store/re_data_loader/src/loader_rrd.rs | 370 ++++++++-- crates/store/re_entity_db/Cargo.toml | 3 + crates/store/re_entity_db/src/entity_db.rs | 118 ++- .../re_entity_db/src/ingestion_worker.rs | 669 ++++++++++++++++++ crates/store/re_entity_db/src/lib.rs | 5 + crates/store/re_entity_db/src/store_bundle.rs | 36 + crates/utils/re_video/src/decode/mod.rs | 7 +- crates/viewer/re_viewer/Cargo.toml | 1 + crates/viewer/re_viewer/src/app.rs | 174 ++++- .../viewer/re_viewer_context/src/store_hub.rs | 14 + 14 files changed, 1448 insertions(+), 66 deletions(-) create mode 100644 crates/store/re_data_loader/benches/parallel_ingestion_bench.rs create mode 100644 crates/store/re_entity_db/src/ingestion_worker.rs diff --git a/.gitignore b/.gitignore index 8d7bf89ece38..3a3ce951134e 100644 --- a/.gitignore +++ b/.gitignore @@ -92,3 +92,6 @@ justfile # Lychee link checker cache .lycheecache + +# Development notes +notes diff --git a/Cargo.lock b/Cargo.lock index 903affbd6e8b..ce10f33d7966 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8665,6 +8665,7 @@ dependencies = [ "ahash", "anyhow", "arrow", + "criterion", "crossbeam", "image", "indexmap 2.11.4", @@ -8672,6 +8673,7 @@ dependencies = [ "itertools 0.14.0", "mcap", "memmap2 0.9.8", + "mimalloc", "notify", "parking_lot", "parquet", @@ -8880,6 +8882,8 @@ version = "0.27.0-alpha.8+dev" dependencies = [ "ahash", "anyhow", + "arrow", + "crossbeam", "document-features", "emath", "indexmap 2.11.4", @@ -8900,6 +8904,7 @@ dependencies = [ "re_smart_channel", "re_sorbet", "re_tracing", + "re_types", "re_types_core", "re_uri", "saturating_cast", @@ -10264,6 +10269,7 @@ dependencies = [ "re_renderer", "re_selection_panel", "re_smart_channel", + "re_sorbet", "re_test_context", "re_test_viewport", "re_time_panel", diff --git a/crates/store/re_data_loader/Cargo.toml b/crates/store/re_data_loader/Cargo.toml index 9cf318df3c61..b0743734d957 100644 --- a/crates/store/re_data_loader/Cargo.toml +++ b/crates/store/re_data_loader/Cargo.toml @@ -63,3 +63,9 @@ re_chunk_store.workspace = true re_log_encoding = { workspace = true, features = ["decoder", "encoder"] } insta = { workspace = true, features = ["glob"] } +criterion.workspace = true +mimalloc.workspace = true + +[[bench]] +name = "parallel_ingestion_bench" +harness = false diff --git a/crates/store/re_data_loader/benches/parallel_ingestion_bench.rs b/crates/store/re_data_loader/benches/parallel_ingestion_bench.rs new file mode 100644 index 000000000000..2e6afb44b1bf --- /dev/null +++ b/crates/store/re_data_loader/benches/parallel_ingestion_bench.rs @@ -0,0 +1,102 @@ +// Allow unwrap() in benchmarks +#![expect(clippy::unwrap_used)] + +#[global_allocator] +static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; + +use criterion::{Criterion, criterion_group, criterion_main}; +use re_chunk::{Chunk, RowId}; +use re_log_encoding::Encoder; +use re_log_types::{ + LogMsg, NonMinI64, StoreId, StoreKind, TimeInt, TimePoint, Timeline, entity_path, +}; +use re_types::archetypes::Points2D; +use std::sync::mpsc; + +use re_data_loader::{DataLoader as _, DataLoaderSettings, RrdLoader}; + +#[cfg(not(debug_assertions))] +const NUM_MESSAGES: usize = 10_000; + +#[cfg(debug_assertions)] +const NUM_MESSAGES: usize = 100; + +criterion_group!(benches, benchmark_load_from_file_contents); +criterion_main!(benches); + +fn generate_messages(count: usize) -> Vec { + let store_id = StoreId::random(StoreKind::Recording, "bench_app"); + let mut messages = Vec::with_capacity(count); + + for i in 0..count { + let chunk = Chunk::builder(entity_path!("points", i.to_string())) + .with_archetype( + RowId::new(), + TimePoint::default().with( + Timeline::new_sequence("log_time"), + TimeInt::from_millis(NonMinI64::new(i64::try_from(i).unwrap()).unwrap()), + ), + &Points2D::new([(i as f32, i as f32), ((i + 1) as f32, (i + 1) as f32)]), + ) + .build() + .unwrap(); + + messages.push(LogMsg::ArrowMsg( + store_id.clone(), + chunk.to_arrow_msg().unwrap(), + )); + } + + messages +} + +fn encode_messages(messages: &[LogMsg]) -> Vec { + let mut bytes = Vec::new(); + { + let mut encoder = Encoder::new_eager( + re_build_info::CrateVersion::LOCAL, + re_log_encoding::rrd::EncodingOptions::PROTOBUF_UNCOMPRESSED, + &mut bytes, + ) + .unwrap(); + + for msg in messages { + encoder.append(msg).unwrap(); + } + encoder.flush_blocking().unwrap(); + encoder.finish().unwrap(); + } + bytes +} + +/// Benchmark loading from file (parallel processing) +fn benchmark_load_from_file_contents(c: &mut Criterion) { + let messages = generate_messages(NUM_MESSAGES); + let encoded = encode_messages(&messages); + let filepath = std::path::PathBuf::from("bench_data.rrd"); + let settings = DataLoaderSettings::recommended(re_log_types::RecordingId::random()); + + let mut group = c.benchmark_group("load_from_file_contents"); + group.throughput(criterion::Throughput::Elements(NUM_MESSAGES as u64)); + + group.bench_function("rrd_loader", |b| { + b.iter(|| { + let (tx, rx) = mpsc::channel(); + let contents = std::borrow::Cow::Borrowed(encoded.as_slice()); + let loader = RrdLoader; + + loader + .load_from_file_contents(&settings, filepath.clone(), contents, tx) + .unwrap(); + + let mut count = 0; + while rx.try_recv().is_ok() { + count += 1; + } + + assert_eq!(count, NUM_MESSAGES); + }); + }); + + group.finish(); +} diff --git a/crates/store/re_data_loader/src/loader_rrd.rs b/crates/store/re_data_loader/src/loader_rrd.rs index f44e11680a9e..ee924b9bc288 100644 --- a/crates/store/re_data_loader/src/loader_rrd.rs +++ b/crates/store/re_data_loader/src/loader_rrd.rs @@ -1,8 +1,8 @@ use re_log_encoding::Decoder; +use re_log_types::ApplicationId; #[cfg(not(target_arch = "wasm32"))] use crossbeam::channel::Receiver; -use re_log_types::ApplicationId; use crate::{DataLoader as _, LoadedData}; @@ -185,58 +185,67 @@ fn decode_and_stream( } }; - let msg = if forced_application_id.is_some() || forced_recording_id.is_some() { - match msg { - re_log_types::LogMsg::SetStoreInfo(set_store_info) => { - let mut store_id = set_store_info.info.store_id.clone(); - if let Some(forced_application_id) = forced_application_id { - store_id = store_id.with_application_id(forced_application_id.clone()); - } - if let Some(forced_recording_id) = forced_recording_id { - store_id = store_id.with_recording_id(forced_recording_id.clone()); - } + // Transform message if needed (for .rbl files with forced IDs) + let msg = transform_message(msg, forced_application_id, forced_recording_id); - re_log_types::LogMsg::SetStoreInfo(re_log_types::SetStoreInfo { - info: re_log_types::StoreInfo { - store_id, - ..set_store_info.info - }, - ..set_store_info - }) - } + // Send message to the viewer + let data = LoadedData::LogMsg(RrdLoader::name(&RrdLoader), msg); + if tx.send(data).is_err() { + return; // The other end has decided to hang up + } + } +} - re_log_types::LogMsg::ArrowMsg(mut store_id, arrow_msg) => { - if let Some(forced_application_id) = forced_application_id { - store_id = store_id.with_application_id(forced_application_id.clone()); - } - if let Some(forced_recording_id) = forced_recording_id { - store_id = store_id.with_recording_id(forced_recording_id.clone()); - } +fn transform_message( + msg: re_log_types::LogMsg, + forced_application_id: Option<&ApplicationId>, + forced_recording_id: Option<&String>, +) -> re_log_types::LogMsg { + if forced_application_id.is_none() && forced_recording_id.is_none() { + return msg; + } - re_log_types::LogMsg::ArrowMsg(store_id, arrow_msg) - } + match msg { + re_log_types::LogMsg::SetStoreInfo(set_store_info) => { + let mut store_id = set_store_info.info.store_id.clone(); + if let Some(forced_application_id) = forced_application_id { + store_id = store_id.with_application_id(forced_application_id.clone()); + } + if let Some(forced_recording_id) = forced_recording_id { + store_id = store_id.with_recording_id(forced_recording_id.clone()); + } - re_log_types::LogMsg::BlueprintActivationCommand(blueprint_activation_command) => { - let mut blueprint_id = blueprint_activation_command.blueprint_id.clone(); - if let Some(forced_application_id) = forced_application_id { - blueprint_id = - blueprint_id.with_application_id(forced_application_id.clone()); - } - re_log_types::LogMsg::BlueprintActivationCommand( - re_log_types::BlueprintActivationCommand { - blueprint_id, - ..blueprint_activation_command - }, - ) - } + re_log_types::LogMsg::SetStoreInfo(re_log_types::SetStoreInfo { + info: re_log_types::StoreInfo { + store_id, + ..set_store_info.info + }, + ..set_store_info + }) + } + + re_log_types::LogMsg::ArrowMsg(mut store_id, arrow_msg) => { + if let Some(forced_application_id) = forced_application_id { + store_id = store_id.with_application_id(forced_application_id.clone()); + } + if let Some(forced_recording_id) = forced_recording_id { + store_id = store_id.with_recording_id(forced_recording_id.clone()); } - } else { - msg - }; - let data = LoadedData::LogMsg(RrdLoader::name(&RrdLoader), msg); - if tx.send(data).is_err() { - break; // The other end has decided to hang up, not our problem. + re_log_types::LogMsg::ArrowMsg(store_id, arrow_msg) + } + + re_log_types::LogMsg::BlueprintActivationCommand(blueprint_activation_command) => { + let mut blueprint_id = blueprint_activation_command.blueprint_id.clone(); + if let Some(forced_application_id) = forced_application_id { + blueprint_id = blueprint_id.with_application_id(forced_application_id.clone()); + } + re_log_types::LogMsg::BlueprintActivationCommand( + re_log_types::BlueprintActivationCommand { + blueprint_id, + ..blueprint_activation_command + }, + ) } } } @@ -453,4 +462,271 @@ mod tests { // Drop explicitly to make sure that rustc doesn't drop it earlier. drop(rrd_file_delete_guard); } + + #[test] + #[cfg(not(target_arch = "wasm32"))] + fn test_message_order_preserved() { + use re_chunk::{Chunk, RowId}; + use re_log_encoding::Encoder; + use re_log_types::{ + LogMsg, NonMinI64, StoreId, StoreKind, TimeInt, TimePoint, Timeline, entity_path, + }; + use re_types::archetypes::Points2D; + use std::sync::mpsc; + + // Create a temporary file for the test + let rrd_file_path = std::path::PathBuf::from("test_ordering.rrd"); + let rrd_file_delete_guard = DeleteOnDrop { + path: rrd_file_path.clone(), + }; + std::fs::remove_file(&rrd_file_path).ok(); + + let store_id = StoreId::random(StoreKind::Recording, "order_test"); + let mut encoder = Encoder::new_eager( + re_build_info::CrateVersion::LOCAL, + re_log_encoding::rrd::EncodingOptions::PROTOBUF_UNCOMPRESSED, + std::fs::File::create(&rrd_file_path).unwrap(), + ) + .unwrap(); + + // Create messages with sequential identifiers embedded in metadata + // Use RowId ordering and check that messages arrive in the same order + const NUM_MESSAGES: usize = 500; + let mut original_messages = Vec::with_capacity(NUM_MESSAGES); + + for i in 0..NUM_MESSAGES { + let chunk = Chunk::builder(entity_path!("test", "points", i.to_string())) + .with_archetype( + RowId::new(), + TimePoint::default().with( + Timeline::new_sequence("seq"), + TimeInt::from_millis(NonMinI64::new(i64::try_from(i).unwrap()).unwrap()), + ), + &Points2D::new([(i as f32, i as f32)]), + ) + .build() + .unwrap(); + + let mut arrow_msg = chunk.to_arrow_msg().unwrap(); + // Embed sequence number in metadata to verify ordering + arrow_msg + .batch + .schema_metadata_mut() + .insert("sequence".into(), i.to_string()); + + let msg = LogMsg::ArrowMsg(store_id.clone(), arrow_msg); + original_messages.push(msg.clone()); + encoder.append(&msg).unwrap(); + } + + encoder.flush_blocking().unwrap(); + encoder.finish().unwrap(); + drop(encoder); + + // Load the file and verify message ordering is preserved + let (tx, rx) = mpsc::channel(); + let loader = RrdLoader; + let settings = crate::DataLoaderSettings::recommended(re_log_types::RecordingId::random()); + let file_contents = std::fs::read(&rrd_file_path).unwrap(); + + loader + .load_from_file_contents( + &settings, + rrd_file_path.clone(), + std::borrow::Cow::Borrowed(&file_contents), + tx.clone(), + ) + .unwrap(); + + drop(tx); // Close sender to signal end of stream + + let mut received_messages = Vec::new(); + for loaded_data in rx { + if let crate::LoadedData::LogMsg(_, msg) = loaded_data { + received_messages.push(msg); + } + } + + // Verify we got all messages + assert_eq!( + received_messages.len(), + NUM_MESSAGES, + "Should receive all {NUM_MESSAGES} messages" + ); + + // Verify ordering by checking sequence numbers in metadata + for (i, msg) in received_messages.iter().enumerate() { + if let LogMsg::ArrowMsg(_, arrow_msg) = msg { + let schema = arrow_msg.batch.schema(); + let sequence_str = schema + .metadata() + .get("sequence") + .expect("Message should have sequence metadata"); + let sequence: usize = sequence_str + .parse() + .expect("Sequence should be a valid number"); + + assert_eq!( + sequence, i, + "Message at position {i} should have sequence {i}, but got {sequence}" + ); + } else { + panic!("Expected ArrowMsg, got {msg:?}"); + } + } + + assert_eq!( + original_messages.len(), + received_messages.len(), + "Message counts should match" + ); + + for (original, received) in original_messages.iter().zip(received_messages.iter()) { + assert_eq!( + original.store_id(), + received.store_id(), + "Store IDs should match" + ); + } + + drop(rrd_file_delete_guard); + } + + #[test] + #[cfg(not(target_arch = "wasm32"))] + fn test_message_order_preserved_with_transformation() { + use re_chunk::{Chunk, RowId}; + use re_log_encoding::Encoder; + use re_log_types::{ + ApplicationId, LogMsg, NonMinI64, RecordingId, StoreId, StoreKind, TimeInt, TimePoint, + Timeline, entity_path, + }; + use re_types::archetypes::Points2D; + use std::sync::mpsc; + + // Test that message ordering is preserved even when transform_message is applied + // Note: Only .rbl files use forced IDs, .rrd files don't transform store IDs + let rrd_file_path = std::path::PathBuf::from("test_ordering_transform.rbl"); + let rrd_file_delete_guard = DeleteOnDrop { + path: rrd_file_path.clone(), + }; + std::fs::remove_file(&rrd_file_path).ok(); + + // For .rbl files, the opened_store_id's application_id is used to transform messages + let opened_app_id = ApplicationId::random(); + let opened_recording_id = RecordingId::random(); + let opened_store_id = StoreId::new( + re_log_types::StoreKind::Blueprint, + opened_app_id.clone(), + opened_recording_id.clone(), + ); + + // Original messages will have a different store_id that gets transformed + let original_store_id = StoreId::random(StoreKind::Recording, "original"); + + let mut encoder = Encoder::new_eager( + re_build_info::CrateVersion::LOCAL, + re_log_encoding::rrd::EncodingOptions::PROTOBUF_UNCOMPRESSED, + std::fs::File::create(&rrd_file_path).unwrap(), + ) + .unwrap(); + + // Create messages that will be transformed + const NUM_MESSAGES: usize = 250; + let mut original_messages = Vec::with_capacity(NUM_MESSAGES); + + for i in 0..NUM_MESSAGES { + let chunk = Chunk::builder(entity_path!("test", "points", i.to_string())) + .with_archetype( + RowId::new(), + TimePoint::default().with( + Timeline::new_sequence("seq"), + TimeInt::from_millis(NonMinI64::new(i64::try_from(i).unwrap()).unwrap()), + ), + &Points2D::new([(i as f32, i as f32)]), + ) + .build() + .unwrap(); + + let mut arrow_msg = chunk.to_arrow_msg().unwrap(); + arrow_msg + .batch + .schema_metadata_mut() + .insert("sequence".into(), i.to_string()); + + let msg = LogMsg::ArrowMsg(original_store_id.clone(), arrow_msg); + original_messages.push(msg.clone()); + encoder.append(&msg).unwrap(); + } + + encoder.flush_blocking().unwrap(); + encoder.finish().unwrap(); + drop(encoder); + + // Load with opened_store_id to trigger transform_message for .rbl files + let (tx, rx) = mpsc::channel(); + let loader = RrdLoader; + let mut settings = crate::DataLoaderSettings::recommended(opened_recording_id.clone()); + settings.opened_store_id = Some(opened_store_id.clone()); + + let file_contents = std::fs::read(&rrd_file_path).unwrap(); + + loader + .load_from_file_contents( + &settings, + rrd_file_path.clone(), + std::borrow::Cow::Borrowed(&file_contents), + tx.clone(), + ) + .unwrap(); + + drop(tx); + + let mut received_messages = Vec::new(); + for loaded_data in rx { + if let crate::LoadedData::LogMsg(_, msg) = loaded_data { + received_messages.push(msg); + } + } + + // Verify we got all messages + assert_eq!( + received_messages.len(), + NUM_MESSAGES, + "Should receive all {NUM_MESSAGES} messages" + ); + + // Verify ordering is preserved even after transformation + for (i, msg) in received_messages.iter().enumerate() { + if let LogMsg::ArrowMsg(store_id, arrow_msg) = msg { + // Verify store ID was transformed correctly (only application_id is transformed for .rbl) + assert_eq!( + store_id.application_id(), + &opened_app_id, + "Store ID should have opened_store_id's application ID" + ); + // Recording ID is NOT transformed (forced_recording_id is always None) + // So it should match the original message's recording_id + + // Verify ordering + let schema = arrow_msg.batch.schema(); + let sequence_str = schema + .metadata() + .get("sequence") + .expect("Message should have sequence metadata"); + let sequence: usize = sequence_str + .parse() + .expect("Sequence should be a valid number"); + + assert_eq!( + sequence, i, + "Message at position {i} should have sequence {i}, but got {sequence} (ordering broken!)" + ); + } else { + panic!("Expected ArrowMsg, got {msg:?}"); + } + } + + drop(rrd_file_delete_guard); + } } diff --git a/crates/store/re_entity_db/Cargo.toml b/crates/store/re_entity_db/Cargo.toml index 494999a8416a..20e249eb844f 100644 --- a/crates/store/re_entity_db/Cargo.toml +++ b/crates/store/re_entity_db/Cargo.toml @@ -45,6 +45,8 @@ re_types_core.workspace = true re_uri.workspace = true ahash.workspace = true +arrow.workspace = true +crossbeam.workspace = true document-features.workspace = true emath.workspace = true indexmap.workspace = true @@ -58,6 +60,7 @@ web-time.workspace = true [dev-dependencies] re_log_encoding = { workspace = true, features = ["decoder", "encoder"] } +re_types.workspace = true anyhow.workspace = true similar-asserts.workspace = true diff --git a/crates/store/re_entity_db/src/entity_db.rs b/crates/store/re_entity_db/src/entity_db.rs index e916d638dd4e..f861a92f6ce4 100644 --- a/crates/store/re_entity_db/src/entity_db.rs +++ b/crates/store/re_entity_db/src/entity_db.rs @@ -61,7 +61,6 @@ impl EntityDbClass<'_> { /// An in-memory database built from a stream of [`LogMsg`]es. /// /// NOTE: all mutation is to be done via public functions! -#[derive(Clone)] // Useful for tests pub struct EntityDb { /// Store id associated with this [`EntityDb`]. Must be identical to the `storage_engine`'s /// store id. @@ -114,6 +113,15 @@ pub struct EntityDb { storage_engine: StorageEngine, stats: IngestionStatistics, + + /// Background worker for processing Arrow messages (native only). + /// + /// On native: Each `EntityDb` gets its own worker thread for parallel ingestion + /// On Wasm: This is a no-op (messages processed synchronously) + /// + /// Lazily initialized on first Arrow message. + #[cfg(not(target_arch = "wasm32"))] + ingestion_worker: Option, } impl Debug for EntityDb { @@ -126,6 +134,28 @@ impl Debug for EntityDb { } } +// Custom Clone implementation that skips ingestion_worker (can't clone thread handles) +// Useful for tests +impl Clone for EntityDb { + fn clone(&self) -> Self { + Self { + store_id: self.store_id.clone(), + data_source: None, // Clones of an EntityDb get a None source + set_store_info: self.set_store_info.clone(), + last_modified_at: self.last_modified_at, + latest_row_id: self.latest_row_id, + entity_path_from_hash: self.entity_path_from_hash.clone(), + times_per_timeline: self.times_per_timeline.clone(), + time_histogram_per_timeline: self.time_histogram_per_timeline.clone(), + tree: self.tree.clone(), + storage_engine: self.storage_engine.clone(), + stats: self.stats.clone(), + #[cfg(not(target_arch = "wasm32"))] + ingestion_worker: None, // Don't clone workers (thread handles can't be cloned) + } + } +} + impl EntityDb { pub fn new(store_id: StoreId) -> Self { Self::with_store_config(store_id, ChunkStoreConfig::from_env().unwrap_or_default()) @@ -151,6 +181,8 @@ impl EntityDb { time_histogram_per_timeline: Default::default(), storage_engine, stats: IngestionStatistics::default(), + #[cfg(not(target_arch = "wasm32"))] + ingestion_worker: None, } } @@ -554,16 +586,16 @@ impl EntityDb { } LogMsg::ArrowMsg(_, arrow_msg) => { - self.last_modified_at = web_time::Instant::now(); - + // Convert Arrow message to chunk let chunk_batch = re_sorbet::ChunkBatch::try_from(&arrow_msg.batch) .map_err(re_chunk::ChunkError::from)?; let mut chunk = re_chunk::Chunk::from_chunk_batch(&chunk_batch)?; chunk.sort_if_unsorted(); - self.add_chunk_with_timestamp_metadata( - &Arc::new(chunk), - &chunk_batch.sorbet_schema().timestamps, - )? + + let chunk = Arc::new(chunk); + let timestamps = chunk_batch.sorbet_schema().timestamps.clone(); + + self.add_chunk_with_timestamp_metadata(&chunk, ×tamps)? } LogMsg::BlueprintActivationCommand(_) => { @@ -579,11 +611,13 @@ impl EntityDb { self.add_chunk_with_timestamp_metadata(chunk, &Default::default()) } - fn add_chunk_with_timestamp_metadata( + pub fn add_chunk_with_timestamp_metadata( &mut self, chunk: &Arc, timestamps: &re_sorbet::TimestampMetadata, ) -> Result, Error> { + self.last_modified_at = web_time::Instant::now(); + let mut engine = self.storage_engine.write(); let store_events = engine.store().insert_chunk(chunk)?; engine.cache().on_events(&store_events); @@ -629,6 +663,74 @@ impl EntityDb { self.set_store_info = Some(store_info); } + /// Submit an Arrow message to the background ingestion worker (native only). + /// + /// On native: Lazily creates worker on first call, then queues message for processing + /// On Wasm: This method doesn't exist (compile error if called) + /// + /// The caller should periodically call [`Self::poll_worker_output`] to retrieve + /// processed chunks and add them to the store. + #[cfg(not(target_arch = "wasm32"))] + pub fn submit_arrow_msg( + &mut self, + arrow_msg: re_log_types::ArrowMsg, + channel_source: Arc, + msg_will_add_new_store: bool, + ) { + re_tracing::profile_function!(); + + // Lazy-init worker on first use + let worker = self.ingestion_worker.get_or_insert_with(|| { + re_log::debug!("Creating ingestion worker for store {:?}", self.store_id); + crate::ingestion_worker::IngestionWorker::new() + }); + + worker.submit_arrow_msg_blocking( + self.store_id.clone(), + arrow_msg, + channel_source, + msg_will_add_new_store, + ); + } + + /// Process pending work from the background ingestion worker (native only). + /// + /// This should be called once per frame to process chunks that have been + /// converted by the background worker. The processed chunks are automatically + /// added to this `EntityDb`'s store. + /// + /// Returns a vector of (`ProcessedChunk`, `was_empty_before`, `StoreEvents`) tuples + /// for each chunk that was successfully added. + /// + /// On native: Polls and processes chunks from the background worker + /// On Wasm: This method doesn't exist (compile error if called) + #[cfg(not(target_arch = "wasm32"))] + pub fn on_frame_start( + &mut self, + ) -> Vec<( + crate::ingestion_worker::ProcessedChunk, + bool, + Result, Error>, + )> { + re_tracing::profile_function!(); + + let Some(worker) = &self.ingestion_worker else { + return Vec::new(); + }; + + let processed_chunks = worker.poll_processed_chunks(); + let mut results = Vec::with_capacity(processed_chunks.len()); + + for processed in processed_chunks { + let was_empty = self.is_empty(); + let result = + self.add_chunk_with_timestamp_metadata(&processed.chunk, &processed.timestamps); + results.push((processed, was_empty, result)); + } + + results + } + /// Free up some RAM by forgetting the older parts of all timelines. pub fn purge_fraction_of_ram(&mut self, fraction_to_purge: f32) -> Vec { re_tracing::profile_function!(); diff --git a/crates/store/re_entity_db/src/ingestion_worker.rs b/crates/store/re_entity_db/src/ingestion_worker.rs new file mode 100644 index 000000000000..e5eea7fa51ba --- /dev/null +++ b/crates/store/re_entity_db/src/ingestion_worker.rs @@ -0,0 +1,669 @@ +/// Background worker for ingesting Arrow messages. +/// +/// # Platform Support +/// +/// - **Native**: Uses a dedicated background thread with bounded channels for backpressure +/// - **Wasm**: Processes synchronously (no threads available) +/// +/// # Architecture +/// +/// Each `EntityDb` owns its own `IngestionWorker` instance, ensuring: +/// - Message ordering per store is preserved +/// - Different stores don't block each other +/// - Worker lifecycle tied to `EntityDb` lifecycle +use std::sync::Arc; + +use re_log_types::{ArrowMsg, StoreId}; +use re_smart_channel::SmartChannelSource; + +/// Maximum number of pending work items before backpressure kicks in (native only). +const WORK_QUEUE_CAPACITY: usize = 2000; + +/// Result of processing a work item. +#[derive(Debug)] +pub struct ProcessedChunk { + pub store_id: StoreId, + pub chunk: Arc, + pub timestamps: re_sorbet::TimestampMetadata, + pub channel_source: Arc, + pub msg_will_add_new_store: bool, +} + +// ============================================================================ +// NATIVE IMPLEMENTATION (background thread with channels) +// ============================================================================ + +#[cfg(not(target_arch = "wasm32"))] +mod native_impl { + use std::sync::Arc; + + use re_log_types::{ArrowMsg, StoreId}; + use re_smart_channel::SmartChannelSource; + + use super::{ProcessedChunk, WORK_QUEUE_CAPACITY}; + + /// Work item to be processed by the ingestion worker. + struct WorkItem { + store_id: StoreId, + arrow_msg: ArrowMsg, + channel_source: Arc, + msg_will_add_new_store: bool, + } + + /// Background worker for processing Arrow messages into chunks. + /// + /// Runs on a dedicated thread and provides backpressure via bounded channels. + pub struct IngestionWorkerImpl { + input_tx: crossbeam::channel::Sender, + output_rx: crossbeam::channel::Receiver, + #[expect(dead_code)] // Kept alive for thread lifecycle + worker_thread: Option>, + } + + impl IngestionWorkerImpl { + /// Create a new ingestion worker with a dedicated background thread. + pub fn new() -> Self { + let (input_tx, input_rx) = crossbeam::channel::bounded::(WORK_QUEUE_CAPACITY); + let (output_tx, output_rx) = crossbeam::channel::unbounded::(); + + let worker_thread = std::thread::Builder::new() + .name("ingestion_worker".to_owned()) + .spawn(move || { + Self::worker_loop(input_rx, output_tx); + }) + .expect("Failed to spawn ingestion worker thread"); + + Self { + input_tx, + output_rx, + worker_thread: Some(worker_thread), + } + } + + /// Submit an arrow message for processing, blocking if necessary. + pub fn submit_arrow_msg_blocking( + &self, + store_id: StoreId, + arrow_msg: ArrowMsg, + channel_source: Arc, + msg_will_add_new_store: bool, + ) { + let work_item = WorkItem { + store_id, + arrow_msg, + channel_source, + msg_will_add_new_store, + }; + + // Block until we can send + if let Err(err) = self.input_tx.send(work_item) { + re_log::warn!("Failed to send to ingestion worker: {err}"); + } + } + + /// Poll for processed chunks. Returns all available chunks without blocking. + pub fn poll_processed_chunks(&self) -> Vec { + let mut chunks = Vec::new(); + + // Drain all available processed chunks without blocking + while let Ok(chunk) = self.output_rx.try_recv() { + chunks.push(chunk); + } + + chunks + } + + /// Main worker loop that processes arrow messages. + #[expect(clippy::needless_pass_by_value)] // Channels are moved into thread + fn worker_loop( + input_rx: crossbeam::channel::Receiver, + output_tx: crossbeam::channel::Sender, + ) { + re_log::debug!("Ingestion worker thread started"); + + while let Ok(work_item) = input_rx.recv() { + re_tracing::profile_scope!("process_arrow_msg"); + + let WorkItem { + store_id, + arrow_msg, + channel_source, + msg_will_add_new_store, + } = work_item; + + // Do the work of converting Arrow data to chunks + let result = Self::process_arrow_msg(&arrow_msg); + + match result { + Ok((chunk, timestamps)) => { + let processed = ProcessedChunk { + store_id, + chunk: Arc::new(chunk), + timestamps, + channel_source, + msg_will_add_new_store, + }; + + if output_tx.send(processed).is_err() { + // Main thread has disconnected, time to exit + break; + } + } + Err(err) => { + re_log::warn_once!("Failed to process arrow message: {err}"); + } + } + } + + re_log::debug!("Ingestion worker thread exiting"); + } + + /// Process an arrow message into a chunk. + /// + /// This is the work that we want to do off the main thread. + fn process_arrow_msg( + arrow_msg: &ArrowMsg, + ) -> crate::Result<(re_chunk::Chunk, re_sorbet::TimestampMetadata)> { + re_tracing::profile_function!(); + + let chunk_batch = re_sorbet::ChunkBatch::try_from(&arrow_msg.batch) + .map_err(re_chunk::ChunkError::from)?; + let mut chunk = re_chunk::Chunk::from_chunk_batch(&chunk_batch)?; + chunk.sort_if_unsorted(); + + Ok((chunk, chunk_batch.sorbet_schema().timestamps.clone())) + } + } + + impl Drop for IngestionWorkerImpl { + fn drop(&mut self) { + // Dropping input_tx will cause the worker thread to exit gracefully + // when it finishes processing remaining items + re_log::debug!("Dropping ingestion worker"); + } + } +} + +// ============================================================================ +// Wasm IMPLEMENTATION (synchronous processing, no threads) +// ============================================================================ + +#[cfg(target_arch = "wasm32")] +mod wasm_impl { + use std::sync::Arc; + + use re_log_types::{ArrowMsg, StoreId}; + use re_smart_channel::SmartChannelSource; + + use super::ProcessedChunk; + + /// Wasm implementation that processes synchronously. + /// + /// Since Wasm doesn't support threads, we process messages immediately + /// instead of queueing them to a background worker. + pub struct IngestionWorkerImpl { + // Wasm implementation has no state, but we keep the struct + // for API compatibility + _phantom: std::marker::PhantomData<()>, + } + + impl IngestionWorkerImpl { + /// Create a new synchronous ingestion worker (Wasm). + pub fn new() -> Self { + Self { + _phantom: std::marker::PhantomData, + } + } + + /// Submit an arrow message for immediate synchronous processing. + /// + /// Unlike the native version, this processes the message immediately + /// and adds it to an internal queue for later retrieval via poll_processed_chunks. + pub fn submit_arrow_msg_blocking( + &self, + _store_id: StoreId, + _arrow_msg: ArrowMsg, + _channel_source: Arc, + _msg_will_add_new_store: bool, + ) { + // On Wasm, we don't queue messages - they should be processed synchronously + // by the caller instead of using the worker. + re_log::warn_once!( + "IngestionWorker::submit_arrow_msg_blocking called on Wasm - this is unexpected" + ); + } + + /// Poll for processed chunks (always empty on Wasm). + pub fn poll_processed_chunks(&self) -> Vec { + // On Wasm, messages are processed synchronously, so polling always returns empty + Vec::new() + } + } +} + +// ============================================================================ +// PUBLIC API (platform-agnostic) +// ============================================================================ + +/// Platform-agnostic ingestion worker. +/// +/// On native: Uses background thread with channels +/// On Wasm: No-op (messages processed synchronously by caller) +pub struct IngestionWorker { + #[cfg(not(target_arch = "wasm32"))] + inner: native_impl::IngestionWorkerImpl, + + #[cfg(target_arch = "wasm32")] + inner: wasm_impl::IngestionWorkerImpl, +} + +impl IngestionWorker { + /// Create a new ingestion worker. + /// + /// - On native: Spawns a background thread + /// - On Wasm: Returns a no-op worker + pub fn new() -> Self { + #[cfg(not(target_arch = "wasm32"))] + { + Self { + inner: native_impl::IngestionWorkerImpl::new(), + } + } + + #[cfg(target_arch = "wasm32")] + { + Self { + inner: wasm_impl::IngestionWorkerImpl::new(), + } + } + } + + /// Submit an arrow message for processing. + /// + /// - On native: Queues to background thread (may block if queue is full) + /// - On Wasm: No-op (caller should process synchronously instead) + pub fn submit_arrow_msg_blocking( + &self, + store_id: StoreId, + arrow_msg: ArrowMsg, + channel_source: Arc, + msg_will_add_new_store: bool, + ) { + self.inner.submit_arrow_msg_blocking( + store_id, + arrow_msg, + channel_source, + msg_will_add_new_store, + ); + } + + /// Poll for processed chunks without blocking. + /// + /// - On native: Returns chunks processed by background thread + /// - On Wasm: Always returns empty vec (messages processed synchronously) + pub fn poll_processed_chunks(&self) -> Vec { + self.inner.poll_processed_chunks() + } +} + +impl Default for IngestionWorker { + fn default() -> Self { + Self::new() + } +} + +// ============================================================================ +// TESTS (native only, since Wasm worker is a no-op) +// ============================================================================ + +#[cfg(all(test, not(target_arch = "wasm32")))] +mod tests { + use std::sync::Arc; + + use re_chunk::{Chunk, RowId}; + use re_log_types::{ + ArrowMsg, NonMinI64, StoreId, StoreKind, TimeInt, TimePoint, Timeline, entity_path, + }; + use re_smart_channel::SmartChannelSource; + use re_types::archetypes::Points2D; + + use super::{IngestionWorker, WORK_QUEUE_CAPACITY}; + + /// Helper to create a test arrow message + fn create_test_arrow_msg(index: usize) -> ArrowMsg { + let index_i64 = i64::try_from(index).expect("test index should fit in i64"); + let chunk = Chunk::builder(entity_path!("test", "points", index.to_string())) + .with_archetype( + RowId::new(), + TimePoint::default().with( + Timeline::new_sequence("seq"), + TimeInt::from_millis( + NonMinI64::new(index_i64).expect("test index should not be i64::MIN"), + ), + ), + &Points2D::new([(index as f32, index as f32)]), + ) + .build() + .expect("test chunk should build successfully"); + + chunk + .to_arrow_msg() + .expect("test chunk should convert to arrow msg") + } + + #[test] + fn test_worker_lifecycle() { + // Test that worker starts and can be dropped gracefully + let worker = IngestionWorker::new(); + + // Worker should be ready to receive messages + assert_eq!(worker.poll_processed_chunks().len(), 0); + + // Drop worker - should exit gracefully + drop(worker); + } + + #[test] + fn test_basic_message_processing() { + let worker = IngestionWorker::new(); + let store_id = StoreId::random(StoreKind::Recording, "test"); + let channel_source = Arc::new(SmartChannelSource::RrdHttpStream { + follow: false, + url: "http://test".into(), + }); + + // Submit a single message + let arrow_msg = create_test_arrow_msg(0); + worker.submit_arrow_msg_blocking( + store_id.clone(), + arrow_msg, + channel_source.clone(), + false, + ); + + // Give worker time to process + std::thread::sleep(std::time::Duration::from_millis(100)); + + // Poll for results + let chunks = worker.poll_processed_chunks(); + assert_eq!(chunks.len(), 1); + assert_eq!(chunks[0].store_id, store_id); + } + + #[test] + fn test_multiple_messages_in_sequence() { + let worker = IngestionWorker::new(); + let store_id = StoreId::random(StoreKind::Recording, "test"); + let channel_source = Arc::new(SmartChannelSource::RrdHttpStream { + follow: false, + url: "http://test".into(), + }); + + const NUM_MESSAGES: usize = 100; + + // Submit multiple messages + for i in 0..NUM_MESSAGES { + let arrow_msg = create_test_arrow_msg(i); + worker.submit_arrow_msg_blocking( + store_id.clone(), + arrow_msg, + channel_source.clone(), + false, + ); + } + + // Give worker time to process all messages + std::thread::sleep(std::time::Duration::from_millis(500)); + + // Poll for all results + let chunks = worker.poll_processed_chunks(); + assert_eq!( + chunks.len(), + NUM_MESSAGES, + "Should process all {NUM_MESSAGES} messages" + ); + + // Verify all chunks are for the correct store + for chunk in &chunks { + assert_eq!(chunk.store_id, store_id); + } + } + + #[test] + fn test_backpressure_behavior() { + let worker = IngestionWorker::new(); + let store_id = StoreId::random(StoreKind::Recording, "test"); + let channel_source = Arc::new(SmartChannelSource::RrdHttpStream { + follow: false, + url: "http://test".into(), + }); + + // Submit messages beyond queue capacity to test backpressure + // This should block but not panic or lose messages + const NUM_MESSAGES: usize = WORK_QUEUE_CAPACITY + 100; + + let worker_clone = std::sync::Arc::new(worker); + let worker_ref = worker_clone.clone(); + let store_id_clone = store_id.clone(); + let channel_source_clone = channel_source.clone(); + + // Submit in a separate thread to avoid blocking test + let submit_handle = std::thread::Builder::new() + .name("test-backpressure-submitter".to_owned()) + .spawn(move || { + for i in 0..NUM_MESSAGES { + let arrow_msg = create_test_arrow_msg(i); + worker_ref.submit_arrow_msg_blocking( + store_id_clone.clone(), + arrow_msg, + channel_source_clone.clone(), + false, + ); + } + }) + .expect("failed to spawn test thread"); + + // Poll periodically to drain the queue + let mut total_chunks = 0; + let start = std::time::Instant::now(); + let timeout = std::time::Duration::from_secs(10); + + while total_chunks < NUM_MESSAGES && start.elapsed() < timeout { + let chunks = worker_clone.poll_processed_chunks(); + total_chunks += chunks.len(); + std::thread::sleep(std::time::Duration::from_millis(10)); + } + + // Wait for submission thread to complete + submit_handle.join().unwrap(); + + // Poll any remaining chunks + std::thread::sleep(std::time::Duration::from_millis(100)); + let remaining = worker_clone.poll_processed_chunks(); + total_chunks += remaining.len(); + + assert_eq!( + total_chunks, NUM_MESSAGES, + "Should process all messages despite backpressure" + ); + } + + #[test] + fn test_invalid_arrow_data_handling() { + let worker = IngestionWorker::new(); + let store_id = StoreId::random(StoreKind::Recording, "test"); + let channel_source = Arc::new(SmartChannelSource::RrdHttpStream { + follow: false, + url: "http://test".into(), + }); + + // Create an invalid arrow message (empty batch with incorrect schema) + let schema = arrow::datatypes::Schema::new_with_metadata( + vec![] as Vec, + Default::default(), + ); + let batch = arrow::array::RecordBatch::new_empty(Arc::new(schema)); + let invalid_msg = ArrowMsg { + chunk_id: *re_chunk::ChunkId::new(), + batch, + on_release: None, + }; + + // Submit invalid message - should not crash worker + worker.submit_arrow_msg_blocking( + store_id.clone(), + invalid_msg, + channel_source.clone(), + false, + ); + + // Submit a valid message after + let valid_msg = create_test_arrow_msg(0); + worker.submit_arrow_msg_blocking(store_id.clone(), valid_msg, channel_source, false); + + // Give worker time to process + std::thread::sleep(std::time::Duration::from_millis(200)); + + // Should get only the valid message + let chunks = worker.poll_processed_chunks(); + assert_eq!( + chunks.len(), + 1, + "Worker should skip invalid message and continue" + ); + } + + #[test] + fn test_concurrent_submission_and_polling() { + let worker = Arc::new(IngestionWorker::new()); + let store_id = StoreId::random(StoreKind::Recording, "test"); + let channel_source = Arc::new(SmartChannelSource::RrdHttpStream { + follow: false, + url: "http://test".into(), + }); + + const NUM_MESSAGES: usize = 500; + + // Spawn submitter thread + let worker_submit = worker.clone(); + let store_id_submit = store_id.clone(); + let channel_source_submit = channel_source.clone(); + + let submit_handle = std::thread::Builder::new() + .name("test-concurrent-submitter".to_owned()) + .spawn(move || { + for i in 0..NUM_MESSAGES { + let arrow_msg = create_test_arrow_msg(i); + worker_submit.submit_arrow_msg_blocking( + store_id_submit.clone(), + arrow_msg, + channel_source_submit.clone(), + false, + ); + // Small delay to simulate realistic submission pattern + if i % 50 == 0 { + std::thread::sleep(std::time::Duration::from_millis(1)); + } + } + }) + .expect("failed to spawn test thread"); + + // Poll concurrently from main thread + let mut total_chunks = 0; + let start = std::time::Instant::now(); + let timeout = std::time::Duration::from_secs(10); + + while total_chunks < NUM_MESSAGES && start.elapsed() < timeout { + let chunks = worker.poll_processed_chunks(); + total_chunks += chunks.len(); + std::thread::sleep(std::time::Duration::from_millis(5)); + } + + // Wait for submission to complete + submit_handle.join().unwrap(); + + // Final poll to get any remaining chunks + std::thread::sleep(std::time::Duration::from_millis(100)); + let remaining = worker.poll_processed_chunks(); + total_chunks += remaining.len(); + + assert_eq!( + total_chunks, NUM_MESSAGES, + "Should handle concurrent submission and polling" + ); + } + + #[test] + fn test_worker_thread_exits_on_drop() { + let worker = IngestionWorker::new(); + let store_id = StoreId::random(StoreKind::Recording, "test"); + let channel_source = Arc::new(SmartChannelSource::RrdHttpStream { + follow: false, + url: "http://test".into(), + }); + + // Submit some messages + for i in 0..10 { + let arrow_msg = create_test_arrow_msg(i); + worker.submit_arrow_msg_blocking( + store_id.clone(), + arrow_msg, + channel_source.clone(), + false, + ); + } + + // Drop worker - thread should exit gracefully + drop(worker); + + // If thread doesn't exit properly, this test will hang or leak threads + // Successful completion means thread exited + } + + #[test] + fn test_empty_poll_returns_empty_vec() { + let worker = IngestionWorker::new(); + + // Poll without submitting anything + let chunks = worker.poll_processed_chunks(); + assert_eq!(chunks.len(), 0); + + // Multiple polls should all return empty + let chunks2 = worker.poll_processed_chunks(); + assert_eq!(chunks2.len(), 0); + } + + #[test] + fn test_poll_drains_all_available_chunks() { + let worker = IngestionWorker::new(); + let store_id = StoreId::random(StoreKind::Recording, "test"); + let channel_source = Arc::new(SmartChannelSource::RrdHttpStream { + follow: false, + url: "http://test".into(), + }); + + const NUM_MESSAGES: usize = 50; + + // Submit multiple messages + for i in 0..NUM_MESSAGES { + let arrow_msg = create_test_arrow_msg(i); + worker.submit_arrow_msg_blocking( + store_id.clone(), + arrow_msg, + channel_source.clone(), + false, + ); + } + + // Wait for all to be processed + std::thread::sleep(std::time::Duration::from_millis(300)); + + // Single poll should drain all available chunks + let chunks = worker.poll_processed_chunks(); + assert_eq!(chunks.len(), NUM_MESSAGES); + + // Next poll should be empty + let chunks2 = worker.poll_processed_chunks(); + assert_eq!(chunks2.len(), 0); + } +} diff --git a/crates/store/re_entity_db/src/lib.rs b/crates/store/re_entity_db/src/lib.rs index 557fa21cff2a..3eaf93290323 100644 --- a/crates/store/re_entity_db/src/lib.rs +++ b/crates/store/re_entity_db/src/lib.rs @@ -7,6 +7,7 @@ pub mod entity_db; pub mod entity_tree; mod ingestion_statistics; +pub mod ingestion_worker; mod instance_path; mod store_bundle; mod time_histogram_per_timeline; @@ -17,6 +18,7 @@ pub use self::{ entity_db::{DEFAULT_GC_TIME_BUDGET, EntityDb}, entity_tree::EntityTree, ingestion_statistics::{IngestionStatistics, LatencySnapshot, LatencyStats}, + ingestion_worker::{IngestionWorker, ProcessedChunk}, instance_path::{InstancePath, InstancePathHash}, store_bundle::{StoreBundle, StoreLoadError}, time_histogram_per_timeline::{TimeHistogram, TimeHistogramPerTimeline}, @@ -24,6 +26,9 @@ pub use self::{ versioned_instance_path::{VersionedInstancePath, VersionedInstancePathHash}, }; +#[cfg(not(target_arch = "wasm32"))] +pub use self::store_bundle::IngestionWorkerResults; + #[doc(no_inline)] pub use re_log_types::{EntityPath, EntityPathPart, TimeInt, Timeline}; diff --git a/crates/store/re_entity_db/src/store_bundle.rs b/crates/store/re_entity_db/src/store_bundle.rs index 16c143b4e991..969c0e76f4e8 100644 --- a/crates/store/re_entity_db/src/store_bundle.rs +++ b/crates/store/re_entity_db/src/store_bundle.rs @@ -4,6 +4,17 @@ use re_log_types::{StoreId, StoreKind}; use crate::EntityDb; +/// Result type for ingestion worker output processing. +#[cfg(not(target_arch = "wasm32"))] +pub type IngestionWorkerResults = indexmap::IndexMap< + StoreId, + Vec<( + crate::ingestion_worker::ProcessedChunk, + bool, + Result, crate::Error>, + )>, +>; + #[derive(thiserror::Error, Debug)] pub enum StoreLoadError { #[error(transparent)] @@ -145,4 +156,29 @@ impl StoreBundle { entity_dbs.first().map(|db| db.store_id().clone()) } + + /// Process pending work from background ingestion workers for all stores (native only). + /// + /// This should be called once per frame to process chunks from all `EntityDb` workers. + /// Each `EntityDb` polls its worker and adds processed chunks to its store. + /// + /// Returns a map of [`StoreId`] to a vector of processing results for that store. + /// + /// On native: Processes worker output for all `EntityDb`s + /// On Wasm: This method doesn't exist (compile error if called) + #[cfg(not(target_arch = "wasm32"))] + pub fn on_frame_start(&mut self) -> IngestionWorkerResults { + re_tracing::profile_function!(); + + let mut all_results = indexmap::IndexMap::new(); + + for (store_id, entity_db) in &mut self.recording_store { + let results = entity_db.on_frame_start(); + if !results.is_empty() { + all_results.insert(store_id.clone(), results); + } + } + + all_results + } } diff --git a/crates/utils/re_video/src/decode/mod.rs b/crates/utils/re_video/src/decode/mod.rs index 295ca7807c25..24efb573e97f 100644 --- a/crates/utils/re_video/src/decode/mod.rs +++ b/crates/utils/re_video/src/decode/mod.rs @@ -209,7 +209,12 @@ pub fn new_decoder( decode_settings: &DecodeSettings, output_sender: crossbeam::channel::Sender, ) -> Result> { - #![allow(clippy::allow_attributes, unused_variables, clippy::needless_return)] // With some feature flags + #![allow( + clippy::allow_attributes, + unused_variables, + clippy::needless_return, + clippy::needless_pass_by_value + )] // With some feature flags re_tracing::profile_function!(); diff --git a/crates/viewer/re_viewer/Cargo.toml b/crates/viewer/re_viewer/Cargo.toml index b3a84ea71123..ede7e990e857 100644 --- a/crates/viewer/re_viewer/Cargo.toml +++ b/crates/viewer/re_viewer/Cargo.toml @@ -79,6 +79,7 @@ re_recording_panel.workspace = true re_renderer = { workspace = true, default-features = false } re_selection_panel.workspace = true re_smart_channel.workspace = true +re_sorbet.workspace = true re_time_panel.workspace = true re_tracing = { workspace = true, features = ["server"] } re_types_core.workspace = true diff --git a/crates/viewer/re_viewer/src/app.rs b/crates/viewer/re_viewer/src/app.rs index fcb5c467a788..1ecfdce06186 100644 --- a/crates/viewer/re_viewer/src/app.rs +++ b/crates/viewer/re_viewer/src/app.rs @@ -2117,6 +2117,44 @@ impl App { Err(_) => false, }); + #[cfg(not(target_arch = "wasm32"))] + { + // Process background ingestion workers for all stores + let all_results = store_hub.on_frame_start(); + + for (_store_id, results) in all_results { + for (processed, was_empty_before, entity_db_add_result) in results { + self.finalize_arrow_chunk_ingestion( + store_hub, + egui_ctx, + processed.channel_source.as_ref(), + &processed.store_id, + processed.msg_will_add_new_store, + was_empty_before, + entity_db_add_result, + ); + } + } + + // Check if all messages have been ingested after processing worker output + if self.rrd_loading_metrics.is_tracking { + let all_ingested = self.rrd_loading_metrics.total_messages > 0 + && self.rrd_loading_metrics.total_messages + == self.rrd_loading_metrics.total_ingested; + + if all_ingested + && self.rrd_loading_metrics.all_messages_ingested.is_none() + && self.rx_log.sources().is_empty() + { + re_log::info!( + "🎉 All {} messages have been ingested!", + self.rrd_loading_metrics.total_messages + ); + self.rrd_loading_metrics.mark_all_ingested(); + } + } + } + let start = web_time::Instant::now(); while let Some((channel_source, msg)) = self.rx_log.try_recv() { @@ -2142,7 +2180,7 @@ impl App { match msg { DataSourceMessage::LogMsg(msg) => { - self.receive_log_msg(&msg, store_hub, egui_ctx, &channel_source); + self.receive_log_msg(&msg, store_hub, egui_ctx, channel_source.clone()); } DataSourceMessage::UiCommand(ui_command) => { @@ -2166,7 +2204,7 @@ impl App { msg: &LogMsg, store_hub: &mut StoreHub, egui_ctx: &egui::Context, - channel_source: &SmartChannelSource, + channel_source: Arc, ) { let store_id = msg.store_id(); @@ -2180,14 +2218,53 @@ impl App { // Note that the `SetStoreInfo` message might be missing. It's not strictly necessary to add a new store. let msg_will_add_new_store = !store_hub.store_bundle().contains(store_id); - let entity_db = store_hub.entity_db_mut(store_id); - if entity_db.data_source.is_none() { - entity_db.data_source = Some((*channel_source).clone()); + { + let entity_db = store_hub.entity_db_mut(store_id); + if entity_db.data_source.is_none() { + entity_db.data_source = Some((*channel_source).clone()); + } + } + + // On native platforms, use the EntityDb's ingestion worker for Arrow messages + #[cfg(not(target_arch = "wasm32"))] + if let LogMsg::ArrowMsg(_, arrow_msg) = msg { + let entity_db = store_hub.entity_db_mut(store_id); + entity_db.submit_arrow_msg(arrow_msg.clone(), channel_source, msg_will_add_new_store); + return; } - let was_empty = entity_db.is_empty(); - let entity_db_add_result = entity_db.add(msg); + // For non-Arrow messages (or on WASM), process synchronously + let (was_empty, entity_db_add_result) = { + let entity_db = store_hub.entity_db_mut(store_id); + let was_empty = entity_db.is_empty(); + let result = entity_db.add(msg); + (was_empty, result) + }; + + self.finalize_log_msg_processing( + store_hub, + egui_ctx, + channel_source.as_ref(), + store_id, + msg, + msg_will_add_new_store, + was_empty, + entity_db_add_result, + ); + } + #[expect(clippy::too_many_arguments)] + fn finalize_log_msg_processing( + &self, + store_hub: &mut StoreHub, + egui_ctx: &egui::Context, + channel_source: &SmartChannelSource, + store_id: &StoreId, + msg: &LogMsg, + msg_will_add_new_store: bool, + was_empty_before: bool, + entity_db_add_result: re_entity_db::Result>, + ) { // Downgrade to read-only, so we can access caches. let entity_db = store_hub .entity_db(store_id) @@ -2207,7 +2284,7 @@ impl App { } } - if was_empty && !entity_db.is_empty() { + if was_empty_before && !entity_db.is_empty() { // Hack: we cannot go to a specific timeline or entity until we know about it. // Now we _hopefully_ do. if let SmartChannelSource::RedapGrpcStream { uri, .. } = channel_source { @@ -2216,13 +2293,13 @@ impl App { } #[expect(clippy::match_same_arms)] - match &msg { + match msg { LogMsg::SetStoreInfo(_) => { // Causes a new store typically. But that's handled below via `on_new_store`. } LogMsg::ArrowMsg(_, _) => { - // Handled by `EntityDb::add`. + // Handled during ingestion. } LogMsg::BlueprintActivationCommand(cmd) => match store_id.kind() { @@ -2276,6 +2353,83 @@ impl App { } } + #[cfg(not(target_arch = "wasm32"))] + fn process_ingestion_worker_output(&self, store_hub: &mut StoreHub, egui_ctx: &egui::Context) { + re_tracing::profile_function!(); + + // Collect store IDs first to avoid borrowing issues + let store_ids: Vec = store_hub + .store_bundle() + .entity_dbs() + .map(|entity_db| entity_db.store_id().clone()) + .collect(); + + // Poll each EntityDb's worker for processed chunks + for store_id in store_ids { + let results = { + let entity_db = store_hub.entity_db_mut(&store_id); + entity_db.poll_worker_output() + }; + + for (processed, was_empty_before, entity_db_add_result) in results { + self.finalize_arrow_chunk_ingestion( + store_hub, + egui_ctx, + processed.channel_source.as_ref(), + &processed.store_id, + processed.msg_will_add_new_store, + was_empty_before, + entity_db_add_result, + ); + } + } + } + + #[cfg(not(target_arch = "wasm32"))] + #[expect(clippy::too_many_arguments)] + fn finalize_arrow_chunk_ingestion( + &self, + store_hub: &mut StoreHub, + egui_ctx: &egui::Context, + channel_source: &SmartChannelSource, + store_id: &StoreId, + msg_will_add_new_store: bool, + was_empty_before: bool, + entity_db_add_result: re_entity_db::Result>, + ) { + // Downgrade to read-only, so we can access caches. + let entity_db = store_hub + .entity_db(store_id) + .expect("Just queried it mutable and that was fine."); + + match entity_db_add_result { + Ok(store_events) => { + if let Some(caches) = store_hub.active_caches() { + caches.on_store_events(&store_events, entity_db); + } + + self.validate_loaded_events(&store_events); + } + + Err(err) => { + re_log::error_once!("Failed to add incoming chunk: {err}"); + } + } + + if was_empty_before && !entity_db.is_empty() { + // Hack: we cannot go to a specific timeline or entity until we know about it. + // Now we _hopefully_ do. + if let SmartChannelSource::RedapGrpcStream { uri, .. } = channel_source { + self.go_to_dataset_data(uri.store_id(), uri.fragment.clone()); + } + } + + // Handle any action that is triggered by a new store _after_ processing the message that caused it. + if msg_will_add_new_store { + self.on_new_store(egui_ctx, store_id, channel_source, store_hub); + } + } + fn on_new_store( &self, egui_ctx: &egui::Context, diff --git a/crates/viewer/re_viewer_context/src/store_hub.rs b/crates/viewer/re_viewer_context/src/store_hub.rs index d171280f437a..eebb15a1da18 100644 --- a/crates/viewer/re_viewer_context/src/store_hub.rs +++ b/crates/viewer/re_viewer_context/src/store_hub.rs @@ -295,6 +295,20 @@ impl StoreHub { self.store_bundle.get(store_id) } + /// Process pending work from background ingestion workers for all stores (native only). + /// + /// This should be called once per frame to process chunks from all `EntityDb` workers. + /// Each `EntityDb` polls its worker and adds processed chunks to its store. + /// + /// Returns a map of [`StoreId`] to a vector of processing results for that store. + /// + /// On native: Processes worker output for all `EntityDb`s + /// On Wasm: This method doesn't exist (compile error if called) + #[cfg(not(target_arch = "wasm32"))] + pub fn on_frame_start(&mut self) -> re_entity_db::IngestionWorkerResults { + self.store_bundle.on_frame_start() + } + // --------------------- // Add and remove stores From 874e770ae90de6d0c4afd8efd3c0102e1b098bbb Mon Sep 17 00:00:00 2001 From: Joel Reymont <18791+joelreymont@users.noreply.github.com> Date: Tue, 25 Nov 2025 09:15:44 +0200 Subject: [PATCH 2/2] Fix doc comment: reference correct method name on_frame_start --- crates/store/re_entity_db/src/entity_db.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/store/re_entity_db/src/entity_db.rs b/crates/store/re_entity_db/src/entity_db.rs index f861a92f6ce4..65a844e1c3a0 100644 --- a/crates/store/re_entity_db/src/entity_db.rs +++ b/crates/store/re_entity_db/src/entity_db.rs @@ -668,7 +668,7 @@ impl EntityDb { /// On native: Lazily creates worker on first call, then queues message for processing /// On Wasm: This method doesn't exist (compile error if called) /// - /// The caller should periodically call [`Self::poll_worker_output`] to retrieve + /// The caller should periodically call [`Self::on_frame_start`] to retrieve /// processed chunks and add them to the store. #[cfg(not(target_arch = "wasm32"))] pub fn submit_arrow_msg(