File tree Expand file tree Collapse file tree 3 files changed +81
-6
lines changed Expand file tree Collapse file tree 3 files changed +81
-6
lines changed Original file line number Diff line number Diff line change @@ -50,14 +50,15 @@ where
5050 let mut this = self . project ( ) ;
5151 loop {
5252 if let Some ( inner) = this. inner_stream . as_mut ( ) . as_pin_mut ( ) {
53- if let item @ Some ( _) = futures_core:: ready!( inner. poll_next( cx) ) {
54- return Poll :: Ready ( item) ;
53+ match futures_core:: ready!( inner. poll_next( cx) ) {
54+ item @ Some ( _) => return Poll :: Ready ( item) ,
55+ None => this. inner_stream . set ( None ) ,
5556 }
5657 }
5758
5859 match futures_core:: ready!( this. stream. as_mut( ) . poll_next( cx) ) {
60+ inner @ Some ( _) => this. inner_stream . set ( inner. map ( IntoStream :: into_stream) ) ,
5961 None => return Poll :: Ready ( None ) ,
60- Some ( inner) => this. inner_stream . set ( Some ( inner. into_stream ( ) ) ) ,
6162 }
6263 }
6364 }
Original file line number Diff line number Diff line change @@ -52,14 +52,15 @@ where
5252 let mut this = self . project ( ) ;
5353 loop {
5454 if let Some ( inner) = this. inner_stream . as_mut ( ) . as_pin_mut ( ) {
55- if let item @ Some ( _) = futures_core:: ready!( inner. poll_next( cx) ) {
56- return Poll :: Ready ( item) ;
55+ match futures_core:: ready!( inner. poll_next( cx) ) {
56+ item @ Some ( _) => return Poll :: Ready ( item) ,
57+ None => this. inner_stream . set ( None ) ,
5758 }
5859 }
5960
6061 match futures_core:: ready!( this. stream. as_mut( ) . poll_next( cx) ) {
62+ inner @ Some ( _) => this. inner_stream . set ( inner. map ( IntoStream :: into_stream) ) ,
6163 None => return Poll :: Ready ( None ) ,
62- Some ( inner) => this. inner_stream . set ( Some ( inner. into_stream ( ) ) ) ,
6364 }
6465 }
6566 }
Original file line number Diff line number Diff line change 1+ use std:: convert:: identity;
2+ use std:: marker:: Unpin ;
13use std:: pin:: Pin ;
24use std:: task:: { Context , Poll } ;
35
@@ -108,3 +110,74 @@ fn merge_works_with_unfused_streams() {
108110 assert_eq ! ( xs, vec![ 92 , 92 ] ) ;
109111 } ) ;
110112}
113+
114+ struct S < T > ( T ) ;
115+
116+ impl < T : Stream + Unpin > Stream for S < T > {
117+ type Item = T :: Item ;
118+
119+ fn poll_next ( mut self : Pin < & mut Self > , ctx : & mut Context ) -> Poll < Option < Self :: Item > > {
120+ unsafe { Pin :: new_unchecked ( & mut self . 0 ) } . poll_next ( ctx)
121+ }
122+ }
123+
124+ struct StrictOnce {
125+ polled : bool ,
126+ }
127+
128+ impl Stream for StrictOnce {
129+ type Item = ( ) ;
130+
131+ fn poll_next ( mut self : Pin < & mut Self > , _: & mut Context ) -> Poll < Option < Self :: Item > > {
132+ assert ! ( !self . polled, "Polled after completion!" ) ;
133+ self . polled = true ;
134+ Poll :: Ready ( None )
135+ }
136+ }
137+
138+ struct Interchanger {
139+ polled : bool ,
140+ }
141+
142+ impl Stream for Interchanger {
143+ type Item = S < Box < dyn Stream < Item = ( ) > + Unpin > > ;
144+
145+ fn poll_next ( mut self : Pin < & mut Self > , ctx : & mut Context ) -> Poll < Option < Self :: Item > > {
146+ if self . polled {
147+ self . polled = false ;
148+ ctx. waker ( ) . wake_by_ref ( ) ;
149+ Poll :: Pending
150+ } else {
151+ self . polled = true ;
152+ Poll :: Ready ( Some ( S ( Box :: new ( StrictOnce { polled : false } ) ) ) )
153+ }
154+ }
155+ }
156+
157+ #[ test]
158+ fn flat_map_doesnt_poll_completed_inner_stream ( ) {
159+ task:: block_on ( async {
160+ assert_eq ! (
161+ Interchanger { polled: false }
162+ . take( 2 )
163+ . flat_map( identity)
164+ . count( )
165+ . await ,
166+ 0
167+ ) ;
168+ } ) ;
169+ }
170+
171+ #[ test]
172+ fn flatten_doesnt_poll_completed_inner_stream ( ) {
173+ task:: block_on ( async {
174+ assert_eq ! (
175+ Interchanger { polled: false }
176+ . take( 2 )
177+ . flatten( )
178+ . count( )
179+ . await ,
180+ 0
181+ ) ;
182+ } ) ;
183+ }
You can’t perform that action at this time.
0 commit comments