Skip to content

Commit 1bb2da3

Browse files
authored
Support single thread execution on MemoryPool::reclaim (#362)
* Remove high usage callback * Executor not found when resuming a single-thread-execution task during memory reclaiming procedure * Use memory arbitrator to manage pool shrinking as well as growing * Allow using customized memory arbitrator when creating memory manager instance * Deadlock if throwing exception from MemoryManager::addRootPool(...) * Assertion failed, expected "!spillFinalized_" if reclaiming HashBuild operator during finishing * UT Fix: MemoryManagerTest.Ctor * Assertion failed, expected "spillPartitionEntry.second->numFiles() > 0" after HashBuild::reclaim() was called on a HashBuild that has no inputs
1 parent 977184d commit 1bb2da3

19 files changed

+189
-122
lines changed

velox/common/memory/Memory.cpp

Lines changed: 35 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,7 @@ constexpr folly::StringPiece kDefaultLeafName("__default_leaf__");
2828
MemoryManager::MemoryManager(const Options& options)
2929
: capacity_{options.capacity},
3030
allocator_{options.allocator->shared_from_this()},
31-
arbitrator_(MemoryArbitrator::create(MemoryArbitrator::Config{
32-
.kind = options.arbitratorConfig.kind,
33-
.capacity = capacity_,
34-
.initMemoryPoolCapacity =
35-
options.arbitratorConfig.initMemoryPoolCapacity,
36-
.minMemoryPoolCapacityTransferSize =
37-
options.arbitratorConfig.minMemoryPoolCapacityTransferSize,
38-
.retryArbitrationFailure =
39-
options.arbitratorConfig.retryArbitrationFailure})),
31+
arbitrator_{options.arbitratorFactory()},
4032
alignment_(std::max(MemoryAllocator::kMinAlignment, options.alignment)),
4133
checkUsageLeak_(options.checkUsageLeak),
4234
poolDestructionCb_([&](MemoryPool* pool) { dropPool(pool); }),
@@ -57,6 +49,9 @@ MemoryManager::MemoryManager(const Options& options)
5749
.checkUsageLeak = options.checkUsageLeak})} {
5850
VELOX_CHECK_NOT_NULL(allocator_);
5951
VELOX_USER_CHECK_GE(capacity_, 0);
52+
if (arbitrator_ != nullptr) {
53+
VELOX_CHECK_EQ(arbitrator_->capacity(), capacity_);
54+
}
6055
MemoryAllocator::alignmentCheck(0, alignment_);
6156
defaultRoot_->grow(defaultRoot_->maxCapacity());
6257
const size_t numSharedPools =
@@ -116,19 +111,22 @@ std::shared_ptr<MemoryPool> MemoryManager::addRootPool(
116111
options.trackUsage = true;
117112
options.checkUsageLeak = checkUsageLeak_;
118113

119-
folly::SharedMutex::WriteHolder guard{mutex_};
120-
if (pools_.find(poolName) != pools_.end()) {
121-
VELOX_FAIL("Duplicate root pool name found: {}", poolName);
114+
std::shared_ptr<MemoryPool> pool;
115+
{
116+
folly::SharedMutex::WriteHolder guard{mutex_};
117+
if (pools_.find(poolName) != pools_.end()) {
118+
VELOX_FAIL("Duplicate root pool name found: {}", poolName);
119+
}
120+
pool = std::make_shared<MemoryPoolImpl>(
121+
this,
122+
poolName,
123+
MemoryPool::Kind::kAggregate,
124+
nullptr,
125+
std::move(reclaimer),
126+
poolDestructionCb_,
127+
options);
128+
pools_.emplace(poolName, pool);
122129
}
123-
auto pool = std::make_shared<MemoryPoolImpl>(
124-
this,
125-
poolName,
126-
MemoryPool::Kind::kAggregate,
127-
nullptr,
128-
std::move(reclaimer),
129-
poolDestructionCb_,
130-
options);
131-
pools_.emplace(poolName, pool);
132130
VELOX_CHECK_EQ(pool->capacity(), 0);
133131
if (arbitrator_ != nullptr) {
134132
arbitrator_->reserveMemory(pool.get(), capacity);
@@ -151,6 +149,14 @@ std::shared_ptr<MemoryPool> MemoryManager::addLeafPool(
151149
return defaultRoot_->addLeafChild(poolName, threadSafe, nullptr);
152150
}
153151

152+
uint64_t MemoryManager::shrinkPool(MemoryPool* pool, uint64_t decrementBytes) {
153+
VELOX_CHECK_NOT_NULL(pool);
154+
if (arbitrator_ == nullptr) {
155+
return pool->shrink(decrementBytes);
156+
}
157+
return arbitrator_->releaseMemory(pool, decrementBytes);
158+
}
159+
154160
bool MemoryManager::growPool(MemoryPool* pool, uint64_t incrementBytes) {
155161
VELOX_CHECK_NOT_NULL(pool);
156162
VELOX_CHECK_NE(pool->capacity(), kMaxMemory);
@@ -165,14 +171,16 @@ bool MemoryManager::growPool(MemoryPool* pool, uint64_t incrementBytes) {
165171

166172
void MemoryManager::dropPool(MemoryPool* pool) {
167173
VELOX_CHECK_NOT_NULL(pool);
168-
folly::SharedMutex::WriteHolder guard{mutex_};
169-
auto it = pools_.find(pool->name());
170-
if (it == pools_.end()) {
171-
VELOX_FAIL("The dropped memory pool {} not found", pool->name());
174+
{
175+
folly::SharedMutex::WriteHolder guard{mutex_};
176+
auto it = pools_.find(pool->name());
177+
if (it == pools_.end()) {
178+
VELOX_FAIL("The dropped memory pool {} not found", pool->name());
179+
}
180+
pools_.erase(it);
172181
}
173-
pools_.erase(it);
174182
if (arbitrator_ != nullptr) {
175-
arbitrator_->releaseMemory(pool);
183+
arbitrator_->releaseMemory(pool, 0);
176184
}
177185
}
178186

velox/common/memory/Memory.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,9 @@ class IMemoryManager {
8080
/// Specifies the backing memory allocator.
8181
MemoryAllocator* allocator{MemoryAllocator::getInstance()};
8282

83-
/// Specifies the memory arbitration config.
84-
MemoryArbitrator::Config arbitratorConfig{};
83+
/// Specifies the memory arbitrator
84+
std::function<std::unique_ptr<MemoryArbitrator>()> arbitratorFactory{
85+
[]() { return MemoryArbitrator::create({}); }};
8586
};
8687

8788
virtual ~IMemoryManager() = default;
@@ -112,6 +113,10 @@ class IMemoryManager {
112113
const std::string& name = "",
113114
bool threadSafe = true) = 0;
114115

116+
/// Invoked to shrink a memory pool's free capacity with up to
117+
/// 'decrementBytes'.
118+
virtual uint64_t shrinkPool(MemoryPool* pool, uint64_t decrementBytes) = 0;
119+
115120
/// Invoked to grows a memory pool's free capacity with at least
116121
/// 'incrementBytes'. The function returns true on success, otherwise false.
117122
virtual bool growPool(MemoryPool* pool, uint64_t incrementBytes) = 0;
@@ -190,6 +195,7 @@ class MemoryManager final : public IMemoryManager {
190195
const std::string& name = "",
191196
bool threadSafe = true) final;
192197

198+
uint64_t shrinkPool(MemoryPool* pool, uint64_t decrementBytes) final;
193199
bool growPool(MemoryPool* pool, uint64_t incrementBytes) final;
194200

195201
MemoryPool& deprecatedSharedLeafPool() final;

velox/common/memory/MemoryArbitrator.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ std::unique_ptr<MemoryArbitrator> MemoryArbitrator::create(
5050
}
5151
}
5252

53+
uint64_t MemoryArbitrator::capacity() {
54+
return capacity_;
55+
}
56+
5357
std::unique_ptr<MemoryReclaimer> MemoryReclaimer::create() {
5458
return std::unique_ptr<MemoryReclaimer>(new MemoryReclaimer());
5559
}

velox/common/memory/MemoryArbitrator.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,10 @@ class MemoryArbitrator {
105105
/// the memory arbitration on demand when actual memory allocation happens.
106106
virtual void reserveMemory(MemoryPool* pool, uint64_t bytes) = 0;
107107

108-
/// Invoked by the memory manager to return back all the reserved memory
109-
/// capacity of a destroying memory pool.
110-
virtual void releaseMemory(MemoryPool* pool) = 0;
108+
/// Invoked by the memory manager to return back the specified amount of
109+
/// reserved memory capacity of a destroying memory pool. If 0 is specified,
110+
/// release all reserve memory. Returns the actually released amount of bytes.
111+
virtual uint64_t releaseMemory(MemoryPool* pool, uint64_t bytes) = 0;
111112

112113
/// Invoked by the memory manager to grow a memory pool's capacity.
113114
/// 'pool' is the memory pool to request to grow. 'candidates' is a list
@@ -126,6 +127,7 @@ class MemoryArbitrator {
126127
const std::vector<std::shared_ptr<MemoryPool>>& candidatePools,
127128
uint64_t targetBytes) = 0;
128129

130+
uint64_t capacity();
129131
/// The internal execution stats of the memory arbitrator.
130132
struct Stats {
131133
/// The number of arbitration requests.

velox/common/memory/MemoryPool.cpp

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -598,18 +598,6 @@ int64_t MemoryPoolImpl::capacity() const {
598598
return capacity_;
599599
}
600600

601-
bool MemoryPoolImpl::highUsage() {
602-
if (parent_ != nullptr) {
603-
return parent_->highUsage();
604-
}
605-
606-
if (highUsageCallback_ != nullptr) {
607-
return highUsageCallback_(*this);
608-
}
609-
610-
return false;
611-
}
612-
613601
std::shared_ptr<MemoryPool> MemoryPoolImpl::genChild(
614602
std::shared_ptr<MemoryPool> parent,
615603
const std::string& name,
@@ -762,6 +750,16 @@ bool MemoryPoolImpl::incrementReservationThreadSafe(
762750
capacityToString(manager_->capacity()))));
763751
}
764752

753+
uint64_t MemoryPoolImpl::shrinkManaged(
754+
MemoryPool* requestor,
755+
uint64_t targetBytes) {
756+
if (parent_ != nullptr) {
757+
return parent_->shrinkManaged(requestor, targetBytes);
758+
}
759+
VELOX_CHECK_NULL(parent_);
760+
return manager_->shrinkPool(requestor, targetBytes);
761+
};
762+
765763
bool MemoryPoolImpl::maybeIncrementReservation(uint64_t size) {
766764
std::lock_guard<std::mutex> l(mutex_);
767765
return maybeIncrementReservationLocked(size);

velox/common/memory/MemoryPool.h

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@ constexpr int64_t kMaxMemory = std::numeric_limits<int64_t>::max();
104104
/// be merged into memory pool object later.
105105
class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
106106
public:
107-
using HighUsageCallBack = std::function<bool(MemoryPool& Pool)>;
108107
/// Defines the kinds of a memory pool.
109108
enum class Kind {
110109
/// The leaf memory pool is used for memory allocation. User can allocate
@@ -308,14 +307,6 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
308307
/// 'capacity()' is fixed and set to 'maxCapacity()' on creation.
309308
virtual int64_t capacity() const = 0;
310309

311-
virtual bool highUsage() = 0;
312-
313-
virtual void setHighUsageCallback(HighUsageCallBack func) {
314-
VELOX_CHECK_NULL(
315-
parent_, "Only root memory pool allows to set high-usage callback");
316-
highUsageCallback_ = func;
317-
}
318-
319310
/// Returns the currently used memory in bytes of this memory pool.
320311
virtual int64_t currentBytes() const = 0;
321312

@@ -354,6 +345,12 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
354345
/// without actually freeing the used memory.
355346
virtual uint64_t freeBytes() const = 0;
356347

348+
/// Try shrinking up to the specified amount of free memory via memory
349+
/// manager.
350+
virtual uint64_t shrinkManaged(
351+
MemoryPool* requestor,
352+
uint64_t targetBytes = 0) = 0;
353+
357354
/// Invoked to free up to the specified amount of free memory by reducing
358355
/// this memory pool's capacity without actually freeing any used memory. The
359356
/// function returns the actually freed memory capacity in bytes. If
@@ -522,8 +519,6 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
522519
// visitChildren() cost as we don't have to upgrade the weak pointer and copy
523520
// out the upgraded shared pointers.git
524521
std::unordered_map<std::string, std::weak_ptr<MemoryPool>> children_;
525-
526-
HighUsageCallBack highUsageCallback_{};
527522
};
528523

529524
std::ostream& operator<<(std::ostream& out, MemoryPool::Kind kind);
@@ -571,8 +566,6 @@ class MemoryPoolImpl : public MemoryPool {
571566

572567
int64_t capacity() const override;
573568

574-
bool highUsage() override;
575-
576569
int64_t currentBytes() const override {
577570
std::lock_guard<std::mutex> l(mutex_);
578571
return currentBytesLocked();
@@ -599,6 +592,8 @@ class MemoryPoolImpl : public MemoryPool {
599592

600593
uint64_t freeBytes() const override;
601594

595+
uint64_t shrinkManaged(MemoryPool* requestor, uint64_t targetBytes) override;
596+
602597
uint64_t shrink(uint64_t targetBytes = 0) override;
603598

604599
uint64_t grow(uint64_t bytes) noexcept override;

velox/common/memory/SharedArbitrator.cpp

Lines changed: 39 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,11 @@ void SharedArbitrator::reserveMemory(MemoryPool* pool, uint64_t /*unused*/) {
110110
pool->grow(reserveBytes);
111111
}
112112

113-
void SharedArbitrator::releaseMemory(MemoryPool* pool) {
113+
uint64_t SharedArbitrator::releaseMemory(MemoryPool* pool, uint64_t bytes) {
114114
std::lock_guard<std::mutex> l(mutex_);
115-
const uint64_t freedBytes = pool->shrink(0);
115+
const uint64_t freedBytes = pool->shrink(bytes);
116116
incrementFreeCapacityLocked(freedBytes);
117+
return freedBytes;
117118
}
118119

119120
std::vector<SharedArbitrator::Candidate> SharedArbitrator::getCandidateStats(
@@ -196,52 +197,57 @@ bool SharedArbitrator::arbitrateMemory(
196197

197198
const uint64_t growTarget =
198199
std::max(minMemoryPoolCapacityTransferSize_, targetBytes);
199-
uint64_t freedBytes = decrementFreeCapacity(growTarget);
200-
if (freedBytes >= targetBytes) {
201-
requestor->grow(freedBytes);
200+
uint64_t unusedFreedBytes = decrementFreeCapacity(growTarget);
201+
202+
auto freeGuard = folly::makeGuard([&]() {
203+
// Returns the unused freed memory capacity back to the arbitrator.
204+
if (unusedFreedBytes > 0) {
205+
incrementFreeCapacity(unusedFreedBytes);
206+
}
207+
});
208+
209+
if (unusedFreedBytes >= targetBytes) {
210+
requestor->grow(unusedFreedBytes);
211+
unusedFreedBytes = 0;
202212
return true;
203213
}
204-
VELOX_CHECK_LT(freedBytes, growTarget);
214+
VELOX_CHECK_LT(unusedFreedBytes, growTarget);
205215

206-
freedBytes +=
207-
reclaimFreeMemoryFromCandidates(candidates, growTarget - freedBytes);
208-
if (freedBytes >= targetBytes) {
209-
requestor->grow(freedBytes);
216+
reclaimFreeMemoryFromCandidates(candidates, growTarget - unusedFreedBytes);
217+
unusedFreedBytes += decrementFreeCapacity(growTarget - unusedFreedBytes);
218+
if (unusedFreedBytes >= targetBytes) {
219+
requestor->grow(unusedFreedBytes);
220+
unusedFreedBytes = 0;
210221
return true;
211222
}
212223

213-
VELOX_CHECK_LT(freedBytes, growTarget);
214-
freedBytes += reclaimUsedMemoryFromCandidates(
215-
requestor, candidates, growTarget - freedBytes);
224+
VELOX_CHECK_LT(unusedFreedBytes, growTarget);
225+
reclaimUsedMemoryFromCandidates(
226+
requestor, candidates, growTarget - unusedFreedBytes);
227+
unusedFreedBytes += decrementFreeCapacity(growTarget - unusedFreedBytes);
216228
if (requestor->aborted()) {
217-
// Returns the unused freed memory capacity back to the arbitrator if the
218-
// requestor has been aborted.
219-
incrementFreeCapacity(freedBytes);
220229
++numFailures_;
221230
VELOX_MEM_POOL_ABORTED(requestor);
222231
}
223232

224233
VELOX_CHECK(!requestor->aborted());
225234

226-
auto freeGuard = folly::makeGuard([&]() {
227-
// Returns the unused freed memory capacity back to the arbitrator.
228-
if (freedBytes > 0) {
229-
incrementFreeCapacity(freedBytes);
230-
}
231-
});
232-
233-
if (freedBytes < targetBytes) {
235+
if (unusedFreedBytes < targetBytes) {
234236
VELOX_MEM_LOG(WARNING)
235237
<< "Failed to arbitrate sufficient memory for memory pool "
236238
<< requestor->name() << ", request " << succinctBytes(targetBytes)
237-
<< ", only " << succinctBytes(freedBytes)
239+
<< ", only " << succinctBytes(unusedFreedBytes)
238240
<< " has been freed, Arbitrator state: " << toString();
239241
return false;
240242
}
241243

242-
const uint64_t growBytes = std::min(freedBytes, growTarget);
243-
freedBytes -= growBytes;
244-
requestor->grow(growBytes);
244+
if (unusedFreedBytes > growTarget) {
245+
requestor->grow(growTarget);
246+
unusedFreedBytes -= growTarget;
247+
return true;
248+
}
249+
requestor->grow(unusedFreedBytes);
250+
unusedFreedBytes = 0;
245251
return true;
246252
}
247253

@@ -262,7 +268,9 @@ uint64_t SharedArbitrator::reclaimFreeMemoryFromCandidates(
262268
if (bytesToShrink <= 0) {
263269
break;
264270
}
265-
freedBytes += candidate.pool->shrink(bytesToShrink);
271+
uint64_t shrunk = candidate.pool->shrink(bytesToShrink);
272+
incrementFreeCapacity(shrunk);
273+
freedBytes += shrunk;
266274
if (freedBytes >= targetBytes) {
267275
break;
268276
}
@@ -302,6 +310,7 @@ uint64_t SharedArbitrator::reclaim(
302310
uint64_t freedBytes{0};
303311
try {
304312
freedBytes = pool->shrink(targetBytes);
313+
incrementFreeCapacity(freedBytes);
305314
if (freedBytes < targetBytes) {
306315
pool->reclaim(targetBytes - freedBytes);
307316
}
@@ -311,7 +320,7 @@ uint64_t SharedArbitrator::reclaim(
311320
abort(pool);
312321
// Free up all the free capacity from the aborted pool as the associated
313322
// query has failed at this point.
314-
pool->shrink();
323+
incrementFreeCapacity(pool->shrink());
315324
}
316325
const uint64_t newCapacity = pool->capacity();
317326
VELOX_CHECK_GE(oldCapacity, newCapacity);

velox/common/memory/SharedArbitrator.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class SharedArbitrator : public MemoryArbitrator {
3333

3434
void reserveMemory(MemoryPool* pool, uint64_t /*unused*/) final;
3535

36-
void releaseMemory(MemoryPool* pool) final;
36+
uint64_t releaseMemory(MemoryPool* pool, uint64_t bytes) final;
3737

3838
bool growMemory(
3939
MemoryPool* pool,

0 commit comments

Comments
 (0)