-
Notifications
You must be signed in to change notification settings - Fork 1.8k
feat: Support sliding window queries for MedianAccumulator by implementing retract_batch
#19278
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
2010YOUY01
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great, thank you! Left some suggestions.
This PR adds a simple working solution, and it’s quite interesting to figure out how to retract efficiently for large windows 🤔
| median(value) OVER ( | ||
| PARTITION BY tags | ||
| ORDER BY timestamp | ||
| ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend to test different window frames like UNBOUNDED PRECEDING/FOLLOWING
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't familiar with these before, but this was a great idea! It helped me find and understand a bug.
| # median_non_sliding_window | ||
| query ITRRRR | ||
| SELECT | ||
| timestamp, | ||
| tags, | ||
| value, | ||
| median(value) OVER ( | ||
| PARTITION BY tags | ||
| ORDER BY timestamp | ||
| ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW | ||
| ) AS value_median_unbounded_preceding, | ||
| median(value) OVER ( | ||
| PARTITION BY tags | ||
| ORDER BY timestamp | ||
| ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING | ||
| ) AS value_median_unbounded_both, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For UNBOUNDED FOLLOWING, an error is raised when retract_batch() isn't implemented. I found that queries with UNBOUNDED PRECEDING do not trigger this and instead return incorrect results. I assume this is a bug, right? If so, I can file a ticket.
For example, if you remove the UNBOUNDED FOLLOWING case right below my comment here, and try the query on main, I get this diff instead of an error.
Results Diff
``` [Diff] (-expected|+actual) 1 tag1 10 10 30 - 2 tag1 20 15 30 - 3 tag1 30 20 30 - 4 tag1 40 25 30 - 5 tag1 50 30 30 + 2 tag1 20 20 30 + 3 tag1 30 30 30 + 4 tag1 40 40 30 + 5 tag1 50 50 30 1 tag2 60 60 80 - 2 tag2 70 65 80 - 3 tag2 80 70 80 - 4 tag2 90 75 80 - 5 tag2 100 80 80 + 2 tag2 70 70 80 + 3 tag2 80 80 80 + 4 tag2 90 90 80 + 5 tag2 100 100 80 ```There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is this quite informative comment which seems to explain why this is the case:
datafusion/datafusion/physical-expr/src/aggregate.rs
Lines 490 to 538 in befaf93
| // Accumulators that have window frame startings different | |
| // than `UNBOUNDED PRECEDING`, such as `1 PRECEDING`, need to | |
| // implement retract_batch method in order to run correctly | |
| // currently in DataFusion. | |
| // | |
| // If this `retract_batches` is not present, there is no way | |
| // to calculate result correctly. For example, the query | |
| // | |
| // ```sql | |
| // SELECT | |
| // SUM(a) OVER(ORDER BY a ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS sum_a | |
| // FROM | |
| // t | |
| // ``` | |
| // | |
| // 1. First sum value will be the sum of rows between `[0, 1)`, | |
| // | |
| // 2. Second sum value will be the sum of rows between `[0, 2)` | |
| // | |
| // 3. Third sum value will be the sum of rows between `[1, 3)`, etc. | |
| // | |
| // Since the accumulator keeps the running sum: | |
| // | |
| // 1. First sum we add to the state sum value between `[0, 1)` | |
| // | |
| // 2. Second sum we add to the state sum value between `[1, 2)` | |
| // (`[0, 1)` is already in the state sum, hence running sum will | |
| // cover `[0, 2)` range) | |
| // | |
| // 3. Third sum we add to the state sum value between `[2, 3)` | |
| // (`[0, 2)` is already in the state sum). Also we need to | |
| // retract values between `[0, 1)` by this way we can obtain sum | |
| // between [1, 3) which is indeed the appropriate range. | |
| // | |
| // When we use `UNBOUNDED PRECEDING` in the query starting | |
| // index will always be 0 for the desired range, and hence the | |
| // `retract_batch` method will not be called. In this case | |
| // having retract_batch is not a requirement. | |
| // | |
| // This approach is a a bit different than window function | |
| // approach. In window function (when they use a window frame) | |
| // they get all the desired range during evaluation. | |
| if !accumulator.supports_retract_batch() { | |
| return not_impl_err!( | |
| "Aggregate can not be used as a sliding accumulator because \ | |
| `retract_batch` is not implemented: {}", | |
| self.name | |
| ); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should file a ticket, the previous impl should be able to handle unbounded preceding as @Jefffrey explained, and the inconsistent results is likely to indicate a bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, so you're saying unbounded preceding is supposed to work even without retract_batch() implemented. I was originally under the impression that it wasn't, but no that makes total sense now.
In that case, I think this PR is already fixes the bug, so there's no need to submit an issue for that. I mentioned in this comment that passing mut instead of clearing state with take() (81ced74) fixes the results in the mod.rs test. I've verified this by copying that change (81ced74) over to main and testing it, and the results for that test change. It's completely unrelated to the new support for retract_batch(). We just have an integer overflow issue remaining, which I've submitted an issue for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this makes sense. I realized that the root cause is already known and it's not possible to cause issue else where.
| median(value) OVER ( | ||
| PARTITION BY tags | ||
| ORDER BY timestamp | ||
| ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't familiar with these before, but this was a great idea! It helped me find and understand a bug.
| | -85 | -101 | 14 | -12 | -12 | 83 | -101 | 4 | -54 | | ||
| | -85 | -101 | 17 | -25 | -25 | 83 | -101 | 5 | -31 | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found that this test was returning incorrect results due to the bug I explained in another comment, instead of raising an error. The results here were fixed by updating evaluate() to pass a &mut instead of consuming the state with std::mem::take().
retract_batch
Jefffrey
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, just one minor question on some of the updated test results
| | -85 | -48 | 6 | -35 | -36 | 83 | -85 | 2 | -43 | | ||
| | -85 | -5 | 4 | -37 | -40 | -5 | -85 | 1 | 83 | | ||
| | -85 | -54 | 15 | -17 | -18 | 83 | -101 | 4 | -38 | | ||
| | -85 | -56 | 2 | -70 | 57 | -56 | -85 | 1 | -25 | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find this interesting, how we have -70 for the approx median but 57 for median 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch. I looked into it, and it seems like it's wrapping around due to integer overflow while taking the average of the middle two values (since the count is even).
low: [-85], high: -56, median: 57 datatype: Int8
-85 + -56 = -141 -> wraparound to 115
Then 115 / 2 -> 57.5 -> 57 (truncated due to integer type)
What's our desired behavior in this case? We could promote to a larger datatype to perform the calculation. Also is it intentional to return the value as a truncated integer instead of a float?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding overflow, perhaps we should raise a separate issue to discuss/track this, as it does seem like incorrect behaviour.
We could do similar for the truncated integer behaviour; there was a recent issue asking about this for reference: #18867 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed: #19322
| # median_non_sliding_window | ||
| query ITRRRR | ||
| SELECT | ||
| timestamp, | ||
| tags, | ||
| value, | ||
| median(value) OVER ( | ||
| PARTITION BY tags | ||
| ORDER BY timestamp | ||
| ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW | ||
| ) AS value_median_unbounded_preceding, | ||
| median(value) OVER ( | ||
| PARTITION BY tags | ||
| ORDER BY timestamp | ||
| ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING | ||
| ) AS value_median_unbounded_both, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is this quite informative comment which seems to explain why this is the case:
datafusion/datafusion/physical-expr/src/aggregate.rs
Lines 490 to 538 in befaf93
| // Accumulators that have window frame startings different | |
| // than `UNBOUNDED PRECEDING`, such as `1 PRECEDING`, need to | |
| // implement retract_batch method in order to run correctly | |
| // currently in DataFusion. | |
| // | |
| // If this `retract_batches` is not present, there is no way | |
| // to calculate result correctly. For example, the query | |
| // | |
| // ```sql | |
| // SELECT | |
| // SUM(a) OVER(ORDER BY a ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS sum_a | |
| // FROM | |
| // t | |
| // ``` | |
| // | |
| // 1. First sum value will be the sum of rows between `[0, 1)`, | |
| // | |
| // 2. Second sum value will be the sum of rows between `[0, 2)` | |
| // | |
| // 3. Third sum value will be the sum of rows between `[1, 3)`, etc. | |
| // | |
| // Since the accumulator keeps the running sum: | |
| // | |
| // 1. First sum we add to the state sum value between `[0, 1)` | |
| // | |
| // 2. Second sum we add to the state sum value between `[1, 2)` | |
| // (`[0, 1)` is already in the state sum, hence running sum will | |
| // cover `[0, 2)` range) | |
| // | |
| // 3. Third sum we add to the state sum value between `[2, 3)` | |
| // (`[0, 2)` is already in the state sum). Also we need to | |
| // retract values between `[0, 1)` by this way we can obtain sum | |
| // between [1, 3) which is indeed the appropriate range. | |
| // | |
| // When we use `UNBOUNDED PRECEDING` in the query starting | |
| // index will always be 0 for the desired range, and hence the | |
| // `retract_batch` method will not be called. In this case | |
| // having retract_batch is not a requirement. | |
| // | |
| // This approach is a a bit different than window function | |
| // approach. In window function (when they use a window frame) | |
| // they get all the desired range during evaluation. | |
| if !accumulator.supports_retract_batch() { | |
| return not_impl_err!( | |
| "Aggregate can not be used as a sliding accumulator because \ | |
| `retract_batch` is not implemented: {}", | |
| self.name | |
| ); | |
| } |
| fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { | ||
| let values = values[0].as_primitive::<T>(); | ||
| for v in values.iter().flatten() { | ||
| if let Some(idx) = self.all_values.iter().position(|x| *x == v) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems this could be very slow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I improved it using a hashmap in 1b710cc
| # median_non_sliding_window | ||
| query ITRRRR | ||
| SELECT | ||
| timestamp, | ||
| tags, | ||
| value, | ||
| median(value) OVER ( | ||
| PARTITION BY tags | ||
| ORDER BY timestamp | ||
| ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW | ||
| ) AS value_median_unbounded_preceding, | ||
| median(value) OVER ( | ||
| PARTITION BY tags | ||
| ORDER BY timestamp | ||
| ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING | ||
| ) AS value_median_unbounded_both, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should file a ticket, the previous impl should be able to handle unbounded preceding as @Jefffrey explained, and the inconsistent results is likely to indicate a bug.
| } | ||
|
|
||
| fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { | ||
| let mut to_remove: HashMap<ScalarValue, usize> = HashMap::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a good optimization with minimal added complexity.
Which issue does this PR close?
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Added tests
Are there any user-facing changes?
Computing the median() window is now supported instead of throwing an error