11pub use futures:: stream:: Stream ;
22use futures:: future:: Future ;
3+ use futures_async_stream:: async_stream_block;
34
45use core:: pin:: Pin ;
56use core:: iter:: IntoIterator ;
@@ -97,11 +98,13 @@ pub fn map<St, U, F>(stream: St, f: F) -> impl Stream<Item = U>
9798 where St : Stream ,
9899 F : FnMut ( St :: Item ) -> U ,
99100{
100- let stream = Box :: pin ( stream) ;
101- unfold ( ( stream, f) , async move | ( mut stream, mut f) | {
102- let item = next ( & mut stream) . await ;
103- item. map ( |item| ( f ( item) , ( stream, f) ) )
104- } )
101+ let mut f = f;
102+ async_stream_block ! {
103+ #[ for_await]
104+ for item in stream {
105+ yield f( item)
106+ }
107+ }
105108}
106109
107110/// Filters the values produced by this stream according to the provided
@@ -136,18 +139,15 @@ pub fn filter<St, Fut, F>(stream: St, f: F) -> impl Stream<Item = St::Item>
136139 F : FnMut ( & St :: Item ) -> Fut ,
137140 Fut : Future < Output = bool >
138141{
139- let stream = Box :: pin ( stream) ;
140- unfold ( ( stream, f) , async move | ( mut stream, mut f) | {
141- while let Some ( item) = next ( & mut stream) . await {
142- let matched = f ( & item) . await ;
143- if matched {
144- return Some ( ( item, ( stream, f) ) )
145- } else {
146- continue ;
142+ let mut f = f;
143+ async_stream_block ! {
144+ #[ for_await]
145+ for item in stream {
146+ if f( & item) . await {
147+ yield item
147148 }
148- } ;
149- None
150- } )
149+ }
150+ }
151151}
152152
153153/// Filters the values produced by this stream while simultaneously mapping
@@ -183,17 +183,15 @@ pub fn filter_map<St, Fut, F, U>(stream: St, f: F) -> impl Stream<Item = U>
183183 F : FnMut ( St :: Item ) -> Fut ,
184184 Fut : Future < Output = Option < U > >
185185{
186- let stream = Box :: pin ( stream) ;
187- unfold ( ( stream, f) , async move | ( mut stream, mut f) | {
188- while let Some ( item) = next ( & mut stream) . await {
186+ let mut f = f;
187+ async_stream_block ! {
188+ #[ for_await]
189+ for item in stream {
189190 if let Some ( item) = f( item) . await {
190- return Some ( ( item, ( stream, f) ) )
191- } else {
192- continue ;
191+ yield item
193192 }
194- } ;
195- None
196- } )
193+ }
194+ }
197195}
198196
199197/// Converts this stream into a future of `(next_item, tail_of_stream)`.
@@ -370,18 +368,18 @@ pub async fn for_each<St, Fut, F>(stream: St, f: F) -> ()
370368pub fn take < St > ( stream : St , n : u64 ) -> impl Stream < Item = St :: Item >
371369 where St : Stream ,
372370{
373- let stream = Box :: pin ( stream) ;
374- unfold ( ( stream, n) , async move | ( mut stream, n) | {
375- if n == 0 {
376- None
377- } else {
378- if let Some ( item) = next ( & mut stream) . await {
379- Some ( ( item, ( stream, n - 1 ) ) )
371+ let mut n = n;
372+ async_stream_block ! {
373+ #[ for_await]
374+ for item in stream {
375+ if n == 0 {
376+ break ;
380377 } else {
381- None
378+ n = n - 1 ;
379+ yield item
382380 }
383381 }
384- } )
382+ }
385383}
386384
387385/// Create a stream which produces the same item repeatedly.
@@ -435,27 +433,15 @@ pub fn flatten<St, SubSt, T>(stream: St) -> impl Stream<Item = T>
435433 where SubSt : Stream < Item = T > ,
436434 St : Stream < Item = SubSt > ,
437435{
438- let stream = Box :: pin ( stream) ;
439- unfold ( ( Some ( stream) , None ) , async move | ( mut state_stream, mut state_substream) | {
440- loop {
441- if let Some ( mut substream) = state_substream. take ( ) {
442- if let Some ( item) = next ( & mut substream) . await {
443- return Some ( ( item, ( state_stream, Some ( substream) ) ) )
444- } else {
445- continue ;
446- }
447- }
448- if let Some ( mut stream) = state_stream. take ( ) {
449- if let Some ( substream) = next ( & mut stream) . await {
450- let substream = Box :: pin ( substream) ;
451- state_stream = Some ( stream) ;
452- state_substream = Some ( substream) ;
453- continue ;
454- }
436+ async_stream_block ! {
437+ #[ for_await]
438+ for substream in stream {
439+ #[ for_await]
440+ for item in substream {
441+ yield item
455442 }
456- return None ;
457443 }
458- } )
444+ }
459445}
460446
461447/// Computes from this stream's items new items of a different type using
@@ -488,16 +474,14 @@ pub fn then<St, F, Fut>(stream: St, f: F) -> impl Stream<Item = St::Item>
488474 F : FnMut ( St :: Item ) -> Fut ,
489475 Fut : Future < Output = St :: Item >
490476{
491- let stream = Box :: pin ( stream ) ;
492- unfold ( ( stream , f ) , async move | ( mut stream , mut f ) | {
493- let item = next ( & mut stream ) . await ;
494- if let Some ( item) = item {
477+ let mut f = f ;
478+ async_stream_block ! {
479+ # [ for_await ]
480+ for item in stream {
495481 let new_item = f( item) . await ;
496- Some ( ( new_item, ( stream, f) ) )
497- } else {
498- None
482+ yield new_item
499483 }
500- } )
484+ }
501485}
502486
503487/// Creates a new stream which skips `n` items of the underlying stream.
@@ -522,22 +506,18 @@ pub fn then<St, F, Fut>(stream: St, f: F) -> impl Stream<Item = St::Item>
522506pub fn skip < St > ( stream : St , n : u64 ) -> impl Stream < Item = St :: Item >
523507 where St : Stream ,
524508{
525- let stream = Box :: pin ( stream ) ;
526- unfold ( ( stream , n ) , async move | ( mut stream , mut n ) | {
527- while n != 0 {
528- if let Some ( _ ) = next ( & mut stream) . await {
529- n = n - 1 ;
530- continue
509+ let mut n = n ;
510+ async_stream_block ! {
511+ # [ for_await ]
512+ for item in stream {
513+ if n == 0 {
514+ yield item
531515 } else {
532- return None
516+ n = n - 1 ;
517+ continue ;
533518 }
534519 }
535- if let Some ( item) = next ( & mut stream) . await {
536- Some ( ( item, ( stream, 0 ) ) )
537- } else {
538- None
539- }
540- } )
520+ }
541521}
542522
543523/// An adapter for zipping two streams together.
@@ -566,16 +546,18 @@ pub fn zip<St1, St2>(stream: St1, other: St2) -> impl Stream<Item = (St1::Item,
566546 where St1 : Stream ,
567547 St2 : Stream ,
568548{
569- let stream = Box :: pin ( stream) ;
570- let other = Box :: pin ( other) ;
571- unfold ( ( stream, other) , async move | ( mut stream, mut other) | {
572- let left = next ( & mut stream) . await ;
573- let right = next ( & mut other) . await ;
574- match ( left, right) {
575- ( Some ( left) , Some ( right) ) => Some ( ( ( left, right) , ( stream, other) ) ) ,
576- _ => None
549+ let mut stream = Box :: pin ( stream) ;
550+ let mut other = Box :: pin ( other) ;
551+ async_stream_block ! {
552+ loop {
553+ let left = next( & mut stream) . await ;
554+ let right = next( & mut other) . await ;
555+ match ( left, right) {
556+ ( Some ( left) , Some ( right) ) => yield ( left, right) ,
557+ _ => break ,
558+ }
577559 }
578- } )
560+ }
579561}
580562
581563/// Adapter for chaining two stream.
@@ -605,21 +587,16 @@ pub fn zip<St1, St2>(stream: St1, other: St2) -> impl Stream<Item = (St1::Item,
605587pub fn chain < St > ( stream : St , other : St ) -> impl Stream < Item = St :: Item >
606588 where St : Stream ,
607589{
608- let stream = Box :: pin ( stream) ;
609- let other = Box :: pin ( other) ;
610- let start_with_first = true ;
611- unfold ( ( stream, other, start_with_first) , async move | ( mut stream, mut other, start_with_first) | {
612- if start_with_first {
613- if let Some ( item) = next ( & mut stream) . await {
614- return Some ( ( item, ( stream, other, start_with_first) ) )
615- }
590+ async_stream_block ! {
591+ #[ for_await]
592+ for item in stream {
593+ yield item
616594 }
617- if let Some ( item) = next ( & mut other) . await {
618- Some ( ( item, ( stream, other, /* start_with_first */ false ) ) )
619- } else {
620- None
595+ #[ for_await]
596+ for item in other {
597+ yield item
621598 }
622- } )
599+ }
623600}
624601
625602/// Take elements from this stream while the provided asynchronous predicate
@@ -649,18 +626,17 @@ pub fn take_while<St, F, Fut>(stream: St, f: F) -> impl Stream<Item = St::Item>
649626 F : FnMut ( & St :: Item ) -> Fut ,
650627 Fut : Future < Output = bool > ,
651628{
652- let stream = Box :: pin ( stream) ;
653- unfold ( ( stream, f) , async move | ( mut stream, mut f) | {
654- if let Some ( item) = next ( & mut stream) . await {
629+ let mut f = f;
630+ async_stream_block ! {
631+ #[ for_await]
632+ for item in stream {
655633 if f( & item) . await {
656- Some ( ( item , ( stream , f ) ) )
634+ yield item
657635 } else {
658- None
636+ break ;
659637 }
660- } else {
661- None
662638 }
663- } )
639+ }
664640}
665641
666642/// Skip elements on this stream while the provided asynchronous predicate
@@ -690,26 +666,23 @@ pub fn skip_while<St, F, Fut>(stream: St, f: F) -> impl Stream<Item = St::Item>
690666 F : FnMut ( & St :: Item ) -> Fut ,
691667 Fut : Future < Output = bool > ,
692668{
693- let stream = Box :: pin ( stream) ;
694- let should_skip = true ;
695- unfold ( ( stream, f, should_skip) , async move | ( mut stream, mut f, should_skip) | {
696- while should_skip {
697- if let Some ( item) = next ( & mut stream) . await {
669+ let mut f = f;
670+ let mut should_skip = true ;
671+ async_stream_block ! {
672+ #[ for_await]
673+ for item in stream {
674+ if should_skip {
698675 if f( & item) . await {
699676 continue ;
700677 } else {
701- return Some ( ( item, ( stream, f, /* should_skip */ false ) ) )
678+ should_skip = false ;
679+ yield item
702680 }
703681 } else {
704- return None
682+ yield item
705683 }
706684 }
707- if let Some ( item) = next ( & mut stream) . await {
708- Some ( ( item, ( stream, f, /* should_skip */ false ) ) )
709- } else {
710- None
711- }
712- } )
685+ }
713686}
714687
715688/// Execute an accumulating asynchronous computation over a stream,
0 commit comments