Skip to content

Commit bcdecf0

Browse files
joelreymontclaude
authored andcommitted
Address PR review comments
This commit addresses feedback from PR rerun-io#11798: 1. Use get_or_insert_with pattern for worker initialization - Replace manual if-check with idiomatic Option::get_or_insert_with - Better expresses intent and reduces nesting 2. Encapsulate ingestion worker processing in EntityDb - Add on_frame_start() method to EntityDb for worker processing - Add on_frame_start() to StoreBundle to process all workers - Add on_frame_start() to StoreHub as public interface - Remove process_ingestion_worker_output() from App - Move all worker-related logic out of App into EntityDb layer - App now calls store_hub.on_frame_start() instead of manually polling workers and processing output 3. Introduce IngestionWorkerResults type alias - Simplify complex return types - Export from re_entity_db crate Benefits: - Cleaner separation of concerns - EntityDb fully encapsulates its ingestion worker - Easier to understand and maintain - App layer no longer needs to know about worker internals
1 parent 80566ea commit bcdecf0

File tree

5 files changed

+107
-110
lines changed

5 files changed

+107
-110
lines changed

crates/store/re_entity_db/src/entity_db.rs

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -680,30 +680,32 @@ impl EntityDb {
680680
re_tracing::profile_function!();
681681

682682
// Lazy-init worker on first use
683-
if self.ingestion_worker.is_none() {
683+
let worker = self.ingestion_worker.get_or_insert_with(|| {
684684
re_log::debug!("Creating ingestion worker for store {:?}", self.store_id);
685-
self.ingestion_worker = Some(crate::ingestion_worker::IngestionWorker::new());
686-
}
685+
crate::ingestion_worker::IngestionWorker::new()
686+
});
687687

688-
if let Some(worker) = &self.ingestion_worker {
689-
worker.submit_arrow_msg_blocking(
690-
self.store_id.clone(),
691-
arrow_msg,
692-
channel_source,
693-
msg_will_add_new_store,
694-
);
695-
}
688+
worker.submit_arrow_msg_blocking(
689+
self.store_id.clone(),
690+
arrow_msg,
691+
channel_source,
692+
msg_will_add_new_store,
693+
);
696694
}
697695

