@@ -31,7 +31,7 @@ pub fn map<St, U, F>(stream: St, f: F) -> impl Stream<Item = U>
3131 F : FnMut ( St :: Item ) -> U ,
3232{
3333 let stream = Box :: pin ( stream) ;
34- futures :: stream :: unfold ( ( stream, f) , async move | ( mut stream, mut f) | {
34+ unfold ( ( stream, f) , async move | ( mut stream, mut f) | {
3535 let item = next ( & mut stream) . await ;
3636 item. map ( |item| ( f ( item) , ( stream, f) ) )
3737 } )
@@ -43,7 +43,7 @@ pub fn filter<St, Fut, F>(stream: St, f: F) -> impl Stream<Item = St::Item>
4343 Fut : Future < Output = bool >
4444{
4545 let stream = Box :: pin ( stream) ;
46- futures :: stream :: unfold ( ( stream, f) , async move | ( mut stream, mut f) | {
46+ unfold ( ( stream, f) , async move | ( mut stream, mut f) | {
4747 while let Some ( item) = next ( & mut stream) . await {
4848 let matched = f ( & item) . await ;
4949 if matched {
@@ -62,7 +62,7 @@ pub fn filter_map<St, Fut, F, U>(stream: St, f: F) -> impl Stream<Item = U>
6262 Fut : Future < Output = Option < U > >
6363{
6464 let stream = Box :: pin ( stream) ;
65- futures :: stream :: unfold ( ( stream, f) , async move | ( mut stream, mut f) | {
65+ unfold ( ( stream, f) , async move | ( mut stream, mut f) | {
6666 while let Some ( item) = next ( & mut stream) . await {
6767 if let Some ( item) = f ( item) . await {
6868 return Some ( ( item, ( stream, f) ) )
@@ -122,7 +122,7 @@ pub fn take<St>(stream: St, n: u64) -> impl Stream<Item = St::Item>
122122 where St : Stream ,
123123{
124124 let stream = Box :: pin ( stream) ;
125- futures :: stream :: unfold ( ( stream, n) , async move | ( mut stream, n) | {
125+ unfold ( ( stream, n) , async move | ( mut stream, n) | {
126126 if n == 0 {
127127 None
128128 } else {
@@ -149,7 +149,7 @@ pub fn flatten<St, SubSt, T>(stream: St) -> impl Stream<Item = T>
149149 St : Stream < Item = SubSt > ,
150150{
151151 let stream = Box :: pin ( stream) ;
152- futures :: stream :: unfold ( ( Some ( stream) , None ) , async move | ( mut state_stream, mut state_substream) | {
152+ unfold ( ( Some ( stream) , None ) , async move | ( mut state_stream, mut state_substream) | {
153153 loop {
154154 if let Some ( mut substream) = state_substream. take ( ) {
155155 if let Some ( item) = next ( & mut substream) . await {
@@ -177,7 +177,7 @@ pub fn then<St, F, Fut>(stream: St, f: F) -> impl Stream<Item = St::Item>
177177 Fut : Future < Output = St :: Item >
178178{
179179 let stream = Box :: pin ( stream) ;
180- futures :: stream :: unfold ( ( stream, f) , async move | ( mut stream, mut f) | {
180+ unfold ( ( stream, f) , async move | ( mut stream, mut f) | {
181181 let item = next ( & mut stream) . await ;
182182 if let Some ( item) = item {
183183 let new_item = f ( item) . await ;
@@ -192,7 +192,7 @@ pub fn skip<St>(stream: St, n: u64) -> impl Stream<Item = St::Item>
192192 where St : Stream ,
193193{
194194 let stream = Box :: pin ( stream) ;
195- futures :: stream :: unfold ( ( stream, n) , async move | ( mut stream, mut n) | {
195+ unfold ( ( stream, n) , async move | ( mut stream, mut n) | {
196196 while n != 0 {
197197 if let Some ( _) = next ( & mut stream) . await {
198198 n = n - 1 ;
@@ -215,7 +215,7 @@ pub fn zip<St1, St2>(stream: St1, other: St2) -> impl Stream<Item = (St1::Item,
215215{
216216 let stream = Box :: pin ( stream) ;
217217 let other = Box :: pin ( other) ;
218- futures :: stream :: unfold ( ( stream, other) , async move | ( mut stream, mut other) | {
218+ unfold ( ( stream, other) , async move | ( mut stream, mut other) | {
219219 let left = next ( & mut stream) . await ;
220220 let right = next ( & mut other) . await ;
221221 match ( left, right) {
@@ -231,7 +231,7 @@ pub fn chain<St>(stream: St, other: St) -> impl Stream<Item = St::Item>
231231 let stream = Box :: pin ( stream) ;
232232 let other = Box :: pin ( other) ;
233233 let start_with_first = true ;
234- futures :: stream :: unfold ( ( stream, other, start_with_first) , async move | ( mut stream, mut other, start_with_first) | {
234+ unfold ( ( stream, other, start_with_first) , async move | ( mut stream, mut other, start_with_first) | {
235235 if start_with_first {
236236 if let Some ( item) = next ( & mut stream) . await {
237237 return Some ( ( item, ( stream, other, start_with_first) ) )
@@ -251,7 +251,7 @@ pub fn take_while<St, F, Fut>(stream: St, f: F) -> impl Stream<Item = St::Item>
251251 Fut : Future < Output = bool > ,
252252{
253253 let stream = Box :: pin ( stream) ;
254- futures :: stream :: unfold ( ( stream, f) , async move | ( mut stream, mut f) | {
254+ unfold ( ( stream, f) , async move | ( mut stream, mut f) | {
255255 if let Some ( item) = next ( & mut stream) . await {
256256 if f ( & item) . await {
257257 Some ( ( item, ( stream, f) ) )
@@ -271,7 +271,7 @@ pub fn skip_while<St, F, Fut>(stream: St, f: F) -> impl Stream<Item = St::Item>
271271{
272272 let stream = Box :: pin ( stream) ;
273273 let should_skip = true ;
274- futures :: stream :: unfold ( ( stream, f, should_skip) , async move | ( mut stream, mut f, should_skip) | {
274+ unfold ( ( stream, f, should_skip) , async move | ( mut stream, mut f, should_skip) | {
275275 while should_skip {
276276 if let Some ( item) = next ( & mut stream) . await {
277277 if f ( & item) . await {
@@ -305,6 +305,36 @@ pub async fn fold<St, T, F, Fut>(stream: St, init: T, f: F) -> T
305305 acc
306306}
307307
308+ pub fn unfold < T , F , Fut , It > ( init : T , mut f : F ) -> impl Stream < Item = It >
309+ where F : FnMut ( T ) -> Fut ,
310+ Fut : Future < Output = Option < ( It , T ) > > ,
311+ {
312+ use core:: task:: Poll ;
313+ enum State < T , Fut > {
314+ Paused ( T ) ,
315+ Running ( Pin < Box < Fut > > ) ,
316+ }
317+ let mut state = Some ( State :: Paused ( init) ) ;
318+ futures:: stream:: poll_fn ( move |waker| -> Poll < Option < It > > {
319+ let mut future = match state. take ( ) {
320+ Some ( State :: Running ( fut) ) => fut,
321+ Some ( State :: Paused ( st) ) => Box :: pin ( f ( st) ) ,
322+ None => panic ! ( "this stream must not be polled any more" ) ,
323+ } ;
324+ match future. as_mut ( ) . poll ( waker) {
325+ Poll :: Pending => {
326+ state = Some ( State :: Running ( future) ) ;
327+ Poll :: Pending
328+ } ,
329+ Poll :: Ready ( None ) => Poll :: Ready ( None ) ,
330+ Poll :: Ready ( Some ( ( item, new_state) ) ) => {
331+ state = Some ( State :: Paused ( new_state) ) ;
332+ Poll :: Ready ( Some ( item) )
333+ } ,
334+ }
335+ } )
336+ }
337+
308338#[ cfg( test) ]
309339mod tests {
310340 use futures:: executor;
@@ -487,4 +517,18 @@ mod tests {
487517
488518 assert_eq ! ( 15 , executor:: block_on( sum) ) ;
489519 }
520+
521+ #[ test]
522+ fn test_unfold ( ) {
523+ let stream = unfold ( 0 , |state| {
524+ if state <= 2 {
525+ let next_state = state + 1 ;
526+ let yielded = state * 2 ;
527+ ready ( Some ( ( yielded, next_state) ) )
528+ } else {
529+ ready ( None )
530+ }
531+ } ) ;
532+ assert_eq ! ( vec![ 0 , 2 , 4 ] , executor:: block_on( collect:: <_, Vec <_>>( stream) ) ) ;
533+ }
490534}
0 commit comments