@@ -476,21 +476,99 @@ impl<T> Channel<T> {
476476 Some ( self . cap ( ) )
477477 }
478478
479- /// Disconnects the channel and wakes up all blocked senders and receivers.
479+ /// Disconnects senders and wakes up all blocked senders and receivers.
480480 ///
481481 /// Returns `true` if this call disconnected the channel.
482- pub ( crate ) fn disconnect ( & self ) -> bool {
482+ pub ( crate ) fn disconnect_senders ( & self ) -> bool {
483483 let tail = self . tail . fetch_or ( self . mark_bit , Ordering :: SeqCst ) ;
484484
485485 if tail & self . mark_bit == 0 {
486- self . senders . disconnect ( ) ;
487486 self . receivers . disconnect ( ) ;
488487 true
489488 } else {
490489 false
491490 }
492491 }
493492
493+ /// Disconnects receivers and wakes up all blocked senders.
494+ ///
495+ /// Returns `true` if this call disconnected the channel.
496+ ///
497+ /// # Safety
498+ /// May only be called once upon dropping the last receiver. The
499+ /// destruction of all other receivers must have been observed with acquire
500+ /// ordering or stronger.
501+ pub ( crate ) unsafe fn disconnect_receivers ( & self ) -> bool {
502+ let tail = self . tail . fetch_or ( self . mark_bit , Ordering :: SeqCst ) ;
503+ let disconnected = if tail & self . mark_bit == 0 {
504+ self . senders . disconnect ( ) ;
505+ true
506+ } else {
507+ false
508+ } ;
509+
510+ self . discard_all_messages ( tail) ;
511+ disconnected
512+ }
513+
514+ /// Discards all messages.
515+ ///
516+ /// `tail` should be the current (and therefore last) value of `tail`.
517+ ///
518+ /// # Panicking
519+ /// If a destructor panics, the remaining messages are leaked, matching the
520+ /// behaviour of the unbounded channel.
521+ ///
522+ /// # Safety
523+ /// This method must only be called when dropping the last receiver. The
524+ /// destruction of all other receivers must have been observed with acquire
525+ /// ordering or stronger.
526+ unsafe fn discard_all_messages ( & self , tail : usize ) {
527+ debug_assert ! ( self . is_disconnected( ) ) ;
528+
529+ // Only receivers modify `head`, so since we are the last one,
530+ // this value will not change and will not be observed (since
531+ // no new messages can be sent after disconnection).
532+ let mut head = self . head . load ( Ordering :: Relaxed ) ;
533+ let tail = tail & !self . mark_bit ;
534+
535+ let backoff = Backoff :: new ( ) ;
536+ loop {
537+ // Deconstruct the head.
538+ let index = head & ( self . mark_bit - 1 ) ;
539+ let lap = head & !( self . one_lap - 1 ) ;
540+
541+ // Inspect the corresponding slot.
542+ debug_assert ! ( index < self . buffer. len( ) ) ;
543+ let slot = unsafe { self . buffer . get_unchecked ( index) } ;
544+ let stamp = slot. stamp . load ( Ordering :: Acquire ) ;
545+
546+ // If the stamp is ahead of the head by 1, we may drop the message.
547+ if head + 1 == stamp {
548+ head = if index + 1 < self . cap ( ) {
549+ // Same lap, incremented index.
550+ // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
551+ head + 1
552+ } else {
553+ // One lap forward, index wraps around to zero.
554+ // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
555+ lap. wrapping_add ( self . one_lap )
556+ } ;
557+
558+ unsafe {
559+ ( * slot. msg . get ( ) ) . assume_init_drop ( ) ;
560+ }
561+ // If the tail equals the head, that means the channel is empty.
562+ } else if tail == head {
563+ return ;
564+ // Otherwise, a sender is about to write into the slot, so we need
565+ // to wait for it to update the stamp.
566+ } else {
567+ backoff. spin ( ) ;
568+ }
569+ }
570+ }
571+
494572 /// Returns `true` if the channel is disconnected.
495573 pub ( crate ) fn is_disconnected ( & self ) -> bool {
496574 self . tail . load ( Ordering :: SeqCst ) & self . mark_bit != 0
@@ -521,45 +599,6 @@ impl<T> Channel<T> {
521599 }
522600}
523601
524- impl < T > Drop for Channel < T > {
525- fn drop ( & mut self ) {
526- if mem:: needs_drop :: < T > ( ) {
527- // Get the index of the head.
528- let head = * self . head . get_mut ( ) ;
529- let tail = * self . tail . get_mut ( ) ;
530-
531- let hix = head & ( self . mark_bit - 1 ) ;
532- let tix = tail & ( self . mark_bit - 1 ) ;
533-
534- let len = if hix < tix {
535- tix - hix
536- } else if hix > tix {
537- self . cap ( ) - hix + tix
538- } else if ( tail & !self . mark_bit ) == head {
539- 0
540- } else {
541- self . cap ( )
542- } ;
543-
544- // Loop over all slots that hold a message and drop them.
545- for i in 0 ..len {
546- // Compute the index of the next slot holding a message.
547- let index = if hix + i < self . cap ( ) {
548- hix + i
549- } else {
550- hix + i - self . cap ( )
551- } ;
552-
553- unsafe {
554- debug_assert ! ( index < self . buffer. len( ) ) ;
555- let slot = self . buffer . get_unchecked_mut ( index) ;
556- ( * slot. msg . get ( ) ) . assume_init_drop ( ) ;
557- }
558- }
559- }
560- }
561- }
562-
563602/// Receiver handle to a channel.
564603pub ( crate ) struct Receiver < ' a , T > ( & ' a Channel < T > ) ;
565604
0 commit comments