Skip to content

Commit f0de549

Browse files
committed
Use ioCounter for pageLoadTimeNs
1 parent e078023 commit f0de549

File tree

9 files changed

+42
-17
lines changed

9 files changed

+42
-17
lines changed

velox/common/io/IoStatistics.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,13 @@ class IoCounter {
7373
casLoop(max_, other.max_, std::less());
7474
}
7575

76+
void clear() {
77+
count_ = 0;
78+
sum_ = 0;
79+
min_ = std::numeric_limits<uint64_t>::max();
80+
max_ = 0;
81+
}
82+
7683
private:
7784
template <typename Compare>
7885
static void

velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ int IcebergSplitReaderBenchmark::read(
233233
const RowTypePtr& rowType,
234234
uint32_t nextSize,
235235
std::unique_ptr<IcebergSplitReader> icebergSplitReader) {
236-
runtimeStats_ = RuntimeStatistics();
236+
runtimeStats_.clear();
237237
icebergSplitReader->resetFilterCaches();
238238
int resultSize = 0;
239239
auto result = BaseVector::create(rowType, 0, leafPool_.get());

velox/dwio/common/Statistics.h

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
#include "velox/common/base/Exceptions.h"
2424
#include "velox/common/base/RuntimeMetrics.h"
25+
#include "velox/common/io/IoStatistics.h"
2526
#include "velox/dwio/common/exception/Exception.h"
2627

2728
namespace facebook::velox::dwio::common {
@@ -540,7 +541,12 @@ struct ColumnReaderStatistics {
540541
int64_t flattenStringDictionaryValues{0};
541542

542543
// Total time spent in loading pages, in nanoseconds.
543-
uint64_t pageLoadTimeNs{0};
544+
io::IoCounter pageLoadTimeNs;
545+
546+
void clear() {
547+
flattenStringDictionaryValues = 0;
548+
pageLoadTimeNs.clear();
549+
}
544550
};
545551

546552
struct RuntimeStatistics {
@@ -566,6 +572,18 @@ struct RuntimeStatistics {
566572
UnitLoaderStats unitLoaderStats;
567573
ColumnReaderStatistics columnReaderStatistics;
568574

575+
void clear() {
576+
skippedSplits = 0;
577+
processedSplits = 0;
578+
skippedSplitBytes = 0;
579+
skippedStrides = 0;
580+
processedStrides = 0;
581+
footerBufferOverread = 0;
582+
numStripes = 0;
583+
unitLoaderStats = UnitLoaderStats();
584+
columnReaderStatistics.clear();
585+
}
586+
569587
std::unordered_map<std::string, RuntimeMetric> toRuntimeMetricMap() {
570588
std::unordered_map<std::string, RuntimeMetric> result;
571589
for (const auto& [name, metric] : unitLoaderStats.stats()) {
@@ -601,10 +619,10 @@ struct RuntimeStatistics {
601619
"flattenStringDictionaryValues",
602620
RuntimeMetric(columnReaderStatistics.flattenStringDictionaryValues));
603621
}
604-
if (columnReaderStatistics.pageLoadTimeNs > 0) {
622+
if (columnReaderStatistics.pageLoadTimeNs.sum() > 0) {
605623
result.emplace(
606624
"pageLoadTimeNs",
607-
RuntimeMetric(columnReaderStatistics.pageLoadTimeNs));
625+
RuntimeMetric(columnReaderStatistics.pageLoadTimeNs.sum()));
608626
}
609627
return result;
610628
}

velox/dwio/common/tests/utils/E2EFilterTestBase.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ void E2EFilterTestBase::readWithFilter(
163163
}
164164
OwnershipChecker ownershipChecker;
165165
auto rowReader = reader->createRowReader(rowReaderOpts);
166-
runtimeStats_ = dwio::common::RuntimeStatistics();
166+
runtimeStats_.clear();
167167
auto rowIndex = 0;
168168
auto resultBatch = BaseVector::create(rowType_, 1, leafPool_.get());
169169
resetReadBatchSizes();

velox/dwio/dwrf/test/ReaderTest.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2585,7 +2585,7 @@ TEST_F(TestReader, readStringDictionaryAsFlat) {
25852585
ASSERT_EQ(rowReader->next(20, actual), 20);
25862586
ASSERT_EQ(actual->size(), 1);
25872587
ASSERT_TRUE(actual->as<RowVector>()->childAt(0)->isFlatEncoding());
2588-
stats = {};
2588+
stats.clear();
25892589
rowReader->updateRuntimeStats(stats);
25902590
ASSERT_EQ(stats.columnReaderStatistics.flattenStringDictionaryValues, 1);
25912591
}

velox/dwio/parquet/reader/PageReader.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ PageHeader PageReader::readPageHeader() {
9292
MicrosecondTimer timer(&readUs);
9393
inputStream_->Next(&buffer, &size);
9494
}
95-
stats_.pageLoadTimeNs += readUs * 1'000;
95+
stats_.pageLoadTimeNs.increment(readUs * 1'000);
9696
bufferStart_ = reinterpret_cast<const char*>(buffer);
9797
bufferEnd_ = bufferStart_ + size;
9898
}
@@ -135,7 +135,7 @@ const char* PageReader::readBytes(int32_t size, BufferPtr& copy) {
135135
bufferStart_,
136136
bufferEnd_);
137137
}
138-
stats_.pageLoadTimeNs += readUs * 1'000;
138+
stats_.pageLoadTimeNs.increment(readUs * 1'000);
139139
return copy->as<char>();
140140
}
141141

@@ -388,7 +388,7 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
388388
bufferStart_,
389389
bufferEnd_);
390390
}
391-
stats_.pageLoadTimeNs += readUs * 1'000;
391+
stats_.pageLoadTimeNs.increment(readUs * 1'000);
392392
}
393393
if (type_->type()->isShortDecimal() &&
394394
parquetType == thrift::Type::INT32) {
@@ -428,7 +428,7 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
428428
bufferStart_,
429429
bufferEnd_);
430430
}
431-
stats_.pageLoadTimeNs += readUs * 1'000;
431+
stats_.pageLoadTimeNs.increment(readUs * 1'000);
432432
}
433433
// Expand the Parquet type length values to Velox type length.
434434
// We start from the end to allow in-place expansion.
@@ -461,7 +461,7 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
461461
dwio::common::readBytes(
462462
numBytes, inputStream_.get(), strings, bufferStart_, bufferEnd_);
463463
}
464-
stats_.pageLoadTimeNs += readUs * 1'000;
464+
stats_.pageLoadTimeNs.increment(readUs * 1'000);
465465
}
466466
auto header = strings;
467467
for (auto i = 0; i < dictionary_.numValues; ++i) {
@@ -493,7 +493,7 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
493493
bufferStart_,
494494
bufferEnd_);
495495
}
496-
stats_.pageLoadTimeNs += readUs * 1'000;
496+
stats_.pageLoadTimeNs.increment(readUs * 1'000);
497497
}
498498
if (type_->type()->isShortDecimal()) {
499499
// Parquet decimal values have a fixed typeLength_ and are in big-endian

velox/dwio/parquet/reader/ParquetReader.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1336,8 +1336,8 @@ class ParquetRowReader::Impl {
13361336
void updateRuntimeStats(dwio::common::RuntimeStatistics& stats) const {
13371337
stats.skippedStrides += skippedStrides_;
13381338
stats.processedStrides += rowGroupIds_.size();
1339-
stats.columnReaderStatistics.pageLoadTimeNs +=
1340-
columnReaderStats_.pageLoadTimeNs;
1339+
stats.columnReaderStatistics.pageLoadTimeNs.increment(
1340+
columnReaderStats_.pageLoadTimeNs.sum());
13411341
}
13421342

13431343
void resetFilterCaches() {

velox/dwio/parquet/tests/reader/ParquetPageReaderTest.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ TEST_F(ParquetPageReaderTest, smallPage) {
5252
auto maxValue = header.data_page_header.statistics.max_value;
5353
EXPECT_EQ(minValue, expectedMinValue);
5454
EXPECT_EQ(maxValue, expectedMaxValue);
55-
EXPECT_GT(stats.pageLoadTimeNs, 0);
55+
EXPECT_GT(stats.pageLoadTimeNs.sum(), 0);
5656
}
5757

5858
TEST_F(ParquetPageReaderTest, largePage) {
@@ -84,7 +84,7 @@ TEST_F(ParquetPageReaderTest, largePage) {
8484
auto maxValue = header.data_page_header.statistics.max_value;
8585
EXPECT_EQ(minValue, expectedMinValue);
8686
EXPECT_EQ(maxValue, expectedMaxValue);
87-
EXPECT_GT(stats.pageLoadTimeNs, 0);
87+
EXPECT_GT(stats.pageLoadTimeNs.sum(), 0);
8888
}
8989

9090
TEST_F(ParquetPageReaderTest, corruptedPageHeader) {

velox/dwio/parquet/tests/reader/ParquetReaderBenchmark.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ int ParquetReaderBenchmark::read(
124124
std::shared_ptr<ScanSpec> scanSpec,
125125
uint32_t nextSize) {
126126
auto rowReader = createReader(scanSpec, rowType);
127-
runtimeStats_ = dwio::common::RuntimeStatistics();
127+
runtimeStats_.clear();
128128

129129
rowReader->resetFilterCaches();
130130
auto result = BaseVector::create(rowType, 1, leafPool_.get());

0 commit comments

Comments
 (0)