11use futures:: stream:: Stream ;
22use futures:: future:: Future ;
3+ use futures:: async_stream_block;
34
45use core:: pin:: Pin ;
56use core:: iter:: IntoIterator ;
@@ -30,48 +31,45 @@ pub fn map<St, U, F>(stream: St, f: F) -> impl Stream<Item = U>
3031 where St : Stream ,
3132 F : FnMut ( St :: Item ) -> U ,
3233{
33- let stream = Box :: pin ( stream) ;
34- unfold ( ( stream, f) , async move | ( mut stream, mut f) | {
35- let item = next ( & mut stream) . await ;
36- item. map ( |item| ( f ( item) , ( stream, f) ) )
37- } )
34+ let mut f = f;
35+ async_stream_block ! {
36+ #[ for_await]
37+ for item in stream {
38+ yield f( item)
39+ }
40+ }
3841}
3942
4043pub fn filter < St , Fut , F > ( stream : St , f : F ) -> impl Stream < Item = St :: Item >
4144 where St : Stream ,
4245 F : FnMut ( & St :: Item ) -> Fut ,
4346 Fut : Future < Output = bool >
4447{
45- let stream = Box :: pin ( stream) ;
46- unfold ( ( stream, f) , async move | ( mut stream, mut f) | {
47- while let Some ( item) = next ( & mut stream) . await {
48- let matched = f ( & item) . await ;
49- if matched {
50- return Some ( ( item, ( stream, f) ) )
51- } else {
52- continue ;
48+ let mut f = f;
49+ async_stream_block ! {
50+ #[ for_await]
51+ for item in stream {
52+ if f( & item) . await {
53+ yield item
5354 }
54- } ;
55- None
56- } )
55+ }
56+ }
5757}
5858
5959pub fn filter_map < St , Fut , F , U > ( stream : St , f : F ) -> impl Stream < Item = U >
6060 where St : Stream ,
6161 F : FnMut ( St :: Item ) -> Fut ,
6262 Fut : Future < Output = Option < U > >
6363{
64- let stream = Box :: pin ( stream) ;
65- unfold ( ( stream, f) , async move | ( mut stream, mut f) | {
66- while let Some ( item) = next ( & mut stream) . await {
64+ let mut f = f;
65+ async_stream_block ! {
66+ #[ for_await]
67+ for item in stream {
6768 if let Some ( item) = f( item) . await {
68- return Some ( ( item, ( stream, f) ) )
69- } else {
70- continue ;
69+ yield item
7170 }
72- } ;
73- None
74- } )
71+ }
72+ }
7573}
7674
7775pub async fn into_future < St > ( stream : St ) -> ( Option < St :: Item > , impl Stream < Item = St :: Item > )
@@ -121,18 +119,18 @@ pub async fn for_each<St, Fut, F>(stream: St, f: F) -> ()
121119pub fn take < St > ( stream : St , n : u64 ) -> impl Stream < Item = St :: Item >
122120 where St : Stream ,
123121{
124- let stream = Box :: pin ( stream) ;
125- unfold ( ( stream, n) , async move | ( mut stream, n) | {
126- if n == 0 {
127- None
128- } else {
129- if let Some ( item) = next ( & mut stream) . await {
130- Some ( ( item, ( stream, n - 1 ) ) )
122+ let mut n = n;
123+ async_stream_block ! {
124+ #[ for_await]
125+ for item in stream {
126+ if n == 0 {
127+ break ;
131128 } else {
132- None
129+ n = n - 1 ;
130+ yield item
133131 }
134132 }
135- } )
133+ }
136134}
137135
138136pub fn repeat < T > ( item : T ) -> impl Stream < Item = T >
@@ -148,147 +146,122 @@ pub fn flatten<St, SubSt, T>(stream: St) -> impl Stream<Item = T>
148146 where SubSt : Stream < Item = T > ,
149147 St : Stream < Item = SubSt > ,
150148{
151- let stream = Box :: pin ( stream) ;
152- unfold ( ( Some ( stream) , None ) , async move | ( mut state_stream, mut state_substream) | {
153- loop {
154- if let Some ( mut substream) = state_substream. take ( ) {
155- if let Some ( item) = next ( & mut substream) . await {
156- return Some ( ( item, ( state_stream, Some ( substream) ) ) )
157- } else {
158- continue ;
159- }
160- }
161- if let Some ( mut stream) = state_stream. take ( ) {
162- if let Some ( substream) = next ( & mut stream) . await {
163- let substream = Box :: pin ( substream) ;
164- state_stream = Some ( stream) ;
165- state_substream = Some ( substream) ;
166- continue ;
167- }
149+ async_stream_block ! {
150+ #[ for_await]
151+ for substream in stream {
152+ #[ for_await]
153+ for item in substream {
154+ yield item
168155 }
169- return None ;
170156 }
171- } )
157+ }
172158}
173159
174160pub fn then < St , F , Fut > ( stream : St , f : F ) -> impl Stream < Item = St :: Item >
175161 where St : Stream ,
176162 F : FnMut ( St :: Item ) -> Fut ,
177163 Fut : Future < Output = St :: Item >
178164{
179- let stream = Box :: pin ( stream ) ;
180- unfold ( ( stream , f ) , async move | ( mut stream , mut f ) | {
181- let item = next ( & mut stream ) . await ;
182- if let Some ( item) = item {
165+ let mut f = f ;
166+ async_stream_block ! {
167+ # [ for_await ]
168+ for item in stream {
183169 let new_item = f( item) . await ;
184- Some ( ( new_item, ( stream, f) ) )
185- } else {
186- None
170+ yield new_item
187171 }
188- } )
172+ }
189173}
190174
191175pub fn skip < St > ( stream : St , n : u64 ) -> impl Stream < Item = St :: Item >
192176 where St : Stream ,
193177{
194- let stream = Box :: pin ( stream ) ;
195- unfold ( ( stream , n ) , async move | ( mut stream , mut n ) | {
196- while n != 0 {
197- if let Some ( _ ) = next ( & mut stream) . await {
198- n = n - 1 ;
199- continue
178+ let mut n = n ;
179+ async_stream_block ! {
180+ # [ for_await ]
181+ for item in stream {
182+ if n == 0 {
183+ yield item
200184 } else {
201- return None
185+ n = n - 1 ;
186+ continue ;
202187 }
203188 }
204- if let Some ( item) = next ( & mut stream) . await {
205- Some ( ( item, ( stream, 0 ) ) )
206- } else {
207- None
208- }
209- } )
189+ }
210190}
211191
212192pub fn zip < St1 , St2 > ( stream : St1 , other : St2 ) -> impl Stream < Item = ( St1 :: Item , St2 :: Item ) >
213193 where St1 : Stream ,
214194 St2 : Stream ,
215195{
216- let stream = Box :: pin ( stream) ;
217- let other = Box :: pin ( other) ;
218- unfold ( ( stream, other) , async move | ( mut stream, mut other) | {
219- let left = next ( & mut stream) . await ;
220- let right = next ( & mut other) . await ;
221- match ( left, right) {
222- ( Some ( left) , Some ( right) ) => Some ( ( ( left, right) , ( stream, other) ) ) ,
223- _ => None
196+ let mut stream = Box :: pin ( stream) ;
197+ let mut other = Box :: pin ( other) ;
198+ async_stream_block ! {
199+ loop {
200+ let left = next( & mut stream) . await ;
201+ let right = next( & mut other) . await ;
202+ match ( left, right) {
203+ ( Some ( left) , Some ( right) ) => yield ( left, right) ,
204+ _ => break ,
205+ }
224206 }
225- } )
207+ }
226208}
227209
228210pub fn chain < St > ( stream : St , other : St ) -> impl Stream < Item = St :: Item >
229211 where St : Stream ,
230212{
231- let stream = Box :: pin ( stream) ;
232- let other = Box :: pin ( other) ;
233- let start_with_first = true ;
234- unfold ( ( stream, other, start_with_first) , async move | ( mut stream, mut other, start_with_first) | {
235- if start_with_first {
236- if let Some ( item) = next ( & mut stream) . await {
237- return Some ( ( item, ( stream, other, start_with_first) ) )
238- }
213+ async_stream_block ! {
214+ #[ for_await]
215+ for item in stream {
216+ yield item
239217 }
240- if let Some ( item) = next ( & mut other) . await {
241- Some ( ( item, ( stream, other, /* start_with_first */ false ) ) )
242- } else {
243- None
218+ #[ for_await]
219+ for item in other {
220+ yield item
244221 }
245- } )
222+ }
246223}
247224
248225pub fn take_while < St , F , Fut > ( stream : St , f : F ) -> impl Stream < Item = St :: Item >
249226 where St : Stream ,
250227 F : FnMut ( & St :: Item ) -> Fut ,
251228 Fut : Future < Output = bool > ,
252229{
253- let stream = Box :: pin ( stream) ;
254- unfold ( ( stream, f) , async move | ( mut stream, mut f) | {
255- if let Some ( item) = next ( & mut stream) . await {
230+ let mut f = f;
231+ async_stream_block ! {
232+ #[ for_await]
233+ for item in stream {
256234 if f( & item) . await {
257- Some ( ( item , ( stream , f ) ) )
235+ yield item
258236 } else {
259- None
237+ break ;
260238 }
261- } else {
262- None
263239 }
264- } )
240+ }
265241}
266242
267243pub fn skip_while < St , F , Fut > ( stream : St , f : F ) -> impl Stream < Item = St :: Item >
268244 where St : Stream ,
269245 F : FnMut ( & St :: Item ) -> Fut ,
270246 Fut : Future < Output = bool > ,
271247{
272- let stream = Box :: pin ( stream) ;
273- let should_skip = true ;
274- unfold ( ( stream, f, should_skip) , async move | ( mut stream, mut f, should_skip) | {
275- while should_skip {
276- if let Some ( item) = next ( & mut stream) . await {
248+ let mut f = f;
249+ let mut should_skip = true ;
250+ async_stream_block ! {
251+ #[ for_await]
252+ for item in stream {
253+ if should_skip {
277254 if f( & item) . await {
278255 continue ;
279256 } else {
280- return Some ( ( item, ( stream, f, /* should_skip */ false ) ) )
257+ should_skip = false ;
258+ yield item
281259 }
282260 } else {
283- return None
261+ yield item
284262 }
285263 }
286- if let Some ( item) = next ( & mut stream) . await {
287- Some ( ( item, ( stream, f, /* should_skip */ false ) ) )
288- } else {
289- None
290- }
291- } )
264+ }
292265}
293266
294267pub async fn fold < St , T , F , Fut > ( stream : St , init : T , f : F ) -> T
0 commit comments