Skip to content

crossbeam::channel::Receiver::try_recv can block forever if sending thread is blocked #997

@benhansen-io

Description

@benhansen-io

Using unbounded channels, try_recv can block indefinitely if the sender is stopped at a particular point. This behavior is also exhibited in the std mpsc and in the issue there (rust-lang/rust#112723) I was asked to verify the issue also exists in crossbeam (it does). I'll update the backtraces and code for crossbeam below.

Running the following code (using the crossbeam feature):

use std::{
    sync::Arc,
    time::{Duration, Instant},
};

use thread_priority::ThreadPriority;

#[cfg(not(feature = "crossbeam"))]
use std::sync::mpsc::channel;
#[cfg(not(feature = "crossbeam"))]
use std::sync::mpsc::TryRecvError;

#[cfg(feature = "crossbeam")]
use crossbeam::channel::unbounded as channel;
#[cfg(feature = "crossbeam")]
use crossbeam::channel::TryRecvError;

fn main() {
    const PINNED_CORE: usize = 2;

    if cfg!(feature = "crossbeam") {
        println!("Using crossbeam::unbounded channels");
    } else {
        println!("Using std::sync::mpsc channels");
    }

    let (sender, receiver) = channel::<usize>();

    std::thread::Builder::new()
        .name("sending".to_owned())
        .spawn(move || {
            thread_priority::set_current_thread_priority(ThreadPriority::Min).unwrap();
            core_affinity::set_for_current(core_affinity::CoreId { id: PINNED_CORE });

            loop {
                sender.send(42).unwrap();
            }
        })
        .unwrap();

    let num_received = Arc::new(std::sync::atomic::AtomicUsize::new(0));

    std::thread::Builder::new()
        .name("receiving".to_owned())
        .spawn({
            let num_received = num_received.clone();
            move || {
                thread_priority::set_current_thread_priority(ThreadPriority::Max).unwrap();
                core_affinity::set_for_current(core_affinity::CoreId { id: PINNED_CORE });

                loop {
                    let start = Instant::now();
                    let try_receive_result = receiver.try_recv();
                    let elapsed = start.elapsed();
                    if elapsed > Duration::from_secs(1) {
                        println!("try_recv blocked for {:.2} seconds", elapsed.as_secs_f32());
                    }
                    match try_receive_result {
                        Ok(_) => {
                            num_received.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
                        }
                        Err(TryRecvError::Empty) => {
                            std::thread::sleep(Duration::from_millis(200));
                        }
                        Err(TryRecvError::Disconnected) => unreachable!(),
                    }
                }
            }
        })
        .unwrap();

    loop {
        std::thread::sleep(Duration::from_millis(500));
        println!(
            "Receiving thread has received {}",
            num_received.load(std::sync::atomic::Ordering::SeqCst)
        )
    }
}

Based on the documentation of try_recv:

Attempts to receive a message from the channel without blocking.

This method will either receive a message from the channel immediately or return an error if the channel is empty.

I would not expect try_recv to block but I see output like the following:

...                                                                                                                      
Receiving thread has received 130090289                                                                                                                         
Receiving thread has received 130090289                                                                                                                         
Receiving thread has received 130090289                                                                                                                         
try_recv blocked for 30.02 seconds                                                                                                                              
Receiving thread has received 131925728                                                                                                                         
Receiving thread has received 134485936                                                                                                                         
Receiving thread has received 137052417                                                                                                                         
...

During a period of deadlock I get the following backtraces:

Sending thread:

#0  core::slice::index::{impl#2}::get_unchecked<crossbeam_channel::flavors::list::Slot<usize>> (self=0, slice=...)
    at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/core/src/slice/index.rs:230
#1  0x0000555555575c08 in core::slice::{impl#0}::get_unchecked<crossbeam_channel::flavors::list::Slot<usize>, usize> (self=..., index=2)
    at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/core/src/slice/mod.rs:405
#2  0x0000555555575a22 in crossbeam_channel::flavors::list::Channel<usize>::write<usize> (self=0x5555555ddc00, token=0x7ffff7c8c970, msg=42)
    at /home/ben/.cargo/registry/src/index.crates.io-6f17d22bba15001f/crossbeam-channel-0.5.8/src/flavors/list.rs:290
#3  0x00005555555758b8 in crossbeam_channel::flavors::list::Channel<usize>::send<usize> (self=0x5555555ddc00, msg=42, _deadline=...)
    at /home/ben/.cargo/registry/src/index.crates.io-6f17d22bba15001f/crossbeam-channel-0.5.8/src/flavors/list.rs:426
#4  0x00005555555614dd in crossbeam_channel::channel::Sender<usize>::send<usize> (self=0x7ffff7c8cb10, msg=42)
    at /home/ben/.cargo/registry/src/index.crates.io-6f17d22bba15001f/crossbeam-channel-0.5.8/src/channel.rs:438
#5  0x00005555555633fa in mpsc_deadlock_reproducer::main::{closure#0} () at src/main.rs:36
#6  0x000055555556fed3 in std::sys_common::backtrace::__rust_begin_short_backtrace<mpsc_deadlock_reproducer::main::{closure_env#0}, ()> (f=...)
    at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/sys_common/backtrace.rs:134
#7  0x000055555556c6c3 in std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure#0}<mpsc_deadlock_reproducer::main::{closure_env#0}, ()> ()
    at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/thread/mod.rs:526
#8  0x0000555555561a43 in core::panic::unwind_safe::{impl#23}::call_once<(), std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure_env#0}<mpsc_deadlock_reproducer::main::{closure_env#0}, ()>> (self=...) at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/core/src/panic/unwind_safe.rs:271
#9  0x0000555555563a56 in std::panicking::try::do_call<core::panic::unwind_safe::AssertUnwindSafe<std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure_env#0}<mpsc_deadlock_reproducer::main::{closure_env#0}, ()>>, ()> (data=0x7ffff7c8cc90)
    at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/panicking.rs:485
#10 0x0000555555563c7b in __rust_try ()
#11 0x000055555556388e in std::panicking::try<(), core::panic::unwind_safe::AssertUnwindSafe<std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure_env#0}<mpsc_deadlock_reproducer::main::{closure_env#0}, ()>>> (f=...) at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/panicking.rs:449
#12 0x000055555556ff13 in std::panic::catch_unwind<core::panic::unwind_safe::AssertUnwindSafe<std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure_env#0}<mpsc_deadlock_reproducer::main::{closure_env#0}, ()>>, ()> (f=...) at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/panic.rs:140
#13 0x000055555556c0fc in std::thread::{impl#0}::spawn_unchecked_::{closure#1}<mpsc_deadlock_reproducer::main::{closure_env#0}, ()> ()
    at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/thread/mod.rs:525
#14 0x000055555556cbbe in core::ops::function::FnOnce::call_once<std::thread::{impl#0}::spawn_unchecked_::{closure_env#1}<mpsc_deadlock_reproducer::main::{closure_env#0}, ()>, ()> () at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/core/src/ops/function.rs:250
#15 0x000055555559e8c5 in alloc::boxed::{impl#45}::call_once<(), dyn core::ops::function::FnOnce<(), Output=()>, alloc::alloc::Global> ()
    at library/alloc/src/boxed.rs:1973
#16 alloc::boxed::{impl#45}::call_once<(), alloc::boxed::Box<dyn core::ops::function::FnOnce<(), Output=()>, alloc::alloc::Global>, alloc::alloc::Global> ()
    at library/alloc/src/boxed.rs:1973
#17 std::sys::unix::thread::{impl#2}::new::thread_start () at library/std/src/sys/unix/thread.rs:108
#18 0x00007ffff7d1844b in ?? () from /usr/lib/libc.so.6
#19 0x00007ffff7d9be40 in ?? () from /usr/lib/libc.so.6

Receiving thread:

#0  0x00007ffff7d804fb in sched_yield () from /usr/lib/libc.so.6
#1  0x0000555555571c7f in crossbeam_utils::backoff::Backoff::snooze (self=0x7ffff7a8b728)
    at /home/ben/.cargo/registry/src/index.crates.io-6f17d22bba15001f/crossbeam-utils-0.8.16/src/backoff.rs:227
#2  0x0000555555573d31 in crossbeam_channel::flavors::list::Slot<usize>::wait_write<usize> (self=0x7ffff016fec0)
    at /home/ben/.cargo/registry/src/index.crates.io-6f17d22bba15001f/crossbeam-channel-0.5.8/src/flavors/list.rs:61
#3  0x000055555557568c in crossbeam_channel::flavors::list::Channel<usize>::read<usize> (self=0x5555555ddc00, token=0x7ffff7a8b828)
    at /home/ben/.cargo/registry/src/index.crates.io-6f17d22bba15001f/crossbeam-channel-0.5.8/src/flavors/list.rs:395
#4  0x0000555555575b85 in crossbeam_channel::flavors::list::Channel<usize>::try_recv<usize> (self=0x5555555ddc00)
    at /home/ben/.cargo/registry/src/index.crates.io-6f17d22bba15001f/crossbeam-channel-0.5.8/src/flavors/list.rs:436
#5  0x0000555555561679 in crossbeam_channel::channel::Receiver<usize>::try_recv<usize> (self=0x7ffff7a8bba0)
    at /home/ben/.cargo/registry/src/index.crates.io-6f17d22bba15001f/crossbeam-channel-0.5.8/src/channel.rs:762
#6  0x000055555556350c in mpsc_deadlock_reproducer::main::{closure#1} () at src/main.rs:53
#7  0x000055555556feb9 in std::sys_common::backtrace::__rust_begin_short_backtrace<mpsc_deadlock_reproducer::main::{closure_env#1}, ()> (f=...)
    at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/sys_common/backtrace.rs:134
#8  0x000055555556c6f5 in std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure#0}<mpsc_deadlock_reproducer::main::{closure_env#1}, ()> ()
    at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/thread/mod.rs:526
#9  0x0000555555561a28 in core::panic::unwind_safe::{impl#23}::call_once<(), std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure_env#0}<mpsc_deadlock_reproducer::main::{closure_env#1}, ()>> (self=...) at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/core/src/panic/unwind_safe.rs:271
#10 0x0000555555563a9e in std::panicking::try::do_call<core::panic::unwind_safe::AssertUnwindSafe<std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure_env#0}<mpsc_deadlock_reproducer::main::{closure_env#1}, ()>>, ()> (data=0x7ffff7a8bc70)
    at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/panicking.rs:485
#11 0x0000555555563c7b in __rust_try ()
#12 0x000055555556392e in std::panicking::try<(), core::panic::unwind_safe::AssertUnwindSafe<std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure_env#0}<mpsc_deadlock_reproducer::main::{closure_env#1}, ()>>> (f=...) at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/panicking.rs:449
#13 0x000055555556fee6 in std::panic::catch_unwind<core::panic::unwind_safe::AssertUnwindSafe<std::thread::{impl#0}::spawn_unchecked_::{closure#1}::{closure_env#0}<mpsc_deadlock_reproducer::main::{closure_env#1}, ()>>, ()> (f=...) at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/panic.rs:140
#14 0x000055555556c483 in std::thread::{impl#0}::spawn_unchecked_::{closure#1}<mpsc_deadlock_reproducer::main::{closure_env#1}, ()> ()
    at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/std/src/thread/mod.rs:525
#15 0x000055555556cb7e in core::ops::function::FnOnce::call_once<std::thread::{impl#0}::spawn_unchecked_::{closure_env#1}<mpsc_deadlock_reproducer::main::{closure_env#1}, ()>, ()> () at /rustc/90c541806f23a127002de5b4038be731ba1458ca/library/core/src/ops/function.rs:250
#16 0x000055555559e8c5 in alloc::boxed::{impl#45}::call_once<(), dyn core::ops::function::FnOnce<(), Output=()>, alloc::alloc::Global> ()
    at library/alloc/src/boxed.rs:1973
#17 alloc::boxed::{impl#45}::call_once<(), alloc::boxed::Box<dyn core::ops::function::FnOnce<(), Output=()>, alloc::alloc::Global>, alloc::alloc::Global> ()
    at library/alloc/src/boxed.rs:1973
#18 std::sys::unix::thread::{impl#2}::new::thread_start () at library/std/src/sys/unix/thread.rs:108
#19 0x00007ffff7d1844b in ?? () from /usr/lib/libc.so.6
#20 0x00007ffff7d9be40 in ?? () from /usr/lib/libc.so.6

try_recv calling read which calls wait_write thus causing try_recv to wait on the sender seems fundamentally wrong.

The reproducer code above was run on Linux. Full crate code is available at https://github.com/benhansen-io/mpsc_deadlock_reproducer

The issue was originally discovered on a real-time OS where the receiver has a higher priority than the sender (which makes sense for that application). The reproducer code was run on Linux. On Linux, eventually the sending thread will get a time slice so the deadlock isn't forever but on the real-time OS the blocking happens forever.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions