Skip to content

Commit c0057c0

Browse files
committed
Redesign parallel RRD ingestion with per-EntityDb workers
- Move ingestion worker from App-level to per-EntityDb architecture - Each EntityDb now has its own dedicated ingestion worker with separate channels for commands and message batches - Add benchmark infrastructure for ingestion performance testing - Refactor loader_rrd.rs to support parallel message distribution
1 parent 8b8e342 commit c0057c0

File tree

12 files changed

+1344
-66
lines changed

12 files changed

+1344
-66
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,3 +92,6 @@ justfile
9292

9393
# Lychee link checker cache
9494
.lycheecache
95+
96+
# Development notes
97+
notes

Cargo.lock

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8659,13 +8659,15 @@ dependencies = [
86598659
"ahash",
86608660
"anyhow",
86618661
"arrow",
8662+
"criterion",
86628663
"crossbeam",
86638664
"image",
86648665
"indexmap 2.11.4",
86658666
"insta",
86668667
"itertools 0.14.0",
86678668
"mcap",
86688669
"memmap2 0.9.8",
8670+
"mimalloc",
86698671
"notify",
86708672
"parking_lot",
86718673
"parquet",
@@ -8874,6 +8876,8 @@ version = "0.27.0-alpha.8+dev"
88748876
dependencies = [
88758877
"ahash",
88768878
"anyhow",
8879+
"arrow",
8880+
"crossbeam",
88778881
"document-features",
88788882
"emath",
88798883
"indexmap 2.11.4",
@@ -8894,6 +8898,7 @@ dependencies = [
88948898
"re_smart_channel",
88958899
"re_sorbet",
88968900
"re_tracing",
8901+
"re_types",
88978902
"re_types_core",
88988903
"re_uri",
88998904
"saturating_cast",
@@ -10250,6 +10255,7 @@ dependencies = [
1025010255
"re_renderer",
1025110256
"re_selection_panel",
1025210257
"re_smart_channel",
10258+
"re_sorbet",
1025310259
"re_test_context",
1025410260
"re_test_viewport",
1025510261
"re_time_panel",

crates/store/re_data_loader/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,9 @@ re_chunk_store.workspace = true
6363
re_log_encoding = { workspace = true, features = ["decoder", "encoder"] }
6464

6565
insta = { workspace = true, features = ["glob"] }
66+
criterion.workspace = true
67+
mimalloc.workspace = true
68+
69+
[[bench]]
70+
name = "parallel_ingestion_bench"
71+
harness = false
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
// Allow unwrap() in benchmarks
2+
#![expect(clippy::unwrap_used)]
3+
4+
#[global_allocator]
5+
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
6+
7+
use criterion::{Criterion, criterion_group, criterion_main};
8+
use re_chunk::{Chunk, RowId};
9+
use re_log_encoding::Encoder;
10+
use re_log_types::{
11+
LogMsg, NonMinI64, StoreId, StoreKind, TimeInt, TimePoint, Timeline, entity_path,
12+
};
13+
use re_types::archetypes::Points2D;
14+
use std::sync::mpsc;
15+
16+
use re_data_loader::{DataLoader as _, DataLoaderSettings, RrdLoader};
17+
18+
#[cfg(not(debug_assertions))]
19+
const NUM_MESSAGES: usize = 10_000;
20+
21+
#[cfg(debug_assertions)]
22+
const NUM_MESSAGES: usize = 100;
23+
24+
criterion_group!(benches, benchmark_load_from_file_contents);
25+
criterion_main!(benches);
26+
27+
fn generate_messages(count: usize) -> Vec<LogMsg> {
28+
let store_id = StoreId::random(StoreKind::Recording, "bench_app");
29+
let mut messages = Vec::with_capacity(count);
30+
31+
for i in 0..count {
32+
let chunk = Chunk::builder(entity_path!("points", i.to_string()))
33+
.with_archetype(
34+
RowId::new(),
35+
TimePoint::default().with(
36+
Timeline::new_sequence("log_time"),
37+
TimeInt::from_millis(NonMinI64::new(i64::try_from(i).unwrap()).unwrap()),
38+
),
39+
&Points2D::new([(i as f32, i as f32), ((i + 1) as f32, (i + 1) as f32)]),
40+
)
41+
.build()
42+
.unwrap();
43+
44+
messages.push(LogMsg::ArrowMsg(
45+
store_id.clone(),
46+
chunk.to_arrow_msg().unwrap(),
47+
));
48+
}
49+
50+
messages
51+
}
52+
53+
fn encode_messages(messages: &[LogMsg]) -> Vec<u8> {
54+
let mut bytes = Vec::new();
55+
{
56+
let mut encoder = Encoder::new_eager(
57+
re_build_info::CrateVersion::LOCAL,
58+
re_log_encoding::rrd::EncodingOptions::PROTOBUF_UNCOMPRESSED,
59+
&mut bytes,
60+
)
61+
.unwrap();
62+
63+
for msg in messages {
64+
encoder.append(msg).unwrap();
65+
}
66+
encoder.flush_blocking().unwrap();
67+
encoder.finish().unwrap();
68+
}
69+
bytes
70+
}
71+
72+
/// Benchmark loading from file (parallel processing)
73+
fn benchmark_load_from_file_contents(c: &mut Criterion) {
74+
let messages = generate_messages(NUM_MESSAGES);
75+
let encoded = encode_messages(&messages);
76+
let filepath = std::path::PathBuf::from("bench_data.rrd");
77+
let settings = DataLoaderSettings::recommended(re_log_types::RecordingId::random());
78+
79+
let mut group = c.benchmark_group("load_from_file_contents");
80+
group.throughput(criterion::Throughput::Elements(NUM_MESSAGES as u64));
81+
82+
group.bench_function("rrd_loader", |b| {
83+
b.iter(|| {
84+
let (tx, rx) = mpsc::channel();
85+
let contents = std::borrow::Cow::Borrowed(encoded.as_slice());
86+
let loader = RrdLoader;
87+
88+
loader
89+
.load_from_file_contents(&settings, filepath.clone(), contents, tx)
90+
.unwrap();
91+
92+
let mut count = 0;
93+
while rx.try_recv().is_ok() {
94+
count += 1;
95+
}
96+
97+
assert_eq!(count, NUM_MESSAGES);
98+
});
99+
});
100+
101+
group.finish();
102+
}

0 commit comments

Comments
 (0)