Skip to content

Commit 5a4c0d2

Browse files
fix: Fix CSV select(len()) off by 1 with comment prefix (#25069)
1 parent 890fc7a commit 5a4c0d2

File tree

5 files changed

+118
-108
lines changed

5 files changed

+118
-108
lines changed

crates/polars-io/src/csv/read/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,15 @@ mod splitfields;
2626
mod utils;
2727

2828
pub use options::{CommentPrefix, CsvEncoding, CsvParseOptions, CsvReadOptions, NullValues};
29-
pub use parser::{count_rows, count_rows_from_slice_par, count_rows_from_slice_raw};
29+
pub use parser::{count_rows, count_rows_from_slice_par};
3030
pub use read_impl::batched::{BatchedCsvReader, OwnedBatchedCsvReader};
3131
pub use reader::CsvReader;
3232
pub use schema_inference::infer_file_schema;
3333

3434
pub mod _csv_read_internal {
3535
pub use super::buffer::validate_utf8;
3636
pub use super::options::NullValuesCompiled;
37-
pub use super::parser::CountLines;
37+
pub use super::parser::{CountLines, is_comment_line};
3838
pub use super::read_impl::{cast_columns, find_starting_point, read_chunk};
3939
pub use super::reader::prepare_csv_schema;
4040
}

crates/polars-io/src/csv/read/parser.rs

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -113,16 +113,17 @@ pub fn count_rows_from_slice_par(
113113
// Ensure we start at the start of a line.
114114
if let Some(nl_off) = bytes[start_offset..next_start_offset]
115115
.iter()
116-
.position(|b| *b == b'\n')
116+
.position(|b| *b == eol_char)
117117
{
118118
start_offset += nl_off + 1;
119119
} else {
120120
return count.analyze_chunk(&[]);
121121
}
122122
}
123123

124-
let stop_offset = if let Some(nl_off) =
125-
bytes[next_start_offset..].iter().position(|b| *b == b'\n')
124+
let stop_offset = if let Some(nl_off) = bytes[next_start_offset..]
125+
.iter()
126+
.position(|b| *b == eol_char)
126127
{
127128
next_start_offset + nl_off + 1
128129
} else {
@@ -140,28 +141,21 @@ pub fn count_rows_from_slice_par(
140141
n += pair[in_string as usize].newline_count;
141142
in_string = pair[in_string as usize].end_inside_string;
142143
}
143-
if let Some(last) = bytes.last() {
144-
n += (*last != eol_char) as usize;
144+
if let Some(last) = bytes.last()
145+
&& *last != eol_char
146+
&& (comment_prefix.is_none()
147+
|| !is_comment_line(
148+
bytes.rsplit(|c| *c == eol_char).next().unwrap(),
149+
comment_prefix,
150+
))
151+
{
152+
n += 1
145153
}
154+
146155
Ok(n)
147156
})
148157
}
149158

