Skip to content

Commit d1602c0

Browse files
committed
use blocking state in select macro
1 parent 86debba commit d1602c0

File tree

12 files changed

+281
-64
lines changed

12 files changed

+281
-64
lines changed

crossbeam-channel/src/channel.rs

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use crate::err::{
1414
};
1515
use crate::flavors;
1616
use crate::select::{Operation, SelectHandle, Token};
17+
use crate::waker::BlockingState;
1718

1819
/// Creates a multi-producer multi-consumer channel of unbounded capacity.
1920
///
@@ -1358,6 +1359,14 @@ impl<T> fmt::Debug for IntoIter<T> {
13581359
}
13591360

13601361
impl<T> SelectHandle for Sender<T> {
1362+
fn start(&self) -> Option<BlockingState<'_>> {
1363+
match &self.flavor {
1364+
SenderFlavor::Array(chan) => chan.sender().start_ref(),
1365+
SenderFlavor::List(chan) => chan.sender().start_ref(),
1366+
SenderFlavor::Zero(chan) => chan.start(),
1367+
}
1368+
}
1369+
13611370
fn try_select(&self, token: &mut Token) -> bool {
13621371
match &self.flavor {
13631372
SenderFlavor::Array(chan) => chan.sender().try_select(token),
@@ -1370,11 +1379,11 @@ impl<T> SelectHandle for Sender<T> {
13701379
None
13711380
}
13721381

1373-
fn register(&self, oper: Operation, cx: &Context) -> bool {
1382+
fn register(&self, oper: Operation, cx: &Context, state: Option<&BlockingState<'_>>) -> bool {
13741383
match &self.flavor {
1375-
SenderFlavor::Array(chan) => chan.sender().register(oper, cx),
1376-
SenderFlavor::List(chan) => chan.sender().register(oper, cx),
1377-
SenderFlavor::Zero(chan) => chan.sender().register(oper, cx),
1384+
SenderFlavor::Array(chan) => chan.sender().register(oper, cx, state),
1385+
SenderFlavor::List(chan) => chan.sender().register(oper, cx, state),
1386+
SenderFlavor::Zero(chan) => chan.sender().register(oper, cx, state),
13781387
}
13791388
}
13801389

@@ -1420,6 +1429,17 @@ impl<T> SelectHandle for Sender<T> {
14201429
}
14211430

14221431
impl<T> SelectHandle for Receiver<T> {
1432+
fn start(&self) -> Option<BlockingState<'_>> {
1433+
match &self.flavor {
1434+
ReceiverFlavor::Array(chan) => chan.receiver().start_ref(),
1435+
ReceiverFlavor::List(chan) => chan.receiver().start_ref(),
1436+
ReceiverFlavor::Zero(chan) => chan.start(),
1437+
ReceiverFlavor::At(chan) => chan.start(),
1438+
ReceiverFlavor::Tick(chan) => chan.start(),
1439+
ReceiverFlavor::Never(chan) => chan.start(),
1440+
}
1441+
}
1442+
14231443
fn try_select(&self, token: &mut Token) -> bool {
14241444
match &self.flavor {
14251445
ReceiverFlavor::Array(chan) => chan.receiver().try_select(token),
@@ -1442,14 +1462,14 @@ impl<T> SelectHandle for Receiver<T> {
14421462
}
14431463
}
14441464

1445-
fn register(&self, oper: Operation, cx: &Context) -> bool {
1465+
fn register(&self, oper: Operation, cx: &Context, state: Option<&BlockingState<'_>>) -> bool {
14461466
match &self.flavor {
1447-
ReceiverFlavor::Array(chan) => chan.receiver().register(oper, cx),
1448-
ReceiverFlavor::List(chan) => chan.receiver().register(oper, cx),
1449-
ReceiverFlavor::Zero(chan) => chan.receiver().register(oper, cx),
1450-
ReceiverFlavor::At(chan) => chan.register(oper, cx),
1451-
ReceiverFlavor::Tick(chan) => chan.register(oper, cx),
1452-
ReceiverFlavor::Never(chan) => chan.register(oper, cx),
1467+
ReceiverFlavor::Array(chan) => chan.receiver().register(oper, cx, state),
1468+
ReceiverFlavor::List(chan) => chan.receiver().register(oper, cx, state),
1469+
ReceiverFlavor::Zero(chan) => chan.receiver().register(oper, cx, state),
1470+
ReceiverFlavor::At(chan) => chan.register(oper, cx, state),
1471+
ReceiverFlavor::Tick(chan) => chan.register(oper, cx, state),
1472+
ReceiverFlavor::Never(chan) => chan.register(oper, cx, state),
14531473
}
14541474
}
14551475

crossbeam-channel/src/flavors/array.rs

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crossbeam_utils::{Backoff, CachePadded};
2020
use crate::context::Context;
2121
use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
2222
use crate::select::{Operation, SelectHandle, Selected, Token};
23-
use crate::waker::SyncWaker;
23+
use crate::waker::{BlockingState, SyncWaker};
2424

2525
/// A slot in a channel.
2626
struct Slot<T> {
@@ -401,7 +401,7 @@ impl<T> Channel<T> {
401401
Context::with(|cx| {
402402
// Prepare for blocking until a receiver wakes us up.
403403
let oper = Operation::hook(token);
404-
self.senders.register2(oper, cx, &state);
404+
self.senders.register(oper, cx, &state);
405405

406406
// Has the channel become ready just now?
407407
if !self.is_full() || self.is_disconnected() {
@@ -478,7 +478,7 @@ impl<T> Channel<T> {
478478
Context::with(|cx| {
479479
// Prepare for blocking until a sender wakes us up.
480480
let oper = Operation::hook(token);
481-
self.receivers.register2(oper, cx, &mut state);
481+
self.receivers.register(oper, cx, &state);
482482

483483
// Has the channel become ready just now?
484484
if !self.is_empty() || self.is_disconnected() {
@@ -629,7 +629,18 @@ pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
629629
/// Sender handle to a channel.
630630
pub(crate) struct Sender<'a, T>(&'a Channel<T>);
631631

632+
impl<'a, T> Receiver<'a, T> {
633+
/// Same as `SelectHandle::start`, but with a more specific lifetime.
634+
pub(crate) fn start_ref(&self) -> Option<BlockingState<'a>> {
635+
Some(self.0.receivers.start())
636+
}
637+
}
638+
632639
impl<T> SelectHandle for Receiver<'_, T> {
640+
fn start(&self) -> Option<BlockingState<'_>> {
641+
self.start_ref()
642+
}
643+
633644
fn try_select(&self, token: &mut Token) -> bool {
634645
self.0.start_recv(token) == Status::Ready
635646
}
@@ -638,8 +649,9 @@ impl<T> SelectHandle for Receiver<'_, T> {
638649
None
639650
}
640651

641-
fn register(&self, oper: Operation, cx: &Context) -> bool {
642-
self.0.receivers.register(oper, cx);
652+
fn register(&self, oper: Operation, cx: &Context, state: Option<&BlockingState<'_>>) -> bool {
653+
let state = state.expect("Receiver::start returns blocking state");
654+
self.0.receivers.register(oper, cx, state);
643655
self.is_ready()
644656
}
645657

@@ -665,7 +677,18 @@ impl<T> SelectHandle for Receiver<'_, T> {
665677
}
666678
}
667679

680+
impl<'a, T> Sender<'a, T> {
681+
/// Same as `SelectHandle::start`, but with a more specific lifetime.
682+
pub(crate) fn start_ref(&self) -> Option<BlockingState<'a>> {
683+
Some(self.0.senders.start())
684+
}
685+
}
686+
668687
impl<T> SelectHandle for Sender<'_, T> {
688+
fn start(&self) -> Option<BlockingState<'_>> {
689+
self.start_ref()
690+
}
691+
669692
fn try_select(&self, token: &mut Token) -> bool {
670693
self.0.start_send(token) == Status::Ready
671694
}
@@ -674,8 +697,9 @@ impl<T> SelectHandle for Sender<'_, T> {
674697
None
675698
}
676699

677-
fn register(&self, oper: Operation, cx: &Context) -> bool {
678-
self.0.senders.register(oper, cx);
700+
fn register(&self, oper: Operation, cx: &Context, state: Option<&BlockingState<'_>>) -> bool {
701+
let state = state.expect("Sender::start returns blocking state");
702+
self.0.senders.register(oper, cx, state);
679703
self.is_ready()
680704
}
681705

crossbeam-channel/src/flavors/at.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::context::Context;
1010
use crate::err::{RecvTimeoutError, TryRecvError};
1111
use crate::select::{Operation, SelectHandle, Token};
1212
use crate::utils;
13+
use crate::waker::BlockingState;
1314

1415
/// Result of a receive operation.
1516
pub(crate) type AtToken = Option<Instant>;
@@ -140,6 +141,10 @@ impl Channel {
140141
}
141142

142143
impl SelectHandle for Channel {
144+
fn start(&self) -> Option<BlockingState<'_>> {
145+
None
146+
}
147+
143148
#[inline]
144149
fn try_select(&self, token: &mut Token) -> bool {
145150
match self.try_recv() {
@@ -166,7 +171,12 @@ impl SelectHandle for Channel {
166171
}
167172

168173
#[inline]
169-
fn register(&self, _oper: Operation, _cx: &Context) -> bool {
174+
fn register(
175+
&self,
176+
_oper: Operation,
177+
_cx: &Context,
178+
_state: Option<&BlockingState<'_>>,
179+
) -> bool {
170180
self.is_ready()
171181
}
172182

crossbeam-channel/src/flavors/list.rs

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crossbeam_utils::{Backoff, CachePadded};
1313
use crate::context::Context;
1414
use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
1515
use crate::select::{Operation, SelectHandle, Selected, Token};
16-
use crate::waker::SyncWaker;
16+
use crate::waker::{BlockingState, SyncWaker};
1717

1818
// TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, re-apply the
1919
// following changes by @kleimkuhler:
@@ -510,7 +510,7 @@ impl<T> Channel<T> {
510510
// Prepare for blocking until a sender wakes us up.
511511
Context::with(|cx| {
512512
let oper = Operation::hook(token);
513-
self.receivers.register2(oper, cx, &state);
513+
self.receivers.register(oper, cx, &state);
514514

515515
// Has the channel become ready just now?
516516
if !self.is_empty() || self.is_disconnected() {
@@ -734,7 +734,18 @@ pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
734734
/// Sender handle to a channel.
735735
pub(crate) struct Sender<'a, T>(&'a Channel<T>);
736736

737+
impl<'a, T> Receiver<'a, T> {
738+
/// Same as `SelectHandle::start`, but with a more specific lifetime.
739+
pub(crate) fn start_ref(&self) -> Option<BlockingState<'a>> {
740+
Some(self.0.receivers.start())
741+
}
742+
}
743+
737744
impl<T> SelectHandle for Receiver<'_, T> {
745+
fn start(&self) -> Option<BlockingState<'_>> {
746+
self.start_ref()
747+
}
748+
738749
fn try_select(&self, token: &mut Token) -> bool {
739750
self.0.start_recv(token) == Status::Ready
740751
}
@@ -743,8 +754,9 @@ impl<T> SelectHandle for Receiver<'_, T> {
743754
None
744755
}
745756

746-
fn register(&self, oper: Operation, cx: &Context) -> bool {
747-
self.0.receivers.register(oper, cx);
757+
fn register(&self, oper: Operation, cx: &Context, state: Option<&BlockingState<'_>>) -> bool {
758+
let state = state.expect("Receiver::start returns blocking stat");
759+
self.0.receivers.register(oper, cx, state);
748760
self.is_ready()
749761
}
750762

@@ -770,7 +782,18 @@ impl<T> SelectHandle for Receiver<'_, T> {
770782
}
771783
}
772784

773-
impl<T> SelectHandle for Sender<'_, T> {
785+
impl<'a, T> Sender<'a, T> {
786+
/// Same as `SelectHandle::start`, but with a more specific lifetime.
787+
pub(crate) fn start_ref(&self) -> Option<BlockingState<'a>> {
788+
None
789+
}
790+
}
791+
792+
impl<'a, T> SelectHandle for Sender<'a, T> {
793+
fn start(&self) -> Option<BlockingState<'a>> {
794+
None
795+
}
796+
774797
fn try_select(&self, token: &mut Token) -> bool {
775798
self.0.start_send(token)
776799
}
@@ -779,7 +802,12 @@ impl<T> SelectHandle for Sender<'_, T> {
779802
None
780803
}
781804

782-
fn register(&self, _oper: Operation, _cx: &Context) -> bool {
805+
fn register(
806+
&self,
807+
_oper: Operation,
808+
_cx: &Context,
809+
_state: Option<&BlockingState<'_>>,
810+
) -> bool {
783811
self.is_ready()
784812
}
785813

crossbeam-channel/src/flavors/never.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::context::Context;
99
use crate::err::{RecvTimeoutError, TryRecvError};
1010
use crate::select::{Operation, SelectHandle, Token};
1111
use crate::utils;
12+
use crate::waker::BlockingState;
1213

1314
/// This flavor doesn't need a token.
1415
pub(crate) type NeverToken = ();
@@ -72,6 +73,10 @@ impl<T> Channel<T> {
7273
}
7374

7475
impl<T> SelectHandle for Channel<T> {
76+
fn start(&self) -> Option<BlockingState<'_>> {
77+
None
78+
}
79+
7580
#[inline]
7681
fn try_select(&self, _token: &mut Token) -> bool {
7782
false
@@ -83,7 +88,12 @@ impl<T> SelectHandle for Channel<T> {
8388
}
8489

8590
#[inline]
86-
fn register(&self, _oper: Operation, _cx: &Context) -> bool {
91+
fn register(
92+
&self,
93+
_oper: Operation,
94+
_cx: &Context,
95+
_state: Option<&BlockingState<'_>>,
96+
) -> bool {
8797
self.is_ready()
8898
}
8999

crossbeam-channel/src/flavors/tick.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crossbeam_utils::atomic::AtomicCell;
1010
use crate::context::Context;
1111
use crate::err::{RecvTimeoutError, TryRecvError};
1212
use crate::select::{Operation, SelectHandle, Token};
13+
use crate::waker::BlockingState;
1314

1415
/// Result of a receive operation.
1516
pub(crate) type TickToken = Option<Instant>;
@@ -115,6 +116,10 @@ impl Channel {
115116
}
116117

117118
impl SelectHandle for Channel {
119+
fn start(&self) -> Option<BlockingState<'_>> {
120+
None
121+
}
122+
118123
#[inline]
119124
fn try_select(&self, token: &mut Token) -> bool {
120125
match self.try_recv() {
@@ -136,7 +141,12 @@ impl SelectHandle for Channel {
136141
}
137142

138143
#[inline]
139-
fn register(&self, _oper: Operation, _cx: &Context) -> bool {
144+
fn register(
145+
&self,
146+
_oper: Operation,
147+
_cx: &Context,
148+
_state: Option<&BlockingState<'_>>,
149+
) -> bool {
140150
self.is_ready()
141151
}
142152

0 commit comments

Comments
 (0)