Skip to content

Commit 3cf456f

Browse files
rui-mometa-codesync[bot]
authored andcommitted
misc: Use ioCounter for pageLoadTimeNs (#15407)
Summary: This PR uses `ioCounter` for `pageLoadTimeNs` to collect its min, max, count stats. Pull Request resolved: #15407 Reviewed By: mbasmanova Differential Revision: D86894092 Pulled By: Yuhta fbshipit-source-id: da80213f821aadb24124d16b912cf41d28bc87d7
1 parent 2ebb9cf commit 3cf456f

File tree

5 files changed

+35
-13
lines changed

5 files changed

+35
-13
lines changed

velox/common/io/IoStatistics.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,24 @@ struct OperationCounters {
4343

4444
class IoCounter {
4545
public:
46+
IoCounter& operator=(const IoCounter& other) noexcept {
47+
if (this != &other) {
48+
count_.store(
49+
other.count_.load(std::memory_order_relaxed),
50+
std::memory_order_relaxed);
51+
sum_.store(
52+
other.sum_.load(std::memory_order_relaxed),
53+
std::memory_order_relaxed);
54+
min_.store(
55+
other.min_.load(std::memory_order_relaxed),
56+
std::memory_order_relaxed);
57+
max_.store(
58+
other.max_.load(std::memory_order_relaxed),
59+
std::memory_order_relaxed);
60+
}
61+
return *this;
62+
}
63+
4664
uint64_t count() const {
4765
return count_;
4866
}

velox/dwio/common/Statistics.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
#include "velox/common/base/Exceptions.h"
2424
#include "velox/common/base/RuntimeMetrics.h"
25+
#include "velox/common/io/IoStatistics.h"
2526
#include "velox/dwio/common/exception/Exception.h"
2627

2728
namespace facebook::velox::dwio::common {
@@ -540,7 +541,7 @@ struct ColumnReaderStatistics {
540541
int64_t flattenStringDictionaryValues{0};
541542

542543
// Total time spent in loading pages, in nanoseconds.
543-
uint64_t pageLoadTimeNs{0};
544+
io::IoCounter pageLoadTimeNs;
544545
};
545546

546547
struct RuntimeStatistics {
@@ -601,11 +602,14 @@ struct RuntimeStatistics {
601602
"flattenStringDictionaryValues",
602603
RuntimeMetric(columnReaderStatistics.flattenStringDictionaryValues));
603604
}
604-
if (columnReaderStatistics.pageLoadTimeNs > 0) {
605+
if (columnReaderStatistics.pageLoadTimeNs.sum() > 0) {
605606
result.emplace(
606607
"pageLoadTimeNs",
607608
RuntimeMetric(
608-
columnReaderStatistics.pageLoadTimeNs,
609+
columnReaderStatistics.pageLoadTimeNs.sum(),
610+
columnReaderStatistics.pageLoadTimeNs.count(),
611+
columnReaderStatistics.pageLoadTimeNs.min(),
612+
columnReaderStatistics.pageLoadTimeNs.max(),
609613
RuntimeCounter::Unit::kNanos));
610614
}
611615
return result;

velox/dwio/parquet/reader/PageReader.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ PageHeader PageReader::readPageHeader() {
9292
MicrosecondTimer timer(&readUs);
9393
inputStream_->Next(&buffer, &size);
9494
}
95-
stats_.pageLoadTimeNs += readUs * 1'000;
95+
stats_.pageLoadTimeNs.increment(readUs * 1'000);
9696
bufferStart_ = reinterpret_cast<const char*>(buffer);
9797
bufferEnd_ = bufferStart_ + size;
9898
}
@@ -135,7 +135,7 @@ const char* PageReader::readBytes(int32_t size, BufferPtr& copy) {
135135
bufferStart_,
136136
bufferEnd_);
137137
}
138-
stats_.pageLoadTimeNs += readUs * 1'000;
138+
stats_.pageLoadTimeNs.increment(readUs * 1'000);
139139
return copy->as<char>();
140140
}
141141

@@ -388,7 +388,7 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
388388
bufferStart_,
389389
bufferEnd_);
390390
}
391-
stats_.pageLoadTimeNs += readUs * 1'000;
391+
stats_.pageLoadTimeNs.increment(readUs * 1'000);
392392
}
393393
if (type_->type()->isShortDecimal() &&
394394
parquetType == thrift::Type::INT32) {
@@ -428,7 +428,7 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
428428
bufferStart_,
429429
bufferEnd_);
430430
}
431-
stats_.pageLoadTimeNs += readUs * 1'000;
431+
stats_.pageLoadTimeNs.increment(readUs * 1'000);
432432
}
433433
// Expand the Parquet type length values to Velox type length.
434434
// We start from the end to allow in-place expansion.
@@ -461,7 +461,7 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
461461
dwio::common::readBytes(
462462
numBytes, inputStream_.get(), strings, bufferStart_, bufferEnd_);
463463
}
464-
stats_.pageLoadTimeNs += readUs * 1'000;
464+
stats_.pageLoadTimeNs.increment(readUs * 1'000);
465465
}
466466
auto header = strings;
467467
for (auto i = 0; i < dictionary_.numValues; ++i) {
@@ -493,7 +493,7 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
493493
bufferStart_,
494494
bufferEnd_);
495495
}
496-
stats_.pageLoadTimeNs += readUs * 1'000;
496+
stats_.pageLoadTimeNs.increment(readUs * 1'000);
497497
}
498498
if (type_->type()->isShortDecimal()) {
499499
// Parquet decimal values have a fixed typeLength_ and are in big-endian

velox/dwio/parquet/reader/ParquetReader.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1336,8 +1336,8 @@ class ParquetRowReader::Impl {
13361336
void updateRuntimeStats(dwio::common::RuntimeStatistics& stats) const {
13371337
stats.skippedStrides += skippedStrides_;
13381338
stats.processedStrides += rowGroupIds_.size();
1339-
stats.columnReaderStatistics.pageLoadTimeNs +=
1340-
columnReaderStats_.pageLoadTimeNs;
1339+
stats.columnReaderStatistics.pageLoadTimeNs.merge(
1340+
columnReaderStats_.pageLoadTimeNs);
13411341
}
13421342

13431343
void resetFilterCaches() {

velox/dwio/parquet/tests/reader/ParquetPageReaderTest.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ TEST_F(ParquetPageReaderTest, smallPage) {
5252
auto maxValue = header.data_page_header.statistics.max_value;
5353
EXPECT_EQ(minValue, expectedMinValue);
5454
EXPECT_EQ(maxValue, expectedMaxValue);
55-
EXPECT_GT(stats.pageLoadTimeNs, 0);
55+
EXPECT_GT(stats.pageLoadTimeNs.sum(), 0);
5656
}
5757

5858
TEST_F(ParquetPageReaderTest, largePage) {
@@ -84,7 +84,7 @@ TEST_F(ParquetPageReaderTest, largePage) {
8484
auto maxValue = header.data_page_header.statistics.max_value;
8585
EXPECT_EQ(minValue, expectedMinValue);
8686
EXPECT_EQ(maxValue, expectedMaxValue);
87-
EXPECT_GT(stats.pageLoadTimeNs, 0);
87+
EXPECT_GT(stats.pageLoadTimeNs.sum(), 0);
8888
}
8989

9090
TEST_F(ParquetPageReaderTest, corruptedPageHeader) {

0 commit comments

Comments
 (0)