Skip to content

Commit 0b25438

Browse files
committed
Redesign ingestion using a dedicated ingestion worker
Addresses #4298 by moving CPU-intensive Arrow message processing off the UI thread. Key changes: 1. Ingestion Worker (crates/viewer/re_viewer/src/ingestion_worker.rs): - Dedicated background thread for processing Arrow messages into chunks - Bounded channel (capacity: 2000) provides natural backpressure 2. Viewer Integration (crates/viewer/re_viewer/src/app.rs): - Arrow messages submitted to worker queue instead of blocking UI thread - Poll worker output during update() and add chunks to EntityDb
1 parent 0a0c451 commit 0b25438

File tree

8 files changed

+652
-143
lines changed

8 files changed

+652
-143
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8622,13 +8622,15 @@ dependencies = [
86228622
"ahash",
86238623
"anyhow",
86248624
"arrow",
8625+
"criterion",
86258626
"crossbeam",
86268627
"image",
86278628
"indexmap 2.11.4",
86288629
"insta",
86298630
"itertools 0.14.0",
86308631
"mcap",
86318632
"memmap2 0.9.8",
8633+
"mimalloc",
86328634
"notify",
86338635
"parking_lot",
86348636
"parquet",
@@ -10222,6 +10224,7 @@ dependencies = [
1022210224
"re_renderer",
1022310225
"re_selection_panel",
1022410226
"re_smart_channel",
10227+
"re_sorbet",
1022510228
"re_test_context",
1022610229
"re_test_viewport",
1022710230
"re_time_panel",

crates/store/re_data_loader/benches/parallel_ingestion_bench.rs

Lines changed: 3 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use criterion::{Criterion, criterion_group, criterion_main};
88
use re_chunk::{Chunk, RowId};
99
use re_log_encoding::Encoder;
1010
use re_log_types::{
11-
entity_path, LogMsg, NonMinI64, StoreId, StoreKind, TimeInt, TimePoint, Timeline,
11+
LogMsg, NonMinI64, StoreId, StoreKind, TimeInt, TimePoint, Timeline, entity_path,
1212
};
1313
use re_types::archetypes::Points2D;
1414
use std::sync::mpsc;
@@ -21,11 +21,7 @@ const NUM_MESSAGES: usize = 10_000;
2121
#[cfg(debug_assertions)]
2222
const NUM_MESSAGES: usize = 100;
2323

24-
criterion_group!(
25-
benches,
26-
benchmark_load_from_file_contents,
27-
benchmark_message_processing_batches,
28-
);
24+
criterion_group!(benches, benchmark_load_from_file_contents);
2925
criterion_main!(benches);
3026

3127
fn generate_messages(count: usize) -> Vec<LogMsg> {
@@ -70,7 +66,7 @@ fn encode_messages(messages: &[LogMsg]) -> Vec<u8> {
7066
bytes
7167
}
7268

73-
/// Benchmark loading from file contents (parallel processing)
69+
/// Benchmark loading from file (parallel processing)
7470
fn benchmark_load_from_file_contents(c: &mut Criterion) {
7571
let messages = generate_messages(NUM_MESSAGES);
7672
let encoded = encode_messages(&messages);
@@ -101,71 +97,3 @@ fn benchmark_load_from_file_contents(c: &mut Criterion) {
10197

10298
group.finish();
10399
}
104-
105-
/// Try to find optimal batch size
106-
fn benchmark_message_processing_batches(c: &mut Criterion) {
107-
let messages = generate_messages(NUM_MESSAGES);
108-
let (tx, rx) = mpsc::channel();
109-
110-
let mut group = c.benchmark_group("message_processing_batches");
111-
group.throughput(criterion::Throughput::Elements(NUM_MESSAGES as u64));
112-
113-
// Sequential processing (baseline)
114-
group.bench_function("sequential", |b| {
115-
b.iter(|| {
116-
let mut batch = messages.clone();
117-
let loader = RrdLoader;
118-
for msg in batch.drain(..) {
119-
let data = re_data_loader::LoadedData::LogMsg(
120-
loader.name(),
121-
msg,
122-
);
123-
let _ = tx.send(data);
124-
}
125-
while rx.try_recv().is_ok() {}
126-
});
127-
});
128-
129-
// Parallel processing with different batch sizes
130-
#[cfg(not(target_arch = "wasm32"))]
131-
{
132-
use rayon::prelude::*;
133-
134-
for batch_size in [10, 50, 100, 500, 1000] {
135-
group.bench_function(format!("parallel_batch_{}", batch_size), |b| {
136-
b.iter(|| {
137-
let mut messages = messages.clone();
138-
let mut processed_count = 0;
139-
140-
while processed_count < messages.len() {
141-
let remaining = messages.len() - processed_count;
142-
let current_batch_size = batch_size.min(remaining);
143-
let batch: Vec<_> = messages
144-
.drain(processed_count..processed_count + current_batch_size)
145-
.collect();
146-
147-
let processed: Vec<_> = batch
148-
.into_par_iter()
149-
.map(|msg| msg) // Simulate transform_message
150-
.collect();
151-
152-
let loader = RrdLoader;
153-
for msg in processed {
154-
let data = re_data_loader::LoadedData::LogMsg(
155-
loader.name(),
156-
msg,
157-
);
158-
let _ = tx.send(data);
159-
}
160-
161-
processed_count += current_batch_size;
162-
}
163-
164-
while rx.try_recv().is_ok() {}
165-
});
166-
});
167-
}
168-
}
169-
170-
group.finish();
171-
}

0 commit comments

Comments
 (0)