@@ -163,10 +163,11 @@ void SharedArbitrator::reserveMemory(MemoryPool* pool, uint64_t /*unused*/) {
163163 pool->grow (reserveBytes);
164164}
165165
166- void SharedArbitrator::releaseMemory (MemoryPool* pool) {
166+ uint64_t SharedArbitrator::releaseMemory (MemoryPool* pool, uint64_t bytes ) {
167167 std::lock_guard<std::mutex> l (mutex_);
168- const uint64_t freedBytes = pool->shrink (0 );
168+ const uint64_t freedBytes = pool->shrink (bytes );
169169 incrementFreeCapacityLocked (freedBytes);
170+ return freedBytes;
170171}
171172
172173std::vector<SharedArbitrator::Candidate> SharedArbitrator::getCandidateStats (
@@ -246,10 +247,7 @@ bool SharedArbitrator::ensureCapacity(
246247 if (checkCapacityGrowth (*requestor, targetBytes)) {
247248 return true ;
248249 }
249- const uint64_t reclaimedBytes = reclaim (requestor, targetBytes);
250- // NOTE: return the reclaimed bytes back to the arbitrator and let the memory
251- // arbitration process to grow the requestor's memory capacity accordingly.
252- incrementFreeCapacity (reclaimedBytes);
250+ reclaim (requestor, targetBytes);
253251 // Check if the requestor has been aborted in reclaim operation above.
254252 if (requestor->aborted ()) {
255253 ++numFailures_;
@@ -294,51 +292,57 @@ bool SharedArbitrator::arbitrateMemory(
294292 const uint64_t growTarget = std::min (
295293 maxGrowBytes (*requestor),
296294 std::max (memoryPoolTransferCapacity_, targetBytes));
297- uint64_t freedBytes = decrementFreeCapacity (growTarget);
298- if (freedBytes >= targetBytes) {
299- requestor->grow (freedBytes);
300- return true ;
301- }
302- VELOX_CHECK_LT (freedBytes, growTarget);
295+ uint64_t unusedFreedBytes = decrementFreeCapacity (growTarget);
303296
304297 auto freeGuard = folly::makeGuard ([&]() {
305298 // Returns the unused freed memory capacity back to the arbitrator.
306- if (freedBytes > 0 ) {
307- incrementFreeCapacity (freedBytes );
299+ if (unusedFreedBytes > 0 ) {
300+ incrementFreeCapacity (unusedFreedBytes );
308301 }
309302 });
310303
311- freedBytes +=
312- reclaimFreeMemoryFromCandidates (candidates, growTarget - freedBytes);
313- if (freedBytes >= targetBytes) {
314- const uint64_t bytesToGrow = std::min (growTarget, freedBytes);
315- requestor->grow (bytesToGrow);
316- freedBytes -= bytesToGrow;
304+ if (unusedFreedBytes >= targetBytes) {
305+ requestor->grow (unusedFreedBytes);
306+ unusedFreedBytes = 0 ;
307+ return true ;
308+ }
309+ VELOX_CHECK_LT (unusedFreedBytes, growTarget);
310+
311+ reclaimFreeMemoryFromCandidates (candidates, growTarget - unusedFreedBytes);
312+ unusedFreedBytes += decrementFreeCapacity (growTarget - unusedFreedBytes);
313+ if (unusedFreedBytes >= targetBytes) {
314+ requestor->grow (unusedFreedBytes);
315+ unusedFreedBytes = 0 ;
317316 return true ;
318317 }
319318
320- VELOX_CHECK_LT (freedBytes, growTarget);
321- freedBytes += reclaimUsedMemoryFromCandidates (
322- requestor, candidates, growTarget - freedBytes);
319+ VELOX_CHECK_LT (unusedFreedBytes, growTarget);
320+ reclaimUsedMemoryFromCandidates (
321+ requestor, candidates, growTarget - unusedFreedBytes);
322+ unusedFreedBytes += decrementFreeCapacity (growTarget - unusedFreedBytes);
323323 if (requestor->aborted ()) {
324324 ++numFailures_;
325325 VELOX_MEM_POOL_ABORTED (" The requestor pool has been aborted." );
326326 }
327327
328328 VELOX_CHECK (!requestor->aborted ());
329329
330- if (freedBytes < targetBytes) {
330+ if (unusedFreedBytes < targetBytes) {
331331 VELOX_MEM_LOG (WARNING)
332332 << " Failed to arbitrate sufficient memory for memory pool "
333333 << requestor->name () << " , request " << succinctBytes (targetBytes)
334- << " , only " << succinctBytes (freedBytes )
334+ << " , only " << succinctBytes (unusedFreedBytes )
335335 << " has been freed, Arbitrator state: " << toString ();
336336 return false ;
337337 }
338338
339- const uint64_t bytesToGrow = std::min (freedBytes, growTarget);
340- requestor->grow (bytesToGrow);
341- freedBytes -= bytesToGrow;
339+ if (unusedFreedBytes > growTarget) {
340+ requestor->grow (growTarget);
341+ unusedFreedBytes -= growTarget;
342+ return true ;
343+ }
344+ requestor->grow (unusedFreedBytes);
345+ unusedFreedBytes = 0 ;
342346 return true ;
343347}
344348
@@ -359,7 +363,9 @@ uint64_t SharedArbitrator::reclaimFreeMemoryFromCandidates(
359363 if (bytesToShrink <= 0 ) {
360364 break ;
361365 }
362- freedBytes += candidate.pool ->shrink (bytesToShrink);
366+ uint64_t shrunk = candidate.pool ->shrink (bytesToShrink);
367+ incrementFreeCapacity (shrunk);
368+ freedBytes += shrunk;
363369 if (freedBytes >= targetBytes) {
364370 break ;
365371 }
@@ -399,6 +405,7 @@ uint64_t SharedArbitrator::reclaim(
399405 uint64_t freedBytes{0 };
400406 try {
401407 freedBytes = pool->shrink (targetBytes);
408+ incrementFreeCapacity (freedBytes);
402409 if (freedBytes < targetBytes) {
403410 pool->reclaim (targetBytes - freedBytes);
404411 }
@@ -408,7 +415,7 @@ uint64_t SharedArbitrator::reclaim(
408415 abort (pool, std::current_exception ());
409416 // Free up all the free capacity from the aborted pool as the associated
410417 // query has failed at this point.
411- pool->shrink ();
418+ incrementFreeCapacity ( pool->shrink () );
412419 }
413420 const uint64_t newCapacity = pool->capacity ();
414421 VELOX_CHECK_GE (oldCapacity, newCapacity);
0 commit comments