Skip to content

Commit 7d5d966

Browse files
feat: Rewrite IR::Scan to IR::DataFrameScan in expand_datasets when applicable (#25106)
1 parent 5a4c0d2 commit 7d5d966

File tree

17 files changed

+121
-38
lines changed

17 files changed

+121
-38
lines changed

crates/polars-core/src/utils/mod.rs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1320,22 +1320,6 @@ pub fn coalesce_nulls_columns(a: &Column, b: &Column) -> (Column, Column) {
13201320
}
13211321
}
13221322

1323-
pub fn operation_exceeded_idxsize_msg(operation: &str) -> String {
1324-
if size_of::<IdxSize>() == size_of::<u32>() {
1325-
format!(
1326-
"{} exceeded the maximum supported limit of {} rows. Consider installing 'polars[rt64]'.",
1327-
operation,
1328-
IdxSize::MAX,
1329-
)
1330-
} else {
1331-
format!(
1332-
"{} exceeded the maximum supported limit of {} rows.",
1333-
operation,
1334-
IdxSize::MAX,
1335-
)
1336-
}
1337-
}
1338-
13391323
#[cfg(test)]
13401324
mod test {
13411325
use super::*;

crates/polars-plan/src/dsl/file_scan/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,9 @@ pub struct UnifiedScanArgs {
297297
pub table_statistics: Option<TableStatistics>,
298298
/// Stores (physical, deleted) row counts of the table if known upfront (e.g. for Iceberg).
299299
/// This allows for row-count queries to succeed without scanning all files.
300-
pub row_count: Option<(IdxSize, IdxSize)>,
300+
///
301+
/// Note, intentionally store u64 instead of IdxSize to avoid erroring if it's unused.
302+
pub row_count: Option<(u64, u64)>,
301303
}
302304

303305
impl UnifiedScanArgs {

crates/polars-plan/src/plans/optimizer/expand_datasets.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@ use std::sync::Arc;
33

44
use polars_core::config;
55
use polars_core::error::{PolarsResult, polars_bail};
6+
use polars_core::frame::DataFrame;
67
use polars_utils::arena::{Arena, Node};
78
use polars_utils::pl_str::PlSmallStr;
89
#[cfg(feature = "python")]
910
use polars_utils::python_function::PythonObject;
11+
use polars_utils::row_counter::RowCounter;
1012
use polars_utils::slice_enum::Slice;
1113
use polars_utils::{format_pl_smallstr, unitvec};
1214

@@ -330,6 +332,64 @@ pub(super) fn expand_datasets(
330332
}
331333

332334
apply_scan_predicate_to_scan_ir(node, ir_arena, expr_arena)?;
335+
336+
let output_schema = ir_arena.get(node).schema(ir_arena);
337+
338+
let IR::Scan {
339+
sources,
340+
file_info: _,
341+
hive_parts: _,
342+
predicate,
343+
predicate_file_skip_applied: _,
344+
output_schema: _,
345+
scan_type,
346+
unified_scan_args,
347+
} = ir_arena.get(node)
348+
else {
349+
unreachable!()
350+
};
351+
352+
let df: DataFrame = if (sources.is_empty()
353+
&& !matches!(scan_type.as_ref(), FileScanIR::Anonymous { .. }))
354+
|| unified_scan_args
355+
.pre_slice
356+
.as_ref()
357+
.is_some_and(|slice| slice.len() == 0)
358+
{
359+
if config::verbose() {
360+
eprintln!("expand_datasets: scan IR replaced with empty DataFrameScan")
361+
}
362+
363+
DataFrame::empty_with_schema(output_schema.as_ref())
364+
} else if output_schema.is_empty()
365+
&& let Some((physical_rows, deleted_rows)) = unified_scan_args.row_count
366+
&& unified_scan_args.pre_slice.is_none()
367+
&& predicate.is_none()
368+
{
369+
let row_counter = RowCounter::new(physical_rows, deleted_rows);
370+
row_counter.num_rows_idxsize()?;
371+
let num_rows = row_counter.num_rows()?;
372+
373+
if config::verbose() {
374+
eprintln!(
375+
"expand_datasets: scan IR replaced with 0-width DataFrameScan with height {} ({:?})",
376+
num_rows, &row_counter
377+
)
378+
}
379+
380+
DataFrame::empty_with_height(num_rows)
381+
} else {
382+
continue;
383+
};
384+
385+
let schema = df.schema().clone();
386+
let new_ir = IR::DataFrameScan {
387+
df: Arc::new(df),
388+
schema: schema.clone(),
389+
output_schema: Some(schema),
390+
};
391+
392+
ir_arena.replace(node, new_ir);
333393
}
334394

335395
Ok(())

crates/polars-python/src/io/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ impl PyScanOptions<'_> {
6666
retries: usize,
6767
deletion_files: Option<Wrap<DeletionFilesList>>,
6868
table_statistics: Option<Wrap<TableStatistics>>,
69-
row_count: Option<(IdxSize, IdxSize)>,
69+
row_count: Option<(u64, u64)>,
7070
}
7171

