Skip to content

Commit 43697df

Browse files
zhli1142015zhejiangxiaomai
authored andcommitted
Avoid duplicate reading for small parquet files (#333)
1 parent 482e4b3 commit 43697df

File tree

3 files changed

+41
-14
lines changed

3 files changed

+41
-14
lines changed

velox/dwio/common/BufferedInput.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,14 @@ class BufferedInput {
122122
return std::make_unique<BufferedInput>(input_, pool_);
123123
}
124124

125+
std::unique_ptr<SeekableInputStream> readFile(
126+
uint64_t length,
127+
LogType logType) {
128+
enqueue({0, length});
129+
load(logType);
130+
return readBuffer(0, length);
131+
}
132+
125133
const std::shared_ptr<ReadFile>& getReadFile() const {
126134
return input_->getReadFile();
127135
}

velox/dwio/parquet/reader/ParquetReader.cpp

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,18 @@ ReaderBase::ReaderBase(
4747
}
4848

4949
void ReaderBase::loadFileMetaData() {
50-
bool preloadFile_ = fileLength_ <= filePreloadThreshold_;
50+
preloadFile_ = fileLength_ <= filePreloadThreshold_ ||
51+
fileLength_ <= directorySizeGuess_;
5152
uint64_t readSize =
5253
preloadFile_ ? fileLength_ : std::min(fileLength_, directorySizeGuess_);
5354

54-
auto stream = input_->read(
55-
fileLength_ - readSize, readSize, dwio::common::LogType::FOOTER);
55+
std::unique_ptr<dwio::common::SeekableInputStream> stream = nullptr;
56+
if (preloadFile_) {
57+
stream = input_->readFile(fileLength_, dwio::common::LogType::FOOTER);
58+
} else {
59+
stream = input_->read(
60+
fileLength_ - readSize, readSize, dwio::common::LogType::FOOTER);
61+
}
5662

5763
std::vector<char> copy(readSize);
5864
const char* bufferStart = nullptr;
@@ -465,19 +471,30 @@ void ReaderBase::scheduleRowGroups(
465471
currentGroup + 1 < rowGroupIds.size() ? rowGroupIds[currentGroup + 1] : 0;
466472
auto input = inputs_[thisGroup].get();
467473
if (!input) {
468-
auto newInput = input_->clone();
469-
reader.enqueueRowGroup(thisGroup, *newInput);
470-
newInput->load(dwio::common::LogType::STRIPE);
471-
inputs_[thisGroup] = std::move(newInput);
474+
if (preloadFile_) {
475+
// Read data from buffer directly.
476+
reader.enqueueRowGroup(thisGroup, *input_);
477+
inputs_[thisGroup] = input_;
478+
} else {
479+
auto newInput = input_->clone();
480+
reader.enqueueRowGroup(thisGroup, *newInput);
481+
newInput->load(dwio::common::LogType::STRIPE);
482+
inputs_[thisGroup] = std::move(newInput);
483+
}
472484
}
473485
for (auto counter = 0; counter < FLAGS_parquet_prefetch_rowgroups;
474486
++counter) {
475487
if (nextGroup) {
476-
if (inputs_.count(nextGroup) != 0) {
477-
auto newInput = input_->clone();
478-
reader.enqueueRowGroup(nextGroup, *newInput);
479-
newInput->load(dwio::common::LogType::STRIPE);
480-
inputs_[nextGroup] = std::move(newInput);
488+
if (inputs_.count(nextGroup) == 0) {
489+
if (preloadFile_) {
490+
reader.enqueueRowGroup(nextGroup, *input_);
491+
inputs_[nextGroup] = input_;
492+
} else {
493+
auto newInput = input_->clone();
494+
reader.enqueueRowGroup(nextGroup, *newInput);
495+
newInput->load(dwio::common::LogType::STRIPE);
496+
inputs_[nextGroup] = std::move(newInput);
497+
}
481498
}
482499
} else {
483500
break;

velox/dwio/parquet/reader/ParquetReader.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,16 +108,18 @@ class ReaderBase {
108108
const uint64_t directorySizeGuess_;
109109
const uint64_t filePreloadThreshold_;
110110
const dwio::common::ReaderOptions& options_;
111-
std::unique_ptr<velox::dwio::common::BufferedInput> input_;
111+
std::shared_ptr<velox::dwio::common::BufferedInput> input_;
112112
uint64_t fileLength_;
113113
std::unique_ptr<thrift::FileMetaData> fileMetaData_;
114114
RowTypePtr schema_;
115115
std::shared_ptr<const dwio::common::TypeWithId> schemaWithId_;
116116

117117
const bool binaryAsString = false;
118118

119+
bool preloadFile_ = false;
120+
119121
// Map from row group index to pre-created loading BufferedInput.
120-
std::unordered_map<uint32_t, std::unique_ptr<dwio::common::BufferedInput>>
122+
std::unordered_map<uint32_t, std::shared_ptr<dwio::common::BufferedInput>>
121123
inputs_;
122124
};
123125

0 commit comments

Comments
 (0)