Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ class DataSource {
// Returns the number of input bytes processed so far.
virtual uint64_t getCompletedBytes() = 0;

virtual uint64_t getFileSize() { return 0; }

// Returns the number of input rows processed so far.
virtual uint64_t getCompletedRows() = 0;

Expand Down
30 changes: 30 additions & 0 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -414,9 +414,17 @@ HiveDataSource::HiveDataSource(
SubfieldFilters filters;
core::TypedExprPtr remainingFilter;
if (hiveTableHandle->isFilterPushdownEnabled()) {
std::ostringstream oss;
oss << "collect ";
for (auto& [k, v] : hiveTableHandle->subfieldFilters()) {
oss << k.toString() << " -> " << v->toString() << "\n";
filters.emplace(k.clone(), v->clone());
}

std::cout << "[zcw] " << oss.str();
std::cout << "[zcw] remainingFilter:"
<< hiveTableHandle->remainingFilter()->toString() << std::endl;

remainingFilter = extractFiltersFromRemainingFilter(
hiveTableHandle->remainingFilter(),
expressionEvaluator_,
Expand All @@ -434,6 +442,11 @@ HiveDataSource::HiveDataSource(
}
}

for (auto& [k, v] : filters) {
std::cout << "[zcw] iterator " << k.toString() << " -> " << v->toString()
<< "\n";
}

auto outputTypes = outputType_->children();
readerOutputType_ = ROW(std::move(columnNames), std::move(outputTypes));
scanSpec_ = makeScanSpec(
Expand All @@ -443,6 +456,9 @@ HiveDataSource::HiveDataSource(
remainingFilterInputs,
pool_);

std::cout << "[zcw] readerOutputType_:" << readerOutputType_->toString()
<< std::endl;

if (remainingFilter) {
metadataFilter_ = std::make_shared<common::MetadataFilter>(
*scanSpec_, *remainingFilter, expressionEvaluator_);
Expand Down Expand Up @@ -474,6 +490,12 @@ HiveDataSource::HiveDataSource(
rowReaderOpts_.setScanSpec(scanSpec_);
rowReaderOpts_.setMetadataFilter(metadataFilter_);

std::cout << "[zcw] fileSchema:" << readerOpts_.getFileSchema()->toString()
<< " readerOutputType_:" << readerOutputType_->toString()
<< " remainingFilterInputs size:" << remainingFilterInputs.size()
<< " filter.size:" << filters.size() << " filterPushdownEnabled:"
<< hiveTableHandle->isFilterPushdownEnabled() << std::endl;

ioStats_ = std::make_shared<dwio::common::IoStatistics>();
}

Expand All @@ -486,6 +508,8 @@ void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {

VLOG(1) << "Adding split " << split_->toString();

std::cout << "[zcw] add split:" << split_->filePath << std::endl;

fileHandle_ = fileHandleFactory_->generate(split_->filePath).second;
auto input = createBufferedInput(*fileHandle_, readerOpts_);

Expand All @@ -499,6 +523,8 @@ void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
readerOpts_.setFileFormat(split_->fileFormat);
}

std::cout << "[zcw] " << readerOpts_.toString() << std::endl;

reader_ = dwio::common::getReaderFactory(readerOpts_.getFileFormat())
->createReader(std::move(input), readerOpts_);

Expand Down Expand Up @@ -781,6 +807,7 @@ HiveDataSource::createBufferedInput(
const FileHandle& fileHandle,
const dwio::common::ReaderOptions& readerOpts) {
if (auto* asyncCache = dynamic_cast<cache::AsyncDataCache*>(allocator_)) {
std::cout << "[zcw] AsyncDataCache CachedBufferedInput" << std::endl;
return std::make_unique<dwio::common::CachedBufferedInput>(
fileHandle.file,
readerOpts.getMemoryPool(),
Expand All @@ -794,6 +821,9 @@ HiveDataSource::createBufferedInput(
readerOpts.loadQuantum(),
readerOpts.maxCoalesceDistance());
}

std::cout << "[zcw] BufferedInput, name:" << fileHandle.file->getName()
<< std::endl;
return std::make_unique<dwio::common::BufferedInput>(
fileHandle.file,
readerOpts.getMemoryPool(),
Expand Down
4 changes: 4 additions & 0 deletions velox/connectors/hive/HiveDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ class HiveDataSource : public DataSource {
return ioStats_->rawBytesRead();
}

uint64_t getFileSize() override {
return ioStats_->fileSize_;
}

uint64_t getCompletedRows() override {
return completedRows_;
}
Expand Down
4 changes: 3 additions & 1 deletion velox/dwio/common/InputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ ReadFileInputStream::ReadFileInputStream(
const MetricsLogPtr& metricsLog,
IoStatistics* stats)
: InputStream(readFile->getName(), metricsLog, stats),
readFile_(std::move(readFile)) {}
readFile_(std::move(readFile)) {
stats->fileSize_ += getLength();
}

void ReadFileInputStream::read(
void* buf,
Expand Down
1 change: 1 addition & 0 deletions velox/dwio/common/IoStatistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ IoStatistics::operationStats() const {
}

void IoStatistics::merge(const IoStatistics& other) {
fileSize_ += other.fileSize_;
rawBytesRead_ += other.rawBytesRead_;
rawBytesWritten_ += other.rawBytesWritten_;
totalScanTime_ += other.totalScanTime_;
Expand Down
3 changes: 2 additions & 1 deletion velox/dwio/common/IoStatistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ class IoStatistics {

folly::dynamic getOperationStatsSnapshot() const;

private:
//private:
std::atomic<uint64_t> fileSize_{0};
std::atomic<uint64_t> rawBytesRead_{0};
std::atomic<uint64_t> rawBytesWritten_{0};
std::atomic<uint64_t> inputBatchSize_{0};
Expand Down
10 changes: 10 additions & 0 deletions velox/dwio/common/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,16 @@ class ReaderOptions {
uint64_t filePreloadThreshold{kDefaultFilePreloadThreshold};
bool fileColumnNamesReadAsLowerCase = false;

std::string toString() const {
std::ostringstream oss;
oss << "[ReaderOptions] autoPreloadLength:" << autoPreloadLength
<< " prefetchMode:" << (int)prefetchMode
<< " loadQuantum_:" << (int)loadQuantum_
<< " directorySizeGuess:" << directorySizeGuess
<< " filePreloadThreshold:" << filePreloadThreshold;
return oss.str();
}

public:
static constexpr int32_t kDefaultLoadQuantum = 8 << 20; // 8MB
static constexpr int32_t kDefaultCoalesceDistance = 512 << 10; // 512K
Expand Down
9 changes: 9 additions & 0 deletions velox/dwio/dwrf/reader/ReaderBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ ReaderBase::ReaderBase(

if (!cache_ && input_->shouldPrefetchStripes()) {
auto numStripes = getFooter().stripesSize();
std::cout << "[zcw] shouldPrefetchStripes, numStripes:" << numStripes
<< std::endl;

for (auto i = 0; i < numStripes; i++) {
const auto stripe = getFooter().stripes(i);
input_->enqueue(
Expand All @@ -219,6 +222,12 @@ ReaderBase::ReaderBase(
}
}

std::cout << "[zcw] fileLength:" << fileLength_ << " psLength:" << psLength_
<< " footerSize:" << footerSize << " cacheSize:" << cacheSize
<< " tailSize:" << tailSize
<< " shouldPreftchStripes:" << input_->shouldPrefetchStripes()
<< " readSize:" << readSize << std::endl;

// initialize file decrypter
handler_ = DecryptionHandler::create(*footer_, decryptorFactory_.get());
}
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,8 @@ void OperatorStats::add(const OperatorStats& other) {
inputPositions += other.inputPositions;
inputVectors += other.inputVectors;

fileSize += other.fileSize;

getOutputTiming.add(other.getOutputTiming);
outputBytes += other.outputBytes;
outputPositions += other.outputPositions;
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ struct OperatorStats {
uint64_t rawInputBytes = 0;
uint64_t rawInputPositions = 0;

uint64_t fileSize = 0;

CpuWallTiming addInputTiming;

/// Bytes of input in terms of retained size of input vectors.
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/PlanNodeStats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ void PlanNodeStats::addTotals(const OperatorStats& stats) {
rawInputRows += stats.rawInputPositions;
rawInputBytes += stats.rawInputBytes;

fileSize += stats.fileSize;

outputRows += stats.outputPositions;
outputBytes += stats.outputBytes;
outputVectors += stats.outputVectors;
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/PlanNodeStats.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ struct PlanNodeStats {
/// Sum of raw input bytes for all corresponding operators.
uint64_t rawInputBytes{0};

uint64_t fileSize{0};

/// Sum of output rows for all corresponding operators. When
/// plan node corresponds to multiple operator types, operators of only one of
/// these types report non-zero output rows.
Expand Down
10 changes: 10 additions & 0 deletions velox/exec/TableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ TableScan::TableScan(
->queryConfig()
.preferredOutputBatchRows()) {
connector_ = connector::getConnector(tableHandle_->connectorId());

std::cout << "[zcw] ==== " << toString() << " ====" << std::endl;
}

RowVectorPtr TableScan::getOutput() {
Expand Down Expand Up @@ -107,6 +109,8 @@ RowVectorPtr TableScan::getOutput() {
tableHandle_,
columnHandles_,
connectorQueryCtx_.get());
std::cout << "[zcw] pendingDynamicFilters_.size="
<< pendingDynamicFilters_.size() << std::endl;
for (const auto& entry : pendingDynamicFilters_) {
dataSource_->addDynamicFilter(entry.first, entry.second);
}
Expand Down Expand Up @@ -135,6 +139,7 @@ RowVectorPtr TableScan::getOutput() {
VELOX_CHECK(operatorCtx_->task()->isCancelled());
return nullptr;
}
std::cout << "[zcw] setFromDataSource\n";
dataSource_->setFromDataSource(std::move(preparedDataSource));
} else {
dataSource_->addSplit(connectorSplit);
Expand Down Expand Up @@ -178,6 +183,11 @@ RowVectorPtr TableScan::getOutput() {

lockedStats->rawInputPositions = dataSource_->getCompletedRows();
lockedStats->rawInputBytes = dataSource_->getCompletedBytes();
lockedStats->fileSize = dataSource_->getFileSize();

std::cout << "[zcw] fileSize:" << lockedStats->fileSize
<< " rawInputBytes:" << lockedStats->rawInputBytes << std::endl;

auto data = dataOptional.value();
if (data) {
if (data->size() > 0) {
Expand Down