Skip to content

Commit b7277c9

Browse files
committed
fallback to blocking when waiting for block or message writes
1 parent 61013ca commit b7277c9

File tree

2 files changed

+148
-26
lines changed

2 files changed

+148
-26
lines changed

crossbeam-channel/src/flavors/array.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ impl<T> Channel<T> {
210210

211211
// The head was advanced but the stamp hasn't been updated yet,
212212
// meaning a receive is in-progress. Spin for a bit waiting for
213-
// the receive to complete before falling back to parking.
213+
// the receive to complete before falling back to blocking.
214214
if backoff.is_completed() {
215215
return Status::InProgress;
216216
}
@@ -308,7 +308,7 @@ impl<T> Channel<T> {
308308

309309
// The tail was advanced but the stamp hasn't been updated yet,
310310
// meaning a send is in-progress. Spin for a bit waiting for
311-
// the send to complete before falling back to parking.
311+
// the send to complete before falling back to blocking.
312312
if backoff.is_completed() {
313313
return Status::InProgress;
314314
}

crossbeam-channel/src/flavors/list.rs

Lines changed: 146 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::boxed::Box;
44
use std::cell::UnsafeCell;
55
use std::marker::PhantomData;
66
use std::mem::MaybeUninit;
7-
use std::ptr;
7+
use std::ptr::{self, NonNull};
88
use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
99
use std::time::Instant;
1010

@@ -56,10 +56,48 @@ impl<T> Slot<T> {
5656
};
5757

5858
/// Waits until a message is written into the slot.
59-
fn wait_write(&self) {
60-
let backoff = Backoff::new();
61-
while self.state.load(Ordering::Acquire) & WRITE == 0 {
62-
backoff.snooze();
59+
fn wait_write(&self, receivers: &SyncWaker, token: &mut Token) {
60+
let mut state = receivers.start();
61+
62+
loop {
63+
// Try reading the message several times.
64+
let backoff = Backoff::new();
65+
loop {
66+
if self.state.load(Ordering::Acquire) & WRITE != 0 {
67+
return;
68+
}
69+
70+
if backoff.is_completed() {
71+
break;
72+
} else {
73+
backoff.snooze();
74+
}
75+
}
76+
77+
// Prepare for blocking until a sender wakes us up.
78+
Context::with(|cx| {
79+
let oper = Operation::hook(token);
80+
// Register to be notified after any message is sent.
81+
receivers.watch(oper, cx, &state);
82+
83+
// Was the emssage just sent?
84+
if self.state.load(Ordering::Acquire) & WRITE != 0 {
85+
let _ = cx.try_select(Selected::Aborted);
86+
}
87+
88+
// Block the current thread.
89+
let sel = cx.wait_until(None);
90+
91+
match sel {
92+
Selected::Waiting => unreachable!(),
93+
Selected::Aborted | Selected::Disconnected => {
94+
receivers.unwatch(oper);
95+
}
96+
Selected::Operation(_) => {}
97+
}
98+
99+
state.unpark();
100+
});
63101
}
64102
}
65103
}
@@ -85,14 +123,47 @@ impl<T> Block<T> {
85123
}
86124

87125
/// Waits until the next pointer is set.
88-
fn wait_next(&self) -> *mut Self {
89-
let backoff = Backoff::new();
126+
fn wait_next(&self, receivers: &SyncWaker, token: &mut Token) -> *mut Self {
127+
let mut state = receivers.start();
90128
loop {
91-
let next = self.next.load(Ordering::Acquire);
92-
if !next.is_null() {
93-
return next;
129+
// Try reading the message several times.
130+
let backoff = Backoff::new();
131+
loop {
132+
if let Some(next) = NonNull::new(self.next.load(Ordering::Acquire)) {
133+
return next.as_ptr();
134+
}
135+
136+
if backoff.is_completed() {
137+
break;
138+
} else {
139+
backoff.snooze();
140+
}
94141
}
95-
backoff.snooze();
142+
143+
// Prepare for blocking until a sender wakes us up.
144+
Context::with(|cx| {
145+
let oper = Operation::hook(token);
146+
// Register to be notified after any message is sent.
147+
receivers.watch(oper, cx, &state);
148+
149+
// Was the next pointer just written?
150+
if !self.next.load(Ordering::Acquire).is_null() {
151+
let _ = cx.try_select(Selected::Aborted);
152+
}
153+
154+
// Block the current thread.
155+
let sel = cx.wait_until(None);
156+
157+
match sel {
158+
Selected::Waiting => unreachable!(),
159+
Selected::Aborted | Selected::Disconnected => {
160+
receivers.unwatch(oper);
161+
}
162+
Selected::Operation(_) => {}
163+
}
164+
165+
state.unpark();
166+
});
96167
}
97168
}
98169

@@ -208,7 +279,7 @@ impl<T> Channel<T> {
208279
}
209280

210281
/// Attempts to reserve a slot for sending a message.
211-
fn start_send(&self, token: &mut Token) -> bool {
282+
fn start_send(&self, token: &mut Token) -> Status {
212283
let backoff = Backoff::new();
213284
let mut tail = self.tail.index.load(Ordering::Acquire);
214285
let mut block = self.tail.block.load(Ordering::Acquire);
@@ -218,14 +289,19 @@ impl<T> Channel<T> {
218289
// Check if the channel is disconnected.
219290
if tail & MARK_BIT != 0 {
220291
token.list.block = ptr::null();
221-
return true;
292+
return Status::Ready;
222293
}
223294

224295
// Calculate the offset of the index into the block.
225296
let offset = (tail >> SHIFT) % LAP;
226297

227298
// If we reached the end of the block, wait until the next one is installed.
299+
// If we've been waiting for too long, fallback to blocking.
228300
if offset == BLOCK_CAP {
301+
if backoff.is_completed() {
302+
return Status::InProgress;
303+
}
304+
229305
backoff.snooze();
230306
tail = self.tail.index.load(Ordering::Acquire);
231307
block = self.tail.block.load(Ordering::Acquire);
@@ -279,7 +355,7 @@ impl<T> Channel<T> {
279355

280356
token.list.block = block as *const u8;
281357
token.list.offset = offset;
282-
return true;
358+
return Status::Ready;
283359
},
284360
Err(t) => {
285361
tail = t;
@@ -320,7 +396,7 @@ impl<T> Channel<T> {
320396
let offset = (head >> SHIFT) % LAP;
321397

322398
// We reached the end of the block but the block is not installed yet, meaning
323-
// the last send on previous block is still in progress. The send is likely to
399+
// the last send on the previous block is still in progress. The send is likely to
324400
// be soon so we spin here before falling back to blocking.
325401
if offset == BLOCK_CAP {
326402
if backoff.is_completed() {
@@ -382,7 +458,7 @@ impl<T> Channel<T> {
382458
Ok(_) => unsafe {
383459
// If we've reached the end of the block, move to the next one.
384460
if offset + 1 == BLOCK_CAP {
385-
let next = (*block).wait_next();
461+
let next = (*block).wait_next(&self.receivers, token);
386462
let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
387463
if !(*next).next.load(Ordering::Relaxed).is_null() {
388464
next_index |= MARK_BIT;
@@ -416,7 +492,7 @@ impl<T> Channel<T> {
416492
let block = token.list.block as *mut Block<T>;
417493
let offset = token.list.offset;
418494
let slot = unsafe { (*block).slots.get_unchecked(offset) };
419-
slot.wait_write();
495+
slot.wait_write(&self.receivers, token);
420496
let msg = unsafe { slot.msg.get().read().assume_init() };
421497

422498
// Destroy the block if we've reached the end, or if another thread wanted to destroy but
@@ -447,10 +523,55 @@ impl<T> Channel<T> {
447523
_deadline: Option<Instant>,
448524
) -> Result<(), SendTimeoutError<T>> {
449525
let token = &mut Token::default();
450-
assert!(self.start_send(token));
451-
unsafe {
452-
self.write(token, msg)
453-
.map_err(SendTimeoutError::Disconnected)
526+
527+
// It's possible that we can't proceed because of the sender that
528+
// is supposed to install the next block lagging, so we might have to
529+
// block for a message to be sent.
530+
let mut state = self.receivers.start();
531+
let mut succeeded = false;
532+
loop {
533+
// Try sending a message several times.
534+
let backoff = Backoff::new();
535+
loop {
536+
if succeeded || self.start_send(token) == Status::Ready {
537+
return unsafe {
538+
self.write(token, msg)
539+
.map_err(SendTimeoutError::Disconnected)
540+
};
541+
}
542+
543+
if backoff.is_completed() {
544+
break;
545+
} else {
546+
backoff.snooze();
547+
}
548+
}
549+
550+
// Prepare for blocking until a sender wakes us up.
551+
Context::with(|cx| {
552+
let oper = Operation::hook(token);
553+
// Register to be notified after any message is sent.
554+
self.receivers.watch(oper, cx, &state);
555+
556+
// Has the channel become ready just now?
557+
if self.start_send(token) == Status::Ready {
558+
succeeded = true;
559+
let _ = cx.try_select(Selected::Aborted);
560+
}
561+
562+
// Block the current thread.
563+
let sel = cx.wait_until(None);
564+
565+
match sel {
566+
Selected::Waiting => unreachable!(),
567+
Selected::Aborted | Selected::Disconnected => {
568+
self.receivers.unwatch(oper);
569+
}
570+
Selected::Operation(_) => {}
571+
}
572+
573+
state.unpark();
574+
});
454575
}
455576
}
456577

@@ -610,6 +731,7 @@ impl<T> Channel<T> {
610731
///
611732
/// This method should only be called when all receivers are dropped.
612733
fn discard_all_messages(&self) {
734+
let token = &mut Token::default();
613735
let backoff = Backoff::new();
614736
let mut tail = self.tail.index.load(Ordering::Acquire);
615737
loop {
@@ -651,10 +773,10 @@ impl<T> Channel<T> {
651773
if offset < BLOCK_CAP {
652774
// Drop the message in the slot.
653775
let slot = (*block).slots.get_unchecked(offset);
654-
slot.wait_write();
776+
slot.wait_write(&self.receivers, token);
655777
(*slot.msg.get()).assume_init_drop();
656778
} else {
657-
(*block).wait_next();
779+
(*block).wait_next(&self.receivers, token);
658780
// Deallocate the block and move to the next one.
659781
let next = (*block).next.load(Ordering::Acquire);
660782
drop(Box::from_raw(block));
@@ -796,7 +918,7 @@ impl<'a, T> SelectHandle for Sender<'a, T> {
796918
}
797919

798920
fn try_select(&self, token: &mut Token) -> bool {
799-
self.0.start_send(token)
921+
self.0.start_send(token) == Status::Ready
800922
}
801923

802924
fn deadline(&self) -> Option<Instant> {

0 commit comments

Comments
 (0)