Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 28 additions & 15 deletions clang/data/wspq.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
// Notes
// - Tasks are u64 values; key is an 8-bit hint.
// - Not a general multi-producer queue: each worker pushes to its own bank.
// - wsq buffers are fixed size; push spins if a bucket is temporarily full.
// - wsq buffers are fixed size and allocated lazily per bucket on first push.
// - Push spins if a bucket is temporarily full.

#include <stdatomic.h>
#include <stdbool.h>
Expand Down Expand Up @@ -81,25 +82,28 @@ static inline u8 wspq_key_bucket(u32 key) {
return (u8)bucket;
}

// Initialize all per-worker bucket queues.
// Ensure one owner-local bucket queue is allocated.
static inline bool wspq_bucket_ensure(Wspq *ws, u32 tid, u32 b) {
WsDeque *q = &ws->bank[tid].q[b];
if (q->buf != NULL) {
return true;
}
return wsq_init(q, WSPQ_CAP_POW2);
}

// Initialize per-worker bucket metadata; queues are allocated lazily.
static inline bool wspq_init(Wspq *ws, u32 nthreads) {
ws->n = nthreads;

for (u32 t = 0; t < nthreads; ++t) {
atomic_store_explicit(&ws->bank[t].nonempty.v, 0ull, memory_order_relaxed);
for (u32 b = 0; b < WSPQ_BRACKETS; ++b) {
if (!wsq_init(&ws->bank[t].q[b], WSPQ_CAP_POW2)) {
for (u32 t2 = 0; t2 <= t; ++t2) {
u32 bmax = WSPQ_BRACKETS;
if (t2 == t) {
bmax = b;
}
for (u32 b2 = 0; b2 < bmax; ++b2) {
wsq_free(&ws->bank[t2].q[b2]);
}
}
return false;
}
WsDeque *q = &ws->bank[t].q[b];
q->buf = NULL;
q->cap = 0;
q->mask = 0;
atomic_store_explicit(&q->top.v, 0, memory_order_relaxed);
atomic_store_explicit(&q->bot.v, 0, memory_order_relaxed);
}
}
return true;
Expand All @@ -118,6 +122,9 @@ static inline void wspq_free(Wspq *ws) {
static inline bool wspq_bucket_full_all(Wspq *ws, u8 b) {
for (u32 t = 0; t < ws->n; ++t) {
WsDeque *q = &ws->bank[t].q[b];
if (q->buf == NULL) {
return false;
}
size_t bot = atomic_load_explicit(&q->bot.v, memory_order_relaxed);
size_t top = atomic_load_explicit(&q->top.v, memory_order_relaxed);
if (bot - top < q->cap) {
Expand All @@ -133,6 +140,12 @@ static inline void wspq_push(Wspq *ws, u32 tid, u8 key, u64 task) {
}
u8 bucket = wspq_key_bucket(key);
WsDeque *q = &ws->bank[tid].q[bucket];
if (__builtin_expect(q->buf == NULL, 0)) {
if (!wspq_bucket_ensure(ws, tid, bucket)) {
fprintf(stderr, "error: wspq queue allocation failed (bucket %u)\n", bucket);
exit(1);
}
}
u32 spins = 1;
while (!wsq_push(q, task)) {
if ((spins % WSPQ_DEADLOCK_CHECK_PERIOD) == 0) {
Expand Down Expand Up @@ -233,4 +246,4 @@ static inline bool wspq_can_steal(Wspq *ws, u32 me) {
}
}
return false;
}
}
24 changes: 20 additions & 4 deletions clang/eval/normalize.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,21 @@ typedef struct {

static inline void eval_normalize_go(EvalNormalizeCtx *ctx, EvalNormalizeWorker *worker, u32 loc);

static inline bool eval_normalize_queue_ensure(EvalNormalizeWorker *worker) {
WsDeque *q = &worker->dq;
if (q->buf != NULL) {
return true;
}
return wsq_init(q, EVAL_NORMALIZE_WS_CAP_POW2);
}

static inline void eval_normalize_enqueue(EvalNormalizeCtx *ctx, EvalNormalizeWorker *worker, u32 loc) {
if (__builtin_expect(worker->dq.buf == NULL, 0)) {
if (!eval_normalize_queue_ensure(worker)) {
fprintf(stderr, "eval_normalize: queue allocation failed\n");
exit(1);
}
}
if (!wsq_push(&worker->dq, (u64)loc)) {
eval_normalize_go(ctx, worker, loc);
}
Expand Down Expand Up @@ -141,10 +155,12 @@ fn Term eval_normalize(Term term) {
ctx.n = n;
atomic_store_explicit(&ctx.pending.v, n, memory_order_relaxed);
for (u32 i = 0; i < n; i++) {
if (!wsq_init(&ctx.W[i].dq, EVAL_NORMALIZE_WS_CAP_POW2)) {
fprintf(stderr, "eval_normalize: queue allocation failed\n");
exit(1);
}
WsDeque *q = &ctx.W[i].dq;
q->buf = NULL;
q->cap = 0;
q->mask = 0;
atomic_store_explicit(&q->top.v, 0, memory_order_relaxed);
atomic_store_explicit(&q->bot.v, 0, memory_order_relaxed);
}
uset_init(&ctx.seen);

Expand Down