-
Notifications
You must be signed in to change notification settings - Fork 517
Description
Using unbounded channels, try_recv can block indefinitely if the sender is stopped at a particular point. This behavior is also exhibited in the std mpsc and in the issue there (rust-lang/rust#112723) I was asked to verify the issue also exists in crossbeam (it does). I'll update the backtraces and code for crossbeam below.
Running the following code (using the crossbeam feature):
use std::{
sync::Arc,
time::{Duration, Instant},
};
use thread_priority::ThreadPriority;
#[cfg(not(feature = "crossbeam"))]
use std::sync::mpsc::channel;
#[cfg(not(feature = "crossbeam"))]
use std::sync::mpsc::TryRecvError;
#[cfg(feature = "crossbeam")]
use crossbeam::channel::unbounded as channel;
#[cfg(feature = "crossbeam")]
use crossbeam::channel::TryRecvError;
fn main() {
const PINNED_CORE: usize = 2;
if cfg!(feature = "crossbeam") {
println!("Using crossbeam::unbounded channels");
} else {
println!("Using std::sync::mpsc channels");
}
let (sender, receiver) = channel::<usize>();
std::thread::Builder::new()
.name("sending".to_owned())
.spawn(move || {
thread_priority::set_current_thread_priority(ThreadPriority::Min).unwrap();
core_affinity::set_for_current(core_affinity::CoreId { id: PINNED_CORE });
loop {
sender.send(42).unwrap();
}
})
.unwrap();
let num_received = Arc::new(std::sync::atomic::AtomicUsize::new(0));
std::thread::Builder::new()
.name("receiving".to_owned())
.spawn({
let num_received = num_received.clone();
move || {
thread_priority::set_current_thread_priority(ThreadPriority::Max).unwrap();
core_affinity::set_for_current(core_affinity::CoreId { id: PINNED_CORE });
loop {
let start = Instant::now();
let try_receive_result = receiver.try_recv();
let elapsed = start.elapsed();
if elapsed > Duration::from_secs(1) {
println!("try_recv blocked for {:.2} seconds", elapsed.as_secs_f32());
}
match try_receive_result {
Ok(_) => {
num_received.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
Err(TryRecvError::Empty) => {
std::thread::sleep(Duration::from_millis(200));
}
Err(TryRecvError::Disconnected) => unreachable!(),
}
}
}
})
.unwrap();
loop {
std::thread::sleep(Duration::from_millis(500));
println!(
"Receiving thread has received {}",
num_received.load(std::sync::atomic::Ordering::SeqCst)
)
}
}Based on the documentation of try_recv:
Attempts to receive a message from the channel without blocking.
This method will either receive a message from the channel immediately or return an error if the channel is empty.
I would not expect try_recv to block but I see output like the following:
...
Receiving thread has received 130090289
Receiving thread has received 130090289
Receiving thread has received 130090289
try_recv blocked for 30.02 seconds
Receiving thread has received 131925728
Receiving thread has received 134485936
Receiving thread has received 137052417
...
During a period of deadlock I get the following backtraces:
Sending thread:
#0 core::slice::index::{impl#2}::get_unchecked<crossbeam_channel::flavors::list::Slot<usize>> (self=0, slice=...)
at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/core/src/slice/index.rs:230
#1 0x0000555555575c08 in core::slice::{impl#0}::get_unchecked<crossbeam_channel::flavors::list::Slot<usize>, usize> (self=..., index=2)
at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/core/src/slice/mod.rs:405
#2 0x0000555555575a22 in crossbeam_channel::flavors::list::Channel<usize>::write<usize> (self=0x5555555ddc00, token=0x7ffff7c8c970, msg=42)
at /home/ben/.cargo/registry/src/index.crates.io-6f17d22bba15001f/crossbeam-channel-0.5.8/src/flavors/list.rs:290
#3 0x00005555555758b8 in crossbeam_channel::flavors::list::Channel<usize>::send<usize> (self=0x5555555ddc00, msg=42, _deadline=...)
at /home/ben/.cargo/registry/src/index.crates.io-6f17d22bba15001f/crossbeam-channel-0.5.8/src/flavors/list.rs:426
#4 0x00005555555614dd in crossbeam_channel::channel::Sender<usize>::send<usize> (self=0x7ffff7c8cb10, msg=42)
at /home/ben/.cargo/registry/src/index.crates.io-6f17d22bba15001f/crossbeam-channel-0.5.8/src/channel.rs:438
#5 0x00005555555633fa in mpsc_deadlock_reproducer::main::{closure#0} () at src/main.rs:36
#6 0x000055555556fed3 in std::sys_common::backtrace::__rust_begin_short_backtrace<mpsc_deadlock_reproducer::main::{closure_env#0}, ()> (f=...)
at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/sys_common/backtrace.rs:134
#7 0x000055555556c6c3 in std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure#0}<mpsc_deadlock_reproducer::main::{closure_env#0}, ()> ()
at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/thread/mod.rs:526
#8 0x0000555555561a43 in core::panic::unwind_safe::{impl#23}::call_once<(), std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure_env#0}<mpsc_deadlock_reproducer::main::{closure_env#0}, ()>> (self=...) at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/core/src/panic/unwind_safe.rs:271
#9 0x0000555555563a56 in std::panicking::try::do_call<core::panic::unwind_safe::AssertUnwindSafe<std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure_env#0}<mpsc_deadlock_reproducer::main::{closure_env#0}, ()>>, ()> (data=0x7ffff7c8cc90)
at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/panicking.rs:485
#10 0x0000555555563c7b in __rust_try ()
#11 0x000055555556388e in std::panicking::try<(), core::panic::unwind_safe::AssertUnwindSafe<std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure_env#0}<mpsc_deadlock_reproducer::main::{closure_env#0}, ()>>> (f=...) at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/panicking.rs:449
#12 0x000055555556ff13 in std::panic::catch_unwind<core::panic::unwind_safe::AssertUnwindSafe<std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure_env#0}<mpsc_deadlock_reproducer::main::{closure_env#0}, ()>>, ()> (f=...) at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/panic.rs:140
#13 0x000055555556c0fc in std::thread::{impl#0}::spawn_unchecked_::{closure#1}<mpsc_deadlock_reproducer::main::{closure_env#0}, ()> ()
at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/thread/mod.rs:525
#14 0x000055555556cbbe in core::ops::function::FnOnce::call_once<std::thread::{impl#0}::spawn_unchecked_::{closure_env#1}<mpsc_deadlock_reproducer::main::{closure_env#0}, ()>, ()> () at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/core/src/ops/function.rs:250
#15 0x000055555559e8c5 in alloc::boxed::{impl#45}::call_once<(), dyn core::ops::function::FnOnce<(), Output=()>, alloc::alloc::Global> ()
at library/alloc/src/boxed.rs:1973
#16 alloc::boxed::{impl#45}::call_once<(), alloc::boxed::Box<dyn core::ops::function::FnOnce<(), Output=()>, alloc::alloc::Global>, alloc::alloc::Global> ()
at library/alloc/src/boxed.rs:1973
#17 std::sys::unix::thread::{impl#2}::new::thread_start () at library/std/src/sys/unix/thread.rs:108
#18 0x00007ffff7d1844b in ?? () from /usr/lib/libc.so.6
#19 0x00007ffff7d9be40 in ?? () from /usr/lib/libc.so.6
Receiving thread:
#0 0x00007ffff7d804fb in sched_yield () from /usr/lib/libc.so.6
#1 0x0000555555571c7f in crossbeam_utils::backoff::Backoff::snooze (self=0x7ffff7a8b728)
at /home/ben/.cargo/registry/src/index.crates.io-6f17d22bba15001f/crossbeam-utils-0.8.16/src/backoff.rs:227
#2 0x0000555555573d31 in crossbeam_channel::flavors::list::Slot<usize>::wait_write<usize> (self=0x7ffff016fec0)
at /home/ben/.cargo/registry/src/index.crates.io-6f17d22bba15001f/crossbeam-channel-0.5.8/src/flavors/list.rs:61
#3 0x000055555557568c in crossbeam_channel::flavors::list::Channel<usize>::read<usize> (self=0x5555555ddc00, token=0x7ffff7a8b828)
at /home/ben/.cargo/registry/src/index.crates.io-6f17d22bba15001f/crossbeam-channel-0.5.8/src/flavors/list.rs:395
#4 0x0000555555575b85 in crossbeam_channel::flavors::list::Channel<usize>::try_recv<usize> (self=0x5555555ddc00)
at /home/ben/.cargo/registry/src/index.crates.io-6f17d22bba15001f/crossbeam-channel-0.5.8/src/flavors/list.rs:436
#5 0x0000555555561679 in crossbeam_channel::channel::Receiver<usize>::try_recv<usize> (self=0x7ffff7a8bba0)
at /home/ben/.cargo/registry/src/index.crates.io-6f17d22bba15001f/crossbeam-channel-0.5.8/src/channel.rs:762
#6 0x000055555556350c in mpsc_deadlock_reproducer::main::{closure#1} () at src/main.rs:53
#7 0x000055555556feb9 in std::sys_common::backtrace::__rust_begin_short_backtrace<mpsc_deadlock_reproducer::main::{closure_env#1}, ()> (f=...)
at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/sys_common/backtrace.rs:134
#8 0x000055555556c6f5 in std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure#0}<mpsc_deadlock_reproducer::main::{closure_env#1}, ()> ()
at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/thread/mod.rs:526
#9 0x0000555555561a28 in core::panic::unwind_safe::{impl#23}::call_once<(), std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure_env#0}<mpsc_deadlock_reproducer::main::{closure_env#1}, ()>> (self=...) at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/core/src/panic/unwind_safe.rs:271
#10 0x0000555555563a9e in std::panicking::try::do_call<core::panic::unwind_safe::AssertUnwindSafe<std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure_env#0}<mpsc_deadlock_reproducer::main::{closure_env#1}, ()>>, ()> (data=0x7ffff7a8bc70)
at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/panicking.rs:485
#11 0x0000555555563c7b in __rust_try ()
#12 0x000055555556392e in std::panicking::try<(), core::panic::unwind_safe::AssertUnwindSafe<std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure_env#0}<mpsc_deadlock_reproducer::main::{closure_env#1}, ()>>> (f=...) at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/panicking.rs:449
#13 0x000055555556fee6 in std::panic::catch_unwind<core::panic::unwind_safe::AssertUnwindSafe<std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure_env#0}<mpsc_deadlock_reproducer::main::{closure_env#1}, ()>>, ()> (f=...) at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/panic.rs:140
#14 0x000055555556c483 in std::thread::{impl#0}::spawn_unchecked_::{closure#1}<mpsc_deadlock_reproducer::main::{closure_env#1}, ()> ()
at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/thread/mod.rs:525
#15 0x000055555556cb7e in core::ops::function::FnOnce::call_once<std::thread::{impl#0}::spawn_unchecked_::{closure_env#1}<mpsc_deadlock_reproducer::main::{closure_env#1}, ()>, ()> () at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/core/src/ops/function.rs:250
#16 0x000055555559e8c5 in alloc::boxed::{impl#45}::call_once<(), dyn core::ops::function::FnOnce<(), Output=()>, alloc::alloc::Global> ()
at library/alloc/src/boxed.rs:1973
#17 alloc::boxed::{impl#45}::call_once<(), alloc::boxed::Box<dyn core::ops::function::FnOnce<(), Output=()>, alloc::alloc::Global>, alloc::alloc::Global> ()
at library/alloc/src/boxed.rs:1973
#18 std::sys::unix::thread::{impl#2}::new::thread_start () at library/std/src/sys/unix/thread.rs:108
#19 0x00007ffff7d1844b in ?? () from /usr/lib/libc.so.6
#20 0x00007ffff7d9be40 in ?? () from /usr/lib/libc.so.6
try_recv calling read which calls wait_write thus causing try_recv to wait on the sender seems fundamentally wrong.
The reproducer code above was run on Linux. Full crate code is available at https://github.com/benhansen-io/mpsc_deadlock_reproducer
The issue was originally discovered on a real-time OS where the receiver has a higher priority than the sender (which makes sense for that application). The reproducer code was run on Linux. On Linux, eventually the sending thread will get a time slice so the deadlock isn't forever but on the real-time OS the blocking happens forever.