|
21 | 21 | //! # }) } |
22 | 22 | //! ``` |
23 | 23 |
|
24 | | -use std::cmp::Ordering; |
25 | | -use std::pin::Pin; |
| 24 | +mod all; |
| 25 | +mod any; |
| 26 | +mod min_by; |
| 27 | +mod next; |
| 28 | +mod take; |
26 | 29 |
|
27 | | -use cfg_if::cfg_if; |
| 30 | +pub use take::Take; |
| 31 | + |
| 32 | +use all::AllFuture; |
| 33 | +use any::AnyFuture; |
| 34 | +use min_by::MinByFuture; |
| 35 | +use next::NextFuture; |
28 | 36 |
|
29 | | -use super::min_by::MinBy; |
30 | | -use crate::future::Future; |
31 | | -use crate::task::{Context, Poll}; |
| 37 | +use std::cmp::Ordering; |
32 | 38 | use std::marker::PhantomData; |
33 | 39 |
|
| 40 | +use cfg_if::cfg_if; |
| 41 | + |
34 | 42 | cfg_if! { |
35 | 43 | if #[cfg(feature = "docs")] { |
36 | 44 | #[doc(hidden)] |
@@ -145,12 +153,12 @@ pub trait Stream { |
145 | 153 | /// # |
146 | 154 | /// # }) } |
147 | 155 | /// ``` |
148 | | - fn min_by<F>(self, compare: F) -> MinBy<Self, F> |
| 156 | + fn min_by<F>(self, compare: F) -> MinByFuture<Self, F> |
149 | 157 | where |
150 | 158 | Self: Sized + Unpin, |
151 | 159 | F: FnMut(&Self::Item, &Self::Item) -> Ordering, |
152 | 160 | { |
153 | | - MinBy::new(self, compare) |
| 161 | + MinByFuture::new(self, compare) |
154 | 162 | } |
155 | 163 |
|
156 | 164 | /// Tests if every element of the stream matches a predicate. |
@@ -278,142 +286,3 @@ impl<T: futures_core::stream::Stream + Unpin + ?Sized> Stream for T { |
278 | 286 | NextFuture { stream: self } |
279 | 287 | } |
280 | 288 | } |
281 | | - |
282 | | -#[doc(hidden)] |
283 | | -#[allow(missing_debug_implementations)] |
284 | | -pub struct NextFuture<'a, T: Unpin + ?Sized> { |
285 | | - stream: &'a mut T, |
286 | | -} |
287 | | - |
288 | | -impl<T: futures_core::stream::Stream + Unpin + ?Sized> Future for NextFuture<'_, T> { |
289 | | - type Output = Option<T::Item>; |
290 | | - |
291 | | - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
292 | | - Pin::new(&mut *self.stream).poll_next(cx) |
293 | | - } |
294 | | -} |
295 | | - |
296 | | -/// A stream that yields the first `n` items of another stream. |
297 | | -#[derive(Clone, Debug)] |
298 | | -pub struct Take<S> { |
299 | | - stream: S, |
300 | | - remaining: usize, |
301 | | -} |
302 | | - |
303 | | -impl<S: Unpin> Unpin for Take<S> {} |
304 | | - |
305 | | -impl<S: futures_core::stream::Stream> Take<S> { |
306 | | - pin_utils::unsafe_pinned!(stream: S); |
307 | | - pin_utils::unsafe_unpinned!(remaining: usize); |
308 | | -} |
309 | | - |
310 | | -impl<S: futures_core::stream::Stream> futures_core::stream::Stream for Take<S> { |
311 | | - type Item = S::Item; |
312 | | - |
313 | | - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> { |
314 | | - if self.remaining == 0 { |
315 | | - Poll::Ready(None) |
316 | | - } else { |
317 | | - let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); |
318 | | - match next { |
319 | | - Some(_) => *self.as_mut().remaining() -= 1, |
320 | | - None => *self.as_mut().remaining() = 0, |
321 | | - } |
322 | | - Poll::Ready(next) |
323 | | - } |
324 | | - } |
325 | | -} |
326 | | - |
327 | | -#[derive(Debug)] |
328 | | -pub struct AllFuture<'a, S, F, T> |
329 | | -where |
330 | | - F: FnMut(T) -> bool, |
331 | | -{ |
332 | | - stream: &'a mut S, |
333 | | - f: F, |
334 | | - result: bool, |
335 | | - __item: PhantomData<T>, |
336 | | -} |
337 | | - |
338 | | -impl<'a, S, F, T> AllFuture<'a, S, F, T> |
339 | | -where |
340 | | - F: FnMut(T) -> bool, |
341 | | -{ |
342 | | - pin_utils::unsafe_pinned!(stream: &'a mut S); |
343 | | - pin_utils::unsafe_unpinned!(result: bool); |
344 | | - pin_utils::unsafe_unpinned!(f: F); |
345 | | -} |
346 | | - |
347 | | -impl<S, F> Future for AllFuture<'_, S, F, S::Item> |
348 | | -where |
349 | | - S: futures_core::stream::Stream + Unpin + Sized, |
350 | | - F: FnMut(S::Item) -> bool, |
351 | | -{ |
352 | | - type Output = bool; |
353 | | - |
354 | | - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
355 | | - use futures_core::stream::Stream; |
356 | | - let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); |
357 | | - match next { |
358 | | - Some(v) => { |
359 | | - let result = (self.as_mut().f())(v); |
360 | | - *self.as_mut().result() = result; |
361 | | - if result { |
362 | | - // don't forget to wake this task again to pull the next item from stream |
363 | | - cx.waker().wake_by_ref(); |
364 | | - Poll::Pending |
365 | | - } else { |
366 | | - Poll::Ready(false) |
367 | | - } |
368 | | - } |
369 | | - None => Poll::Ready(self.result), |
370 | | - } |
371 | | - } |
372 | | -} |
373 | | - |
374 | | -#[derive(Debug)] |
375 | | -pub struct AnyFuture<'a, S, F, T> |
376 | | -where |
377 | | - F: FnMut(T) -> bool, |
378 | | -{ |
379 | | - stream: &'a mut S, |
380 | | - f: F, |
381 | | - result: bool, |
382 | | - __item: PhantomData<T>, |
383 | | -} |
384 | | - |
385 | | -impl<'a, S, F, T> AnyFuture<'a, S, F, T> |
386 | | -where |
387 | | - F: FnMut(T) -> bool, |
388 | | -{ |
389 | | - pin_utils::unsafe_pinned!(stream: &'a mut S); |
390 | | - pin_utils::unsafe_unpinned!(result: bool); |
391 | | - pin_utils::unsafe_unpinned!(f: F); |
392 | | -} |
393 | | - |
394 | | -impl<S, F> Future for AnyFuture<'_, S, F, S::Item> |
395 | | -where |
396 | | - S: futures_core::stream::Stream + Unpin + Sized, |
397 | | - F: FnMut(S::Item) -> bool, |
398 | | -{ |
399 | | - type Output = bool; |
400 | | - |
401 | | - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
402 | | - use futures_core::stream::Stream; |
403 | | - let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); |
404 | | - match next { |
405 | | - Some(v) => { |
406 | | - let result = (self.as_mut().f())(v); |
407 | | - *self.as_mut().result() = result; |
408 | | - if result { |
409 | | - Poll::Ready(true) |
410 | | - } else { |
411 | | - // don't forget to wake this task again to pull the next item from stream |
412 | | - cx.waker().wake_by_ref(); |
413 | | - Poll::Pending |
414 | | - } |
415 | | - } |
416 | | - None => Poll::Ready(self.result), |
417 | | - } |
418 | | - } |
419 | | -} |
0 commit comments