diff --git a/velox/common/memory/tests/SharedArbitratorTest.cpp b/velox/common/memory/tests/SharedArbitratorTest.cpp index f01cc043da5c..5f7043144b93 100644 --- a/velox/common/memory/tests/SharedArbitratorTest.cpp +++ b/velox/common/memory/tests/SharedArbitratorTest.cpp @@ -755,7 +755,7 @@ DEBUG_ONLY_TEST_P( folly::EventCount taskPauseWait; auto taskPauseWaitKey = taskPauseWait.prepareWait(); - const auto fakeAllocationSize = kMemoryCapacity - (32L << 20); + const auto fakeAllocationSize = kMemoryCapacity - (2L << 20); std::atomic injectAllocationOnce{true}; fakeOperatorFactory_->setAllocationCallback([&](Operator* op) { diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index 12ffa8358216..4fcef7895f2a 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -3004,6 +3004,15 @@ class AbstractJoinNode : public PlanNode { return isInnerJoin() || isLeftJoin() || isAntiJoin(); } + // Indicates if this joinNode can drop duplicate rows with same join key. + // For left semi and anti join, it is not necessary to store duplicate rows. + bool canDropDuplicates() const { + // Left semi and anti join with no extra filter only needs to know whether + // there is a match. Hence, no need to store entries with duplicate keys. + return !filter() && + (isLeftSemiFilterJoin() || isLeftSemiProjectJoin() || isAntiJoin()); + } + const std::vector& leftKeys() const { return leftKeys_; } diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 14636f52f91a..96c0b1af13ad 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -181,6 +181,12 @@ class QueryConfig { static constexpr const char* kAbandonPartialTopNRowNumberMinPct = "abandon_partial_topn_row_number_min_pct"; + static constexpr const char* kAbandonBuildNoDupHashMinRows = + "abandon_build_no_dup_hash_min_rows"; + + static constexpr const char* kAbandonBuildNoDupHashMinPct = + "abandon_build_no_dup_hash_min_pct"; + static constexpr const char* kMaxElementsSizeInRepeatAndSequence = "max_elements_size_in_repeat_and_sequence"; @@ -760,6 +766,14 @@ class QueryConfig { return get(kAbandonPartialTopNRowNumberMinPct, 80); } + int32_t abandonBuildNoDupHashMinRows() const { + return get(kAbandonBuildNoDupHashMinRows, 100'000); + } + + int32_t abandonBuildNoDupHashMinPct() const { + return get(kAbandonBuildNoDupHashMinPct, 0); + } + int32_t maxElementsSizeInRepeatAndSequence() const { return get(kMaxElementsSizeInRepeatAndSequence, 10'000); } diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index 605ae3266bcf..9bfb6381ff41 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -43,6 +43,16 @@ Generic Configuration - integer - 80 - Abandons partial TopNRowNumber if number of output rows equals or exceeds this percentage of the number of input rows. + * - abandon_build_no_dup_hash_min_rows + - integer + - 100,000 + - Number of input rows to receive before starting to check whether to abandon building a HashTable without + duplicates in HashBuild for left semi/anti join. + * - abandon_build_no_dup_hash_min_pct + - integer + - 0 + - Abandons building a HashTable without duplicates in HashBuild for left semi/anti join if the percentage of + distinct keys in the HashTable exceeds this threshold. Zero means 'disable this optimization'. * - session_timezone - string - diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index 4e44d6f71e7a..3cc8075c7ed0 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -66,7 +66,12 @@ HashBuild::HashBuild( joinBridge_(operatorCtx_->task()->getHashJoinBridgeLocked( operatorCtx_->driverCtx()->splitGroupId, planNodeId())), - keyChannelMap_(joinNode_->rightKeys().size()) { + dropDuplicates_(joinNode_->canDropDuplicates()), + keyChannelMap_(joinNode_->rightKeys().size()), + abandonBuildNoDupHashMinRows_( + driverCtx->queryConfig().abandonBuildNoDupHashMinRows()), + abandonBuildNoDupHashMinPct_( + driverCtx->queryConfig().abandonBuildNoDupHashMinPct()) { VELOX_CHECK(pool()->trackUsage()); VELOX_CHECK_NOT_NULL(joinBridge_); @@ -86,19 +91,22 @@ HashBuild::HashBuild( // Identify the non-key build side columns and make a decoder for each. const int32_t numDependents = inputType->size() - numKeys; - if (numDependents > 0) { - // Number of join keys (numKeys) may be less then number of input columns - // (inputType->size()). In this case numDependents is negative and cannot be - // used to call 'reserve'. This happens when we join different probe side - // keys with the same build side key: SELECT * FROM t LEFT JOIN u ON t.k1 = - // u.k AND t.k2 = u.k. - dependentChannels_.reserve(numDependents); - decoders_.reserve(numDependents); - } - for (auto i = 0; i < inputType->size(); ++i) { - if (keyChannelMap_.find(i) == keyChannelMap_.end()) { - dependentChannels_.emplace_back(i); - decoders_.emplace_back(std::make_unique()); + if (!dropDuplicates_) { + if (numDependents > 0) { + // Number of join keys (numKeys) may be less then number of input columns + // (inputType->size()). In this case numDependents is negative and cannot + // be used to call 'reserve'. This happens when we join different probe + // side keys with the same build side key: SELECT * FROM t LEFT JOIN u ON + // t.k1 = u.k AND t.k2 = u.k. + dependentChannels_.reserve(numDependents); + decoders_.reserve(numDependents); + } + + for (auto i = 0; i < inputType->size(); ++i) { + if (keyChannelMap_.find(i) == keyChannelMap_.end()) { + dependentChannels_.emplace_back(i); + decoders_.emplace_back(std::make_unique()); + } } } @@ -146,11 +154,6 @@ void HashBuild::setupTable() { .minTableRowsForParallelJoinBuild(), pool()); } else { - // (Left) semi and anti join with no extra filter only needs to know whether - // there is a match. Hence, no need to store entries with duplicate keys. - const bool dropDuplicates = !joinNode_->filter() && - (joinNode_->isLeftSemiFilterJoin() || - joinNode_->isLeftSemiProjectJoin() || isAntiJoin(joinType_)); // Right semi join needs to tag build rows that were probed. const bool needProbedFlag = joinNode_->isRightSemiFilterJoin(); if (isLeftNullAwareJoinWithFilter(joinNode_)) { @@ -159,7 +162,7 @@ void HashBuild::setupTable() { table_ = HashTable::createForJoin( std::move(keyHashers), dependentTypes, - !dropDuplicates, // allowDuplicates + !dropDuplicates_, // allowDuplicates needProbedFlag, // hasProbedFlag operatorCtx_->driverCtx() ->queryConfig() @@ -170,7 +173,7 @@ void HashBuild::setupTable() { table_ = HashTable::createForJoin( std::move(keyHashers), dependentTypes, - !dropDuplicates, // allowDuplicates + !dropDuplicates_, // allowDuplicates needProbedFlag, // hasProbedFlag operatorCtx_->driverCtx() ->queryConfig() @@ -178,7 +181,14 @@ void HashBuild::setupTable() { pool()); } } + lookup_ = std::make_unique(table_->hashers(), pool()); analyzeKeys_ = table_->hashMode() != BaseHashTable::HashMode::kHash; + if (abandonBuildNoDupHashMinPct_ == 0) { + // Building a HashTable without duplicates is disabled if + // abandonBuildNoDupHashMinPct_ is 0. + abandonBuildNoDupHash_ = true; + table_->joinTableMayHaveDuplicates(); + } } void HashBuild::setupSpiller(SpillPartition* spillPartition) { @@ -377,6 +387,31 @@ void HashBuild::addInput(RowVectorPtr input) { return; } + if (dropDuplicates_ && !abandonBuildNoDupHash_) { + const bool abandonEarly = abandonBuildNoDupHashEarly(table_->numDistinct()); + numHashInputRows_ += activeRows_.countSelected(); + if (abandonEarly) { + // The hash table is no longer directly constructed in addInput. The data + // that was previously inserted into the hash table is already in the + // RowContainer. + addRuntimeStat("abandonBuildNoDupHash", RuntimeCounter(1)); + abandonBuildNoDupHash_ = true; + table_->joinTableMayHaveDuplicates(); + } else { + table_->prepareForGroupProbe( + *lookup_, + input, + activeRows_, + BaseHashTable::kNoSpillInputStartPartitionBit); + if (lookup_->rows.empty()) { + return; + } + table_->groupProbe( + *lookup_, BaseHashTable::kNoSpillInputStartPartitionBit); + return; + } + } + if (analyzeKeys_ && hashes_.size() < activeRows_.end()) { hashes_.resize(activeRows_.end()); } @@ -756,7 +791,8 @@ bool HashBuild::finishHashBuild() { isInputFromSpill() ? spillConfig()->startPartitionBit : BaseHashTable::kNoSpillInputStartPartitionBit, allowParallelJoinBuild ? operatorCtx_->task()->queryCtx()->executor() - : nullptr); + : nullptr, + dropDuplicates_); } stats_.wlock()->addRuntimeStat( BaseHashTable::kBuildWallNanos, @@ -879,6 +915,7 @@ void HashBuild::setupSpillInput(HashJoinBridge::SpillInput spillInput) { setupTable(); setupSpiller(spillInput.spillPartition.get()); stateCleared_ = false; + numHashInputRows_ = 0; // Start to process spill input. processSpillInput(); @@ -1240,4 +1277,10 @@ void HashBuildSpiller::extractSpill( rows.data(), rows.size(), false, false, result->childAt(types.size())); } } + +bool HashBuild::abandonBuildNoDupHashEarly(int64_t numDistinct) const { + VELOX_CHECK(dropDuplicates_); + return numHashInputRows_ > abandonBuildNoDupHashMinRows_ && + numDistinct / numHashInputRows_ >= abandonBuildNoDupHashMinPct_ / 100; +} } // namespace facebook::velox::exec diff --git a/velox/exec/HashBuild.h b/velox/exec/HashBuild.h index 9485662bd0a2..1061a31ee26f 100644 --- a/velox/exec/HashBuild.h +++ b/velox/exec/HashBuild.h @@ -204,6 +204,11 @@ class HashBuild final : public Operator { // not. bool nonReclaimableState() const; + // True if we have enough rows and not enough duplicate join keys, i.e. more + // than 'abandonBuildNoDuplicatesHashMinRows_' rows and more than + // 'abandonBuildNoDuplicatesHashMinPct_' % of rows are unique. + bool abandonBuildNoDupHashEarly(int64_t numDistinct) const; + const std::shared_ptr joinNode_; const core::JoinType joinType_; @@ -242,6 +247,9 @@ class HashBuild final : public Operator { // Container for the rows being accumulated. std::unique_ptr table_; + // Used for building hash table while adding input rows. + std::unique_ptr lookup_; + // Key channels in 'input_' std::vector keyChannels_; @@ -269,6 +277,14 @@ class HashBuild final : public Operator { // at least one entry with null join keys. bool joinHasNullKeys_{false}; + // Indicates whether drop duplicate rows. Rows containing duplicate keys + // can be removed for left semi and anti join. + const bool dropDuplicates_; + + // Whether to abandon building a HashTable without duplicates in HashBuild + // addInput phase for left semi/anti join. + bool abandonBuildNoDupHash_{false}; + // The type used to spill hash table which might attach a boolean column to // record the probed flag if 'needProbedFlagSpill_' is true. RowTypePtr spillType_; @@ -310,6 +326,19 @@ class HashBuild final : public Operator { // Maps key channel in 'input_' to channel in key. folly::F14FastMap keyChannelMap_; + + // Count the number of hash table input rows for building no duplicates + // hash table. It will not be updated after abandonBuildNoDupHash_ is true. + int64_t numHashInputRows_ = 0; + + // Minimum number of rows to see before deciding to give up build no + // duplicates hash table. + const int32_t abandonBuildNoDupHashMinRows_; + + // Min unique rows pct for give up build no duplicates hash table. If more + // than this many rows are unique, build hash table in addInput phase is not + // worthwhile. + const int32_t abandonBuildNoDupHashMinPct_; }; inline std::ostream& operator<<(std::ostream& os, HashBuild::State state) { diff --git a/velox/exec/HashJoinBridge.cpp b/velox/exec/HashJoinBridge.cpp index c9dc19948a00..ac8ac75408ff 100644 --- a/velox/exec/HashJoinBridge.cpp +++ b/velox/exec/HashJoinBridge.cpp @@ -42,13 +42,18 @@ RowTypePtr hashJoinTableType( types.emplace_back(inputType->childAt(channel)); } + if (joinNode->canDropDuplicates()) { + // For left semi and anti join with no extra filter, hash table does not + // store dependent columns. + return ROW(std::move(names), std::move(types)); + } + for (auto i = 0; i < inputType->size(); ++i) { if (keyChannelSet.find(i) == keyChannelSet.end()) { names.emplace_back(inputType->nameOf(i)); types.emplace_back(inputType->childAt(i)); } } - return ROW(std::move(names), std::move(types)); } diff --git a/velox/exec/HashTable.cpp b/velox/exec/HashTable.cpp index 4c4a8c46382e..cdff1d50a2b3 100644 --- a/velox/exec/HashTable.cpp +++ b/velox/exec/HashTable.cpp @@ -57,6 +57,7 @@ HashTable::HashTable( pool_(pool), minTableSizeForParallelJoinBuild_(minTableSizeForParallelJoinBuild), isJoinBuild_(isJoinBuild), + joinBuildNoDuplicates_(!allowDuplicates), buildPartitionBounds_(raw_vector(pool)) { std::vector keys; for (auto& hasher : hashers_) { @@ -1487,7 +1488,9 @@ void HashTable::decideHashMode( return; } disableRangeArrayHash_ |= disableRangeArrayHash; - if (numDistinct_ && !isJoinBuild_) { + if (numDistinct_ && (!isJoinBuild_ || joinBuildNoDuplicates_)) { + // If the join type is left semi and anti, allowDuplicates_ will be false, + // and join build is building hash table while adding input rows. if (!analyze()) { setHashMode(HashMode::kHash, numNew, spillInputStartPartitionBit); return; @@ -1709,8 +1712,20 @@ template void HashTable::prepareJoinTable( std::vector> tables, int8_t spillInputStartPartitionBit, - folly::Executor* executor) { + folly::Executor* executor, + bool dropDuplicates) { buildExecutor_ = executor; + if (dropDuplicates) { + if (table_ != nullptr) { + // Reset table_ and capacity_ to trigger rehash. + rows_->pool()->freeContiguous(tableAllocation_); + table_ = nullptr; + capacity_ = 0; + } + // Call analyze to insert all unique values in row container to the + // table hashers' uniqueValues_; + analyze(); + } otherTables_.reserve(tables.size()); for (auto& table : tables) { otherTables_.emplace_back(std::unique_ptr>( @@ -1740,6 +1755,11 @@ void HashTable::prepareJoinTable( } if (useValueIds) { for (auto& other : otherTables_) { + if (dropDuplicates) { + // Before merging with the current hashers, all values in the row + // containers of other table need to be inserted into uniqueValues_. + other->analyze(); + } for (auto i = 0; i < hashers_.size(); ++i) { hashers_[i]->merge(*other->hashers_[i]); if (!hashers_[i]->mayUseValueIds()) { diff --git a/velox/exec/HashTable.h b/velox/exec/HashTable.h index c19d74727a11..9a85d5d090ba 100644 --- a/velox/exec/HashTable.h +++ b/velox/exec/HashTable.h @@ -310,7 +310,19 @@ class BaseHashTable { virtual void prepareJoinTable( std::vector> tables, int8_t spillInputStartPartitionBit, - folly::Executor* executor = nullptr) = 0; + folly::Executor* executor = nullptr, + bool dropDuplicates = false) = 0; + + /// The hash table used for join build in left semi and anti join does not + /// retain duplicate join keys by default. This is achieved by constructing + /// the hash table in the addInput phase to eliminate duplicate join keys. + /// When the percentage of duplicate data is small, it will adaptively adjust + /// to not build the hash table in the addInput phase. Instead, it operates + /// like other join types by reading all the data before building the hash + /// table. This function is used to indicate that the join hash table will not + /// be built during the addInput phase, and the input data will also not be + /// deduplicated. + virtual void joinTableMayHaveDuplicates() = 0; /// Returns the memory footprint in bytes for any data structures /// owned by 'this'. @@ -587,6 +599,10 @@ class HashTable : public BaseHashTable { return hasDuplicates_.check(); } + void joinTableMayHaveDuplicates() override { + joinBuildNoDuplicates_ = false; + } + HashMode hashMode() const override { return hashMode_; } @@ -611,7 +627,8 @@ class HashTable : public BaseHashTable { void prepareJoinTable( std::vector> tables, int8_t spillInputStartPartitionBit, - folly::Executor* executor = nullptr) override; + folly::Executor* executor = nullptr, + bool dropDuplicates = false) override; void prepareForJoinProbe( HashLookup& lookup, @@ -967,7 +984,7 @@ class HashTable : public BaseHashTable { // or distinct mode VectorHashers in a group by hash table. 0 for // join build sides. int32_t reservePct() const { - return isJoinBuild_ ? 0 : 50; + return (isJoinBuild_ && !joinBuildNoDuplicates_) ? 0 : 50; } // Returns the byte offset of the bucket for 'hash' starting from 'table_'. @@ -1037,6 +1054,7 @@ class HashTable : public BaseHashTable { int8_t sizeBits_; bool isJoinBuild_ = false; + bool joinBuildNoDuplicates_ = false; // Set at join build time if the table has duplicates, meaning that // the join can be cardinality increasing. Atomic for tsan because diff --git a/velox/exec/benchmarks/CMakeLists.txt b/velox/exec/benchmarks/CMakeLists.txt index 14391abd2e9f..89747d7ec704 100644 --- a/velox/exec/benchmarks/CMakeLists.txt +++ b/velox/exec/benchmarks/CMakeLists.txt @@ -65,6 +65,17 @@ target_link_libraries( velox_vector_test_lib Folly::follybenchmark) +add_executable(velox_hash_join_stream_build_benchmark + HashJoinStreamBuildBenchmark.cpp) + +target_link_libraries( + velox_hash_join_stream_build_benchmark + velox_exec + velox_exec_test_lib + velox_vector_fuzzer + velox_vector_test_lib + Folly::follybenchmark) + add_executable(velox_hash_join_prepare_join_table_benchmark HashJoinPrepareJoinTableBenchmark.cpp) diff --git a/velox/exec/benchmarks/HashJoinStreamBuildBenchmark.cpp b/velox/exec/benchmarks/HashJoinStreamBuildBenchmark.cpp new file mode 100644 index 000000000000..eba6eaa063e4 --- /dev/null +++ b/velox/exec/benchmarks/HashJoinStreamBuildBenchmark.cpp @@ -0,0 +1,389 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "velox/exec/HashTable.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/exec/tests/utils/VectorTestUtil.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +using namespace facebook::velox; +using namespace facebook::velox::exec; +using namespace facebook::velox::exec::test; +using namespace facebook::velox::test; + +namespace { +struct BenchmarkParams { + BenchmarkParams() = default; + + // Benchmark params, we need to provide: + // -the expect hash mode, + // -the build row schema, + // -the duplicate factor, + // -number of building rows, + // -number of probing rows, + // -the abandon percentage, + // -the number of build vector batches. + BenchmarkParams( + BaseHashTable::HashMode mode, + const TypePtr& buildType, + double dupFactor, + int64_t buildSize, + int64_t probeSize, + int32_t abandonPct, + int32_t numBuildBatches) + : mode{mode}, + buildType{buildType}, + hashTableSize{static_cast(std::floor(buildSize / dupFactor))}, + buildSize{buildSize}, + probeSize{probeSize}, + numBuildBatches{numBuildBatches}, + dupFactor{dupFactor}, + abandonPct{abandonPct} { + VELOX_CHECK_LE(hashTableSize, buildSize); + VELOX_CHECK_GE(numBuildBatches, 1); + + if (hashTableSize > BaseHashTable::kArrayHashMaxSize && + mode == BaseHashTable::HashMode::kArray) { + VELOX_FAIL("Bad hash mode."); + } + + numFields = buildType->size(); + if (mode == BaseHashTable::HashMode::kNormalizedKey) { + extraValue = BaseHashTable::kArrayHashMaxSize + 100; + } else if (mode == BaseHashTable::HashMode::kHash) { + extraValue = std::numeric_limits::max() - 1; + } else { + extraValue = 0; + } + + title = fmt::format( + "dupFactor:{:<2},abandonPct:{},{}", + dupFactor, + abandonPct, + BaseHashTable::modeString(mode)); + } + + // Expected mode. + BaseHashTable::HashMode mode; + + // Type of build & probe row. + TypePtr buildType; + + // Distinct rows in the table. + int64_t hashTableSize; + + // Number of build rows. + int64_t buildSize; + + // Number of probe rows. + int64_t probeSize; + + // Number of build RowContainers. + int32_t numBuildBatches; + + // Title for reporting. + std::string title; + + // The duplicate factor, 2 means every row will repeat 2 times. + double dupFactor; + + // This parameter controls the hashing mode. It is incorporated into the keys + // on the build side. If the expected mode is an array, its value is 0. If + // the expected mode is a normalized key, its value is 'kArrayHashMaxSize' + + // 100 to make the key range > 'kArrayHashMaxSize'. If the expected mode is a + // hash, its value is the maximum value of int64_t minus 1 to make the key + // range == 'kRangeTooLarge'. + int64_t extraValue; + + // Number of fields. + int32_t numFields; + + int32_t abandonPct; + + std::string toString() const { + return fmt::format( + "DupFactor:{:<2}, AbandonPct:{}, HashMode:{:<14}", + dupFactor, + abandonPct, + BaseHashTable::modeString(mode)); + } +}; + +struct BenchmarkResult { + BenchmarkParams params; + + uint64_t totalClock{0}; + + uint64_t hashBuildPeakMemoryBytes{0}; + + bool isBuildNoDupHashTableAbandon{false}; + + // The mode of the table. + BaseHashTable::HashMode hashMode; + + std::string toString() const { + return fmt::format( + "{}, isAbandon:{:<5}, totalClock:{}ms, peakMemoryBytes:{}", + params.toString(), + isBuildNoDupHashTableAbandon, + totalClock / 1000'000, + succinctBytes(hashBuildPeakMemoryBytes)); + } +}; + +class HashJoinStreamBuildBenchmark : public VectorTestBase { + public: + HashJoinStreamBuildBenchmark() : randomEngine_((std::random_device{}())) {} + + BenchmarkResult run(BenchmarkParams params) { + params_ = std::move(params); + BenchmarkResult result; + result.params = params_; + result.hashMode = params_.mode; + + std::vector buildVectors; + makeBuildBatches(buildVectors); + + int64_t sequence = 0; + int64_t batchSize = params_.probeSize / 4; + std::vector probeVectors; + for (auto i = 0; i < 4; ++i) { + auto batch = makeProbeVector(batchSize, params_.hashTableSize, sequence); + probeVectors.emplace_back(batch); + } + + uint64_t totalClocks{0}; + { + ClockTimer timer(totalClocks); + auto plan = makeHashJoinPlan(buildVectors, probeVectors); + CursorParameters cursorParams; + cursorParams.planNode = std::move(plan); + cursorParams.queryCtx = core::QueryCtx::create( + executor_.get(), + core::QueryConfig{{}}, + {}, + cache::AsyncDataCache::getInstance(), + rootPool_); + cursorParams.queryCtx->testingOverrideConfigUnsafe({ + {core::QueryConfig::kAbandonBuildNoDupHashMinPct, + std::to_string(params_.abandonPct)}, + {core::QueryConfig::kAbandonBuildNoDupHashMinRows, "1000000"}, + }); + + cursorParams.maxDrivers = 1; + auto cursor = TaskCursor::create(cursorParams); + auto* task = cursor->task().get(); + while (cursor->moveNext()) { + } + waitForTaskCompletion(task); + result.isBuildNoDupHashTableAbandon = isBuildNoDupHashTableAbandon(task); + } + result.totalClock = totalClocks; + + result.hashBuildPeakMemoryBytes = getHashBuildPeakMemory(rootPool_.get()); + return result; + } + + private: + std::shared_ptr makeHashJoinPlan( + const std::vector& buildVectors, + const std::vector& probeVectors) { + auto planNodeIdGenerator = std::make_shared(); + return exec::test::PlanBuilder(planNodeIdGenerator, pool_.get()) + .values(probeVectors) + .project({"c0 AS t0", "c1 as t1", "c2 as t2"}) + .hashJoin( + {"t0"}, + {"u0"}, + exec::test::PlanBuilder(planNodeIdGenerator) + .values(buildVectors) + .project({"c0 AS u0"}) + .planNode(), + "", + {"t0", "t1", "match"}, + core::JoinType::kLeftSemiProject) + .planNode(); + } + + // Create the row vector for the build side, where the first column is used + // as the join key, and the remaining columns are dependent fields. + // If expect mode is array, the key is within the range [0, hashTableSize]; + // If expect mode is normalized key, the key is within the range + // [0, hashTableSize] + extraValue(kArrayHashMaxSize + 100); + // If expect mode is hash, the key is within the range [0, hashTableSize] + + // extraValue(max_int64 -1); + RowVectorPtr makeBuildRows( + std::vector& data, + int64_t start, + int64_t end, + bool addExtraValue) { + auto subData = + std::vector(data.begin() + start, data.begin() + end); + if (addExtraValue) { + subData[0] = params_.extraValue; + } + + std::vector children; + children.push_back(makeFlatVector(subData)); + return makeRowVector(children); + } + + // Generate the build side data batches. + void makeBuildBatches(std::vector& batches) { + int64_t buildKey = 0; + std::vector data; + for (auto i = 0; i < params_.buildSize; ++i) { + data.emplace_back((buildKey++) % params_.hashTableSize); + } + std::shuffle(data.begin(), data.end(), randomEngine_); + + auto size = params_.buildSize / params_.numBuildBatches; + for (auto i = 0; i < params_.numBuildBatches; ++i) { + batches.push_back(makeBuildRows( + data, + i * size, + (i + 1) * size + 1, + i == params_.numBuildBatches - 1)); + } + } + + // Create the row vector for the probe side, where the first column is used + // as the join key, and the remaining columns are dependent fields. + // Probe key is within the range [0, hashTableSize]. + RowVectorPtr + makeProbeVector(int64_t size, int64_t hashTableSize, int64_t& sequence) { + std::vector children; + for (int32_t i = 0; i < params_.numFields; ++i) { + children.push_back(makeFlatVector( + size, + [&](vector_size_t row) { return (sequence + row) % hashTableSize; }, + nullptr)); + } + sequence += size; + + for (int32_t i = 0; i < 2; ++i) { + children.push_back(makeFlatVector( + size, [&](vector_size_t row) { return row + size; }, nullptr)); + } + return makeRowVector(children); + } + + static int64_t getHashBuildPeakMemory(memory::MemoryPool* rootPool) { + int64_t hashBuildPeakBytes = 0; + std::vector pools; + pools.push_back(rootPool); + while (!pools.empty()) { + std::vector childPools; + for (auto pool : pools) { + pool->visitChildren([&](memory::MemoryPool* childPool) -> bool { + if (childPool->name().find("HashBuild") != std::string::npos) { + hashBuildPeakBytes += childPool->peakBytes(); + } + childPools.push_back(childPool); + return true; + }); + } + pools.swap(childPools); + } + if (hashBuildPeakBytes == 0) { + VELOX_FAIL("Failed to get HashBuild peak memory"); + } + return hashBuildPeakBytes; + } + + static bool isBuildNoDupHashTableAbandon(exec::Task* task) { + for (auto& pipelineStat : task->taskStats().pipelineStats) { + for (auto& operatorStat : pipelineStat.operatorStats) { + if (operatorStat.operatorType == "HashBuild") { + return operatorStat.runtimeStats["abandonBuildNoDupHash"].count != 0; + } + } + } + return false; + } + + std::default_random_engine randomEngine_; + BenchmarkParams params_; +}; + +} // namespace + +int main(int argc, char** argv) { + folly::Init init{&argc, &argv}; + memory::MemoryManagerOptions options; + options.useMmapAllocator = true; + options.allocatorCapacity = 10UL << 30; + options.useMmapArena = true; + options.mmapArenaCapacityRatio = 1; + memory::MemoryManager::initialize(options); + + auto bm = std::make_unique(); + std::vector results; + + auto buildRowSize = (2L << 20) - 3; + auto probeRowSize = 100000000L; + + TypePtr twoKeyType{ROW({"k1"}, {BIGINT()})}; + + std::vector hashModes = { + BaseHashTable::HashMode::kArray, + BaseHashTable::HashMode::kNormalizedKey, + BaseHashTable::HashMode::kHash, + }; + std::vector dupFactorVector = { + 2, + 8, + 32, + }; + std::vector abandonPcts = { + 90, + 80, + 70, + 50, + }; + + std::vector params; + for (auto mode : hashModes) { + for (auto dupFactor : dupFactorVector) { + for (auto pct : abandonPcts) { + params.push_back(BenchmarkParams( + mode, twoKeyType, dupFactor, buildRowSize, probeRowSize, pct, 512)); + } + } + } + + for (auto& param : params) { + BenchmarkResult result; + folly::addBenchmark(__FILE__, param.title, [param, &results, &bm]() { + results.emplace_back(bm->run(param)); + return 1; + }); + } + + folly::runBenchmarks(); + + for (auto& result : results) { + std::cout << result.toString() << std::endl; + } + return 0; +} diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index 510e063e9191..6bf0b0156b4a 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -6499,6 +6499,134 @@ TEST_F(HashJoinTest, reclaimFromJoinBuilderWithMultiDrivers) { ASSERT_GT(arbitrator->stats().reclaimedUsedBytes, 0); } +TEST_F(HashJoinTest, semiJoinAbandonBuildNoDupHashEarly) { + auto probeVectors = makeBatches(3, [&](int32_t /*unused*/) { + return makeRowVector({ + makeFlatVector({1, 2, 2, 3, 3, 3, 4, 5, 5, 6, 7}), + makeFlatVector({10, 20, 21, 30, 31, 32, 40, 50, 51, 60, 70}), + }); + }); + + auto buildVectors = makeBatches(3, [&](int32_t /*unused*/) { + return makeRowVector({ + makeFlatVector({1, 1, 3, 4, 5, 5, 7, 8}), + makeFlatVector({100, 101, 300, 400, 500, 501, 700, 800}), + }); + }); + + createDuckDbTable("t", probeVectors); + createDuckDbTable("u", buildVectors); + + auto planNodeIdGenerator = std::make_shared(); + auto plan = PlanBuilder(planNodeIdGenerator) + .values(probeVectors) + .project({"c0 AS t0", "c1 AS t1"}) + .hashJoin( + {"t0"}, + {"u0"}, + PlanBuilder(planNodeIdGenerator) + .values(buildVectors) + .project({"c0 AS u0", "c1 AS u1"}) + .planNode(), + "", + {"t0", "t1", "match"}, + core::JoinType::kLeftSemiProject) + .planNode(); + + HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) + .config(core::QueryConfig::kAbandonBuildNoDupHashMinRows, "1") + .config(core::QueryConfig::kAbandonBuildNoDupHashMinPct, "10") + .planNode(plan) + .referenceQuery( + "SELECT t.c0, t.c1, EXISTS (SELECT * FROM u WHERE t.c0 = u.c0) FROM t") + .run(); +} + +TEST_F(HashJoinTest, antiJoinAbandonBuildNoDupHashEarly) { + auto probeVectors = makeBatches(64, [&](int32_t /*unused*/) { + return makeRowVector( + {"t0", "t1"}, + { + makeNullableFlatVector({std::nullopt, 1, 2, 3, 4, 5, 6}), + makeFlatVector({0, 1, 2, 3, 4, 5, 6}), + }); + }); + auto buildVectors = makeBatches(64, [&](int32_t /*unused*/) { + return makeRowVector( + {"u0", "u1"}, + { + makeNullableFlatVector({std::nullopt, 2, 3, 4, 6, 7, 8}), + makeFlatVector({0, 2, 3, 4, 6, 7, 8}), + }); + }); + HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) + .config(core::QueryConfig::kAbandonBuildNoDupHashMinRows, "1") + .config(core::QueryConfig::kAbandonBuildNoDupHashMinPct, "10") + .numDrivers(numDrivers_) + .probeKeys({"t0"}) + .probeVectors(std::vector(probeVectors)) + .buildKeys({"u0"}) + .buildVectors(std::vector(buildVectors)) + .joinType(core::JoinType::kAnti) + .joinOutputLayout({"t0", "t1"}) + .referenceQuery( + "SELECT t.* FROM t WHERE NOT EXISTS (SELECT * FROM u WHERE u.u0 = t.t0)") + .run(); +} + +TEST_F(HashJoinTest, semiJoinDeduplicateResetCapacity) { + const int32_t vectorSize = 10; + const int32_t batches = 210; + auto probeVectors = makeBatches(batches, [&](int32_t /*unused*/) { + return makeRowVector({ + // Join Key is double -> VectorHasher::typeKindSupportsValueIds will + // return false -> HashMode is kHash + makeFlatVector( + vectorSize, [&](vector_size_t /*row*/) { return rand(); }), + makeFlatVector( + vectorSize, [&](vector_size_t /*row*/) { return rand(); }), + }); + }); + + auto buildVectors = makeBatches(batches, [&](int32_t batch) { + return makeRowVector({ + makeFlatVector( + vectorSize, [&](vector_size_t /*row*/) { return rand(); }), + makeFlatVector( + vectorSize, [&](vector_size_t /*row*/) { return rand(); }), + }); + }); + + createDuckDbTable("t", probeVectors); + createDuckDbTable("u", buildVectors); + + auto planNodeIdGenerator = std::make_shared(); + auto plan = PlanBuilder(planNodeIdGenerator) + .values(probeVectors) + .project({"c0 AS t0", "c1 AS t1"}) + .hashJoin( + {"t0"}, + {"u0"}, + PlanBuilder(planNodeIdGenerator) + .values(buildVectors) + .project({"c0 AS u0", "c1 AS u1"}) + .planNode(), + "", + {"t0", "t1", "match"}, + core::JoinType::kLeftSemiProject) + .planNode(); + + HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) + .config(core::QueryConfig::kAbandonBuildNoDupHashMinRows, "10") + .config(core::QueryConfig::kAbandonBuildNoDupHashMinPct, "50") + .numDrivers(1) + .checkSpillStats(false) + .planNode(plan) + .referenceQuery( + "SELECT t.c0, t.c1, EXISTS (SELECT * FROM u WHERE t.c0 = u.c0) FROM t") + .run(); +} + DEBUG_ONLY_TEST_F( HashJoinTest, failedToReclaimFromHashJoinBuildersInNonReclaimableSection) { @@ -6880,7 +7008,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, arbitrationTriggeredByEnsureJoinTableFit) { } DEBUG_ONLY_TEST_F(HashJoinTest, joinBuildSpillError) { - const int kMemoryCapacity = 32 << 20; + const int kMemoryCapacity = 27 << 20; // Set a small memory capacity to trigger spill. std::unique_ptr memoryManager = createMemoryManager(kMemoryCapacity, 0);