Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,6 @@ justfile

# Lychee link checker cache
.lycheecache

# Development notes
notes
6 changes: 6 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8665,13 +8665,15 @@ dependencies = [
"ahash",
"anyhow",
"arrow",
"criterion",
"crossbeam",
"image",
"indexmap 2.11.4",
"insta",
"itertools 0.14.0",
"mcap",
"memmap2 0.9.8",
"mimalloc",
"notify",
"parking_lot",
"parquet",
Expand Down Expand Up @@ -8880,6 +8882,8 @@ version = "0.27.0-alpha.8+dev"
dependencies = [
"ahash",
"anyhow",
"arrow",
"crossbeam",
"document-features",
"emath",
"indexmap 2.11.4",
Expand All @@ -8900,6 +8904,7 @@ dependencies = [
"re_smart_channel",
"re_sorbet",
"re_tracing",
"re_types",
"re_types_core",
"re_uri",
"saturating_cast",
Expand Down Expand Up @@ -10264,6 +10269,7 @@ dependencies = [
"re_renderer",
"re_selection_panel",
"re_smart_channel",
"re_sorbet",
"re_test_context",
"re_test_viewport",
"re_time_panel",
Expand Down
6 changes: 6 additions & 0 deletions crates/store/re_data_loader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,9 @@ re_chunk_store.workspace = true
re_log_encoding = { workspace = true, features = ["decoder", "encoder"] }

insta = { workspace = true, features = ["glob"] }
criterion.workspace = true
mimalloc.workspace = true

[[bench]]
name = "parallel_ingestion_bench"
harness = false
102 changes: 102 additions & 0 deletions crates/store/re_data_loader/benches/parallel_ingestion_bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Allow unwrap() in benchmarks
#![expect(clippy::unwrap_used)]

#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;

use criterion::{Criterion, criterion_group, criterion_main};
use re_chunk::{Chunk, RowId};
use re_log_encoding::Encoder;
use re_log_types::{
LogMsg, NonMinI64, StoreId, StoreKind, TimeInt, TimePoint, Timeline, entity_path,
};
use re_types::archetypes::Points2D;
use std::sync::mpsc;

use re_data_loader::{DataLoader as _, DataLoaderSettings, RrdLoader};

#[cfg(not(debug_assertions))]
const NUM_MESSAGES: usize = 10_000;

#[cfg(debug_assertions)]
const NUM_MESSAGES: usize = 100;

criterion_group!(benches, benchmark_load_from_file_contents);
criterion_main!(benches);

fn generate_messages(count: usize) -> Vec<LogMsg> {
let store_id = StoreId::random(StoreKind::Recording, "bench_app");
let mut messages = Vec::with_capacity(count);

for i in 0..count {
let chunk = Chunk::builder(entity_path!("points", i.to_string()))
.with_archetype(
RowId::new(),
TimePoint::default().with(
Timeline::new_sequence("log_time"),
TimeInt::from_millis(NonMinI64::new(i64::try_from(i).unwrap()).unwrap()),
),
&Points2D::new([(i as f32, i as f32), ((i + 1) as f32, (i + 1) as f32)]),
)
.build()
.unwrap();

messages.push(LogMsg::ArrowMsg(
store_id.clone(),
chunk.to_arrow_msg().unwrap(),
));
}

messages
}

fn encode_messages(messages: &[LogMsg]) -> Vec<u8> {
let mut bytes = Vec::new();
{
let mut encoder = Encoder::new_eager(
re_build_info::CrateVersion::LOCAL,
re_log_encoding::rrd::EncodingOptions::PROTOBUF_UNCOMPRESSED,
&mut bytes,
)
.unwrap();

for msg in messages {
encoder.append(msg).unwrap();
}
encoder.flush_blocking().unwrap();
encoder.finish().unwrap();
}
bytes
}

/// Benchmark loading from file (parallel processing)
fn benchmark_load_from_file_contents(c: &mut Criterion) {
let messages = generate_messages(NUM_MESSAGES);
let encoded = encode_messages(&messages);
let filepath = std::path::PathBuf::from("bench_data.rrd");
let settings = DataLoaderSettings::recommended(re_log_types::RecordingId::random());

let mut group = c.benchmark_group("load_from_file_contents");
group.throughput(criterion::Throughput::Elements(NUM_MESSAGES as u64));

group.bench_function("rrd_loader", |b| {
b.iter(|| {
let (tx, rx) = mpsc::channel();
let contents = std::borrow::Cow::Borrowed(encoded.as_slice());
let loader = RrdLoader;

loader
.load_from_file_contents(&settings, filepath.clone(), contents, tx)
.unwrap();

let mut count = 0;
while rx.try_recv().is_ok() {
count += 1;
}

assert_eq!(count, NUM_MESSAGES);
});
});

group.finish();
}
Loading