@@ -94,11 +94,8 @@ use crate::mpsc::queue::Queue;
9494
9595mod queue;
9696
97- /// The transmission end of a bounded mpsc channel.
98- ///
99- /// This value is created by the [`channel`](channel) function.
10097#[ derive( Debug ) ]
101- pub struct Sender < T > {
98+ struct SenderInner < T > {
10299 // Channel state shared between the sender and receiver.
103100 inner : Arc < Inner < T > > ,
104101
@@ -112,14 +109,20 @@ pub struct Sender<T> {
112109 maybe_parked : bool ,
113110}
114111
115- // We never project Pin<&mut Sender> to `Pin<&mut T>`
116- impl < T > Unpin for Sender < T > { }
112+ // We never project Pin<&mut SenderInner> to `Pin<&mut T>`
113+ impl < T > Unpin for SenderInner < T > { }
114+
115+ /// The transmission end of a bounded mpsc channel.
116+ ///
117+ /// This value is created by the [`channel`](channel) function.
118+ #[ derive( Debug ) ]
119+ pub struct Sender < T > ( Option < SenderInner < T > > ) ;
117120
118121/// The transmission end of an unbounded mpsc channel.
119122///
120123/// This value is created by the [`unbounded`](unbounded) function.
121124#[ derive( Debug ) ]
122- pub struct UnboundedSender < T > ( Sender < T > ) ;
125+ pub struct UnboundedSender < T > ( Option < SenderInner < T > > ) ;
123126
124127trait AssertKinds : Send + Sync + Clone { }
125128impl AssertKinds for UnboundedSender < u32 > { }
@@ -357,7 +360,8 @@ pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
357360 // Check that the requested buffer size does not exceed the maximum buffer
358361 // size permitted by the system.
359362 assert ! ( buffer < MAX_BUFFER , "requested buffer size too large" ) ;
360- channel2 ( Some ( buffer) )
363+ let ( tx, rx) = channel2 ( Some ( buffer) ) ;
364+ ( Sender ( Some ( tx) ) , rx)
361365}
362366
363367/// Creates an unbounded mpsc channel for communicating between asynchronous
@@ -372,10 +376,10 @@ pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
372376/// process to run out of memory. In this case, the process will be aborted.
373377pub fn unbounded < T > ( ) -> ( UnboundedSender < T > , UnboundedReceiver < T > ) {
374378 let ( tx, rx) = channel2 ( None ) ;
375- ( UnboundedSender ( tx ) , UnboundedReceiver ( rx) )
379+ ( UnboundedSender ( Some ( tx ) ) , UnboundedReceiver ( rx) )
376380}
377381
378- fn channel2 < T > ( buffer : Option < usize > ) -> ( Sender < T > , Receiver < T > ) {
382+ fn channel2 < T > ( buffer : Option < usize > ) -> ( SenderInner < T > , Receiver < T > ) {
379383 let inner = Arc :: new ( Inner {
380384 buffer,
381385 state : AtomicUsize :: new ( INIT_STATE ) ,
@@ -385,7 +389,7 @@ fn channel2<T>(buffer: Option<usize>) -> (Sender<T>, Receiver<T>) {
385389 recv_task : AtomicWaker :: new ( ) ,
386390 } ) ;
387391
388- let tx = Sender {
392+ let tx = SenderInner {
389393 inner : inner. clone ( ) ,
390394 sender_task : Arc :: new ( Mutex :: new ( SenderTask :: new ( ) ) ) ,
391395 maybe_parked : false ,
@@ -404,10 +408,10 @@ fn channel2<T>(buffer: Option<usize>) -> (Sender<T>, Receiver<T>) {
404408 *
405409 */
406410
407- impl < T > Sender < T > {
411+ impl < T > SenderInner < T > {
408412 /// Attempts to send a message on this `Sender`, returning the message
409413 /// if there was an error.
410- pub fn try_send ( & mut self , msg : T ) -> Result < ( ) , TrySendError < T > > {
414+ fn try_send ( & mut self , msg : T ) -> Result < ( ) , TrySendError < T > > {
411415 // If the sender is currently blocked, reject the message
412416 if !self . poll_unparked ( None ) . is_ready ( ) {
413417 return Err ( TrySendError {
@@ -422,16 +426,6 @@ impl<T> Sender<T> {
422426 self . do_send_b ( msg)
423427 }
424428
425- /// Send a message on the channel.
426- ///
427- /// This function should only be called after
428- /// [`poll_ready`](Sender::poll_ready) has reported that the channel is
429- /// ready to receive a message.
430- pub fn start_send ( & mut self , msg : T ) -> Result < ( ) , SendError > {
431- self . try_send ( msg)
432- . map_err ( |e| e. err )
433- }
434-
435429 // Do the send without failing.
436430 // Can be called only by bounded sender.
437431 fn do_send_b ( & mut self , msg : T )
@@ -484,7 +478,7 @@ impl<T> Sender<T> {
484478 Poll :: Ready ( Ok ( ( ) ) )
485479 } else {
486480 Poll :: Ready ( Err ( SendError {
487- kind : SendErrorKind :: Full ,
481+ kind : SendErrorKind :: Disconnected ,
488482 } ) )
489483 }
490484 }
@@ -559,7 +553,7 @@ impl<T> Sender<T> {
559553 /// capacity, in which case the current task is queued to be notified once
560554 /// capacity is available;
561555 /// - `Err(SendError)` if the receiver has been dropped.
562- pub fn poll_ready (
556+ fn poll_ready (
563557 & mut self ,
564558 waker : & Waker
565559 ) -> Poll < Result < ( ) , SendError > > {
@@ -574,12 +568,12 @@ impl<T> Sender<T> {
574568 }
575569
576570 /// Returns whether this channel is closed without needing a context.
577- pub fn is_closed ( & self ) -> bool {
571+ fn is_closed ( & self ) -> bool {
578572 !decode_state ( self . inner . state . load ( SeqCst ) ) . is_open
579573 }
580574
581575 /// Closes this channel from the sender side, preventing any new messages.
582- pub fn close_channel ( & mut self ) {
576+ fn close_channel ( & self ) {
583577 // There's no need to park this sender, its dropping,
584578 // and we don't want to check for capacity, so skip
585579 // that stuff from `do_send`.
@@ -615,43 +609,116 @@ impl<T> Sender<T> {
615609 }
616610}
617611
612+ impl < T > Sender < T > {
613+ /// Attempts to send a message on this `Sender`, returning the message
614+ /// if there was an error.
615+ pub fn try_send ( & mut self , msg : T ) -> Result < ( ) , TrySendError < T > > {
616+ if let Some ( inner) = & mut self . 0 {
617+ inner. try_send ( msg)
618+ } else {
619+ Err ( TrySendError {
620+ err : SendError {
621+ kind : SendErrorKind :: Disconnected ,
622+ } ,
623+ val : msg,
624+ } )
625+ }
626+ }
627+
628+ /// Send a message on the channel.
629+ ///
630+ /// This function should only be called after
631+ /// [`poll_ready`](Sender::poll_ready) has reported that the channel is
632+ /// ready to receive a message.
633+ pub fn start_send ( & mut self , msg : T ) -> Result < ( ) , SendError > {
634+ self . try_send ( msg)
635+ . map_err ( |e| e. err )
636+ }
637+
638+ /// Polls the channel to determine if there is guaranteed capacity to send
639+ /// at least one item without waiting.
640+ ///
641+ /// # Return value
642+ ///
643+ /// This method returns:
644+ ///
645+ /// - `Ok(Async::Ready(_))` if there is sufficient capacity;
646+ /// - `Ok(Async::Pending)` if the channel may not have
647+ /// capacity, in which case the current task is queued to be notified once
648+ /// capacity is available;
649+ /// - `Err(SendError)` if the receiver has been dropped.
650+ pub fn poll_ready (
651+ & mut self ,
652+ lw : & LocalWaker
653+ ) -> Poll < Result < ( ) , SendError > > {
654+ let inner = self . 0 . as_mut ( ) . ok_or ( SendError {
655+ kind : SendErrorKind :: Disconnected ,
656+ } ) ?;
657+ inner. poll_ready ( lw)
658+ }
659+
660+ /// Returns whether this channel is closed without needing a context.
661+ pub fn is_closed ( & self ) -> bool {
662+ self . 0 . as_ref ( ) . map ( SenderInner :: is_closed) . unwrap_or ( true )
663+ }
664+
665+ /// Closes this channel from the sender side, preventing any new messages.
666+ pub fn close_channel ( & mut self ) {
667+ if let Some ( inner) = & mut self . 0 {
668+ inner. close_channel ( ) ;
669+ }
670+ }
671+
672+ /// Disconnects this sender from the channel, closing it if there are no more senders left.
673+ pub fn disconnect ( & mut self ) {
674+ self . 0 = None ;
675+ }
676+ }
677+
618678impl < T > UnboundedSender < T > {
619679 /// Check if the channel is ready to receive a message.
620680 pub fn poll_ready (
621681 & self ,
622682 _: & Waker ,
623683 ) -> Poll < Result < ( ) , SendError > > {
624- self . 0 . poll_ready_nb ( )
684+ let inner = self . 0 . as_ref ( ) . ok_or ( SendError {
685+ kind : SendErrorKind :: Disconnected ,
686+ } ) ?;
687+ inner. poll_ready_nb ( )
625688 }
626689
627690 /// Returns whether this channel is closed without needing a context.
628691 pub fn is_closed ( & self ) -> bool {
629- self . 0 . is_closed ( )
692+ self . 0 . as_ref ( ) . map ( SenderInner :: is_closed) . unwrap_or ( true )
630693 }
631694
632695 /// Closes this channel from the sender side, preventing any new messages.
633696 pub fn close_channel ( & self ) {
634- self . 0 . inner . set_closed ( ) ;
635- self . 0 . inner . recv_task . wake ( ) ;
697+ if let Some ( inner) = & self . 0 {
698+ inner. close_channel ( ) ;
699+ }
700+ }
701+
702+ /// Disconnects this sender from the channel, closing it if there are no more senders left.
703+ pub fn disconnect ( & mut self ) {
704+ self . 0 = None ;
636705 }
637706
638707 // Do the send without parking current task.
639708 fn do_send_nb ( & self , msg : T ) -> Result < ( ) , TrySendError < T > > {
640- match self . 0 . inc_num_messages ( ) {
641- Some ( _num_messages) => { }
642- None => {
643- return Err ( TrySendError {
644- err : SendError {
645- kind : SendErrorKind :: Disconnected ,
646- } ,
647- val : msg,
648- } ) ;
649- } ,
650- } ;
651-
652- self . 0 . queue_push_and_signal ( msg) ;
709+ if let Some ( inner) = & self . 0 {
710+ if inner. inc_num_messages ( ) . is_some ( ) {
711+ inner. queue_push_and_signal ( msg) ;
712+ return Ok ( ( ) ) ;
713+ }
714+ }
653715
654- Ok ( ( ) )
716+ Err ( TrySendError {
717+ err : SendError {
718+ kind : SendErrorKind :: Disconnected ,
719+ } ,
720+ val : msg,
721+ } )
655722 }
656723
657724 /// Send a message on the channel.
@@ -673,15 +740,20 @@ impl<T> UnboundedSender<T> {
673740 }
674741}
675742
743+ impl < T > Clone for Sender < T > {
744+ fn clone ( & self ) -> Sender < T > {
745+ Sender ( self . 0 . clone ( ) )
746+ }
747+ }
748+
676749impl < T > Clone for UnboundedSender < T > {
677750 fn clone ( & self ) -> UnboundedSender < T > {
678751 UnboundedSender ( self . 0 . clone ( ) )
679752 }
680753}
681754
682-
683- impl < T > Clone for Sender < T > {
684- fn clone ( & self ) -> Sender < T > {
755+ impl < T > Clone for SenderInner < T > {
756+ fn clone ( & self ) -> SenderInner < T > {
685757 // Since this atomic op isn't actually guarding any memory and we don't
686758 // care about any orderings besides the ordering on the single atomic
687759 // variable, a relaxed ordering is acceptable.
@@ -701,7 +773,7 @@ impl<T> Clone for Sender<T> {
701773 // The ABA problem doesn't matter here. We only care that the
702774 // number of senders never exceeds the maximum.
703775 if actual == curr {
704- return Sender {
776+ return SenderInner {
705777 inner : self . inner . clone ( ) ,
706778 sender_task : Arc :: new ( Mutex :: new ( SenderTask :: new ( ) ) ) ,
707779 maybe_parked : false ,
@@ -713,7 +785,7 @@ impl<T> Clone for Sender<T> {
713785 }
714786}
715787
716- impl < T > Drop for Sender < T > {
788+ impl < T > Drop for SenderInner < T > {
717789 fn drop ( & mut self ) {
718790 // Ordering between variables don't matter here
719791 let prev = self . inner . num_senders . fetch_sub ( 1 , SeqCst ) ;
0 commit comments