11use futures:: stream:: Stream ;
22use futures:: future:: Future ;
3+ use futures:: async_stream_block;
34
45use core:: pin:: Pin ;
56use core:: iter:: IntoIterator ;
@@ -30,48 +31,45 @@ pub fn map<St, U, F>(stream: St, f: F) -> impl Stream<Item = U>
3031 where St : Stream ,
3132 F : FnMut ( St :: Item ) -> U ,
3233{
33- let stream = Box :: pin ( stream) ;
34- unfold ( ( stream, f) , async move | ( mut stream, mut f) | {
35- let item = next ( & mut stream) . await ;
36- item. map ( |item| ( f ( item) , ( stream, f) ) )
37- } )
34+ let mut f = f;
35+ async_stream_block ! {
36+ #[ for_await]
37+ for item in stream {
38+ yield f( item)
39+ }
40+ }
3841}
3942
4043pub fn filter < St , Fut , F > ( stream : St , f : F ) -> impl Stream < Item = St :: Item >
4144 where St : Stream ,
4245 F : FnMut ( & St :: Item ) -> Fut ,
4346 Fut : Future < Output = bool >
4447{
45- let stream = Box :: pin ( stream) ;
46- unfold ( ( stream, f) , async move | ( mut stream, mut f) | {
47- while let Some ( item) = next ( & mut stream) . await {
48- let matched = f ( & item) . await ;
49- if matched {
50- return Some ( ( item, ( stream, f) ) )
51- } else {
52- continue ;
48+ let mut f = f;
49+ async_stream_block ! {
50+ #[ for_await]
51+ for item in stream {
52+ if f( & item) . await {
53+ yield item
5354 }
54- } ;
55- None
56- } )
55+ }
56+ }
5757}
5858
5959pub fn filter_map < St , Fut , F , U > ( stream : St , f : F ) -> impl Stream < Item = U >
6060 where St : Stream ,
6161 F : FnMut ( St :: Item ) -> Fut ,
6262 Fut : Future < Output = Option < U > >
6363{
64- let stream = Box :: pin ( stream) ;
65- unfold ( ( stream, f) , async move | ( mut stream, mut f) | {
66- while let Some ( item) = next ( & mut stream) . await {
64+ let mut f = f;
65+ async_stream_block ! {
66+ #[ for_await]
67+ for item in stream {
6768 if let Some ( item) = f( item) . await {
68- return Some ( ( item, ( stream, f) ) )
69- } else {
70- continue ;
69+ yield item
7170 }
72- } ;
73- None
74- } )
71+ }
72+ }
7573}
7674
7775pub async fn into_future < St > ( stream : St ) -> ( Option < St :: Item > , impl Stream < Item = St :: Item > )
@@ -121,18 +119,18 @@ pub async fn for_each<St, Fut, F>(stream: St, f: F) -> ()
121119pub fn take < St > ( stream : St , n : u64 ) -> impl Stream < Item = St :: Item >
122120 where St : Stream ,
123121{
124- let stream = Box :: pin ( stream) ;
125- unfold ( ( stream, n) , async move | ( mut stream, n) | {
126- if n == 0 {
127- None
128- } else {
129- if let Some ( item) = next ( & mut stream) . await {
130- Some ( ( item, ( stream, n - 1 ) ) )
122+ let mut n = n;
123+ async_stream_block ! {
124+ #[ for_await]
125+ for item in stream {
126+ if n == 0 {
127+ break ;
131128 } else {
132- None
129+ n = n - 1 ;
130+ yield item
133131 }
134132 }
135- } )
133+ }
136134}
137135
138136pub fn repeat < T > ( item : T ) -> impl Stream < Item = T >
@@ -148,65 +146,47 @@ pub fn flatten<St, SubSt, T>(stream: St) -> impl Stream<Item = T>
148146 where SubSt : Stream < Item = T > ,
149147 St : Stream < Item = SubSt > ,
150148{
151- let stream = Box :: pin ( stream) ;
152- unfold ( ( Some ( stream) , None ) , async move | ( mut state_stream, mut state_substream) | {
153- loop {
154- if let Some ( mut substream) = state_substream. take ( ) {
155- if let Some ( item) = next ( & mut substream) . await {
156- return Some ( ( item, ( state_stream, Some ( substream) ) ) )
157- } else {
158- continue ;
159- }
149+ async_stream_block ! {
150+ #[ for_await]
151+ for substream in stream {
152+ #[ for_await]
153+ for item in substream {
154+ yield item
160155 }
161- if let Some ( mut stream) = state_stream. take ( ) {
162- if let Some ( substream) = next ( & mut stream) . await {
163- let substream = Box :: pin ( substream) ;
164- state_stream = Some ( stream) ;
165- state_substream = Some ( substream) ;
166- continue ;
167- }
168- }
169- return None ;
170156 }
171- } )
157+ }
172158}
173159
174160pub fn then < St , F , Fut > ( stream : St , f : F ) -> impl Stream < Item = St :: Item >
175161 where St : Stream ,
176162 F : FnMut ( St :: Item ) -> Fut ,
177163 Fut : Future < Output = St :: Item >
178164{
179- let stream = Box :: pin ( stream ) ;
180- unfold ( ( stream , f ) , async move | ( mut stream , mut f ) | {
181- let item = next ( & mut stream ) . await ;
182- if let Some ( item) = item {
165+ let mut f = f ;
166+ async_stream_block ! {
167+ # [ for_await ]
168+ for item in stream {
183169 let new_item = f( item) . await ;
184- Some ( ( new_item, ( stream, f) ) )
185- } else {
186- None
170+ yield new_item
187171 }
188- } )
172+ }
189173}
190174
191175pub fn skip < St > ( stream : St , n : u64 ) -> impl Stream < Item = St :: Item >
192176 where St : Stream ,
193177{
194- let stream = Box :: pin ( stream ) ;
195- unfold ( ( stream , n ) , async move | ( mut stream , mut n ) | {
196- while n != 0 {
197- if let Some ( _ ) = next ( & mut stream) . await {
198- n = n - 1 ;
199- continue
178+ let mut n = n ;
179+ async_stream_block ! {
180+ # [ for_await ]
181+ for item in stream {
182+ if n == 0 {
183+ yield item
200184 } else {
201- return None
185+ n = n - 1 ;
186+ continue ;
202187 }
203188 }
204- if let Some ( item) = next ( & mut stream) . await {
205- Some ( ( item, ( stream, 0 ) ) )
206- } else {
207- None
208- }
209- } )
189+ }
210190}
211191
212192pub fn zip < St1 , St2 > ( stream : St1 , other : St2 ) -> impl Stream < Item = ( St1 :: Item , St2 :: Item ) >
@@ -228,67 +208,58 @@ pub fn zip<St1, St2>(stream: St1, other: St2) -> impl Stream<Item = (St1::Item,
228208pub fn chain < St > ( stream : St , other : St ) -> impl Stream < Item = St :: Item >
229209 where St : Stream ,
230210{
231- let stream = Box :: pin ( stream) ;
232- let other = Box :: pin ( other) ;
233- let start_with_first = true ;
234- unfold ( ( stream, other, start_with_first) , async move | ( mut stream, mut other, start_with_first) | {
235- if start_with_first {
236- if let Some ( item) = next ( & mut stream) . await {
237- return Some ( ( item, ( stream, other, start_with_first) ) )
238- }
211+ async_stream_block ! {
212+ #[ for_await]
213+ for item in stream {
214+ yield item
239215 }
240- if let Some ( item) = next ( & mut other) . await {
241- Some ( ( item, ( stream, other, /* start_with_first */ false ) ) )
242- } else {
243- None
216+ #[ for_await]
217+ for item in other {
218+ yield item
244219 }
245- } )
220+ }
246221}
247222
248223pub fn take_while < St , F , Fut > ( stream : St , f : F ) -> impl Stream < Item = St :: Item >
249224 where St : Stream ,
250225 F : FnMut ( & St :: Item ) -> Fut ,
251226 Fut : Future < Output = bool > ,
252227{
253- let stream = Box :: pin ( stream) ;
254- unfold ( ( stream, f) , async move | ( mut stream, mut f) | {
255- if let Some ( item) = next ( & mut stream) . await {
228+ let mut f = f;
229+ async_stream_block ! {
230+ #[ for_await]
231+ for item in stream {
256232 if f( & item) . await {
257- Some ( ( item , ( stream , f ) ) )
233+ yield item
258234 } else {
259- None
235+ break ;
260236 }
261- } else {
262- None
263237 }
264- } )
238+ }
265239}
266240
267241pub fn skip_while < St , F , Fut > ( stream : St , f : F ) -> impl Stream < Item = St :: Item >
268242 where St : Stream ,
269243 F : FnMut ( & St :: Item ) -> Fut ,
270244 Fut : Future < Output = bool > ,
271245{
272- let stream = Box :: pin ( stream) ;
273- let should_skip = true ;
274- unfold ( ( stream, f, should_skip) , async move | ( mut stream, mut f, should_skip) | {
275- while should_skip {
276- if let Some ( item) = next ( & mut stream) . await {
246+ let mut f = f;
247+ let mut should_skip = true ;
248+ async_stream_block ! {
249+ #[ for_await]
250+ for item in stream {
251+ if should_skip {
277252 if f( & item) . await {
278253 continue ;
279254 } else {
280- return Some ( ( item, ( stream, f, /* should_skip */ false ) ) )
255+ should_skip = false ;
256+ yield item
281257 }
282258 } else {
283- return None
259+ yield item
284260 }
285261 }
286- if let Some ( item) = next ( & mut stream) . await {
287- Some ( ( item, ( stream, f, /* should_skip */ false ) ) )
288- } else {
289- None
290- }
291- } )
262+ }
292263}
293264
294265pub async fn fold < St , T , F , Fut > ( stream : St , init : T , f : F ) -> T
0 commit comments