diff --git a/velox/connectors/Connector.h b/velox/connectors/Connector.h index cd9ab91fb59e..1a659df964a1 100644 --- a/velox/connectors/Connector.h +++ b/velox/connectors/Connector.h @@ -175,6 +175,8 @@ class DataSource { // Returns the number of input bytes processed so far. virtual uint64_t getCompletedBytes() = 0; + virtual uint64_t getFileSize() { return 0; } + // Returns the number of input rows processed so far. virtual uint64_t getCompletedRows() = 0; diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index a3f8308488e0..af00ad01faed 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -414,9 +414,17 @@ HiveDataSource::HiveDataSource( SubfieldFilters filters; core::TypedExprPtr remainingFilter; if (hiveTableHandle->isFilterPushdownEnabled()) { + std::ostringstream oss; + oss << "collect "; for (auto& [k, v] : hiveTableHandle->subfieldFilters()) { + oss << k.toString() << " -> " << v->toString() << "\n"; filters.emplace(k.clone(), v->clone()); } + + std::cout << "[zcw] " << oss.str(); + std::cout << "[zcw] remainingFilter:" + << hiveTableHandle->remainingFilter()->toString() << std::endl; + remainingFilter = extractFiltersFromRemainingFilter( hiveTableHandle->remainingFilter(), expressionEvaluator_, @@ -434,6 +442,11 @@ HiveDataSource::HiveDataSource( } } + for (auto& [k, v] : filters) { + std::cout << "[zcw] iterator " << k.toString() << " -> " << v->toString() + << "\n"; + } + auto outputTypes = outputType_->children(); readerOutputType_ = ROW(std::move(columnNames), std::move(outputTypes)); scanSpec_ = makeScanSpec( @@ -443,6 +456,9 @@ HiveDataSource::HiveDataSource( remainingFilterInputs, pool_); + std::cout << "[zcw] readerOutputType_:" << readerOutputType_->toString() + << std::endl; + if (remainingFilter) { metadataFilter_ = std::make_shared( *scanSpec_, *remainingFilter, expressionEvaluator_); @@ -474,6 +490,12 @@ HiveDataSource::HiveDataSource( rowReaderOpts_.setScanSpec(scanSpec_); rowReaderOpts_.setMetadataFilter(metadataFilter_); + std::cout << "[zcw] fileSchema:" << readerOpts_.getFileSchema()->toString() + << " readerOutputType_:" << readerOutputType_->toString() + << " remainingFilterInputs size:" << remainingFilterInputs.size() + << " filter.size:" << filters.size() << " filterPushdownEnabled:" + << hiveTableHandle->isFilterPushdownEnabled() << std::endl; + ioStats_ = std::make_shared(); } @@ -486,6 +508,8 @@ void HiveDataSource::addSplit(std::shared_ptr split) { VLOG(1) << "Adding split " << split_->toString(); + std::cout << "[zcw] add split:" << split_->filePath << std::endl; + fileHandle_ = fileHandleFactory_->generate(split_->filePath).second; auto input = createBufferedInput(*fileHandle_, readerOpts_); @@ -499,6 +523,8 @@ void HiveDataSource::addSplit(std::shared_ptr split) { readerOpts_.setFileFormat(split_->fileFormat); } + std::cout << "[zcw] " << readerOpts_.toString() << std::endl; + reader_ = dwio::common::getReaderFactory(readerOpts_.getFileFormat()) ->createReader(std::move(input), readerOpts_); @@ -781,6 +807,7 @@ HiveDataSource::createBufferedInput( const FileHandle& fileHandle, const dwio::common::ReaderOptions& readerOpts) { if (auto* asyncCache = dynamic_cast(allocator_)) { + std::cout << "[zcw] AsyncDataCache CachedBufferedInput" << std::endl; return std::make_unique( fileHandle.file, readerOpts.getMemoryPool(), @@ -794,6 +821,9 @@ HiveDataSource::createBufferedInput( readerOpts.loadQuantum(), readerOpts.maxCoalesceDistance()); } + + std::cout << "[zcw] BufferedInput, name:" << fileHandle.file->getName() + << std::endl; return std::make_unique( fileHandle.file, readerOpts.getMemoryPool(), diff --git a/velox/connectors/hive/HiveDataSource.h b/velox/connectors/hive/HiveDataSource.h index 2dc448d13699..806a244fe263 100644 --- a/velox/connectors/hive/HiveDataSource.h +++ b/velox/connectors/hive/HiveDataSource.h @@ -56,6 +56,10 @@ class HiveDataSource : public DataSource { return ioStats_->rawBytesRead(); } + uint64_t getFileSize() override { + return ioStats_->fileSize_; + } + uint64_t getCompletedRows() override { return completedRows_; } diff --git a/velox/dwio/common/InputStream.cpp b/velox/dwio/common/InputStream.cpp index 65a040742cd8..d492694c0216 100644 --- a/velox/dwio/common/InputStream.cpp +++ b/velox/dwio/common/InputStream.cpp @@ -60,7 +60,9 @@ ReadFileInputStream::ReadFileInputStream( const MetricsLogPtr& metricsLog, IoStatistics* stats) : InputStream(readFile->getName(), metricsLog, stats), - readFile_(std::move(readFile)) {} + readFile_(std::move(readFile)) { + stats->fileSize_ += getLength(); +} void ReadFileInputStream::read( void* buf, diff --git a/velox/dwio/common/IoStatistics.cpp b/velox/dwio/common/IoStatistics.cpp index 8774900d247c..2bab81db4cb0 100644 --- a/velox/dwio/common/IoStatistics.cpp +++ b/velox/dwio/common/IoStatistics.cpp @@ -95,6 +95,7 @@ IoStatistics::operationStats() const { } void IoStatistics::merge(const IoStatistics& other) { + fileSize_ += other.fileSize_; rawBytesRead_ += other.rawBytesRead_; rawBytesWritten_ += other.rawBytesWritten_; totalScanTime_ += other.totalScanTime_; diff --git a/velox/dwio/common/IoStatistics.h b/velox/dwio/common/IoStatistics.h index d4508eb53a08..9bab7660c703 100644 --- a/velox/dwio/common/IoStatistics.h +++ b/velox/dwio/common/IoStatistics.h @@ -114,7 +114,8 @@ class IoStatistics { folly::dynamic getOperationStatsSnapshot() const; - private: + //private: + std::atomic fileSize_{0}; std::atomic rawBytesRead_{0}; std::atomic rawBytesWritten_{0}; std::atomic inputBatchSize_{0}; diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index 523f136e6e70..c31b97f087fa 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -376,6 +376,16 @@ class ReaderOptions { uint64_t filePreloadThreshold{kDefaultFilePreloadThreshold}; bool fileColumnNamesReadAsLowerCase = false; + std::string toString() const { + std::ostringstream oss; + oss << "[ReaderOptions] autoPreloadLength:" << autoPreloadLength + << " prefetchMode:" << (int)prefetchMode + << " loadQuantum_:" << (int)loadQuantum_ + << " directorySizeGuess:" << directorySizeGuess + << " filePreloadThreshold:" << filePreloadThreshold; + return oss.str(); + } + public: static constexpr int32_t kDefaultLoadQuantum = 8 << 20; // 8MB static constexpr int32_t kDefaultCoalesceDistance = 512 << 10; // 512K diff --git a/velox/dwio/dwrf/reader/ReaderBase.cpp b/velox/dwio/dwrf/reader/ReaderBase.cpp index 640842d79504..8ce64b0e20b7 100644 --- a/velox/dwio/dwrf/reader/ReaderBase.cpp +++ b/velox/dwio/dwrf/reader/ReaderBase.cpp @@ -207,6 +207,9 @@ ReaderBase::ReaderBase( if (!cache_ && input_->shouldPrefetchStripes()) { auto numStripes = getFooter().stripesSize(); + std::cout << "[zcw] shouldPrefetchStripes, numStripes:" << numStripes + << std::endl; + for (auto i = 0; i < numStripes; i++) { const auto stripe = getFooter().stripes(i); input_->enqueue( @@ -219,6 +222,12 @@ ReaderBase::ReaderBase( } } + std::cout << "[zcw] fileLength:" << fileLength_ << " psLength:" << psLength_ + << " footerSize:" << footerSize << " cacheSize:" << cacheSize + << " tailSize:" << tailSize + << " shouldPreftchStripes:" << input_->shouldPrefetchStripes() + << " readSize:" << readSize << std::endl; + // initialize file decrypter handler_ = DecryptionHandler::create(*footer_, decryptorFactory_.get()); } diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 2614a1b63857..36c62cb15041 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -350,6 +350,8 @@ void OperatorStats::add(const OperatorStats& other) { inputPositions += other.inputPositions; inputVectors += other.inputVectors; + fileSize += other.fileSize; + getOutputTiming.add(other.getOutputTiming); outputBytes += other.outputBytes; outputPositions += other.outputPositions; diff --git a/velox/exec/Operator.h b/velox/exec/Operator.h index 4dcf9b9ea480..0ec68974666d 100644 --- a/velox/exec/Operator.h +++ b/velox/exec/Operator.h @@ -98,6 +98,8 @@ struct OperatorStats { uint64_t rawInputBytes = 0; uint64_t rawInputPositions = 0; + uint64_t fileSize = 0; + CpuWallTiming addInputTiming; /// Bytes of input in terms of retained size of input vectors. diff --git a/velox/exec/PlanNodeStats.cpp b/velox/exec/PlanNodeStats.cpp index 053604c05cce..1153ea496458 100644 --- a/velox/exec/PlanNodeStats.cpp +++ b/velox/exec/PlanNodeStats.cpp @@ -39,6 +39,8 @@ void PlanNodeStats::addTotals(const OperatorStats& stats) { rawInputRows += stats.rawInputPositions; rawInputBytes += stats.rawInputBytes; + fileSize += stats.fileSize; + outputRows += stats.outputPositions; outputBytes += stats.outputBytes; outputVectors += stats.outputVectors; diff --git a/velox/exec/PlanNodeStats.h b/velox/exec/PlanNodeStats.h index 11de5f127e5c..43465ab69122 100644 --- a/velox/exec/PlanNodeStats.h +++ b/velox/exec/PlanNodeStats.h @@ -62,6 +62,8 @@ struct PlanNodeStats { /// Sum of raw input bytes for all corresponding operators. uint64_t rawInputBytes{0}; + uint64_t fileSize{0}; + /// Sum of output rows for all corresponding operators. When /// plan node corresponds to multiple operator types, operators of only one of /// these types report non-zero output rows. diff --git a/velox/exec/TableScan.cpp b/velox/exec/TableScan.cpp index 089468907e03..6a92b6fef4bc 100644 --- a/velox/exec/TableScan.cpp +++ b/velox/exec/TableScan.cpp @@ -47,6 +47,8 @@ TableScan::TableScan( ->queryConfig() .preferredOutputBatchRows()) { connector_ = connector::getConnector(tableHandle_->connectorId()); + + std::cout << "[zcw] ==== " << toString() << " ====" << std::endl; } RowVectorPtr TableScan::getOutput() { @@ -107,6 +109,8 @@ RowVectorPtr TableScan::getOutput() { tableHandle_, columnHandles_, connectorQueryCtx_.get()); + std::cout << "[zcw] pendingDynamicFilters_.size=" + << pendingDynamicFilters_.size() << std::endl; for (const auto& entry : pendingDynamicFilters_) { dataSource_->addDynamicFilter(entry.first, entry.second); } @@ -135,6 +139,7 @@ RowVectorPtr TableScan::getOutput() { VELOX_CHECK(operatorCtx_->task()->isCancelled()); return nullptr; } + std::cout << "[zcw] setFromDataSource\n"; dataSource_->setFromDataSource(std::move(preparedDataSource)); } else { dataSource_->addSplit(connectorSplit); @@ -178,6 +183,11 @@ RowVectorPtr TableScan::getOutput() { lockedStats->rawInputPositions = dataSource_->getCompletedRows(); lockedStats->rawInputBytes = dataSource_->getCompletedBytes(); + lockedStats->fileSize = dataSource_->getFileSize(); + + std::cout << "[zcw] fileSize:" << lockedStats->fileSize + << " rawInputBytes:" << lockedStats->rawInputBytes << std::endl; + auto data = dataOptional.value(); if (data) { if (data->size() > 0) {