698-
/// Poll the background ingestion worker for processed chunks (native only).
696+
/// Process pending work from the background ingestion worker (native only).
697+
///
698+
/// This should be called once per frame to process chunks that have been
699+
/// converted by the background worker. The processed chunks are automatically
700+
/// added to this `EntityDb`'s store.
699701
///
700-
/// Returns a vector of (`ProcessedChunk`, `was_empty_before`, `StoreEvents`) tuples.
701-
/// Each chunk has been successfully added to the store.
702+
/// Returns a vector of (`ProcessedChunk`, `was_empty_before`, `StoreEvents`) tuples
703+
/// for each chunk that was successfully added.
702704
///
703-
/// On native: Returns chunks processed by the background worker
705+
/// On native: Polls and processes chunks from the background worker
704706
/// On Wasm: This method doesn't exist (compile error if called)
705707
#[cfg(not(target_arch = "wasm32"))]
706-
pub fn poll_worker_output(
708+
pub fn on_frame_start(
707709
&mut self,
708710
) -> Vec<(
709711
crate::ingestion_worker::ProcessedChunk,

crates/store/re_entity_db/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ pub use self::{
2626
versioned_instance_path::{VersionedInstancePath, VersionedInstancePathHash},
2727
};
2828

29+
#[cfg(not(target_arch = "wasm32"))]
30+
pub use self::store_bundle::IngestionWorkerResults;
31+
2932
#[doc(no_inline)]
3033
pub use re_log_types::{EntityPath, EntityPathPart, TimeInt, Timeline};
3134

crates/store/re_entity_db/src/store_bundle.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,17 @@ use re_log_types::{StoreId, StoreKind};
44

55
use crate::EntityDb;
66

7+
/// Result type for ingestion worker output processing.
8+
#[cfg(not(target_arch = "wasm32"))]
9+
pub type IngestionWorkerResults = indexmap::IndexMap<
10+
StoreId,
11+
Vec<(
12+
crate::ingestion_worker::ProcessedChunk,
13+
bool,
14+
Result<Vec<re_chunk_store::ChunkStoreEvent>, crate::Error>,
15+
)>,
16+
>;
17+
718
#[derive(thiserror::Error, Debug)]
819
pub enum StoreLoadError {
920
#[error(transparent)]
@@ -145,4 +156,29 @@ impl StoreBundle {
145156

146157
entity_dbs.first().map(|db| db.store_id().clone())
147158
}
159+
160+
/// Process pending work from background ingestion workers for all stores (native only).
161+
///
162+
/// This should be called once per frame to process chunks from all `EntityDb` workers.
163+
/// Each `EntityDb` polls its worker and adds processed chunks to its store.
164+
///
165+
/// Returns a map of [`StoreId`] to a vector of processing results for that store.
166+
///
167+
/// On native: Processes worker output for all `EntityDb`s
168+
/// On Wasm: This method doesn't exist (compile error if called)
169+
#[cfg(not(target_arch = "wasm32"))]
170+
pub fn on_frame_start(&mut self) -> IngestionWorkerResults {
171+
re_tracing::profile_function!();
172+
173+
let mut all_results = indexmap::IndexMap::new();
174+
175+
for (store_id, entity_db) in &mut self.recording_store {
176+
let results = entity_db.on_frame_start();
177+
if !results.is_empty() {
178+
all_results.insert(store_id.clone(), results);
179+
}
180+
}
181+
182+
all_results
183+
}
148184
}

crates/viewer/re_viewer/src/app.rs

Lines changed: 36 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -2497,7 +2497,42 @@ impl App {
24972497
});
24982498

24992499
#[cfg(not(target_arch = "wasm32"))]
2500-
self.process_ingestion_worker_output(store_hub, egui_ctx);
2500+
{
2501+
// Process background ingestion workers for all stores
2502+
let all_results = store_hub.on_frame_start();
2503+
2504+
for (_store_id, results) in all_results {
2505+
for (processed, was_empty_before, entity_db_add_result) in results {
2506+
self.finalize_arrow_chunk_ingestion(
2507+
store_hub,
2508+
egui_ctx,
2509+
processed.channel_source.as_ref(),
2510+
&processed.store_id,
2511+
processed.msg_will_add_new_store,
2512+
was_empty_before,
2513+
entity_db_add_result,
2514+
);
2515+
}
2516+
}
2517+
2518+
// Check if all messages have been ingested after processing worker output
2519+
if self.rrd_loading_metrics.is_tracking {
2520+
let all_ingested = self.rrd_loading_metrics.total_messages > 0
2521+
&& self.rrd_loading_metrics.total_messages
2522+
== self.rrd_loading_metrics.total_ingested;
2523+
2524+
if all_ingested
2525+
&& self.rrd_loading_metrics.all_messages_ingested.is_none()
2526+
&& self.rx_log.sources().is_empty()
2527+
{
2528+
re_log::info!(
2529+
"🎉 All {} messages have been ingested!",
2530+
self.rrd_loading_metrics.total_messages
2531+
);
2532+
self.rrd_loading_metrics.mark_all_ingested();
2533+
}
2534+
}
2535+
}
25012536

25022537
let start = web_time::Instant::now();
25032538

@@ -2765,99 +2800,6 @@ impl App {
27652800
}
27662801
}
27672802

