1- use futures :: future:: Future ;
1+ use core :: future:: Future ;
22pub use futures:: stream:: Stream ;
33
4+ use futures_async_stream:: async_stream_block;
5+
46use core:: iter:: IntoIterator ;
57use core:: pin:: Pin ;
68
@@ -97,11 +99,13 @@ where
9799 St : Stream ,
98100 F : FnMut ( St :: Item ) -> U ,
99101{
100- let stream = Box :: pin ( stream) ;
101- unfold ( ( stream, f) , async move |( mut stream, mut f) | {
102- let item = next ( & mut stream) . await ;
103- item. map ( |item| ( f ( item) , ( stream, f) ) )
104- } )
102+ let mut f = f;
103+ async_stream_block ! {
104+ #[ for_await]
105+ for item in stream {
106+ yield f( item)
107+ }
108+ }
105109}
106110
107111/// Filters the values produced by this stream according to the provided
@@ -136,18 +140,15 @@ where
136140 F : FnMut ( & St :: Item ) -> Fut ,
137141 Fut : Future < Output = bool > ,
138142{
139- let stream = Box :: pin ( stream) ;
140- unfold ( ( stream, f) , async move |( mut stream, mut f) | {
141- while let Some ( item) = next ( & mut stream) . await {
142- let matched = f ( & item) . await ;
143- if matched {
144- return Some ( ( item, ( stream, f) ) ) ;
145- } else {
146- continue ;
143+ let mut f = f;
144+ async_stream_block ! {
145+ #[ for_await]
146+ for item in stream {
147+ if f( & item) . await {
148+ yield item
147149 }
148150 }
149- None
150- } )
151+ }
151152}
152153
153154/// Filters the values produced by this stream while simultaneously mapping
@@ -183,17 +184,15 @@ where
183184 F : FnMut ( St :: Item ) -> Fut ,
184185 Fut : Future < Output = Option < U > > ,
185186{
186- let stream = Box :: pin ( stream) ;
187- unfold ( ( stream, f) , async move |( mut stream, mut f) | {
188- while let Some ( item) = next ( & mut stream) . await {
187+ let mut f = f;
188+ async_stream_block ! {
189+ #[ for_await]
190+ for item in stream {
189191 if let Some ( item) = f( item) . await {
190- return Some ( ( item, ( stream, f) ) ) ;
191- } else {
192- continue ;
192+ yield item
193193 }
194194 }
195- None
196- } )
195+ }
197196}
198197
199198/// Converts this stream into a future of `(next_item, tail_of_stream)`.
@@ -367,18 +366,18 @@ pub fn take<St>(stream: St, n: u64) -> impl Stream<Item = St::Item>
367366where
368367 St : Stream ,
369368{
370- let stream = Box :: pin ( stream) ;
371- unfold ( ( stream, n) , async move |( mut stream, n) | {
372- if n == 0 {
373- None
374- } else {
375- if let Some ( item) = next ( & mut stream) . await {
376- Some ( ( item, ( stream, n - 1 ) ) )
369+ let mut n = n;
370+ async_stream_block ! {
371+ #[ for_await]
372+ for item in stream {
373+ if n == 0 {
374+ break ;
377375 } else {
378- None
376+ n = n - 1 ;
377+ yield item
379378 }
380379 }
381- } )
380+ }
382381}
383382
384383/// Create a stream which produces the same item repeatedly.
@@ -430,28 +429,15 @@ where
430429 SubSt : Stream < Item = T > ,
431430 St : Stream < Item = SubSt > ,
432431{
433- let stream = Box :: pin ( stream) ;
434- unfold (
435- ( Some ( stream) , None ) ,
436- async move |( mut state_stream, mut state_substream) | loop {
437- if let Some ( mut substream) = state_substream. take ( ) {
438- if let Some ( item) = next ( & mut substream) . await {
439- return Some ( ( item, ( state_stream, Some ( substream) ) ) ) ;
440- } else {
441- continue ;
442- }
443- }
444- if let Some ( mut stream) = state_stream. take ( ) {
445- if let Some ( substream) = next ( & mut stream) . await {
446- let substream = Box :: pin ( substream) ;
447- state_stream = Some ( stream) ;
448- state_substream = Some ( substream) ;
449- continue ;
450- }
432+ async_stream_block ! {
433+ #[ for_await]
434+ for substream in stream {
435+ #[ for_await]
436+ for item in substream {
437+ yield item
451438 }
452- return None ;
453- } ,
454- )
439+ }
440+ }
455441}
456442
457443/// Computes from this stream's items new items of a different type using
@@ -483,16 +469,14 @@ where
483469 F : FnMut ( St :: Item ) -> Fut ,
484470 Fut : Future < Output = St :: Item > ,
485471{
486- let stream = Box :: pin ( stream ) ;
487- unfold ( ( stream , f ) , async move | ( mut stream , mut f ) | {
488- let item = next ( & mut stream ) . await ;
489- if let Some ( item) = item {
472+ let mut f = f ;
473+ async_stream_block ! {
474+ # [ for_await ]
475+ for item in stream {
490476 let new_item = f( item) . await ;
491- Some ( ( new_item, ( stream, f) ) )
492- } else {
493- None
477+ yield new_item
494478 }
495- } )
479+ }
496480}
497481
498482/// Creates a new stream which skips `n` items of the underlying stream.
@@ -517,22 +501,18 @@ pub fn skip<St>(stream: St, n: u64) -> impl Stream<Item = St::Item>
517501where
518502 St : Stream ,
519503{
520- let stream = Box :: pin ( stream) ;
521- unfold ( ( stream, n) , async move |( mut stream, mut n) | {
522- while n != 0 {
523- if let Some ( _) = next ( & mut stream) . await {
504+ let mut n = n;
505+ async_stream_block ! {
506+ #[ for_await]
507+ for item in stream {
508+ if n == 0 {
509+ yield item
510+ } else {
524511 n = n - 1 ;
525512 continue ;
526- } else {
527- return None ;
528513 }
529514 }
530- if let Some ( item) = next ( & mut stream) . await {
531- Some ( ( item, ( stream, 0 ) ) )
532- } else {
533- None
534- }
535- } )
515+ }
536516}
537517
538518/// An adapter for zipping two streams together.
@@ -561,16 +541,18 @@ where
561541 St1 : Stream ,
562542 St2 : Stream ,
563543{
564- let stream = Box :: pin ( stream) ;
565- let other = Box :: pin ( other) ;
566- unfold ( ( stream, other) , async move |( mut stream, mut other) | {
567- let left = next ( & mut stream) . await ;
568- let right = next ( & mut other) . await ;
569- match ( left, right) {
570- ( Some ( left) , Some ( right) ) => Some ( ( ( left, right) , ( stream, other) ) ) ,
571- _ => None ,
544+ let mut stream = Box :: pin ( stream) ;
545+ let mut other = Box :: pin ( other) ;
546+ async_stream_block ! {
547+ loop {
548+ let left = next( & mut stream) . await ;
549+ let right = next( & mut other) . await ;
550+ match ( left, right) {
551+ ( Some ( left) , Some ( right) ) => yield ( left, right) ,
552+ _ => break ,
553+ }
572554 }
573- } )
555+ }
574556}
575557
576558/// Adapter for chaining two stream.
@@ -600,24 +582,16 @@ pub fn chain<St>(stream: St, other: St) -> impl Stream<Item = St::Item>
600582where
601583 St : Stream ,
602584{
603- let stream = Box :: pin ( stream) ;
604- let other = Box :: pin ( other) ;
605- let start_with_first = true ;
606- unfold (
607- ( stream, other, start_with_first) ,
608- async move |( mut stream, mut other, start_with_first) | {
609- if start_with_first {
610- if let Some ( item) = next ( & mut stream) . await {
611- return Some ( ( item, ( stream, other, start_with_first) ) ) ;
612- }
613- }
614- if let Some ( item) = next ( & mut other) . await {
615- Some ( ( item, ( stream, other, /* start_with_first */ false ) ) )
616- } else {
617- None
618- }
619- } ,
620- )
585+ async_stream_block ! {
586+ #[ for_await]
587+ for item in stream {
588+ yield item
589+ }
590+ #[ for_await]
591+ for item in other {
592+ yield item
593+ }
594+ }
621595}
622596
623597/// Take elements from this stream while the provided asynchronous predicate
@@ -646,18 +620,17 @@ where
646620 F : FnMut ( & St :: Item ) -> Fut ,
647621 Fut : Future < Output = bool > ,
648622{
649- let stream = Box :: pin ( stream) ;
650- unfold ( ( stream, f) , async move |( mut stream, mut f) | {
651- if let Some ( item) = next ( & mut stream) . await {
623+ let mut f = f;
624+ async_stream_block ! {
625+ #[ for_await]
626+ for item in stream {
652627 if f( & item) . await {
653- Some ( ( item , ( stream , f ) ) )
628+ yield item
654629 } else {
655- None
630+ break ;
656631 }
657- } else {
658- None
659632 }
660- } )
633+ }
661634}
662635
663636/// Skip elements on this stream while the provided asynchronous predicate
@@ -687,29 +660,23 @@ where
687660 F : FnMut ( & St :: Item ) -> Fut ,
688661 Fut : Future < Output = bool > ,
689662{
690- let stream = Box :: pin ( stream) ;
691- let should_skip = true ;
692- unfold (
693- ( stream, f, should_skip) ,
694- async move |( mut stream, mut f, should_skip) | {
695- while should_skip {
696- if let Some ( item) = next ( & mut stream) . await {
697- if f ( & item) . await {
698- continue ;
699- } else {
700- return Some ( ( item, ( stream, f, /* should_skip */ false ) ) ) ;
701- }
663+ let mut f = f;
664+ let mut should_skip = true ;
665+ async_stream_block ! {
666+ #[ for_await]
667+ for item in stream {
668+ if should_skip {
669+ if f( & item) . await {
670+ continue ;
702671 } else {
703- return None ;
672+ should_skip = false ;
673+ yield item
704674 }
705- }
706- if let Some ( item) = next ( & mut stream) . await {
707- Some ( ( item, ( stream, f, /* should_skip */ false ) ) )
708675 } else {
709- None
676+ yield item
710677 }
711- } ,
712- )
678+ }
679+ }
713680}
714681
715682/// Execute an accumulating asynchronous computation over a stream,
0 commit comments