From 2e842f192326e518e5d52872aa9088ab3de727b6 Mon Sep 17 00:00:00 2001 From: Daria Sukhonina Date: Thu, 12 Mar 2026 16:44:54 +0300 Subject: [PATCH] Refactor query latch Removed unnecessary mutex for cycle errors --- compiler/rustc_middle/src/query/job.rs | 59 ++++++++++++++------------ compiler/rustc_query_impl/src/job.rs | 16 ++++--- 2 files changed, 41 insertions(+), 34 deletions(-) diff --git a/compiler/rustc_middle/src/query/job.rs b/compiler/rustc_middle/src/query/job.rs index 8c78bf24287e0..8f85632a9428a 100644 --- a/compiler/rustc_middle/src/query/job.rs +++ b/compiler/rustc_middle/src/query/job.rs @@ -52,48 +52,52 @@ impl<'tcx> QueryJob<'tcx> { } #[derive(Debug)] -pub struct QueryWaiter<'tcx> { +pub struct QueryWaiter { pub parent: Option, - pub condvar: Condvar, + pub condvar: Arc, pub span: Span, - pub cycle: Mutex>>, } #[derive(Clone, Debug)] pub struct QueryLatch<'tcx> { /// The `Option` is `Some(..)` when the job is active, and `None` once completed. - pub waiters: Arc>>>>>, + pub inner: Arc>>>, +} + +#[derive(Debug)] +pub struct QueryLatchState<'tcx> { + pub waiters: Vec, + pub cycle: Option>, } impl<'tcx> QueryLatch<'tcx> { fn new() -> Self { - QueryLatch { waiters: Arc::new(Mutex::new(Some(Vec::new()))) } + QueryLatch { + inner: Arc::new(Mutex::new(Some(QueryLatchState { waiters: Vec::new(), cycle: None }))), + } } /// Awaits for the query job to complete. pub fn wait_on( &self, tcx: TyCtxt<'tcx>, - query: Option, + parent: Option, span: Span, ) -> Result<(), Cycle<'tcx>> { - let mut waiters_guard = self.waiters.lock(); - let Some(waiters) = &mut *waiters_guard else { + let mut state_lock = self.inner.lock(); + let Some(state) = &mut *state_lock else { return Ok(()); // already complete }; - let waiter = Arc::new(QueryWaiter { - parent: query, - span, - cycle: Mutex::new(None), - condvar: Condvar::new(), - }); + let condvar = Arc::new(Condvar::new()); + let waiter = QueryWaiter { parent, span, condvar: Arc::clone(&condvar) }; + state.waiters.reserve(state.waiters.len().saturating_sub(tcx.sess.threads())); // We push the waiter on to the `waiters` list. It can be accessed inside // the `wait` call below, by 1) the `set` method or 2) by deadlock detection. // Both of these will remove it from the `waiters` list before resuming // this thread. - waiters.push(Arc::clone(&waiter)); + state.waiters.push(waiter); // Awaits the caller on this latch by blocking the current thread. // If this detects a deadlock and the deadlock handler wants to resume this thread @@ -101,16 +105,15 @@ impl<'tcx> QueryLatch<'tcx> { // getting the self.info lock. rustc_thread_pool::mark_blocked(); tcx.jobserver_proxy.release_thread(); - waiter.condvar.wait(&mut waiters_guard); + condvar.wait(&mut state_lock); + let cycle = state_lock + .as_mut() + .map(|s| s.cycle.take().expect("resumed waiter for unfinished query without a cycle")); // Release the lock before we potentially block in `acquire_thread` - drop(waiters_guard); + drop(state_lock); tcx.jobserver_proxy.acquire_thread(); - // FIXME: Get rid of this lock. We have ownership of the QueryWaiter - // although another thread may still have a Arc reference so we cannot - // use Arc::get_mut - let mut cycle = waiter.cycle.lock(); - match cycle.take() { + match cycle { None => Ok(()), Some(cycle) => Err(cycle), } @@ -118,8 +121,8 @@ impl<'tcx> QueryLatch<'tcx> { /// Sets the latch and resumes all waiters on it fn set(&self) { - let mut waiters_guard = self.waiters.lock(); - let waiters = waiters_guard.take().unwrap(); // mark the latch as complete + let mut state_lock = self.inner.lock(); + let waiters = state_lock.take().unwrap().waiters; // mark the latch as complete let registry = rustc_thread_pool::Registry::current(); for waiter in waiters { rustc_thread_pool::mark_unblocked(®istry); @@ -129,10 +132,10 @@ impl<'tcx> QueryLatch<'tcx> { /// Removes a single waiter from the list of waiters. /// This is used to break query cycles. - pub fn extract_waiter(&self, waiter: usize) -> Arc> { - let mut waiters_guard = self.waiters.lock(); - let waiters = waiters_guard.as_mut().expect("non-empty waiters vec"); + pub fn extract_waiter(&self, waiter: usize) -> QueryWaiter { + let mut state_lock = self.inner.lock(); + let state = state_lock.as_mut().expect("non-empty waiters vec"); // Remove the waiter from the list of waiters - waiters.remove(waiter) + state.waiters.remove(waiter) } } diff --git a/compiler/rustc_query_impl/src/job.rs b/compiler/rustc_query_impl/src/job.rs index bf0493b29fd1e..77553c8aad4d2 100644 --- a/compiler/rustc_query_impl/src/job.rs +++ b/compiler/rustc_query_impl/src/job.rs @@ -1,6 +1,5 @@ use std::io::Write; use std::ops::ControlFlow; -use std::sync::Arc; use std::{iter, mem}; use rustc_data_structures::fx::{FxHashMap, FxHashSet}; @@ -140,7 +139,7 @@ fn abstracted_waiters_of(job_map: &QueryJobMap<'_>, query: QueryJobId) -> Vec(job_map: &QueryJobMap<'tcx>, stack: Vec<(Span, QueryJobId fn find_and_process_cycle<'tcx>( job_map: &QueryJobMap<'tcx>, query: QueryJobId, -) -> Option>> { +) -> Option { let mut visited = FxHashSet::default(); let mut stack = Vec::new(); if let ControlFlow::Break(resumable) = @@ -321,11 +320,16 @@ fn find_and_process_cycle<'tcx>( // edge which is resumable / waited using a query latch let (waitee_query, waiter_idx) = resumable.unwrap(); - // Extract the waiter we want to resume - let waiter = job_map.latch_of(waitee_query).unwrap().extract_waiter(waiter_idx); + let latch = job_map.latch_of(waitee_query).unwrap(); + let mut latch_state_lock = latch.inner.lock(); + let latch_state = latch_state_lock.as_mut().expect("non-empty waiters vec"); + + // Remove the waiter from the list of waiters we want to resume + let waiter = latch_state.waiters.remove(waiter_idx); // Set the cycle error so it will be picked up when resumed - *waiter.cycle.lock() = Some(error); + let old = latch_state.cycle.replace(error); + assert!(old.is_none(), "expected query cycle to break on a single waiter"); // Put the waiter on the list of things to resume Some(waiter)