1- use crate :: stream:: TryStreamExt ;
21use core:: pin:: Pin ;
32use futures_core:: ready;
43use futures_core:: stream:: TryStream ;
54use futures_core:: task:: { Context , Poll } ;
65use futures_io:: { AsyncBufRead , AsyncRead , AsyncWrite } ;
6+ use pin_project_lite:: pin_project;
77use std:: cmp;
88use std:: io:: { Error , Result } ;
99
10- /// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method.
11- #[ derive( Debug ) ]
12- #[ must_use = "readers do nothing unless polled" ]
13- #[ cfg_attr( docsrs, doc( cfg( feature = "io" ) ) ) ]
14- pub struct IntoAsyncRead < St >
15- where
16- St : TryStream < Error = Error > + Unpin ,
17- St :: Ok : AsRef < [ u8 ] > ,
18- {
19- stream : St ,
20- state : ReadState < St :: Ok > ,
21- }
22-
23- impl < St > Unpin for IntoAsyncRead < St >
24- where
25- St : TryStream < Error = Error > + Unpin ,
26- St :: Ok : AsRef < [ u8 ] > ,
27- {
10+ pin_project ! {
11+ /// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method.
12+ #[ derive( Debug ) ]
13+ #[ must_use = "readers do nothing unless polled" ]
14+ #[ cfg_attr( docsrs, doc( cfg( feature = "io" ) ) ) ]
15+ pub struct IntoAsyncRead <St >
16+ where
17+ St : TryStream <Error = Error >,
18+ St :: Ok : AsRef <[ u8 ] >,
19+ {
20+ #[ pin]
21+ stream: St ,
22+ state: ReadState <St :: Ok >,
23+ }
2824}
2925
3026#[ derive( Debug ) ]
@@ -36,7 +32,7 @@ enum ReadState<T: AsRef<[u8]>> {
3632
3733impl < St > IntoAsyncRead < St >
3834where
39- St : TryStream < Error = Error > + Unpin ,
35+ St : TryStream < Error = Error > ,
4036 St :: Ok : AsRef < [ u8 ] > ,
4137{
4238 pub ( super ) fn new ( stream : St ) -> Self {
@@ -46,16 +42,18 @@ where
4642
4743impl < St > AsyncRead for IntoAsyncRead < St >
4844where
49- St : TryStream < Error = Error > + Unpin ,
45+ St : TryStream < Error = Error > ,
5046 St :: Ok : AsRef < [ u8 ] > ,
5147{
5248 fn poll_read (
53- mut self : Pin < & mut Self > ,
49+ self : Pin < & mut Self > ,
5450 cx : & mut Context < ' _ > ,
5551 buf : & mut [ u8 ] ,
5652 ) -> Poll < Result < usize > > {
53+ let mut this = self . project ( ) ;
54+
5755 loop {
58- match & mut self . state {
56+ match this . state {
5957 ReadState :: Ready { chunk, chunk_start } => {
6058 let chunk = chunk. as_ref ( ) ;
6159 let len = cmp:: min ( buf. len ( ) , chunk. len ( ) - * chunk_start) ;
@@ -64,23 +62,23 @@ where
6462 * chunk_start += len;
6563
6664 if chunk. len ( ) == * chunk_start {
67- self . state = ReadState :: PendingChunk ;
65+ * this . state = ReadState :: PendingChunk ;
6866 }
6967
7068 return Poll :: Ready ( Ok ( len) ) ;
7169 }
72- ReadState :: PendingChunk => match ready ! ( self . stream. try_poll_next_unpin ( cx) ) {
70+ ReadState :: PendingChunk => match ready ! ( this . stream. as_mut ( ) . try_poll_next ( cx) ) {
7371 Some ( Ok ( chunk) ) => {
7472 if !chunk. as_ref ( ) . is_empty ( ) {
75- self . state = ReadState :: Ready { chunk, chunk_start : 0 } ;
73+ * this . state = ReadState :: Ready { chunk, chunk_start : 0 } ;
7674 }
7775 }
7876 Some ( Err ( err) ) => {
79- self . state = ReadState :: Eof ;
77+ * this . state = ReadState :: Eof ;
8078 return Poll :: Ready ( Err ( err) ) ;
8179 }
8280 None => {
83- self . state = ReadState :: Eof ;
81+ * this . state = ReadState :: Eof ;
8482 return Poll :: Ready ( Ok ( 0 ) ) ;
8583 }
8684 } ,
@@ -94,51 +92,52 @@ where
9492
9593impl < St > AsyncWrite for IntoAsyncRead < St >
9694where
97- St : TryStream < Error = Error > + AsyncWrite + Unpin ,
95+ St : TryStream < Error = Error > + AsyncWrite ,
9896 St :: Ok : AsRef < [ u8 ] > ,
9997{
100- fn poll_write (
101- mut self : Pin < & mut Self > ,
102- cx : & mut Context < ' _ > ,
103- buf : & [ u8 ] ,
104- ) -> Poll < Result < usize > > {
105- Pin :: new ( & mut self . stream ) . poll_write ( cx, buf)
98+ fn poll_write ( self : Pin < & mut Self > , cx : & mut Context < ' _ > , buf : & [ u8 ] ) -> Poll < Result < usize > > {
99+ let this = self . project ( ) ;
100+ this. stream . poll_write ( cx, buf)
106101 }
107102
108- fn poll_flush ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) > > {
109- Pin :: new ( & mut self . stream ) . poll_flush ( cx)
103+ fn poll_flush ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) > > {
104+ let this = self . project ( ) ;
105+ this. stream . poll_flush ( cx)
110106 }
111107
112- fn poll_close ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) > > {
113- Pin :: new ( & mut self . stream ) . poll_close ( cx)
108+ fn poll_close ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) > > {
109+ let this = self . project ( ) ;
110+ this. stream . poll_close ( cx)
114111 }
115112}
116113
117114impl < St > AsyncBufRead for IntoAsyncRead < St >
118115where
119- St : TryStream < Error = Error > + Unpin ,
116+ St : TryStream < Error = Error > ,
120117 St :: Ok : AsRef < [ u8 ] > ,
121118{
122- fn poll_fill_buf ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < & [ u8 ] > > {
123- while let ReadState :: PendingChunk = self . state {
124- match ready ! ( self . stream. try_poll_next_unpin( cx) ) {
119+ fn poll_fill_buf ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < & [ u8 ] > > {
120+ let mut this = self . project ( ) ;
121+
122+ while let ReadState :: PendingChunk = this. state {
123+ match ready ! ( this. stream. as_mut( ) . try_poll_next( cx) ) {
125124 Some ( Ok ( chunk) ) => {
126125 if !chunk. as_ref ( ) . is_empty ( ) {
127- self . state = ReadState :: Ready { chunk, chunk_start : 0 } ;
126+ * this . state = ReadState :: Ready { chunk, chunk_start : 0 } ;
128127 }
129128 }
130129 Some ( Err ( err) ) => {
131- self . state = ReadState :: Eof ;
130+ * this . state = ReadState :: Eof ;
132131 return Poll :: Ready ( Err ( err) ) ;
133132 }
134133 None => {
135- self . state = ReadState :: Eof ;
134+ * this . state = ReadState :: Eof ;
136135 return Poll :: Ready ( Ok ( & [ ] ) ) ;
137136 }
138137 }
139138 }
140139
141- if let ReadState :: Ready { ref chunk, chunk_start } = self . into_ref ( ) . get_ref ( ) . state {
140+ if let & mut ReadState :: Ready { ref chunk, chunk_start } = this . state {
142141 let chunk = chunk. as_ref ( ) ;
143142 return Poll :: Ready ( Ok ( & chunk[ chunk_start..] ) ) ;
144143 }
@@ -147,16 +146,18 @@ where
147146 Poll :: Ready ( Ok ( & [ ] ) )
148147 }
149148
150- fn consume ( mut self : Pin < & mut Self > , amount : usize ) {
149+ fn consume ( self : Pin < & mut Self > , amount : usize ) {
150+ let this = self . project ( ) ;
151+
151152 // https://github.com/rust-lang/futures-rs/pull/1556#discussion_r281644295
152153 if amount == 0 {
153154 return ;
154155 }
155- if let ReadState :: Ready { chunk, chunk_start } = & mut self . state {
156+ if let ReadState :: Ready { chunk, chunk_start } = this . state {
156157 * chunk_start += amount;
157158 debug_assert ! ( * chunk_start <= chunk. as_ref( ) . len( ) ) ;
158159 if * chunk_start >= chunk. as_ref ( ) . len ( ) {
159- self . state = ReadState :: PendingChunk ;
160+ * this . state = ReadState :: PendingChunk ;
160161 }
161162 } else {
162163 debug_assert ! ( false , "Attempted to consume from IntoAsyncRead without chunk" ) ;
0 commit comments