-
Notifications
You must be signed in to change notification settings - Fork 586
Parallel ingestion #11798
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Parallel ingestion #11798
Conversation
0b25438 to
87bf607
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces parallel Arrow message ingestion by moving CPU-intensive processing off the UI thread. The implementation adds a dedicated background worker thread that converts Arrow messages to chunks asynchronously, with bounded channels providing natural backpressure.
Key changes:
- Created a new ingestion worker module that processes Arrow messages in a background thread
- Modified the viewer app to submit Arrow messages to the worker queue instead of processing them synchronously
- Made
add_chunk_with_timestamp_metadatapublic in EntityDb to support the new parallel ingestion path
Reviewed Changes
Copilot reviewed 8 out of 9 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| crates/viewer/re_viewer/src/lib.rs | Conditionally includes the new ingestion_worker module for non-WASM targets |
| crates/viewer/re_viewer/src/ingestion_worker.rs | Implements the background worker with bounded input queue and unbounded output channel |
| crates/viewer/re_viewer/src/app.rs | Integrates the ingestion worker into the app, routing Arrow messages to the worker and polling for processed chunks |
| crates/viewer/re_viewer/Cargo.toml | Adds re_sorbet dependency needed by the ingestion worker |
| crates/store/re_entity_db/src/entity_db.rs | Refactors Arrow message processing and makes chunk insertion method public |
| crates/store/re_data_loader/src/loader_rrd.rs | Extracts message transformation logic into a separate function and adds comprehensive ordering tests |
| crates/store/re_data_loader/benches/parallel_ingestion_bench.rs | Adds benchmark for measuring parallel ingestion performance |
| crates/store/re_data_loader/Cargo.toml | Configures the new benchmark with criterion and mimalloc dependencies |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Can you please rebase your PR? Seems it's based on a rather old commit from |
87bf607 to
47dd6eb
Compare
Done! |
I'm still seeing your fork's |
47dd6eb to
39c063d
Compare
|
Mea culpa! Should be good now. Also, I'm available on Discord if you have any questions. |
|
I'm observing a fairly large reduction of the "time to fully loaded" with a 2h air traffic data RRD: 30s -> 9s.
|
|
Based on the above benchmark, could you provide a more thorough analysis of the performance profile of this PR with Puffin? You should be able to produce the rrd using |
abey79
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for contributing! First, I'm extremely excited by the performance gain observed so far. Seems significant, and in an area that is very relevant. I've only partially reviewed this PR, but I think it needs some more massaging to be landable.
Our current stream model (in particular the LogMsg enum that mixes data and command) is flawed in many ways and make this stuff hard to reason about. In particular, I'm quite worried about the message ordering here, since a given stream can have multiple SetStoreInfo and corresponding ArrowMsg in between.
Also, a single, global worker seems odd. After discussing with @Wumpf, we think that that _each EntityDb should have its own worker. This would make everything easier to reason with, because by the time a (sub-)stream is specific to a given store, the order of ArrowMsg no longer matters. This could possible enable downstream performance improvement by using multiple parallel worker for each store (but let's keep that as future work).
On the performance side, as discussed, we need more thorough profiling with Puffin to understand better the performance profile of this PR. The 2h air traffic data is imo a good benchmark to start with. Also, the queue capacity (currently set to 2000) needs to be better understood and benchmarked.
Finally, more on the style side, it would be nice if the ingestion worker would abstract the wasm target, instead of having conditional compilation all over the place.
The compressed Puffin file is here. |
|
I'm working on it! |
39c063d to
c3fb549
Compare
|
This is what the old ingestion performance looks like, using the floating metrics window from PR #11820 against the sequential.mp4Time elapsed is ~65s to load the air traffic 2h data set. Throughput is 2017msg/s.
And this is what new ingestion performance looks like, using the same floating metrics window, adapted to parallel processing and included in this PR. parallel.mp4You can see that ingestion time went down to ~12s and throughput went up >5x.
|
|
@abey79 This confirms your findings from yesterday. It also confirms that performance did not decrease with the redesign. |
80c348f to
49d79d7
Compare
abey79
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for continuing work on that. Partial review again, the main points being:
- we need to separate the rrd loading metrics stuff from this PR
- we need to better encapsulate ingestion stuff in entity db
This commit addresses feedback from PR rerun-io#11798: 1. Use get_or_insert_with pattern for worker initialization - Replace manual if-check with idiomatic Option::get_or_insert_with - Better expresses intent and reduces nesting 2. Encapsulate ingestion worker processing in EntityDb - Add on_frame_start() method to EntityDb for worker processing - Add on_frame_start() to StoreBundle to process all workers - Add on_frame_start() to StoreHub as public interface - Remove process_ingestion_worker_output() from App - Move all worker-related logic out of App into EntityDb layer - App now calls store_hub.on_frame_start() instead of manually polling workers and processing output 3. Introduce IngestionWorkerResults type alias - Simplify complex return types - Export from re_entity_db crate Benefits: - Cleaner separation of concerns - EntityDb fully encapsulates its ingestion worker - Easier to understand and maintain - App layer no longer needs to know about worker internals
49d79d7 to
190c28e
Compare
|
Before: After: The metrics are key to the whole thing, in my opinion, as is performance tracking. I don't know how to tell if the PR provides true performance enhancements without metrics! I kept the metrics as the |
I'm not saying metrics are not useful, and it's great to have them to document the performance increase. But I cannot review this PR, let alone land it, unless the diff only covers the actual ingestion part. Using git, you should be able to rebase the metrics UI stuff on top of this branch, such that you can use it locally without it appearing here. |
190c28e to
2d8ffef
Compare
|
Cut the head off, please check! |
- Each EntityDb now has its own dedicated ingestion worker with separate channels for commands and message batches - Add benchmarks for ingestion performance testing
2d8ffef to
bfca894
Compare
|
Should be ready to go! |




Redesign ingestion with dedicated ingestion worker
Addresses #4298 by moving CPU-intensive Arrow message processing off the UI thread.
Key changes:
Ingestion Worker (crates/viewer/re_viewer/src/ingestion_worker.rs):
Viewer Integration (crates/viewer/re_viewer/src/app.rs):