150-
/// Read the number of rows without parsing columns, assuming bytes is at a
151-
/// newline starting point. Does not deal with start/header.
152-
pub fn count_rows_from_slice_raw(
153-
bytes: &[u8],
154-
quote_char: Option<u8>,
155-
comment_prefix: Option<&CommentPrefix>,
156-
eol_char: u8,
157-
) -> PolarsResult<usize> {
158-
Ok(
159-
CountLines::new(quote_char, eol_char, comment_prefix.cloned())
160-
.count(bytes)
161-
.0,
162-
)
163-
}
164-
165159
/// Skip the utf-8 Byte Order Mark.
166160
/// credits to csv-core
167161
pub(super) fn skip_bom(input: &[u8]) -> &[u8] {
@@ -176,7 +170,7 @@ pub(super) fn skip_bom(input: &[u8]) -> &[u8] {
176170
///
177171
/// This function is used during CSV parsing to determine whether a line should be ignored based on its starting characters.
178172
#[inline]
179-
pub(super) fn is_comment_line(line: &[u8], comment_prefix: Option<&CommentPrefix>) -> bool {
173+
pub fn is_comment_line(line: &[u8], comment_prefix: Option<&CommentPrefix>) -> bool {
180174
match comment_prefix {
181175
Some(CommentPrefix::Single(c)) => line.first() == Some(c),
182176
Some(CommentPrefix::Multi(s)) => line.starts_with(s.as_bytes()),
@@ -770,7 +764,10 @@ impl CountLines {
770764
// Skip comment line if needed.
771765
while bytes[scan_offset..].starts_with(pre_s) {
772766
scan_offset += pre_s.len();
773-
let Some(nl_off) = bytes[scan_offset..].iter().position(|c| *c == b'\n') else {
767+
let Some(nl_off) = bytes[scan_offset..]
768+
.iter()
769+
.position(|c| *c == self.eol_char)
770+
else {
774771
break;
775772
};
776773
scan_offset += nl_off + 1;
@@ -799,13 +796,18 @@ impl CountLines {
799796
loop {
800797
let b = unsafe { bytes.get_unchecked(..(*chunk_size).min(bytes.len())) };
801798

802-
let (count, offset) = self.count(b);
799+
let (count, offset) = if self.comment_prefix.is_some() {
800+
let stats = self.analyze_chunk_with_comment(b, false);
801+
(stats.newline_count, stats.last_newline_offset)
802+
} else {
803+
self.count(b)
804+
};
803805

804806
if count > 0 || b.len() == bytes.len() {
805807
return (count, offset);
806808
}
807809

808-
*chunk_size *= 2;
810+
*chunk_size = chunk_size.saturating_mul(2);
809811
}
810812
}
811813

crates/polars-io/src/csv/read/read_impl.rs

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -407,13 +407,13 @@ impl<'a> CoreReader<'a> {
407407
std::ptr::eq(b.as_ptr().add(b.len()), bytes.as_ptr().add(bytes.len()))
408408
} {
409409
total_offset = bytes.len();
410-
(b, 1)
410+
let c = if is_comment_line(bytes, self.parse_options.comment_prefix.as_ref()) {
411+
0
412+
} else {
413+
1
414+
};
415+
(b, c)
411416
} else {
412-
if count == 0 {
413-
chunk_size *= 2;
414-
continue;
415-
}
416-
417417
let end = total_offset + position + 1;
418418
let b = unsafe { bytes.get_unchecked(total_offset..end) };
419419

@@ -440,12 +440,21 @@ impl<'a> CoreReader<'a> {
440440
let result = slf
441441
.read_chunk(b, projection, 0, count, Some(0), b.len())
442442
.and_then(|mut df| {
443-
444443
// Check malformed
445-
if df.height() > count || (df.height() < count && slf.parse_options.comment_prefix.is_none()) {
444+
if df.height() > count
445+
|| (df.height() < count
446+
&& slf.parse_options.comment_prefix.is_none())
447+
{
446448
// Note: in case data is malformed, df.height() is more likely to be correct than count.
447-
let msg = format!("CSV malformed: expected {} rows, actual {} rows, in chunk starting at byte offset {}, length {}",
448-
count, df.height(), previous_total_offset, b.len());
449+
let msg = format!(
450+
"CSV malformed: expected {} rows, \
451+
actual {} rows, in chunk starting at \
452+
byte offset {}, length {}",
453+
count,
454+
df.height(),
455+
previous_total_offset,
456+
b.len()
457+
);
449458
if slf.ignore_errors {
450459
polars_warn!(msg);
451460
} else {
@@ -482,9 +491,7 @@ impl<'a> CoreReader<'a> {
482491

483492
// Check just after we spawned a chunk. That mean we processed all data up until
484493
// row count.
485-
if self.n_rows.is_some()
486-
&& total_line_count.load() > self.n_rows.unwrap()
487-
{
494+
if self.n_rows.is_some() && total_line_count.load() > self.n_rows.unwrap() {
488495
break;
489496
}
490497
}

crates/polars-stream/src/nodes/io_sources/csv.rs

Lines changed: 21 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,11 @@ use polars_error::{PolarsResult, polars_bail, polars_err, polars_warn};
88
use polars_io::RowIndex;
99
use polars_io::cloud::CloudOptions;
1010
use polars_io::prelude::_csv_read_internal::{
11-
CountLines, NullValuesCompiled, cast_columns, find_starting_point, prepare_csv_schema,
12-
read_chunk,
11+
CountLines, NullValuesCompiled, cast_columns, find_starting_point, is_comment_line,
12+
prepare_csv_schema, read_chunk,
1313
};
1414
use polars_io::prelude::buffer::validate_utf8;
15-
use polars_io::prelude::{
16-
CommentPrefix, CsvEncoding, CsvParseOptions, CsvReadOptions, count_rows_from_slice_raw,
17-
};
15+
use polars_io::prelude::{CsvEncoding, CsvParseOptions, CsvReadOptions};
1816
use polars_io::utils::compression::maybe_decompress_bytes;
1917
use polars_io::utils::slice::SplitSlicePosition;
2018
use polars_plan::dsl::ScanSource;
@@ -251,15 +249,11 @@ impl FileReader for CsvFileReader {
251249
)
252250
}
253251

254-
// Only used on empty projection, or if we need the exact row count.
255-
let alt_count_lines: Option<Arc<CountLinesWithComments>> =
256-
CountLinesWithComments::opt_new(&self.options.parse_options).map(Arc::new);
257252
let chunk_reader = Arc::new(ChunkReader::try_new(
258253
self.options.clone(),
259254
inferred_schema.clone(),
260255
projection,
261256
row_index,
262-
alt_count_lines.clone(),
263257
)?);
264258

265259
let needs_full_row_count = n_rows_in_file_tx.is_some();
@@ -304,7 +298,6 @@ impl FileReader for CsvFileReader {
304298
let chunk_reader = chunk_reader.clone();
305299
// Note: We don't use this (it is handled by the bridge). But morsels require a source token.
306300
let source_token = SourceToken::new();
307-
let alt_count_lines = alt_count_lines.clone();
308301

309302
AbortOnDropHandle::new(spawn(TaskPriority::Low, async move {
310303
while let Ok(LineBatch {
@@ -352,7 +345,7 @@ impl FileReader for CsvFileReader {
352345
}
353346

354347
while let Ok(LineBatch {
355-
bytes,
348+
bytes: _,
356349
n_lines,
357350
slice,
358351
row_offset: _,
@@ -361,12 +354,6 @@ impl FileReader for CsvFileReader {
361354
{
362355
assert_eq!(slice, SLICE_ENDED);
363356

364-
let n_lines = if let Some(v) = alt_count_lines.as_deref() {
365-
v.count_lines(bytes)?
366-
} else {
367-
n_lines
368-
};
369-
370357
n_rows_processed = n_rows_processed.saturating_add(n_lines);
371358
}
372359
}
@@ -473,17 +460,18 @@ impl LineBatchSource {
473460

474461
let global_bytes: &[u8] = memslice.as_ref();
475462
let global_bytes: &'static [u8] = unsafe { std::mem::transmute(global_bytes) };
463+
let comment_prefix = options.parse_options.comment_prefix.as_ref();
476464

477-
let i = {
478-
let parse_options = options.parse_options.as_ref();
465+
let parse_options = options.parse_options.as_ref();
466+
let eol_char = parse_options.eol_char;
479467

468+
let i = {
480469
let quote_char = parse_options.quote_char;
481-
let eol_char = parse_options.eol_char;
482470

483471
let skip_lines = options.skip_lines;
484472
let skip_rows_before_header = options.skip_rows;
485473
let skip_rows_after_header = options.skip_rows_after_header;
486-
let comment_prefix = parse_options.comment_prefix.clone();
474+
let comment_prefix = comment_prefix.cloned();
487475
let has_header = options.has_header;
488476

489477
find_starting_point(
@@ -524,7 +512,16 @@ impl LineBatchSource {
524512

525513
let (count, position) = line_counter.find_next(bytes, &mut chunk_size);
526514
let (count, position) = if count == 0 {
527-
(1, bytes.len())
515+
let c = if *bytes.last().unwrap() != eol_char
516+
&& !is_comment_line(
517+
bytes.rsplit(|c| *c == eol_char).next().unwrap(),
518+
comment_prefix,
519+
) {
520+
1
521+
} else {
522+
0
523+
};
524+
(c, bytes.len())
528525
} else {
529526
let pos = (position + 1).min(bytes.len()); // +1 for '\n'
530527
(count, pos)
@@ -596,8 +593,6 @@ struct ChunkReader {
596593
null_values: Option<NullValuesCompiled>,
597594
validate_utf8: bool,
598595
row_index: Option<RowIndex>,
599-
// Alternate line counter when there are comments. This is used on empty projection.
600-
alt_count_lines: Option<Arc<CountLinesWithComments>>,
601596
}
602597

603598
impl ChunkReader {
@@ -606,7 +601,6 @@ impl ChunkReader {
606601
mut reader_schema: SchemaRef,
607602
projection: Vec<usize>,
608603
row_index: Option<RowIndex>,
609-
alt_count_lines: Option<Arc<CountLinesWithComments>>,
610604
) -> PolarsResult<Self> {
611605
let mut fields_to_cast: Vec<Field> = options.fields_to_cast.clone();
612606
prepare_csv_schema(&mut reader_schema, &mut fields_to_cast)?;
@@ -633,7 +627,6 @@ impl ChunkReader {
633627
null_values,
634628
validate_utf8,
635629
row_index,
636-
alt_count_lines,
637630
})
638631
}
639632

@@ -652,13 +645,7 @@ impl ChunkReader {
652645

653646
// If projection is empty create a DataFrame with the correct height by counting the lines.
654647
let mut df = if self.projection.is_empty() {
655-
let h = if let Some(v) = &self.alt_count_lines {
656-
v.count_lines(chunk)?
657-
} else {
658-
n_lines
659-
};
660-
661-
DataFrame::empty_with_height(h)
648+
DataFrame::empty_with_height(n_lines)
662649
} else {
663650
read_chunk(
664651
chunk,
@@ -679,9 +666,7 @@ impl ChunkReader {
679666
let n_lines_is_correct = df.height() == n_lines;
680667

681668
// Check malformed
682-
if df.height() > n_lines
683-
|| (df.height() < n_lines && self.parse_options.comment_prefix.is_none())
684-
{
669+
if !n_lines_is_correct {
685670
// Note: in case data is malformed, df.height() is more likely to be correct than n_lines.
686671
let msg = format!(
687672
"CSV malformed: expected {} rows, actual {} rows, in chunk starting at row_offset {}, length {}",
@@ -722,31 +707,3 @@ impl ChunkReader {
722707
Ok((df, height))
723708
}
724709
}
725-
726-
struct CountLinesWithComments {
727-
quote_char: Option<u8>,
728-
eol_char: u8,
729-
comment_prefix: CommentPrefix,
730-
}
731-
732-
impl CountLinesWithComments {
733-
fn opt_new(parse_options: &CsvParseOptions) -> Option<Self> {
734-
parse_options
735-
.comment_prefix
736-
.clone()
737-
.map(|comment_prefix| CountLinesWithComments {
738-
quote_char: parse_options.quote_char,
739-
eol_char: parse_options.eol_char,
740-
comment_prefix,
741-
})
742-
}
743-
744-
fn count_lines(&self, bytes: &[u8]) -> PolarsResult<usize> {
745-
count_rows_from_slice_raw(
746-
bytes,
747-
self.quote_char,
748-
Some(&self.comment_prefix),
749-
self.eol_char,
750-
)
751-
}
752-
}

0 commit comments

Comments
 (0)