Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 125 additions & 3 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <algorithm>
#include <cstring>
#include <memory>
#include <random>
#include <unordered_set>
#include <utility>
#include <vector>
Expand All @@ -40,11 +41,15 @@
#include "arrow/util/parallel.h"
#include "arrow/util/range.h"
#include "arrow/util/tracing_internal.h"

#include "parquet/arrow/reader_internal.h"
#include "parquet/bloom_filter.h"
#include "parquet/bloom_filter_reader.h"
#include "parquet/column_reader.h"
#include "parquet/exception.h"
#include "parquet/file_reader.h"
#include "parquet/metadata.h"
#include "parquet/page_index.h"
#include "parquet/properties.h"
#include "parquet/schema.h"

Expand Down Expand Up @@ -1411,6 +1416,73 @@ Status FuzzReader(std::unique_ptr<FileReader> reader) {
return st;
}

template <typename DType>
Status FuzzReadTypedColumnIndex(const TypedColumnIndex<DType>* index) {
index->min_values();
index->max_values();
return Status::OK();
}

Status FuzzReadColumnIndex(const ColumnIndex* index, const ColumnDescriptor* descr) {
Status st;
BEGIN_PARQUET_CATCH_EXCEPTIONS
index->definition_level_histograms();
index->repetition_level_histograms();
index->null_pages();
index->null_counts();
index->non_null_page_indices();
index->encoded_min_values();
index->encoded_max_values();
switch (descr->physical_type()) {
case Type::BOOLEAN:
st &= FuzzReadTypedColumnIndex(dynamic_cast<const BoolColumnIndex*>(index));
break;
case Type::INT32:
st &= FuzzReadTypedColumnIndex(dynamic_cast<const Int32ColumnIndex*>(index));
break;
case Type::INT64:
st &= FuzzReadTypedColumnIndex(dynamic_cast<const Int64ColumnIndex*>(index));
break;
case Type::INT96:
st &= FuzzReadTypedColumnIndex(
dynamic_cast<const TypedColumnIndex<Int96Type>*>(index));
break;
case Type::FLOAT:
st &= FuzzReadTypedColumnIndex(dynamic_cast<const FloatColumnIndex*>(index));
break;
case Type::DOUBLE:
st &= FuzzReadTypedColumnIndex(dynamic_cast<const DoubleColumnIndex*>(index));
break;
case Type::FIXED_LEN_BYTE_ARRAY:
st &= FuzzReadTypedColumnIndex(dynamic_cast<const FLBAColumnIndex*>(index));
break;
case Type::BYTE_ARRAY:
st &= FuzzReadTypedColumnIndex(dynamic_cast<const ByteArrayColumnIndex*>(index));
break;
case Type::UNDEFINED:
break;
}
END_PARQUET_CATCH_EXCEPTIONS
return st;
}

Status FuzzReadPageIndex(RowGroupPageIndexReader* reader, const SchemaDescriptor* schema,
int column) {
Status st;
BEGIN_PARQUET_CATCH_EXCEPTIONS
auto offset_index = reader->GetOffsetIndex(column);
if (offset_index) {
offset_index->page_locations();
offset_index->unencoded_byte_array_data_bytes();
}
auto col_index = reader->GetColumnIndex(column);
if (col_index) {
st &= FuzzReadColumnIndex(col_index.get(), schema->Column(column));
}
END_PARQUET_CATCH_EXCEPTIONS
return st;
}

} // namespace

Status FuzzReader(const uint8_t* data, int64_t size) {
Expand All @@ -1419,11 +1491,61 @@ Status FuzzReader(const uint8_t* data, int64_t size) {
auto buffer = std::make_shared<::arrow::Buffer>(data, size);
auto file = std::make_shared<::arrow::io::BufferReader>(buffer);
auto pool = ::arrow::default_memory_pool();
auto reader_properties = default_reader_properties();
std::default_random_engine rng(/*seed*/ 42);

// Read Parquet file metadata only once, which will reduce iteration time slightly
std::shared_ptr<FileMetaData> pq_md;
BEGIN_PARQUET_CATCH_EXCEPTIONS
pq_md = ParquetFileReader::Open(file)->metadata();
int num_row_groups, num_columns;
BEGIN_PARQUET_CATCH_EXCEPTIONS {
// Read some additional metadata (often lazy-decoded, such as statistics)
pq_md = ParquetFileReader::Open(file)->metadata();
num_row_groups = pq_md->num_row_groups();
num_columns = pq_md->num_columns();
for (int i = 0; i < num_row_groups; ++i) {
auto rg = pq_md->RowGroup(i);
rg->sorting_columns();
for (int j = 0; j < num_columns; ++j) {
auto col = rg->ColumnChunk(j);
col->encoded_statistics();
col->statistics();
col->geo_statistics();
col->size_statistics();
col->key_value_metadata();
col->encodings();
col->encoding_stats();
}
}
}
{
// Read and decode bloom filters
auto bloom_reader = BloomFilterReader::Make(file, pq_md, reader_properties);
std::uniform_int_distribution<uint64_t> hash_dist;
for (int i = 0; i < num_row_groups; ++i) {
auto bloom_rg = bloom_reader->RowGroup(i);
for (int j = 0; j < num_columns; ++j) {
auto bloom = bloom_rg->GetColumnBloomFilter(j);
// If the column has a bloom filter, find a bunch of random hashes
if (bloom != nullptr) {
for (int k = 0; k < 100; ++k) {
bloom->FindHash(hash_dist(rng));
}
}
}
}
}
{
// Read and decode page indexes
auto index_reader = PageIndexReader::Make(file.get(), pq_md, reader_properties);
for (int i = 0; i < num_row_groups; ++i) {
auto index_rg = index_reader->RowGroup(i);
if (index_rg) {
for (int j = 0; j < num_columns; ++j) {
st &= FuzzReadPageIndex(index_rg.get(), pq_md->schema(), j);
}
}
}
}
END_PARQUET_CATCH_EXCEPTIONS

// Note that very small batch sizes probably make fuzzing slower
Expand All @@ -1435,7 +1557,7 @@ Status FuzzReader(const uint8_t* data, int64_t size) {

std::unique_ptr<ParquetFileReader> pq_file_reader;
BEGIN_PARQUET_CATCH_EXCEPTIONS
pq_file_reader = ParquetFileReader::Open(file, default_reader_properties(), pq_md);
pq_file_reader = ParquetFileReader::Open(file, reader_properties, pq_md);
END_PARQUET_CATCH_EXCEPTIONS

std::unique_ptr<FileReader> reader;
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/parquet/bloom_filter_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ std::unique_ptr<BloomFilter> RowGroupBloomFilterReaderImpl::GetColumnBloomFilter
return nullptr;
}
PARQUET_ASSIGN_OR_THROW(auto file_size, input_->GetSize());
if (*bloom_filter_offset < 0) {
throw ParquetException("bloom_filter_offset less than 0");
}
if (file_size <= *bloom_filter_offset) {
throw ParquetException("file size less or equal than bloom offset");
}
Expand All @@ -68,7 +71,7 @@ std::unique_ptr<BloomFilter> RowGroupBloomFilterReaderImpl::GetColumnBloomFilter
if (*bloom_filter_length < 0) {
throw ParquetException("bloom_filter_length less than 0");
}
if (*bloom_filter_length + *bloom_filter_offset > file_size) {
if (*bloom_filter_length > file_size - *bloom_filter_offset) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why calling this ? Would add overflows

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it could. The alternative is to call AddWithOverflow.

throw ParquetException(
"bloom filter length + bloom filter offset greater than file size");
}
Expand Down
Loading