Skip to content

Commit 6e8570f

Browse files
liujiayi771Lakehouse Engine Bot
authored andcommitted
Stream input row to hash table when addInput for left semi and anti join
Address comments disable by default Alchemy-item: (ID = 532) feat: Build hash table while adding input rows for left semi and anti join commit 1/1 - 73ee99f
1 parent 98e5bae commit 6e8570f

File tree

12 files changed

+705
-30
lines changed

12 files changed

+705
-30
lines changed

velox/common/memory/tests/SharedArbitratorTest.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -760,7 +760,7 @@ DEBUG_ONLY_TEST_P(
760760
folly::EventCount taskPauseWait;
761761
auto taskPauseWaitKey = taskPauseWait.prepareWait();
762762

763-
const auto fakeAllocationSize = kMemoryCapacity - (32L << 20);
763+
const auto fakeAllocationSize = kMemoryCapacity - (2L << 20);
764764

765765
std::atomic<bool> injectAllocationOnce{true};
766766
fakeOperatorFactory_->setAllocationCallback([&](Operator* op) {

velox/core/PlanNode.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3059,6 +3059,15 @@ class AbstractJoinNode : public PlanNode {
30593059
return isInnerJoin() || isLeftJoin() || isAntiJoin();
30603060
}
30613061

3062+
// Indicates if this joinNode can drop duplicate rows with same join key.
3063+
// For left semi and anti join, it is not necessary to store duplicate rows.
3064+
bool canDropDuplicates() const {
3065+
// Left semi and anti join with no extra filter only needs to know whether
3066+
// there is a match. Hence, no need to store entries with duplicate keys.
3067+
return !filter() &&
3068+
(isLeftSemiFilterJoin() || isLeftSemiProjectJoin() || isAntiJoin());
3069+
}
3070+
30623071
const std::vector<FieldAccessTypedExprPtr>& leftKeys() const {
30633072
return leftKeys_;
30643073
}

velox/core/QueryConfig.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,12 @@ class QueryConfig {
204204
static constexpr const char* kAbandonPartialTopNRowNumberMinPct =
205205
"abandon_partial_topn_row_number_min_pct";
206206

207+
static constexpr const char* kAbandonBuildNoDupHashMinRows =
208+
"abandon_build_no_dup_hash_min_rows";
209+
210+
static constexpr const char* kAbandonBuildNoDupHashMinPct =
211+
"abandon_build_no_dup_hash_min_pct";
212+
207213
static constexpr const char* kMaxElementsSizeInRepeatAndSequence =
208214
"max_elements_size_in_repeat_and_sequence";
209215

@@ -819,6 +825,14 @@ class QueryConfig {
819825
return get<int32_t>(kAbandonPartialTopNRowNumberMinPct, 80);
820826
}
821827

828+
int32_t abandonBuildNoDupHashMinRows() const {
829+
return get<int32_t>(kAbandonBuildNoDupHashMinRows, 100'000);
830+
}
831+
832+
int32_t abandonBuildNoDupHashMinPct() const {
833+
return get<int32_t>(kAbandonBuildNoDupHashMinPct, 0);
834+
}
835+
822836
int32_t maxElementsSizeInRepeatAndSequence() const {
823837
return get<int32_t>(kMaxElementsSizeInRepeatAndSequence, 10'000);
824838
}

velox/docs/configs.rst

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,16 @@ Generic Configuration
4343
- integer
4444
- 80
4545
- Abandons partial TopNRowNumber if number of output rows equals or exceeds this percentage of the number of input rows.
46+
* - abandon_build_no_dup_hash_min_rows
47+
- integer
48+
- 100,000
49+
- Number of input rows to receive before starting to check whether to abandon building a HashTable without
50+
duplicates in HashBuild for left semi/anti join.
51+
* - abandon_build_no_dup_hash_min_pct
52+
- integer
53+
- 0
54+
- Abandons building a HashTable without duplicates in HashBuild for left semi/anti join if the percentage of
55+
distinct keys in the HashTable exceeds this threshold. Zero means 'disable this optimization'.
4656
* - session_timezone
4757
- string
4858
-

velox/exec/HashBuild.cpp

Lines changed: 65 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,12 @@ HashBuild::HashBuild(
6666
joinBridge_(operatorCtx_->task()->getHashJoinBridgeLocked(
6767
operatorCtx_->driverCtx()->splitGroupId,
6868
planNodeId())),
69-
keyChannelMap_(joinNode_->rightKeys().size()) {
69+
dropDuplicates_(joinNode_->canDropDuplicates()),
70+
keyChannelMap_(joinNode_->rightKeys().size()),
71+
abandonBuildNoDupHashMinRows_(
72+
driverCtx->queryConfig().abandonBuildNoDupHashMinRows()),
73+
abandonBuildNoDupHashMinPct_(
74+
driverCtx->queryConfig().abandonBuildNoDupHashMinPct()) {
7075
VELOX_CHECK(pool()->trackUsage());
7176
VELOX_CHECK_NOT_NULL(joinBridge_);
7277

@@ -86,19 +91,22 @@ HashBuild::HashBuild(
8691

8792
// Identify the non-key build side columns and make a decoder for each.
8893
const int32_t numDependents = inputType->size() - numKeys;
89-
if (numDependents > 0) {
90-
// Number of join keys (numKeys) may be less then number of input columns
91-
// (inputType->size()). In this case numDependents is negative and cannot be
92-
// used to call 'reserve'. This happens when we join different probe side
93-
// keys with the same build side key: SELECT * FROM t LEFT JOIN u ON t.k1 =
94-
// u.k AND t.k2 = u.k.
95-
dependentChannels_.reserve(numDependents);
96-
decoders_.reserve(numDependents);
97-
}
98-
for (auto i = 0; i < inputType->size(); ++i) {
99-
if (keyChannelMap_.find(i) == keyChannelMap_.end()) {
100-
dependentChannels_.emplace_back(i);
101-
decoders_.emplace_back(std::make_unique<DecodedVector>());
94+
if (!dropDuplicates_) {
95+
if (numDependents > 0) {
96+
// Number of join keys (numKeys) may be less then number of input columns
97+
// (inputType->size()). In this case numDependents is negative and cannot
98+
// be used to call 'reserve'. This happens when we join different probe
99+
// side keys with the same build side key: SELECT * FROM t LEFT JOIN u ON
100+
// t.k1 = u.k AND t.k2 = u.k.
101+
dependentChannels_.reserve(numDependents);
102+
decoders_.reserve(numDependents);
103+
}
104+
105+
for (auto i = 0; i < inputType->size(); ++i) {
106+
if (keyChannelMap_.find(i) == keyChannelMap_.end()) {
107+
dependentChannels_.emplace_back(i);
108+
decoders_.emplace_back(std::make_unique<DecodedVector>());
109+
}
102110
}
103111
}
104112

@@ -146,11 +154,6 @@ void HashBuild::setupTable() {
146154
.minTableRowsForParallelJoinBuild(),
147155
pool());
148156
} else {
149-
// (Left) semi and anti join with no extra filter only needs to know whether
150-
// there is a match. Hence, no need to store entries with duplicate keys.
151-
const bool dropDuplicates = !joinNode_->filter() &&
152-
(joinNode_->isLeftSemiFilterJoin() ||
153-
joinNode_->isLeftSemiProjectJoin() || isAntiJoin(joinType_));
154157
// Right semi join needs to tag build rows that were probed.
155158
const bool needProbedFlag = joinNode_->isRightSemiFilterJoin();
156159
if (isLeftNullAwareJoinWithFilter(joinNode_)) {
@@ -159,7 +162,7 @@ void HashBuild::setupTable() {
159162
table_ = HashTable<false>::createForJoin(
160163
std::move(keyHashers),
161164
dependentTypes,
162-
!dropDuplicates, // allowDuplicates
165+
!dropDuplicates_, // allowDuplicates
163166
needProbedFlag, // hasProbedFlag
164167
operatorCtx_->driverCtx()
165168
->queryConfig()
@@ -170,15 +173,22 @@ void HashBuild::setupTable() {
170173
table_ = HashTable<true>::createForJoin(
171174
std::move(keyHashers),
172175
dependentTypes,
173-
!dropDuplicates, // allowDuplicates
176+
!dropDuplicates_, // allowDuplicates
174177
needProbedFlag, // hasProbedFlag
175178
operatorCtx_->driverCtx()
176179
->queryConfig()
177180
.minTableRowsForParallelJoinBuild(),
178181
pool());
179182
}
180183
}
184+
lookup_ = std::make_unique<HashLookup>(table_->hashers(), pool());
181185
analyzeKeys_ = table_->hashMode() != BaseHashTable::HashMode::kHash;
186+
if (abandonBuildNoDupHashMinPct_ == 0) {
187+
// Building a HashTable without duplicates is disabled if
188+
// abandonBuildNoDupHashMinPct_ is 0.
189+
abandonBuildNoDupHash_ = true;
190+
table_->joinTableMayHaveDuplicates();
191+
}
182192
}
183193

184194
void HashBuild::setupSpiller(SpillPartition* spillPartition) {
@@ -377,6 +387,31 @@ void HashBuild::addInput(RowVectorPtr input) {
377387
return;
378388
}
379389

390+
if (dropDuplicates_ && !abandonBuildNoDupHash_) {
391+
const bool abandonEarly = abandonBuildNoDupHashEarly(table_->numDistinct());
392+
numHashInputRows_ += activeRows_.countSelected();
393+
if (abandonEarly) {
394+
// The hash table is no longer directly constructed in addInput. The data
395+
// that was previously inserted into the hash table is already in the
396+
// RowContainer.
397+
addRuntimeStat("abandonBuildNoDupHash", RuntimeCounter(1));
398+
abandonBuildNoDupHash_ = true;
399+
table_->joinTableMayHaveDuplicates();
400+
} else {
401+
table_->prepareForGroupProbe(
402+
*lookup_,
403+
input,
404+
activeRows_,
405+
BaseHashTable::kNoSpillInputStartPartitionBit);
406+
if (lookup_->rows.empty()) {
407+
return;
408+
}
409+
table_->groupProbe(
410+
*lookup_, BaseHashTable::kNoSpillInputStartPartitionBit);
411+
return;
412+
}
413+
}
414+
380415
if (analyzeKeys_ && hashes_.size() < activeRows_.end()) {
381416
hashes_.resize(activeRows_.end());
382417
}
@@ -756,7 +791,8 @@ bool HashBuild::finishHashBuild() {
756791
isInputFromSpill() ? spillConfig()->startPartitionBit
757792
: BaseHashTable::kNoSpillInputStartPartitionBit,
758793
allowParallelJoinBuild ? operatorCtx_->task()->queryCtx()->executor()
759-
: nullptr);
794+
: nullptr,
795+
dropDuplicates_);
760796
}
761797
stats_.wlock()->addRuntimeStat(
762798
BaseHashTable::kBuildWallNanos,
@@ -879,6 +915,7 @@ void HashBuild::setupSpillInput(HashJoinBridge::SpillInput spillInput) {
879915
setupTable();
880916
setupSpiller(spillInput.spillPartition.get());
881917
stateCleared_ = false;
918+
numHashInputRows_ = 0;
882919

883920
// Start to process spill input.
884921
processSpillInput();
@@ -1240,4 +1277,10 @@ void HashBuildSpiller::extractSpill(
12401277
rows.data(), rows.size(), false, false, result->childAt(types.size()));
12411278
}
12421279
}
1280+
1281+
bool HashBuild::abandonBuildNoDupHashEarly(int64_t numDistinct) const {
1282+
VELOX_CHECK(dropDuplicates_);
1283+
return numHashInputRows_ > abandonBuildNoDupHashMinRows_ &&
1284+
numDistinct / numHashInputRows_ >= abandonBuildNoDupHashMinPct_ / 100;
1285+
}
12431286
} // namespace facebook::velox::exec

velox/exec/HashBuild.h

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,11 @@ class HashBuild final : public Operator {
204204
// not.
205205
bool nonReclaimableState() const;
206206

207+
// True if we have enough rows and not enough duplicate join keys, i.e. more
208+
// than 'abandonBuildNoDuplicatesHashMinRows_' rows and more than
209+
// 'abandonBuildNoDuplicatesHashMinPct_' % of rows are unique.
210+
bool abandonBuildNoDupHashEarly(int64_t numDistinct) const;
211+
207212
const std::shared_ptr<const core::HashJoinNode> joinNode_;
208213

209214
const core::JoinType joinType_;
@@ -242,6 +247,9 @@ class HashBuild final : public Operator {
242247
// Container for the rows being accumulated.
243248
std::unique_ptr<BaseHashTable> table_;
244249

250+
// Used for building hash table while adding input rows.
251+
std::unique_ptr<HashLookup> lookup_;
252+
245253
// Key channels in 'input_'
246254
std::vector<column_index_t> keyChannels_;
247255

@@ -269,6 +277,14 @@ class HashBuild final : public Operator {
269277
// at least one entry with null join keys.
270278
bool joinHasNullKeys_{false};
271279

280+
// Indicates whether drop duplicate rows. Rows containing duplicate keys
281+
// can be removed for left semi and anti join.
282+
const bool dropDuplicates_;
283+
284+
// Whether to abandon building a HashTable without duplicates in HashBuild
285+
// addInput phase for left semi/anti join.
286+
bool abandonBuildNoDupHash_{false};
287+
272288
// The type used to spill hash table which might attach a boolean column to
273289
// record the probed flag if 'needProbedFlagSpill_' is true.
274290
RowTypePtr spillType_;
@@ -310,6 +326,19 @@ class HashBuild final : public Operator {
310326

311327
// Maps key channel in 'input_' to channel in key.
312328
folly::F14FastMap<column_index_t, column_index_t> keyChannelMap_;
329+
330+
// Count the number of hash table input rows for building no duplicates
331+
// hash table. It will not be updated after abandonBuildNoDupHash_ is true.
332+
int64_t numHashInputRows_ = 0;
333+
334+
// Minimum number of rows to see before deciding to give up build no
335+
// duplicates hash table.
336+
const int32_t abandonBuildNoDupHashMinRows_;
337+
338+
// Min unique rows pct for give up build no duplicates hash table. If more
339+
// than this many rows are unique, build hash table in addInput phase is not
340+
// worthwhile.
341+
const int32_t abandonBuildNoDupHashMinPct_;
313342
};
314343

315344
inline std::ostream& operator<<(std::ostream& os, HashBuild::State state) {

velox/exec/HashJoinBridge.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,18 @@ RowTypePtr hashJoinTableType(
4242
types.emplace_back(inputType->childAt(channel));
4343
}
4444

45+
if (joinNode->canDropDuplicates()) {
46+
// For left semi and anti join with no extra filter, hash table does not
47+
// store dependent columns.
48+
return ROW(std::move(names), std::move(types));
49+
}
50+
4551
for (auto i = 0; i < inputType->size(); ++i) {
4652
if (keyChannelSet.find(i) == keyChannelSet.end()) {
4753
names.emplace_back(inputType->nameOf(i));
4854
types.emplace_back(inputType->childAt(i));
4955
}
5056
}
51-
5257
return ROW(std::move(names), std::move(types));
5358
}
5459

velox/exec/HashTable.cpp

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ HashTable<ignoreNullKeys>::HashTable(
5757
pool_(pool),
5858
minTableSizeForParallelJoinBuild_(minTableSizeForParallelJoinBuild),
5959
isJoinBuild_(isJoinBuild),
60+
joinBuildNoDuplicates_(!allowDuplicates),
6061
buildPartitionBounds_(raw_vector<PartitionBoundIndexType>(pool)) {
6162
std::vector<TypePtr> keys;
6263
for (auto& hasher : hashers_) {
@@ -1494,7 +1495,9 @@ void HashTable<ignoreNullKeys>::decideHashMode(
14941495
return;
14951496
}
14961497
disableRangeArrayHash_ |= disableRangeArrayHash;
1497-
if (numDistinct_ && !isJoinBuild_) {
1498+
if (numDistinct_ && (!isJoinBuild_ || joinBuildNoDuplicates_)) {
1499+
// If the join type is left semi and anti, allowDuplicates_ will be false,
1500+
// and join build is building hash table while adding input rows.
14981501
if (!analyze()) {
14991502
setHashMode(HashMode::kHash, numNew, spillInputStartPartitionBit);
15001503
return;
@@ -1716,8 +1719,20 @@ template <bool ignoreNullKeys>
17161719
void HashTable<ignoreNullKeys>::prepareJoinTable(
17171720
std::vector<std::unique_ptr<BaseHashTable>> tables,
17181721
int8_t spillInputStartPartitionBit,
1719-
folly::Executor* executor) {
1722+
folly::Executor* executor,
1723+
bool dropDuplicates) {
17201724
buildExecutor_ = executor;
1725+
if (dropDuplicates) {
1726+
if (table_ != nullptr) {
1727+
// Reset table_ and capacity_ to trigger rehash.
1728+
rows_->pool()->freeContiguous(tableAllocation_);
1729+
table_ = nullptr;
1730+
capacity_ = 0;
1731+
}
1732+
// Call analyze to insert all unique values in row container to the
1733+
// table hashers' uniqueValues_;
1734+
analyze();
1735+
}
17211736
otherTables_.reserve(tables.size());
17221737
for (auto& table : tables) {
17231738
otherTables_.emplace_back(
@@ -1748,6 +1763,11 @@ void HashTable<ignoreNullKeys>::prepareJoinTable(
17481763
}
17491764
if (useValueIds) {
17501765
for (auto& other : otherTables_) {
1766+
if (dropDuplicates) {
1767+
// Before merging with the current hashers, all values in the row
1768+
// containers of other table need to be inserted into uniqueValues_.
1769+
other->analyze();
1770+
}
17511771
for (auto i = 0; i < hashers_.size(); ++i) {
17521772
hashers_[i]->merge(*other->hashers_[i]);
17531773
if (!hashers_[i]->mayUseValueIds()) {

0 commit comments

Comments
 (0)