Skip to content
Merged
11 changes: 11 additions & 0 deletions datafusion/physical-expr-common/src/sort_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,17 @@ impl LexOrdering {
req.expr.eq(&cur.expr) && is_reversed_sort_options(&req.options, &cur.options)
})
}

/// Returns the sort options for the given expression if one is defined in this `LexOrdering`.
pub fn get_sort_options(&self, expr: &dyn PhysicalExpr) -> Option<SortOptions> {
for e in self {
if e.expr.as_ref().dyn_eq(expr) {
return Some(e.options);
}
}

None
}
}

/// Check if two SortOptions represent reversed orderings.
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/aggregates/group_values/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use arrow::array::types::{
Time64MicrosecondType, Time64NanosecondType, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
};
use arrow::array::{ArrayRef, RecordBatch, downcast_primitive};
use arrow::array::{ArrayRef, downcast_primitive};
use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
use datafusion_common::Result;

Expand Down Expand Up @@ -112,7 +112,7 @@ pub trait GroupValues: Send {
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;

/// Clear the contents and shrink the capacity to the size of the batch (free up memory usage)
fn clear_shrink(&mut self, batch: &RecordBatch);
fn clear_shrink(&mut self, num_rows: usize);
}

/// Return a specialized implementation of [`GroupValues`] for the given schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::aggregates::group_values::multi_group_by::{
bytes_view::ByteViewGroupValueBuilder, primitive::PrimitiveGroupValueBuilder,
};
use ahash::RandomState;
use arrow::array::{Array, ArrayRef, RecordBatch};
use arrow::array::{Array, ArrayRef};
use arrow::compute::cast;
use arrow::datatypes::{
BinaryViewType, DataType, Date32Type, Date64Type, Decimal128Type, Float32Type,
Expand Down Expand Up @@ -1181,14 +1181,13 @@ impl<const STREAMING: bool> GroupValues for GroupValuesColumn<STREAMING> {
Ok(output)
}

fn clear_shrink(&mut self, batch: &RecordBatch) {
let count = batch.num_rows();
fn clear_shrink(&mut self, num_rows: usize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice cleanup

self.group_values.clear();
self.map.clear();
self.map.shrink_to(count, |_| 0); // hasher does not matter since the map is cleared
self.map.shrink_to(num_rows, |_| 0); // hasher does not matter since the map is cleared
self.map_size = self.map.capacity() * size_of::<(u64, usize)>();
self.hashes_buffer.clear();
self.hashes_buffer.shrink_to(count);
self.hashes_buffer.shrink_to(num_rows);

// Such structures are only used in `non-streaming` case
if !STREAMING {
Expand Down
9 changes: 4 additions & 5 deletions datafusion/physical-plan/src/aggregates/group_values/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use crate::aggregates::group_values::GroupValues;
use ahash::RandomState;
use arrow::array::{Array, ArrayRef, ListArray, RecordBatch, StructArray};
use arrow::array::{Array, ArrayRef, ListArray, StructArray};
use arrow::compute::cast;
use arrow::datatypes::{DataType, SchemaRef};
use arrow::row::{RowConverter, Rows, SortField};
Expand Down Expand Up @@ -243,17 +243,16 @@ impl GroupValues for GroupValuesRows {
Ok(output)
}

fn clear_shrink(&mut self, batch: &RecordBatch) {
let count = batch.num_rows();
fn clear_shrink(&mut self, num_rows: usize) {
self.group_values = self.group_values.take().map(|mut rows| {
rows.clear();
rows
});
self.map.clear();
self.map.shrink_to(count, |_| 0); // hasher does not matter since the map is cleared
self.map.shrink_to(num_rows, |_| 0); // hasher does not matter since the map is cleared
self.map_size = self.map.capacity() * size_of::<(u64, usize)>();
self.hashes_buffer.clear();
self.hashes_buffer.shrink_to(count);
self.hashes_buffer.shrink_to(num_rows);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use crate::aggregates::group_values::GroupValues;

use arrow::array::{
ArrayRef, AsArray as _, BooleanArray, BooleanBufferBuilder, NullBufferBuilder,
RecordBatch,
};
use datafusion_common::Result;
use datafusion_expr::EmitTo;
Expand Down Expand Up @@ -146,7 +145,7 @@ impl GroupValues for GroupValuesBoolean {
Ok(vec![Arc::new(BooleanArray::new(values, nulls)) as _])
}

fn clear_shrink(&mut self, _batch: &RecordBatch) {
fn clear_shrink(&mut self, _num_rows: usize) {
self.false_group = None;
self.true_group = None;
self.null_group = None;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::mem::size_of;

use crate::aggregates::group_values::GroupValues;

use arrow::array::{Array, ArrayRef, OffsetSizeTrait, RecordBatch};
use arrow::array::{Array, ArrayRef, OffsetSizeTrait};
use datafusion_common::Result;
use datafusion_expr::EmitTo;
use datafusion_physical_expr_common::binary_map::{ArrowBytesMap, OutputType};
Expand Down Expand Up @@ -120,7 +120,7 @@ impl<O: OffsetSizeTrait> GroupValues for GroupValuesBytes<O> {
Ok(vec![group_values])
}

fn clear_shrink(&mut self, _batch: &RecordBatch) {
fn clear_shrink(&mut self, _num_rows: usize) {
// in theory we could potentially avoid this reallocation and clear the
// contents of the maps, but for now we just reset the map from the beginning
self.map.take();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::aggregates::group_values::GroupValues;
use arrow::array::{Array, ArrayRef, RecordBatch};
use arrow::array::{Array, ArrayRef};
use datafusion_expr::EmitTo;
use datafusion_physical_expr::binary_map::OutputType;
use datafusion_physical_expr_common::binary_view_map::ArrowBytesViewMap;
Expand Down Expand Up @@ -122,7 +122,7 @@ impl GroupValues for GroupValuesBytesView {
Ok(vec![group_values])
}

fn clear_shrink(&mut self, _batch: &RecordBatch) {
fn clear_shrink(&mut self, _num_rows: usize) {
// in theory we could potentially avoid this reallocation and clear the
// contents of the maps, but for now we just reset the map from the beginning
self.map.take();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use arrow::array::{
cast::AsArray,
};
use arrow::datatypes::{DataType, i256};
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_execution::memory_pool::proxy::VecAllocExt;
use datafusion_expr::EmitTo;
Expand Down Expand Up @@ -213,11 +212,10 @@ where
Ok(vec![Arc::new(array.with_data_type(self.data_type.clone()))])
}

fn clear_shrink(&mut self, batch: &RecordBatch) {
let count = batch.num_rows();
fn clear_shrink(&mut self, num_rows: usize) {
self.values.clear();
self.values.shrink_to(count);
self.values.shrink_to(num_rows);
self.map.clear();
self.map.shrink_to(count, |_| 0); // hasher does not matter since the map is cleared
self.map.shrink_to(num_rows, |_| 0); // hasher does not matter since the map is cleared
}
}
Loading