@@ -5,6 +5,7 @@ use std::io;
55use std:: pin:: Pin ;
66use std:: task:: { Context , Poll } ;
77use std:: time:: { Duration , Instant } ;
8+ use pin_project:: pin_project;
89
910use futures:: { AsyncRead , Stream } ;
1011
@@ -13,31 +14,26 @@ use super::Delay;
1314/// A future returned by methods in the [`FutureExt`] trait.
1415///
1516/// [`FutureExt.timeout`]: trait.FutureExt.html
17+ #[ pin_project]
1618#[ derive( Debug ) ]
1719pub struct Timeout < F : Future > {
20+ #[ pin]
1821 future : F ,
22+ #[ pin]
1923 delay : Delay ,
2024}
2125
2226impl < F : Future > Future for Timeout < F > {
2327 type Output = Result < F :: Output , io:: Error > ;
2428
25- fn poll ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
26- // This pinning projection is safe because:
27- // 1. `Timeout` is only Unpin when `F` is Unpin. (Ok for default auto impl)
28- // 2. `drop` never moves out of `F`. (No manual `Drop` impl and no `#[repr(packed)]`)
29- // 3. `drop` on `F` must be called before overwritten or deallocated. (No manual `Drop` impl)
30- // 4. No other operation provided for moving out `F`. (Ok)
31- let ( future, delay) = unsafe {
32- let Timeout { future, delay } = self . get_unchecked_mut ( ) ;
33- ( Pin :: new_unchecked ( future) , Pin :: new ( delay) )
34- } ;
29+ fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
30+ let this = self . project ( ) ;
3531
36- if let Poll :: Ready ( t) = future. poll ( cx) {
32+ if let Poll :: Ready ( t) = this . future . poll ( cx) {
3733 return Poll :: Ready ( Ok ( t) ) ;
3834 }
3935
40- delay
36+ this . delay
4137 . poll ( cx)
4238 . map ( |_| Err ( io:: Error :: new ( io:: ErrorKind :: TimedOut , "future timed out" ) ) )
4339 }
@@ -139,35 +135,29 @@ impl<T: Future> FutureExt for T {}
139135/// A stream returned by methods in the [`StreamExt`] trait.
140136///
141137/// [`StreamExt`]: trait.StreamExt.html
138+ #[ pin_project]
142139#[ derive( Debug ) ]
143140pub struct TimeoutStream < S : Stream > {
141+ #[ pin]
144142 timeout : Delay ,
145143 dur : Duration ,
144+ #[ pin]
146145 stream : S ,
147146}
148147
149148impl < S : Stream > Stream for TimeoutStream < S > {
150149 type Item = Result < S :: Item , io:: Error > ;
151150
152- fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
153- // This pinning projection is safe.
154- // See detail in `Timeout::poll`.
155- let ( mut timeout, dur, stream) = unsafe {
156- let TimeoutStream {
157- timeout,
158- dur,
159- stream,
160- } = self . get_unchecked_mut ( ) ;
161- ( Pin :: new ( timeout) , Pin :: new ( dur) , Pin :: new_unchecked ( stream) )
162- } ;
151+ fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
152+ let mut this = self . project ( ) ;
163153
164- if let Poll :: Ready ( s) = stream. poll_next ( cx) {
165- timeout. set ( Delay :: new ( * dur) ) ;
154+ if let Poll :: Ready ( s) = this . stream . as_mut ( ) . poll_next ( cx) {
155+ this . timeout . set ( Delay :: new ( * this . dur ) ) ;
166156 return Poll :: Ready ( Ok ( s) . transpose ( ) ) ;
167157 }
168158
169- Pin :: new ( & mut * timeout) . poll ( cx) . map ( |_| {
170- timeout. set ( Delay :: new ( * dur) ) ;
159+ this . timeout . as_mut ( ) . poll ( cx) . map ( |_| {
160+ this . timeout . set ( Delay :: new ( * this . dur ) ) ;
171161 Some ( Err ( io:: Error :: new (
172162 io:: ErrorKind :: TimedOut ,
173163 "future timed out" ,
@@ -223,37 +213,31 @@ impl<S: Stream> StreamExt for S {}
223213/// A stream returned by methods in the [`StreamExt`] trait.
224214///
225215/// [`StreamExt`]: trait.StreamExt.html
216+ #[ pin_project]
226217#[ derive( Debug ) ]
227218pub struct TimeoutAsyncRead < S : AsyncRead > {
219+ #[ pin]
228220 timeout : Delay ,
229221 dur : Duration ,
222+ #[ pin]
230223 stream : S ,
231224}
232225
233226impl < S : AsyncRead > AsyncRead for TimeoutAsyncRead < S > {
234227 fn poll_read (
235- self : Pin < & mut Self > ,
228+ mut self : Pin < & mut Self > ,
236229 cx : & mut Context < ' _ > ,
237230 buf : & mut [ u8 ] ,
238231 ) -> Poll < Result < usize , io:: Error > > {
239- // This pinning projection is safe.
240- // See detail in `Timeout::poll`.
241- let ( mut timeout, dur, stream) = unsafe {
242- let TimeoutAsyncRead {
243- timeout,
244- dur,
245- stream,
246- } = self . get_unchecked_mut ( ) ;
247- ( Pin :: new ( timeout) , Pin :: new ( dur) , Pin :: new_unchecked ( stream) )
248- } ;
232+ let mut this = self . project ( ) ;
249233
250- if let Poll :: Ready ( s) = stream. poll_read ( cx, buf) {
251- timeout. set ( Delay :: new ( * dur) ) ;
234+ if let Poll :: Ready ( s) = this . stream . as_mut ( ) . poll_read ( cx, buf) {
235+ this . timeout . set ( Delay :: new ( * this . dur ) ) ;
252236 return Poll :: Ready ( s) ;
253237 }
254238
255- Pin :: new ( & mut * timeout) . poll ( cx) . map ( |_| {
256- timeout. set ( Delay :: new ( * dur) ) ;
239+ this . timeout . as_mut ( ) . poll ( cx) . map ( |_| {
240+ this . timeout . set ( Delay :: new ( * this . dur ) ) ;
257241 Err ( io:: Error :: new ( io:: ErrorKind :: TimedOut , "future timed out" ) )
258242 } )
259243 }
0 commit comments