7272
let Extract {

crates/polars-stream/src/nodes/io_sources/multi_scan/components/apply_extra_ops.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@ use polars_io::RowIndex;
1010
use polars_io::predicates::ScanIOPredicate;
1111
use polars_plan::dsl::{CastColumnsPolicy, MissingColumnsPolicy, ScanSource};
1212
use polars_plan::plans::hive::HivePartitionsDf;
13+
use polars_utils::row_counter::RowCounter;
1314
use polars_utils::slice_enum::Slice;
1415

1516
use crate::nodes::io_sources::multi_scan::components::column_selector::ColumnSelector;
1617
use crate::nodes::io_sources::multi_scan::components::column_selector::builder::ColumnSelectorBuilder;
1718
use crate::nodes::io_sources::multi_scan::components::errors::missing_column_err;
1819
use crate::nodes::io_sources::multi_scan::components::projection::Projection;
19-
use crate::nodes::io_sources::multi_scan::components::row_counter::RowCounter;
2020
use crate::nodes::io_sources::multi_scan::components::row_deletions::ExternalFilterMask;
2121
use crate::nodes::io_sources::multi_scan::pipeline::models::ExtraOperations;
2222

crates/polars-stream/src/nodes/io_sources/multi_scan/components/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,4 @@ pub mod forbid_extra_columns;
88
pub mod physical_slice;
99
pub mod projection;
1010
pub mod reader_operation_pushdown;
11-
pub mod row_counter;
1211
pub mod row_deletions;

crates/polars-stream/src/nodes/io_sources/multi_scan/components/physical_slice.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1+
use polars_utils::row_counter::RowCounter;
12
use polars_utils::slice_enum::Slice;
23

3-
use crate::nodes::io_sources::multi_scan::components::row_counter::RowCounter;
44
use crate::nodes::io_sources::multi_scan::components::row_deletions::ExternalFilterMask;
55

66
/// Represents a [`Slice`] that has been potentially adjusted to account for deleted rows.

crates/polars-stream/src/nodes/io_sources/multi_scan/functions/resolve_slice.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ use components::row_deletions::DeletionFilesProvider;
44
use futures::StreamExt;
55
use polars_core::prelude::{InitHashMaps, PlHashMap};
66
use polars_error::PolarsResult;
7+
use polars_utils::row_counter::RowCounter;
78
use polars_utils::slice_enum::Slice;
89

910
use crate::async_executor::{self, AbortOnDropHandle, TaskPriority};
10-
use crate::nodes::io_sources::multi_scan::components::row_counter::RowCounter;
1111
use crate::nodes::io_sources::multi_scan::pipeline::models::ResolvedSliceInfo;
1212
use crate::nodes::io_sources::multi_scan::{MultiScanConfig, components};
1313

crates/polars-stream/src/nodes/io_sources/multi_scan/pipeline/initialization.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@ use futures::StreamExt;
55
use polars_core::prelude::PlHashMap;
66
use polars_error::PolarsResult;
77
use polars_mem_engine::scan_predicate::initialize_scan_predicate;
8+
use polars_utils::row_counter::RowCounter;
89
use polars_utils::slice_enum::Slice;
910

1011
use crate::async_executor::{self, AbortOnDropHandle, TaskPriority};
1112
use crate::async_primitives::connector::{self};
1213
use crate::nodes::io_sources::multi_scan::components::bridge::{BridgeRecvPort, BridgeState};
13-
use crate::nodes::io_sources::multi_scan::components::row_counter::RowCounter;
1414
use crate::nodes::io_sources::multi_scan::components::row_deletions::{
1515
DeletionFilesProvider, ExternalFilterMask, RowDeletionsInit,
1616
};

crates/polars-stream/src/nodes/io_sources/multi_scan/pipeline/models.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
use std::collections::VecDeque;
33
use std::sync::{Arc, Mutex};
44

5-
use components::row_counter::RowCounter;
65
use components::row_deletions::ExternalFilterMask;
76
use polars_core::prelude::PlHashMap;
87
use polars_core::schema::SchemaRef;
@@ -12,6 +11,7 @@ use polars_io::predicates::ScanIOPredicate;
1211
use polars_plan::dsl::{CastColumnsPolicy, MissingColumnsPolicy, ScanSource};
1312
use polars_plan::plans::hive::HivePartitionsDf;
1413
use polars_utils::pl_str::PlSmallStr;
14+
use polars_utils::row_counter::RowCounter;
1515
use polars_utils::slice_enum::Slice;
1616

1717
use crate::async_executor::AbortOnDropHandle;

0 commit comments

Comments
 (0)