diff --git a/clang/data/wspq.c b/clang/data/wspq.c index c68b2654..8432ab1b 100644 --- a/clang/data/wspq.c +++ b/clang/data/wspq.c @@ -14,7 +14,8 @@ // Notes // - Tasks are u64 values; key is an 8-bit hint. // - Not a general multi-producer queue: each worker pushes to its own bank. -// - wsq buffers are fixed size; push spins if a bucket is temporarily full. +// - wsq buffers are fixed size and allocated lazily per bucket on first push. +// - Push spins if a bucket is temporarily full. #include #include @@ -81,25 +82,28 @@ static inline u8 wspq_key_bucket(u32 key) { return (u8)bucket; } -// Initialize all per-worker bucket queues. +// Ensure one owner-local bucket queue is allocated. +static inline bool wspq_bucket_ensure(Wspq *ws, u32 tid, u32 b) { + WsDeque *q = &ws->bank[tid].q[b]; + if (q->buf != NULL) { + return true; + } + return wsq_init(q, WSPQ_CAP_POW2); +} + +// Initialize per-worker bucket metadata; queues are allocated lazily. static inline bool wspq_init(Wspq *ws, u32 nthreads) { ws->n = nthreads; for (u32 t = 0; t < nthreads; ++t) { atomic_store_explicit(&ws->bank[t].nonempty.v, 0ull, memory_order_relaxed); for (u32 b = 0; b < WSPQ_BRACKETS; ++b) { - if (!wsq_init(&ws->bank[t].q[b], WSPQ_CAP_POW2)) { - for (u32 t2 = 0; t2 <= t; ++t2) { - u32 bmax = WSPQ_BRACKETS; - if (t2 == t) { - bmax = b; - } - for (u32 b2 = 0; b2 < bmax; ++b2) { - wsq_free(&ws->bank[t2].q[b2]); - } - } - return false; - } + WsDeque *q = &ws->bank[t].q[b]; + q->buf = NULL; + q->cap = 0; + q->mask = 0; + atomic_store_explicit(&q->top.v, 0, memory_order_relaxed); + atomic_store_explicit(&q->bot.v, 0, memory_order_relaxed); } } return true; @@ -118,6 +122,9 @@ static inline void wspq_free(Wspq *ws) { static inline bool wspq_bucket_full_all(Wspq *ws, u8 b) { for (u32 t = 0; t < ws->n; ++t) { WsDeque *q = &ws->bank[t].q[b]; + if (q->buf == NULL) { + return false; + } size_t bot = atomic_load_explicit(&q->bot.v, memory_order_relaxed); size_t top = atomic_load_explicit(&q->top.v, memory_order_relaxed); if (bot - top < q->cap) { @@ -133,6 +140,12 @@ static inline void wspq_push(Wspq *ws, u32 tid, u8 key, u64 task) { } u8 bucket = wspq_key_bucket(key); WsDeque *q = &ws->bank[tid].q[bucket]; + if (__builtin_expect(q->buf == NULL, 0)) { + if (!wspq_bucket_ensure(ws, tid, bucket)) { + fprintf(stderr, "error: wspq queue allocation failed (bucket %u)\n", bucket); + exit(1); + } + } u32 spins = 1; while (!wsq_push(q, task)) { if ((spins % WSPQ_DEADLOCK_CHECK_PERIOD) == 0) { @@ -233,4 +246,4 @@ static inline bool wspq_can_steal(Wspq *ws, u32 me) { } } return false; -} \ No newline at end of file +} diff --git a/clang/eval/normalize.c b/clang/eval/normalize.c index 1c694ecf..e92dbd14 100644 --- a/clang/eval/normalize.c +++ b/clang/eval/normalize.c @@ -27,7 +27,21 @@ typedef struct { static inline void eval_normalize_go(EvalNormalizeCtx *ctx, EvalNormalizeWorker *worker, u32 loc); +static inline bool eval_normalize_queue_ensure(EvalNormalizeWorker *worker) { + WsDeque *q = &worker->dq; + if (q->buf != NULL) { + return true; + } + return wsq_init(q, EVAL_NORMALIZE_WS_CAP_POW2); +} + static inline void eval_normalize_enqueue(EvalNormalizeCtx *ctx, EvalNormalizeWorker *worker, u32 loc) { + if (__builtin_expect(worker->dq.buf == NULL, 0)) { + if (!eval_normalize_queue_ensure(worker)) { + fprintf(stderr, "eval_normalize: queue allocation failed\n"); + exit(1); + } + } if (!wsq_push(&worker->dq, (u64)loc)) { eval_normalize_go(ctx, worker, loc); } @@ -141,10 +155,12 @@ fn Term eval_normalize(Term term) { ctx.n = n; atomic_store_explicit(&ctx.pending.v, n, memory_order_relaxed); for (u32 i = 0; i < n; i++) { - if (!wsq_init(&ctx.W[i].dq, EVAL_NORMALIZE_WS_CAP_POW2)) { - fprintf(stderr, "eval_normalize: queue allocation failed\n"); - exit(1); - } + WsDeque *q = &ctx.W[i].dq; + q->buf = NULL; + q->cap = 0; + q->mask = 0; + atomic_store_explicit(&q->top.v, 0, memory_order_relaxed); + atomic_store_explicit(&q->bot.v, 0, memory_order_relaxed); } uset_init(&ctx.seen);