Skip to content

Commit 1268385

Browse files
Address Copilot review: leak fix + group drain + doc cleanup
* TaskQueue/MPSCQueue::link_next_block: hold the freshly allocated Block in a std::unique_ptr so a lost CAS race no longer leaks the block. * Group: drain the wait_buckets and call unregister_external_waiter for every parked entry in ~Group so the Pool::external_waiters counter is balanced on Scheduler teardown (per declaration order Pools outlive Groups, so the raw Pool* pointers in WaitEntry remain valid). * Group.hpp: drop the orphan try_submit doc block that was sitting above try_acquire_running_lock and add the rule-of-five deletes that come with declaring an explicit destructor. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 3d2ebfd commit 1268385

4 files changed

Lines changed: 52 additions & 12 deletions

File tree

src/threading/scheduler/Group.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,27 @@ namespace threading {
151151
Group::Group(std::shared_ptr<const util::GroupDescriptor> descriptor)
152152
: descriptor(std::move(descriptor)), tokens(this->descriptor->concurrency) {}
153153

154+
Group::~Group() {
155+
// Drain any waiters still parked in the fast-path buckets so the external_waiters
156+
// counter on each Pool is balanced back to zero. If we let the wait_buckets just go
157+
// out of scope, the WaitEntry destructors would silently drop the tasks but never
158+
// call unregister_external_waiter, and the matching Pool worker would loop forever
159+
// in get_task() waiting for waiters that no longer exist.
160+
//
161+
// Per the Scheduler field declaration order (`pools` declared before `groups`),
162+
// Groups are destroyed before Pools, so every WaitEntry::pool pointer is still
163+
// valid here.
164+
WaitEntry entry;
165+
for (auto& bucket : wait_buckets) {
166+
while (bucket.try_dequeue(entry)) {
167+
if (entry.pool != nullptr) {
168+
entry.pool->unregister_external_waiter();
169+
}
170+
entry = WaitEntry{};
171+
}
172+
}
173+
}
174+
154175
std::unique_ptr<Lock> Group::try_acquire_running_lock() {
155176
if (slow_pending.load(std::memory_order_acquire) > 0) {
156177
return nullptr;

src/threading/scheduler/Group.hpp

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -179,17 +179,18 @@ namespace threading {
179179
explicit Group(std::shared_ptr<const util::GroupDescriptor> descriptor);
180180

181181
/**
182-
* Try to submit a task through the lock-free fast path.
183-
*
184-
* If a group token is available the task is submitted to the pool immediately.
185-
* Otherwise the task is queued until a token is released.
186-
*
187-
* @param task the reaction task to submit
188-
* @param pool the pool to submit to when runnable
189-
* @param clear_idle if true, clear idle state on submission
190-
*
191-
* @return true if the task was submitted immediately
182+
* Destroy the Group object. Drains any parked waiters in the fast-path buckets so the
183+
* `external_waiters` counter on every Pool referenced by a queued WaitEntry is balanced
184+
* back to zero; otherwise a Pool worker could spin forever in `get_task()` waiting for
185+
* waiters that will never be drained.
192186
*/
187+
~Group();
188+
189+
Group(const Group&) = delete;
190+
Group(Group&&) = delete;
191+
Group& operator=(const Group&) = delete;
192+
Group& operator=(Group&&) = delete;
193+
193194
/**
194195
* Try to acquire a token for inline execution without submitting to a pool.
195196
*

src/threading/scheduler/queue/MPSCQueue.hpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <array>
2727
#include <atomic>
2828
#include <cstddef>
29+
#include <memory>
2930
#include <new>
3031
#include <thread>
3132
#include <type_traits>
@@ -102,10 +103,17 @@ namespace threading {
102103
}
103104

104105
bool link_next_block(Block* block) {
106+
// Hold the new block in a unique_ptr so that if the CAS fails (another producer
107+
// linked the next block first) we don't leak the freshly allocated Block.
108+
// Function arguments are unconditionally evaluated in C++, so the previous form
109+
// `compare_exchange_strong(expected, allocate_block(), ...)` leaked one Block per
110+
// contended overflow.
105111
Block* expected = nullptr;
112+
std::unique_ptr<Block> candidate(allocate_block());
106113
if (block->next.compare_exchange_strong(expected,
107-
allocate_block(),
114+
candidate.get(),
108115
std::memory_order_acq_rel)) {
116+
candidate.release();
109117
return true;
110118
}
111119
return expected != nullptr;

src/threading/scheduler/queue/TaskQueue.hpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <array>
2727
#include <atomic>
2828
#include <cstddef>
29+
#include <memory>
2930
#include <new>
3031
#include <thread>
3132
#include <type_traits>
@@ -99,8 +100,17 @@ namespace threading {
99100
}
100101

101102
bool link_next_block(Block* block) {
103+
// Hold the new block in a unique_ptr so that if the CAS fails (another producer
104+
// linked the next block first) we don't leak the freshly allocated Block.
105+
// Function arguments are unconditionally evaluated in C++, so the previous form
106+
// `compare_exchange_strong(expected, allocate_block(), ...)` leaked one Block per
107+
// contended overflow.
102108
Block* expected = nullptr;
103-
if (block->next.compare_exchange_strong(expected, allocate_block(), std::memory_order_acq_rel)) {
109+
std::unique_ptr<Block> candidate(allocate_block());
110+
if (block->next.compare_exchange_strong(expected,
111+
candidate.get(),
112+
std::memory_order_acq_rel)) {
113+
candidate.release();
104114
return true;
105115
}
106116
return expected != nullptr;

0 commit comments

Comments
 (0)