-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Implement disk spilling for all grouping ordering modes in GroupedHashAggregateStream #19287
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
b8de4b9 to
8cdddb0
Compare
|
|
8cdddb0 to
deb73a9
Compare
|
Extended test failures are related to the changes in this PR. Will review as soon as I can. |
024f425 to
d9af260
Compare
|
I've extended this PR to include spill support for grouping by partial or fully sorted inputs. This fixed the previously failing tests. |
|
|
||
| let mut new_requirements = new_sort_exprs | ||
| .into_iter() | ||
| .map(PhysicalSortRequirement::from) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than creating new sort requirements with default options, this copies the sort options from the input instead.
| let group_ordering = GroupOrdering::try_new(&agg.input_order_mode)?; | ||
| let oom_mode = match group_ordering { | ||
| GroupOrdering::None => { | ||
| if agg.mode == AggregateMode::Partial { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intention here is to keep the earlier emit early behaviour for AggregateMode::Partial.
| fn find_sort_options( | ||
| output_ordering: Option<&LexOrdering>, | ||
| expr: &dyn PhysicalExpr, | ||
| ) -> SortOptions { | ||
| if let Some(ordering) = output_ordering { | ||
| for e in ordering { | ||
| if e.expr.as_ref().dyn_eq(expr) { | ||
| return e.options; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| SortOptions::default() | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't 100% sure about this implementation. Perhaps there's a better (more elegant) way to figure out what the correct SortOptions are.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could add a method to LexOrdering -- so this could be like
LexOrdering::find_options(expr).unwrap_or_default()
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @pepijnve -- this looks very close to me . I reviewed the code carefully and the only thing I have a concern about is spilling the extra batch. Otherwise this looks good to me.
I will kick off some benchmarks just to me sure
cc @rluvaton and @2010YOUY01 who I think have also been working in this code recently
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_grouped_aggregation_respects_memory_limit() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I verified these tests cover the new code as they fail without the code in this PR
failures:
---- aggregates::tests::test_grouped_aggregation_respects_memory_limit stdout ----
thread 'aggregates::tests::test_grouped_aggregation_respects_memory_limit' (25899629) panicked at datafusion/physical-plan/src/aggregates/mod.rs:3384:17:
Expected spill but SpillCount metric not found or SpillCount was 0.
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
---- aggregates::tests::test_order_is_retained_when_spilling stdout ----
thread 'aggregates::tests::test_order_is_retained_when_spilling' (25899632) panicked at datafusion/physical-plan/src/aggregates/mod.rs:3384:17:
Expected spill but SpillCount metric not found or SpillCount was 0.
failures:
aggregates::tests::test_grouped_aggregation_respects_memory_limit
aggregates::tests::test_order_is_retained_when_spilling
| // aggregation operator, we must use a schema that includes both the group | ||
| // columns **and** the partial-state columns. | ||
| let partial_agg_schema = create_schema( | ||
| let spill_schema = Arc::new(create_schema( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given that this is used for spilling (per the comments) renaming these variables makes sense to me
| let output_expr = Column::new(field.name().as_str(), idx); | ||
|
|
||
| // Try to use the sort options from the output ordering, if available. | ||
| // This ensures that spilled state is emitted in the expected order. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
| fn find_sort_options( | ||
| output_ordering: Option<&LexOrdering>, | ||
| expr: &dyn PhysicalExpr, | ||
| ) -> SortOptions { | ||
| if let Some(ordering) = output_ordering { | ||
| for e in ordering { | ||
| if e.expr.as_ref().dyn_eq(expr) { | ||
| return e.options; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| SortOptions::default() | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could add a method to LexOrdering -- so this could be like
LexOrdering::find_options(expr).unwrap_or_default()| } | ||
|
|
||
| /// Determines if `spill_state_if_oom` can free up memory by spilling state to disk | ||
| fn can_spill_on_oom(&self) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit would be to move this closer in the code to can_emit_early_on_oom so they are closer together in the code
| // Spill the last remaining rows (if any) to free up as much memory as possible. | ||
| // Since we're already spilling, we can be sure we're memory constrained. | ||
| // Creating an extra spill file won't make much of a difference. | ||
| self.spill()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sort of depends on how large the spill file was -- like if we have 2GB of data in memorying, writing it to a new spill file will likely make a measurable negative difference in performance
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had my doubts about this one myself due to performance regression concerns. On the other hand, the previous code was kind of cheating in the sense that the last batch is held in memory, and unless I'm mistaken, is no longer accounted for in the memory reservation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, this is how sort exec does - always spill when have in memory and about to do external sort
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed -- this approach makes sense for now. We can try and optimize it in the future if needed
| // clear up memory for streaming_merge | ||
| self.clear_all(); | ||
| self.update_memory_reservation()?; | ||
| let mut streams: Vec<SendableRecordBatchStream> = vec![]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is because the code now forces a spill even if it had memory to hold the last batch in memory, right?
I feel like while the code is now cleaner we have lost some performance
Maybe we could factor the creation of the stream into a method so it looks something like this
if let Some(stream) = self.emit_to_stream(EmitTo:All, true) {
builder = builder.with_streams(vec![stream]));
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's correct. By spilling the last batch the special handling is no longer required. Clean code wasn't really the main driver though. Accounting for batch in the memory reservation was.
|
run benchmarks |
|
run benchmark external_aggr |
|
🤖 |
|
🤖: Benchmark completed Details
|
This comment was marked as outdated.
This comment was marked as outdated.
2 similar comments
This comment was marked as outdated.
This comment was marked as outdated.
|
🤖 |
|
Benchmark runner seems to be struggling |
It seems that the benchmark doesn't complete on main (I haven't tried on this branch) |
|
Interesting. I’ll have a look at that tomorrow. |
4fdb8f0 to
f99ba14
Compare
|
@alamb 9f2f22e makes the Because I was setting Additionally the 'emit early' code would only kick in if there was at least one full batch of group values to emit. I've modified the code to still try to do that if it can, but fall back to emitting a smaller batch if memory pressure requires it. |
|
One remaining question I have is how In The same goes for |
…eam when spilling is not possible
- Report 'can spill: true' for emit early mode - Emit fewer than 'batch size' group values if necessary
…ut-of-memory error
0a3b96f to
1cc05a7
Compare
kazuyukitanimura
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @pepijnve
| /// At this point, all the inputs are read and there are some spills. | ||
| /// Emit the remaining rows and create a batch. | ||
| /// Conduct a streaming merge sort between the batch and spilled data. Since the stream is fully | ||
| /// sorted, set `self.group_ordering` to Full, then later we can read with [`EmitTo::First`]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we update this comment because we now always spill the remaining rows?
Also, is there any way to measure the impact of this change in a micro benchmark?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Yes, I'll revise that comment a bit.
I've made some adjustments that allow benchmarks/src/bin/external_aggr.rs to run to completion again (it was broken on main). I haven't inspected the query plans it uses yet to see which code path it's actually covering.
Because it wasn't working on main it's not possible to use it for comparative testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I decided to inline update_merged_stream into set_input_done_and_produce_output. I couldn't come up with a sensible name for it, and found it to be clearer if the two alternatives in set_input_done_and_produce_output are close together.
I've added additional comments in set_input_done_and_produce_output to explain what's being done there and why.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In external_aggr.rs, the query plan for Q1 is
ProjectionExec: expr=[count(Int64(1))@0 as count(*)]
AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
CoalescePartitionsExec
AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
ProjectionExec: expr=[]
AggregateExec: mode=FinalPartitioned, gby=[l_orderkey@0 as l_orderkey, l_suppkey@1 as l_suppkey], aggr=[]
RepartitionExec: partitioning=Hash([l_orderkey@0, l_suppkey@1], 4), input_partitions=4
AggregateExec: mode=Partial, gby=[l_orderkey@0 as l_orderkey, l_suppkey@1 as l_suppkey], aggr=[]
DataSourceExec: ...
I've confirmed with the debugger that a mix of OutOfMemoryMode::EmitEarly and OutOfMemoryMode::Spill is used. So that benchmark is definitely useful to measure the performance impact of changes in this area of the code.
@alamb Is it worth trying to get it in working state again on main with only minimal changes first? My appetite for that work is rather limited unless it's considered a blocker for this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb Is it worth trying to get it in working state again on main with only minimal changes first? My appetite for that work is rather limited unless it's considered a blocker for this PR.
I don't think so. If we hit issues we can always make a new PR
|
I think this one is ready to go. I'll just update it once more and run benchmarks one more time to make me feel better and plan to merge it in |
|
run benchmark clickbench_partitioned |
|
🤖 |
|
run benchmark tpch |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
Thanks again @pepijnve and @kazuyukitanimura |
Which issue does this PR close?
Rationale for this change
GroupedHashAggregateStream currently always reports that it can spill to the memory tracking subsystem even though this is dependent on the aggregation mode and the grouping order.
The optimistic logic in
group_aggregate_batchdoes not correctly take the spilling preconditions into account which can lead to excessive memory use.In order to to resolve this, this PR implements disk spilling for all grouping modes.
What changes are included in this PR?
MemoryConsumer::can_spillto reflect actual spilling behaviourgroup_aggregate_batchis aligned with disk spilling or early emission logicAre these changes tested?
Added additional test case to demonstrate problem.
Added test case to check that output order is respected after spilling.
Are there any user-facing changes?
Yes, memory exhaustion may be reported much earlier in the query pipeline than is currently the case. In my local tests with a per consumer memory limit of 32MiB, grouped aggregation would consume 480MiB in practice. This was then reported by ExternalSortExec which choked on trying to reserve that much memory.