11use std:: marker:: PhantomData ;
22use std:: pin:: Pin ;
3+ use std:: mem;
34
45use crate :: future:: Future ;
56use crate :: stream:: Stream ;
6- use crate :: task:: { Context , Poll } ;
7+ use crate :: task:: { Context , Poll , ready } ;
78
8- /// A stream that yields elements by calling an async closure with the previous value as an
9- /// argument
10- ///
11- /// This stream is constructed by [`successor`] function
12- ///
13- /// [`successor`]: fn.successor.html
14- #[ derive( Debug ) ]
15- pub struct Successors < F , Fut , T >
16- where
17- Fut : Future < Output = Option < T > > ,
18- {
19- successor : F ,
20- future : Option < Fut > ,
21- next : Option < T > ,
22- _marker : PhantomData < Fut > ,
9+
10+
11+ pin_project_lite:: pin_project! {
12+ /// A stream that yields elements by calling an async closure with the previous value as an
13+ /// argument
14+ ///
15+ /// This stream is constructed by [`successor`] function
16+ ///
17+ /// [`successor`]: fn.successor.html
18+ #[ derive( Debug ) ]
19+ pub struct Successors <F , Fut , T >
20+ where
21+ Fut : Future <Output = Option <T >>,
22+ {
23+ successor: F ,
24+ #[ pin]
25+ future: Option <Fut >,
26+ slot: Option <T >,
27+ _marker: PhantomData <Fut >,
28+ }
2329}
2430
2531/// Creates a new stream where to produce each new element a closure is called with the previous
4046/// });
4147///
4248/// pin_utils::pin_mut!(s);
49+ /// assert_eq!(s.next().await, Some(22));
4350/// assert_eq!(s.next().await, Some(23));
4451/// assert_eq!(s.next().await, Some(24));
4552/// assert_eq!(s.next().await, Some(25));
@@ -58,31 +65,20 @@ where
5865/// # }) }
5966///
6067/// ```
61- pub fn successors < F , Fut , T > ( start : Option < T > , func : F ) -> Successors < F , Fut , T >
68+ pub fn successors < F , Fut , T > ( first : Option < T > , succ : F ) -> Successors < F , Fut , T >
6269where
6370 F : FnMut ( T ) -> Fut ,
6471 Fut : Future < Output = Option < T > > ,
6572 T : Copy ,
6673{
6774 Successors {
68- successor : func ,
75+ successor : succ ,
6976 future : None ,
70- next : start ,
77+ slot : first ,
7178 _marker : PhantomData ,
7279 }
7380}
7481
75- impl < F , Fut , T > Successors < F , Fut , T >
76- where
77- F : FnMut ( T ) -> Fut ,
78- Fut : Future < Output = Option < T > > ,
79- T : Copy ,
80- {
81- pin_utils:: unsafe_unpinned!( successor: F ) ;
82- pin_utils:: unsafe_unpinned!( next: Option <T >) ;
83- pin_utils:: unsafe_pinned!( future: Option <Fut >) ;
84- }
85-
8682impl < F , Fut , T > Stream for Successors < F , Fut , T >
8783where
8884 Fut : Future < Output = Option < T > > ,
@@ -91,23 +87,23 @@ where
9187{
9288 type Item = T ;
9389
94- fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
95- if self . next . is_none ( ) {
90+ fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
91+ let mut this = self . project ( ) ;
92+
93+ if this. slot . is_none ( ) {
9694 return Poll :: Ready ( None ) ;
9795 }
9896
99- match & self . future {
100- None => {
101- let x = self . next . unwrap ( ) ;
102- let fut = ( self . as_mut ( ) . successor ( ) ) ( x) ;
103- self . as_mut ( ) . future ( ) . set ( Some ( fut) ) ;
104- }
105- _ => { }
97+ if this. future . is_none ( ) {
98+ let x = this. slot . unwrap ( ) ;
99+ let fut = ( this. successor ) ( x) ;
100+ this. future . set ( Some ( fut) ) ;
106101 }
107102
108- let next = futures_core:: ready!( self . as_mut( ) . future( ) . as_pin_mut( ) . unwrap( ) . poll( cx) ) ;
109- * self . as_mut ( ) . next ( ) = next;
110- self . as_mut ( ) . future ( ) . set ( None ) ;
103+ let mut next = ready ! ( this. future. as_mut( ) . as_pin_mut( ) . unwrap( ) . poll( cx) ) ;
104+
105+ this. future . set ( None ) ;
106+ mem:: swap ( this. slot , & mut next) ;
111107 Poll :: Ready ( next)
112108 }
113109}
0 commit comments