-
Notifications
You must be signed in to change notification settings - Fork 204
[AURON #1693] split one large batch to many small batch after sort merge join #1694
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR addresses issue #1693 by introducing batch size normalization for sort merge join output. The sort merge join operator can produce very large batches that exceed the configured batch size, leading to memory and performance issues. The solution splits these large batches into smaller ones and then applies coalescing to ensure optimal batch sizes.
Key Changes:
- Added
split_with_default_batch_sizemethod to ExecutionContext that splits large batches into chunks of target batch size - Modified SortMergeJoinExec to apply both split and coalesce operations to its output stream
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| native-engine/datafusion-ext-plans/src/common/execution_context.rs | Introduces split_with_default_batch_size method with internal SplitLargeBatchStream that slices large batches into smaller chunks using the configured batch size |
| native-engine/datafusion-ext-plans/src/sort_merge_join_exec.rs | Updates output_with_streams to wrap the output with both split and coalesce operations for batch size normalization |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| pub fn split_with_default_batch_size( | ||
| self: &Arc<Self>, | ||
| input: SendableRecordBatchStream, | ||
| ) -> SendableRecordBatchStream { |
Copilot
AI
Dec 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The split_with_default_batch_size method lacks documentation. Consider adding a doc comment that explains:
- The purpose of this method (splitting large batches into smaller ones)
- The behavior when batches are already smaller than the target size
- The target batch size used (from
batch_size()) - The relationship with
coalesce_with_default_batch_size
Example:
/// Splits large record batches into smaller batches with sizes not exceeding
/// the default batch size. Batches smaller than or equal to the target size
/// are passed through unchanged. Empty batches are filtered out.
///
/// This is typically used in combination with `coalesce_with_default_batch_size`
/// to normalize batch sizes in a stream.
| pub fn split_with_default_batch_size( | ||
| self: &Arc<Self>, | ||
| input: SendableRecordBatchStream, | ||
| ) -> SendableRecordBatchStream { | ||
| struct SplitLargeBatchStream { | ||
| input: SendableRecordBatchStream, | ||
| current_batch: Option<RecordBatch>, | ||
| current_offset: usize, | ||
| } | ||
|
|
||
| impl SplitLargeBatchStream { | ||
| fn split_next_chunk(&mut self) -> Option<RecordBatch> { | ||
| let batch = self.current_batch.as_ref()?; | ||
| let target_batch_size = batch_size(); | ||
| let num_rows = batch.num_rows(); | ||
|
|
||
| if self.current_offset >= num_rows { | ||
| self.current_batch = None; | ||
| return None; | ||
| } | ||
|
|
||
| let chunk_size = std::cmp::min(target_batch_size, num_rows - self.current_offset); | ||
| let chunk = batch.slice(self.current_offset, chunk_size); | ||
| self.current_offset += chunk_size; | ||
|
|
||
| if self.current_offset >= num_rows { | ||
| self.current_batch = None; | ||
| } | ||
|
|
||
| Some(chunk) | ||
| } | ||
| } | ||
|
|
||
| impl RecordBatchStream for SplitLargeBatchStream { | ||
| fn schema(&self) -> SchemaRef { | ||
| self.input.schema() | ||
| } | ||
| } | ||
|
|
||
| impl Stream for SplitLargeBatchStream { | ||
| type Item = Result<RecordBatch>; | ||
|
|
||
| fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { | ||
| loop { | ||
| if let Some(chunk) = self.split_next_chunk() { | ||
| return Poll::Ready(Some(Ok(chunk))); | ||
| } | ||
|
|
||
| match ready!(self.input.as_mut().poll_next_unpin(cx)) { | ||
| Some(Ok(batch)) => { | ||
| if batch.is_empty() { | ||
| continue; | ||
| } | ||
|
|
||
| let target_batch_size = batch_size(); | ||
| if target_batch_size == 0 { | ||
| return Poll::Ready(Some(Err(DataFusionError::Internal( | ||
| "Invalid batch size: 0".to_string(), | ||
| )))); | ||
| } | ||
|
|
||
| let num_rows = batch.num_rows(); | ||
| if num_rows <= target_batch_size { | ||
| return Poll::Ready(Some(Ok(batch))); | ||
| } else { | ||
| self.current_batch = Some(batch); | ||
| self.current_offset = 0; | ||
| } | ||
| } | ||
| Some(Err(e)) => return Poll::Ready(Some(Err(e))), | ||
| None => return Poll::Ready(None), | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Box::pin(SplitLargeBatchStream { | ||
| input, | ||
| current_batch: None, | ||
| current_offset: 0, | ||
| }) | ||
| } |
Copilot
AI
Dec 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new split_with_default_batch_size method lacks test coverage. Consider adding unit tests to verify:
- Splitting large batches (e.g., 10000 rows) into multiple smaller batches
- Passing through batches already at or below target size
- Handling empty batches
- Error handling for zero batch size
- Edge cases like batches with exactly target_batch_size rows
This is important to ensure the splitting logic works correctly, especially for the error case at line 199-202.
| Ok(exec_ctx | ||
| .coalesce_with_default_batch_size(exec_ctx.split_with_default_batch_size(output))) |
Copilot
AI
Dec 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The composition coalesce_with_default_batch_size(split_with_default_batch_size(output)) may create unnecessary overhead. After splitting large batches to the target size, the coalesce operation will immediately try to merge them back if they're too small (less than 1/4 of batch_size based on line 275 of execution_context.rs).
Consider whether both operations are needed here, or if just split_with_default_batch_size would suffice for the sort merge join output. The coalesce operation is typically used for small batches from sources like filters or unions, but after splitting, batches should already be close to the target size.
| Ok(exec_ctx | |
| .coalesce_with_default_batch_size(exec_ctx.split_with_default_batch_size(output))) | |
| Ok(exec_ctx.split_with_default_batch_size(output)) |
|
@XorSum I wonder why do we need to do this manual batch split, the join algorithm itself already have batch_size handling logic, there is bug in it? auron/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs Lines 55 to 57 in 1051790
|
Thanks for the comment! I do not yet understand how the join batch_size handling logic works, however I did find a reproducible bug(#1693). An extremely large batch (100 million rows) is generated after the join operation, and lead to the We can verify this fact by printing the num_rows in
|
In some extreme cases which have a lot of duplicated keys in both join sides, the |
…1701) <!-- Thanks for sending a pull request! Please keep the following tips in mind: - Start the PR title with the related issue ID, e.g. '[AURON #XXXX] Short summary...'. - Make your PR title clear and descriptive, summarizing what this PR changes. - Provide a concise example to reproduce the issue, if possible. - Keep the PR description up to date with all changes. --> # Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> Closes #1693. # Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> As discussed previously in #1693 and #1694, the join operation should check batch size and trigger flushing in a timely manner, to prevent extreme large batch size. # What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> # Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> # How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible. If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future. If tests were not added, please describe why they were not added and/or why it was difficult to add. --> --------- Co-authored-by: Copilot <[email protected]>


Which issue does this PR close?
Closes #1693.
Rationale for this change
What changes are included in this PR?
split_with_default_batch_sizemethod, which split large batch into many small batches with default - batch_size.Are there any user-facing changes?
No
How was this patch tested?
Performed manual testing with the code specified in the issue.