1- use crate :: stream:: { StreamExt , TryStreamExt , Fuse } ;
1+ use crate :: stream:: { StreamExt , Fuse } ;
22use core:: fmt;
33use core:: pin:: Pin ;
44use futures_core:: future:: Future ;
55use futures_core:: ready;
66use futures_core:: stream:: { TryStream , Stream } ;
77use futures_core:: task:: { Context , Poll } ;
88use futures_sink:: Sink ;
9+ use pin_project_lite:: pin_project;
910
10- /// Future for the [`send_all`](super::SinkExt::send_all) method.
11- #[ allow( explicit_outlives_requirements) ] // https://github.com/rust-lang/rust/issues/60993
12- #[ must_use = "futures do nothing unless you `.await` or poll them" ]
13- pub struct SendAll < ' a , Si , St >
14- where
15- Si : ?Sized ,
16- St : ?Sized + TryStream ,
17- {
18- sink : & ' a mut Si ,
19- stream : Fuse < & ' a mut St > ,
20- buffered : Option < St :: Ok > ,
11+ pin_project ! {
12+ /// Future for the [`send_all`](super::SinkExt::send_all) method.
13+ #[ allow( explicit_outlives_requirements) ] // https://github.com/rust-lang/rust/issues/60993
14+ #[ must_use = "futures do nothing unless you `.await` or poll them" ]
15+ pub struct SendAll <' a, Si , St >
16+ where
17+ Si : ?Sized ,
18+ St : TryStream ,
19+ {
20+ sink: & ' a mut Si ,
21+ #[ pin]
22+ stream: Fuse <St >,
23+ buffered: Option <St :: Ok >,
24+ }
2125}
2226
2327impl < Si , St > fmt:: Debug for SendAll < ' _ , Si , St >
2428where
2529 Si : fmt:: Debug + ?Sized ,
26- St : fmt:: Debug + ? Sized + TryStream ,
30+ St : fmt:: Debug + TryStream ,
2731 St :: Ok : fmt:: Debug ,
2832{
2933 fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
@@ -35,21 +39,14 @@ where
3539 }
3640}
3741
38- // Pinning is never projected to any fields
39- impl < Si , St > Unpin for SendAll < ' _ , Si , St >
40- where
41- Si : Unpin + ?Sized ,
42- St : TryStream + Unpin + ?Sized ,
43- { }
44-
4542impl < ' a , Si , St , Ok , Error > SendAll < ' a , Si , St >
4643where
4744 Si : Sink < Ok , Error = Error > + Unpin + ?Sized ,
48- St : TryStream < Ok = Ok , Error = Error > + Stream + Unpin + ? Sized ,
45+ St : TryStream < Ok = Ok , Error = Error > + Stream ,
4946{
5047 pub ( super ) fn new (
5148 sink : & ' a mut Si ,
52- stream : & ' a mut St ,
49+ stream : St ,
5350 ) -> Self {
5451 Self {
5552 sink,
@@ -59,17 +56,18 @@ where
5956 }
6057
6158 fn try_start_send (
62- & mut self ,
59+ self : Pin < & mut Self > ,
6360 cx : & mut Context < ' _ > ,
6461 item : St :: Ok ,
6562 ) -> Poll < Result < ( ) , Si :: Error > > {
66- debug_assert ! ( self . buffered. is_none( ) ) ;
67- match Pin :: new ( & mut self . sink ) . poll_ready ( cx) ? {
63+ let this = self . project ( ) ;
64+ debug_assert ! ( this. buffered. is_none( ) ) ;
65+ match Pin :: new ( & mut * this. sink ) . poll_ready ( cx) ? {
6866 Poll :: Ready ( ( ) ) => {
69- Poll :: Ready ( Pin :: new ( & mut self . sink ) . start_send ( item) )
67+ Poll :: Ready ( Pin :: new ( & mut * this . sink ) . start_send ( item) )
7068 }
7169 Poll :: Pending => {
72- self . buffered = Some ( item) ;
70+ * this . buffered = Some ( item) ;
7371 Poll :: Pending
7472 }
7573 }
@@ -79,32 +77,32 @@ where
7977impl < Si , St , Ok , Error > Future for SendAll < ' _ , Si , St >
8078where
8179 Si : Sink < Ok , Error = Error > + Unpin + ?Sized ,
82- St : Stream < Item = Result < Ok , Error > > + Unpin + ? Sized ,
80+ St : Stream < Item = Result < Ok , Error > > ,
8381{
8482 type Output = Result < ( ) , Error > ;
8583
8684 fn poll (
8785 mut self : Pin < & mut Self > ,
8886 cx : & mut Context < ' _ > ,
8987 ) -> Poll < Self :: Output > {
90- let this = & mut * self ;
9188 // If we've got an item buffered already, we need to write it to the
9289 // sink before we can do anything else
93- if let Some ( item) = this . buffered . take ( ) {
94- ready ! ( this . try_start_send( cx, item) ) ?
90+ if let Some ( item) = self . as_mut ( ) . project ( ) . buffered . take ( ) {
91+ ready ! ( self . as_mut ( ) . try_start_send( cx, item) ) ?
9592 }
9693
9794 loop {
98- match this. stream . try_poll_next_unpin ( cx) ? {
95+ let this = self . as_mut ( ) . project ( ) ;
96+ match this. stream . try_poll_next ( cx) ? {
9997 Poll :: Ready ( Some ( item) ) => {
100- ready ! ( this . try_start_send( cx, item) ) ?
98+ ready ! ( self . as_mut ( ) . try_start_send( cx, item) ) ?
10199 }
102100 Poll :: Ready ( None ) => {
103- ready ! ( Pin :: new( & mut this. sink) . poll_flush( cx) ) ?;
101+ ready ! ( Pin :: new( this. sink) . poll_flush( cx) ) ?;
104102 return Poll :: Ready ( Ok ( ( ) ) )
105103 }
106104 Poll :: Pending => {
107- ready ! ( Pin :: new( & mut this. sink) . poll_flush( cx) ) ?;
105+ ready ! ( Pin :: new( this. sink) . poll_flush( cx) ) ?;
108106 return Poll :: Pending
109107 }
110108 }
0 commit comments