2768-
#[cfg(not(target_arch = "wasm32"))]
2769-
fn process_ingestion_worker_output(
2770-
&mut self,
2771-
store_hub: &mut StoreHub,
2772-
egui_ctx: &egui::Context,
2773-
) {
2774-
re_tracing::profile_function!();
2775-
2776-
// Debug: Log every time this function is called
2777-
static CALL_COUNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
2778-
let count = CALL_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2779-
if count % 100 == 0 || self.rrd_loading_metrics.total_messages > 100000 {
2780-
re_log::debug!(
2781-
"[process_ingestion_worker_output] Called #{}, is_tracking={}, all_ingested={:?}, total_msg={}, total_ingested={}",
2782-
count,
2783-
self.rrd_loading_metrics.is_tracking,
2784-
self.rrd_loading_metrics.all_messages_ingested.is_some(),
2785-
self.rrd_loading_metrics.total_messages,
2786-
self.rrd_loading_metrics.total_ingested
2787-
);
2788-
}
2789-
2790-
// Collect store IDs first to avoid borrowing issues
2791-
let store_ids: Vec<StoreId> = store_hub
2792-
.store_bundle()
2793-
.entity_dbs()
2794-
.map(|entity_db| entity_db.store_id().clone())
2795-
.collect();
2796-
2797-
// Poll each EntityDb's worker for processed chunks
2798-
for store_id in store_ids {
2799-
let results = {
2800-
let entity_db = store_hub.entity_db_mut(&store_id);
2801-
entity_db.poll_worker_output()
2802-
};
2803-
2804-
for (processed, was_empty_before, entity_db_add_result) in results {
2805-
self.finalize_arrow_chunk_ingestion(
2806-
store_hub,
2807-
egui_ctx,
2808-
processed.channel_source.as_ref(),
2809-
&processed.store_id,
2810-
processed.msg_will_add_new_store,
2811-
was_empty_before,
2812-
entity_db_add_result,
2813-
);
2814-
}
2815-
}
2816-
2817-
// Check if all messages have been ingested after processing worker output
2818-
// Debug: Always evaluate and log guard conditions
2819-
let is_tracking = self.rrd_loading_metrics.is_tracking;
2820-
let all_ingested_is_none = self.rrd_loading_metrics.all_messages_ingested.is_none();
2821-
let all_channels_closed = self.rx_log.sources().is_empty();
2822-
2823-
if self.rrd_loading_metrics.total_messages > 100000 {
2824-
re_log::debug!(
2825-
"[process_ingestion_worker_output] Detection guards: is_tracking={}, all_ingested_is_none={}, channels_closed={}, total_msg={}, total_ingested={}",
2826-
is_tracking,
2827-
all_ingested_is_none,
2828-
all_channels_closed,
2829-
self.rrd_loading_metrics.total_messages,
2830-
self.rrd_loading_metrics.total_ingested
2831-
);
2832-
}
2833-
2834-
if is_tracking && all_ingested_is_none {
2835-
if all_channels_closed && self.rrd_loading_metrics.total_messages > 0 {
2836-
re_log::debug!(
2837-
"Ingestion check: total_msg={}, total_ingested={}",
2838-
self.rrd_loading_metrics.total_messages,
2839-
self.rrd_loading_metrics.total_ingested
2840-
);
2841-
}
2842-
2843-
if all_channels_closed
2844-
&& self.rrd_loading_metrics.total_messages > 0
2845-
&& self.rrd_loading_metrics.total_messages
2846-
== self.rrd_loading_metrics.total_ingested
2847-
{
2848-
re_log::info!(
2849-
"✓ All {} messages ingested! Marking ingestion complete.",
2850-
self.rrd_loading_metrics.total_ingested
2851-
);
2852-
self.rrd_loading_metrics.mark_all_ingested();
2853-
}
2854-
} else if self.rrd_loading_metrics.total_messages > 100000 {
2855-
re_log::debug!(
2856-
"[process_ingestion_worker_output] Guard conditions failed - skipping detection"
2857-
);
2858-
}
2859-
}
2860-
28612803
#[cfg(not(target_arch = "wasm32"))]
28622804
#[expect(clippy::too_many_arguments)]
28632805
fn finalize_arrow_chunk_ingestion(

crates/viewer/re_viewer_context/src/store_hub.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,20 @@ impl StoreHub {
295295
self.store_bundle.get(store_id)
296296
}
297297

298+
/// Process pending work from background ingestion workers for all stores (native only).
299+
///
300+
/// This should be called once per frame to process chunks from all `EntityDb` workers.
301+
/// Each `EntityDb` polls its worker and adds processed chunks to its store.
302+
///
303+
/// Returns a map of [`StoreId`] to a vector of processing results for that store.
304+
///
305+
/// On native: Processes worker output for all `EntityDb`s
306+
/// On Wasm: This method doesn't exist (compile error if called)
307+
#[cfg(not(target_arch = "wasm32"))]
308+
pub fn on_frame_start(&mut self) -> re_entity_db::IngestionWorkerResults {
309+
self.store_bundle.on_frame_start()
310+
}
311+
298312
// ---------------------
299313
// Add and remove stores
300314

0 commit comments

Comments
 (0)