1- use core :: future:: Future ;
1+ use futures_core :: future:: Future ;
22pub use futures_core:: stream:: Stream ;
3+ use futures_async_stream:: async_stream_block;
34
45use core:: iter:: IntoIterator ;
56use core:: pin:: Pin ;
@@ -97,11 +98,13 @@ where
9798 St : Stream ,
9899 F : FnMut ( St :: Item ) -> U ,
99100{
100- let stream = Box :: pin ( stream) ;
101- unfold ( ( stream, f) , async move |( mut stream, mut f) | {
102- let item = next ( & mut stream) . await ;
103- item. map ( |item| ( f ( item) , ( stream, f) ) )
104- } )
101+ let mut f = f;
102+ async_stream_block ! {
103+ #[ for_await]
104+ for item in stream {
105+ yield f( item)
106+ }
107+ }
105108}
106109
107110/// Filters the values produced by this stream according to the provided
@@ -136,18 +139,15 @@ where
136139 F : FnMut ( & St :: Item ) -> Fut ,
137140 Fut : Future < Output = bool > ,
138141{
139- let stream = Box :: pin ( stream) ;
140- unfold ( ( stream, f) , async move |( mut stream, mut f) | {
141- while let Some ( item) = next ( & mut stream) . await {
142- let matched = f ( & item) . await ;
143- if matched {
144- return Some ( ( item, ( stream, f) ) ) ;
145- } else {
146- continue ;
142+ let mut f = f;
143+ async_stream_block ! {
144+ #[ for_await]
145+ for item in stream {
146+ if f( & item) . await {
147+ yield item
147148 }
148149 }
149- None
150- } )
150+ }
151151}
152152
153153/// Filters the values produced by this stream while simultaneously mapping
@@ -183,17 +183,15 @@ where
183183 F : FnMut ( St :: Item ) -> Fut ,
184184 Fut : Future < Output = Option < U > > ,
185185{
186- let stream = Box :: pin ( stream) ;
187- unfold ( ( stream, f) , async move |( mut stream, mut f) | {
188- while let Some ( item) = next ( & mut stream) . await {
186+ let mut f = f;
187+ async_stream_block ! {
188+ #[ for_await]
189+ for item in stream {
189190 if let Some ( item) = f( item) . await {
190- return Some ( ( item, ( stream, f) ) ) ;
191- } else {
192- continue ;
191+ yield item
193192 }
194193 }
195- None
196- } )
194+ }
197195}
198196
199197/// Converts this stream into a future of `(next_item, tail_of_stream)`.
@@ -366,18 +364,18 @@ pub fn take<St>(stream: St, n: u64) -> impl Stream<Item = St::Item>
366364where
367365 St : Stream ,
368366{
369- let stream = Box :: pin ( stream) ;
370- unfold ( ( stream, n) , async move |( mut stream, n) | {
371- if n == 0 {
372- None
373- } else {
374- if let Some ( item) = next ( & mut stream) . await {
375- Some ( ( item, ( stream, n - 1 ) ) )
367+ let mut n = n;
368+ async_stream_block ! {
369+ #[ for_await]
370+ for item in stream {
371+ if n == 0 {
372+ break ;
376373 } else {
377- None
374+ n = n - 1 ;
375+ yield item
378376 }
379377 }
380- } )
378+ }
381379}
382380
383381/// Create a stream which produces the same item repeatedly.
@@ -428,28 +426,15 @@ where
428426 SubSt : Stream < Item = T > ,
429427 St : Stream < Item = SubSt > ,
430428{
431- let stream = Box :: pin ( stream) ;
432- unfold (
433- ( Some ( stream) , None ) ,
434- async move |( mut state_stream, mut state_substream) | loop {
435- if let Some ( mut substream) = state_substream. take ( ) {
436- if let Some ( item) = next ( & mut substream) . await {
437- return Some ( ( item, ( state_stream, Some ( substream) ) ) ) ;
438- } else {
439- continue ;
440- }
441- }
442- if let Some ( mut stream) = state_stream. take ( ) {
443- if let Some ( substream) = next ( & mut stream) . await {
444- let substream = Box :: pin ( substream) ;
445- state_stream = Some ( stream) ;
446- state_substream = Some ( substream) ;
447- continue ;
448- }
429+ async_stream_block ! {
430+ #[ for_await]
431+ for substream in stream {
432+ #[ for_await]
433+ for item in substream {
434+ yield item
449435 }
450- return None ;
451- } ,
452- )
436+ }
437+ }
453438}
454439
455440/// Computes from this stream's items new items of a different type using
@@ -481,16 +466,14 @@ where
481466 F : FnMut ( St :: Item ) -> Fut ,
482467 Fut : Future < Output = St :: Item > ,
483468{
484- let stream = Box :: pin ( stream ) ;
485- unfold ( ( stream , f ) , async move | ( mut stream , mut f ) | {
486- let item = next ( & mut stream ) . await ;
487- if let Some ( item) = item {
469+ let mut f = f ;
470+ async_stream_block ! {
471+ # [ for_await ]
472+ for item in stream {
488473 let new_item = f( item) . await ;
489- Some ( ( new_item, ( stream, f) ) )
490- } else {
491- None
474+ yield new_item
492475 }
493- } )
476+ }
494477}
495478
496479/// Creates a new stream which skips `n` items of the underlying stream.
@@ -515,22 +498,18 @@ pub fn skip<St>(stream: St, n: u64) -> impl Stream<Item = St::Item>
515498where
516499 St : Stream ,
517500{
518- let stream = Box :: pin ( stream) ;
519- unfold ( ( stream, n) , async move |( mut stream, mut n) | {
520- while n != 0 {
521- if let Some ( _) = next ( & mut stream) . await {
501+ let mut n = n;
502+ async_stream_block ! {
503+ #[ for_await]
504+ for item in stream {
505+ if n == 0 {
506+ yield item
507+ } else {
522508 n = n - 1 ;
523509 continue ;
524- } else {
525- return None ;
526510 }
527511 }
528- if let Some ( item) = next ( & mut stream) . await {
529- Some ( ( item, ( stream, 0 ) ) )
530- } else {
531- None
532- }
533- } )
512+ }
534513}
535514
536515/// An adapter for zipping two streams together.
@@ -559,16 +538,18 @@ where
559538 St1 : Stream ,
560539 St2 : Stream ,
561540{
562- let stream = Box :: pin ( stream) ;
563- let other = Box :: pin ( other) ;
564- unfold ( ( stream, other) , async move |( mut stream, mut other) | {
565- let left = next ( & mut stream) . await ;
566- let right = next ( & mut other) . await ;
567- match ( left, right) {
568- ( Some ( left) , Some ( right) ) => Some ( ( ( left, right) , ( stream, other) ) ) ,
569- _ => None ,
541+ let mut stream = Box :: pin ( stream) ;
542+ let mut other = Box :: pin ( other) ;
543+ async_stream_block ! {
544+ loop {
545+ let left = next( & mut stream) . await ;
546+ let right = next( & mut other) . await ;
547+ match ( left, right) {
548+ ( Some ( left) , Some ( right) ) => yield ( left, right) ,
549+ _ => break ,
550+ }
570551 }
571- } )
552+ }
572553}
573554
574555/// Adapter for chaining two stream.
@@ -598,24 +579,16 @@ pub fn chain<St>(stream: St, other: St) -> impl Stream<Item = St::Item>
598579where
599580 St : Stream ,
600581{
601- let stream = Box :: pin ( stream) ;
602- let other = Box :: pin ( other) ;
603- let start_with_first = true ;
604- unfold (
605- ( stream, other, start_with_first) ,
606- async move |( mut stream, mut other, start_with_first) | {
607- if start_with_first {
608- if let Some ( item) = next ( & mut stream) . await {
609- return Some ( ( item, ( stream, other, start_with_first) ) ) ;
610- }
611- }
612- if let Some ( item) = next ( & mut other) . await {
613- Some ( ( item, ( stream, other, /* start_with_first */ false ) ) )
614- } else {
615- None
616- }
617- } ,
618- )
582+ async_stream_block ! {
583+ #[ for_await]
584+ for item in stream {
585+ yield item
586+ }
587+ #[ for_await]
588+ for item in other {
589+ yield item
590+ }
591+ }
619592}
620593
621594/// Take elements from this stream while the provided asynchronous predicate
@@ -644,18 +617,17 @@ where
644617 F : FnMut ( & St :: Item ) -> Fut ,
645618 Fut : Future < Output = bool > ,
646619{
647- let stream = Box :: pin ( stream) ;
648- unfold ( ( stream, f) , async move |( mut stream, mut f) | {
649- if let Some ( item) = next ( & mut stream) . await {
620+ let mut f = f;
621+ async_stream_block ! {
622+ #[ for_await]
623+ for item in stream {
650624 if f( & item) . await {
651- Some ( ( item , ( stream , f ) ) )
625+ yield item
652626 } else {
653- None
627+ break ;
654628 }
655- } else {
656- None
657629 }
658- } )
630+ }
659631}
660632
661633/// Skip elements on this stream while the provided asynchronous predicate
@@ -685,29 +657,23 @@ where
685657 F : FnMut ( & St :: Item ) -> Fut ,
686658 Fut : Future < Output = bool > ,
687659{
688- let stream = Box :: pin ( stream) ;
689- let should_skip = true ;
690- unfold (
691- ( stream, f, should_skip) ,
692- async move |( mut stream, mut f, should_skip) | {
693- while should_skip {
694- if let Some ( item) = next ( & mut stream) . await {
695- if f ( & item) . await {
696- continue ;
697- } else {
698- return Some ( ( item, ( stream, f, /* should_skip */ false ) ) ) ;
699- }
660+ let mut f = f;
661+ let mut should_skip = true ;
662+ async_stream_block ! {
663+ #[ for_await]
664+ for item in stream {
665+ if should_skip {
666+ if f( & item) . await {
667+ continue ;
700668 } else {
701- return None ;
669+ should_skip = false ;
670+ yield item
702671 }
703- }
704- if let Some ( item) = next ( & mut stream) . await {
705- Some ( ( item, ( stream, f, /* should_skip */ false ) ) )
706672 } else {
707- None
673+ yield item
708674 }
709- } ,
710- )
675+ }
676+ }
711677}
712678
713679/// Execute an accumulating asynchronous computation over a stream,
0 commit comments