Skip to content

Commit fc02620

Browse files
committed
Add stream::Peekable::{next_if, next_if_eq}
1 parent e8fdc8f commit fc02620

File tree

4 files changed

+248
-10
lines changed

4 files changed

+248
-10
lines changed

futures-util/src/stream/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream};
1919
mod stream;
2020
pub use self::stream::{
2121
Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach,
22-
Fuse, Inspect, Map, Next, Peek, Peekable, Scan, SelectNextSome, Skip, SkipWhile, StreamExt,
22+
Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, Peekable, Scan, SelectNextSome, Skip, SkipWhile, StreamExt,
2323
StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip,
2424
};
2525

futures-util/src/stream/stream/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ pub use self::select_next_some::SelectNextSome;
123123

124124
mod peek;
125125
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
126-
pub use self::peek::{Peek, Peekable};
126+
pub use self::peek::{Peek, Peekable, NextIf, NextIfEq};
127127

128128
mod skip;
129129
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411

futures-util/src/stream/stream/peek.rs

Lines changed: 220 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
use crate::fns::FnOnce1;
12
use crate::stream::{Fuse, StreamExt};
23
use core::fmt;
4+
use core::marker::PhantomData;
35
use core::pin::Pin;
46
use futures_core::future::{FusedFuture, Future};
57
use futures_core::ready;
@@ -44,10 +46,7 @@ impl<St: Stream> Peekable<St> {
4446
///
4547
/// This method polls the underlying stream and return either a reference
4648
/// to the next item if the stream is ready or passes through any errors.
47-
pub fn poll_peek(
48-
self: Pin<&mut Self>,
49-
cx: &mut Context<'_>,
50-
) -> Poll<Option<&St::Item>> {
49+
pub fn poll_peek(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<&St::Item>> {
5150
let mut this = self.project();
5251

5352
Poll::Ready(loop {
@@ -60,6 +59,96 @@ impl<St: Stream> Peekable<St> {
6059
}
6160
})
6261
}
62+
63+
/// Creates a future which will consume and return the next value of this
64+
/// stream if a condition is true.
65+
///
66+
/// If `func` returns `true` for the next value of this stream, consume and
67+
/// return it. Otherwise, return `None`.
68+
///
69+
/// # Examples
70+
///
71+
/// Consume a number if it's equal to 0.
72+
///
73+
/// ```
74+
/// # futures::executor::block_on(async {
75+
/// use futures::stream::{self, StreamExt};
76+
/// use futures::pin_mut;
77+
///
78+
/// let stream = stream::iter(0..5).peekable();
79+
/// pin_mut!(stream);
80+
/// // The first item of the stream is 0; consume it.
81+
/// assert_eq!(stream.as_mut().next_if(|&x| x == 0).await, Some(0));
82+
/// // The next item returned is now 1, so `consume` will return `false`.
83+
/// assert_eq!(stream.as_mut().next_if(|&x| x == 0).await, None);
84+
/// // `next_if` saves the value of the next item if it was not equal to `expected`.
85+
/// assert_eq!(stream.next().await, Some(1));
86+
/// # });
87+
/// ```
88+
///
89+
/// Consume any number less than 10.
90+
///
91+
/// ```
92+
/// # futures::executor::block_on(async {
93+
/// use futures::stream::{self, StreamExt};
94+
/// use futures::pin_mut;
95+
///
96+
/// let stream = stream::iter(1..20).peekable();
97+
/// pin_mut!(stream);
98+
/// // Consume all numbers less than 10
99+
/// while stream.as_mut().next_if(|&x| x < 10).await.is_some() {}
100+
/// // The next value returned will be 10
101+
/// assert_eq!(stream.next().await, Some(10));
102+
/// # });
103+
/// ```
104+
pub fn next_if<F>(self: Pin<&mut Self>, func: F) -> NextIf<'_, St, F>
105+
where
106+
F: FnOnce(&St::Item) -> bool,
107+
{
108+
NextIf {
109+
inner: Some((self, func)),
110+
}
111+
}
112+
113+
/// Creates a future which will consume and return the next item if it is
114+
/// equal to `expected`.
115+
///
116+
/// # Example
117+
///
118+
/// Consume a number if it's equal to 0.
119+
///
120+
/// ```
121+
/// # futures::executor::block_on(async {
122+
/// use futures::stream::{self, StreamExt};
123+
/// use futures::pin_mut;
124+
///
125+
/// let stream = stream::iter(0..5).peekable();
126+
/// pin_mut!(stream);
127+
/// // The first item of the stream is 0; consume it.
128+
/// assert_eq!(stream.as_mut().next_if_eq(&0).await, Some(0));
129+
/// // The next item returned is now 1, so `consume` will return `false`.
130+
/// assert_eq!(stream.as_mut().next_if_eq(&0).await, None);
131+
/// // `next_if_eq` saves the value of the next item if it was not equal to `expected`.
132+
/// assert_eq!(stream.next().await, Some(1));
133+
/// # });
134+
/// ```
135+
pub fn next_if_eq<'a, T>(self: Pin<&'a mut Self>, expected: &'a T) -> NextIfEq<'a, St, T>
136+
where
137+
T: ?Sized,
138+
St::Item: PartialEq<T>,
139+
{
140+
NextIfEq {
141+
inner: NextIf {
142+
inner: Some((
143+
self,
144+
NextIfEqFn {
145+
expected,
146+
_next: PhantomData,
147+
},
148+
)),
149+
},
150+
}
151+
}
63152
}
64153

65154
impl<St: Stream> FusedStream for Peekable<St> {
@@ -103,7 +192,7 @@ where
103192
}
104193

105194
pin_project! {
106-
/// Future for the [`Peekable::peek()`](self::Peekable::peek) function from [`Peekable`]
195+
/// Future for the [`Peekable::peek`](self::Peekable::peek) method.
107196
#[must_use = "futures do nothing unless polled"]
108197
pub struct Peek<'a, St: Stream> {
109198
inner: Option<Pin<&'a mut Peekable<St>>>,
@@ -116,9 +205,7 @@ where
116205
St::Item: fmt::Debug,
117206
{
118207
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119-
f.debug_struct("Peek")
120-
.field("inner", &self.inner)
121-
.finish()
208+
f.debug_struct("Peek").field("inner", &self.inner).finish()
122209
}
123210
}
124211

@@ -133,6 +220,7 @@ where
133220
St: Stream,
134221
{
135222
type Output = Option<&'a St::Item>;
223+
136224
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
137225
let inner = self.project().inner;
138226
if let Some(peekable) = inner {
@@ -144,3 +232,127 @@ where
144232
}
145233
}
146234
}
235+
236+
pin_project! {
237+
/// Future for the [`Peekable::next_if`](self::Peekable::next_if) method.
238+
#[must_use = "futures do nothing unless polled"]
239+
pub struct NextIf<'a, St: Stream, F> {
240+
inner: Option<(Pin<&'a mut Peekable<St>>, F)>,
241+
}
242+
}
243+
244+
impl<St, F> fmt::Debug for NextIf<'_, St, F>
245+
where
246+
St: Stream + fmt::Debug,
247+
St::Item: fmt::Debug,
248+
{
249+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
250+
f.debug_struct("NextIf")
251+
.field("inner", &self.inner.as_ref().map(|(s, _f)| s))
252+
.finish()
253+
}
254+
}
255+
256+
#[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058
257+
impl<St, F> FusedFuture for NextIf<'_, St, F>
258+
where
259+
St: Stream,
260+
F: for<'a> FnOnce1<&'a St::Item, Output = bool>,
261+
{
262+
fn is_terminated(&self) -> bool {
263+
self.inner.is_none()
264+
}
265+
}
266+
267+
#[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058
268+
impl<St, F> Future for NextIf<'_, St, F>
269+
where
270+
St: Stream,
271+
F: for<'a> FnOnce1<&'a St::Item, Output = bool>,
272+
{
273+
type Output = Option<St::Item>;
274+
275+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
276+
let inner = self.project().inner;
277+
if let Some((peekable, _)) = inner {
278+
let res = ready!(peekable.as_mut().poll_next(cx));
279+
280+
let (peekable, func) = inner.take().unwrap();
281+
match res {
282+
Some(ref matched) if func.call_once(matched) => Poll::Ready(res),
283+
other => {
284+
let peekable = peekable.project();
285+
// Since we called `self.next()`, we consumed `self.peeked`.
286+
assert!(peekable.peeked.is_none());
287+
*peekable.peeked = other;
288+
Poll::Ready(None)
289+
}
290+
}
291+
} else {
292+
panic!("NextIf polled after completion")
293+
}
294+
}
295+
}
296+
297+
pin_project! {
298+
/// Future for the [`Peekable::next_if_eq`](self::Peekable::next_if_eq) method.
299+
#[must_use = "futures do nothing unless polled"]
300+
pub struct NextIfEq<'a, St: Stream, T: ?Sized> {
301+
#[pin]
302+
inner: NextIf<'a, St, NextIfEqFn<'a, T, St::Item>>,
303+
}
304+
}
305+
306+
impl<St, T> fmt::Debug for NextIfEq<'_, St, T>
307+
where
308+
St: Stream + fmt::Debug,
309+
St::Item: fmt::Debug,
310+
T: ?Sized,
311+
{
312+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
313+
f.debug_struct("NextIfEq")
314+
.field("inner", &self.inner.inner.as_ref().map(|(s, _f)| s))
315+
.finish()
316+
}
317+
}
318+
319+
impl<St, T> FusedFuture for NextIfEq<'_, St, T>
320+
where
321+
St: Stream,
322+
T: ?Sized,
323+
St::Item: PartialEq<T>,
324+
{
325+
fn is_terminated(&self) -> bool {
326+
self.inner.is_terminated()
327+
}
328+
}
329+
330+
impl<St, T> Future for NextIfEq<'_, St, T>
331+
where
332+
St: Stream,
333+
T: ?Sized,
334+
St::Item: PartialEq<T>,
335+
{
336+
type Output = Option<St::Item>;
337+
338+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
339+
self.project().inner.poll(cx)
340+
}
341+
}
342+
343+
struct NextIfEqFn<'a, T: ?Sized, Item> {
344+
expected: &'a T,
345+
_next: PhantomData<Item>,
346+
}
347+
348+
impl<T, Item> FnOnce1<&Item> for NextIfEqFn<'_, T, Item>
349+
where
350+
T: ?Sized,
351+
Item: PartialEq<T>,
352+
{
353+
type Output = bool;
354+
355+
fn call_once(self, next: &Item) -> Self::Output {
356+
next == self.expected
357+
}
358+
}

futures/tests/stream_peekable.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,29 @@ fn peekable() {
1111
assert_eq!(peekable.collect::<Vec<u8>>().await, vec![1, 2, 3]);
1212
});
1313
}
14+
15+
#[test]
16+
fn peekable_next_if_eq() {
17+
block_on(async {
18+
// first, try on references
19+
let s = stream::iter(vec!["Heart", "of", "Gold"]).peekable();
20+
pin_mut!(s);
21+
// try before `peek()`
22+
assert_eq!(s.as_mut().next_if_eq(&"trillian").await, None);
23+
assert_eq!(s.as_mut().next_if_eq(&"Heart").await, Some("Heart"));
24+
// try after peek()
25+
assert_eq!(s.as_mut().peek().await, Some(&"of"));
26+
assert_eq!(s.as_mut().next_if_eq(&"of").await, Some("of"));
27+
assert_eq!(s.as_mut().next_if_eq(&"zaphod").await, None);
28+
// make sure `next()` still behaves
29+
assert_eq!(s.next().await, Some("Gold"));
30+
31+
// make sure comparison works for owned values
32+
let s = stream::iter(vec![String::from("Ludicrous"), "speed".into()]).peekable();
33+
pin_mut!(s);
34+
// make sure basic functionality works
35+
assert_eq!(s.as_mut().next_if_eq("Ludicrous").await, Some("Ludicrous".into()));
36+
assert_eq!(s.as_mut().next_if_eq("speed").await, Some("speed".into()));
37+
assert_eq!(s.as_mut().next_if_eq("").await, None);
38+
});
39+
}

0 commit comments

Comments
 (0)