Skip to content

Commit 2694a63

Browse files
committed
Buffered, BufferUnordered: Assert maximum size of buffer is non-zero
1 parent 2fa7bab commit 2694a63

File tree

4 files changed

+27
-1
lines changed

4 files changed

+27
-1
lines changed

futures-util/src/stream/stream/buffer_unordered.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ where
4646
St: Stream,
4747
St::Item: Future,
4848
{
49+
assert!(n > 0);
4950
Self {
5051
stream: super::Fuse::new(stream),
5152
in_progress_queue: FuturesUnordered::new(),

futures-util/src/stream/stream/buffered.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ where
4444
St::Item: Future,
4545
{
4646
pub(super) fn new(stream: St, n: usize) -> Self {
47+
assert!(n > 0);
4748
Self {
4849
stream: super::Fuse::new(stream),
4950
in_progress_queue: FuturesOrdered::new(),

futures-util/src/stream/stream/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1142,6 +1142,10 @@ pub trait StreamExt: Stream {
11421142
///
11431143
/// This method is only available when the `std` or `alloc` feature of this
11441144
/// library is activated, and it is activated by default.
1145+
///
1146+
/// # Panics
1147+
///
1148+
/// This method will panic if `n` is zero.
11451149
#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
11461150
#[cfg(feature = "alloc")]
11471151
fn buffered(self, n: usize) -> Buffered<Self>
@@ -1165,6 +1169,10 @@ pub trait StreamExt: Stream {
11651169
/// This method is only available when the `std` or `alloc` feature of this
11661170
/// library is activated, and it is activated by default.
11671171
///
1172+
/// # Panics
1173+
///
1174+
/// This method will panic if `n` is zero.
1175+
///
11681176
/// # Examples
11691177
///
11701178
/// ```

futures/tests/stream.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use futures::channel::mpsc;
1+
use futures::{channel::mpsc, future::BoxFuture};
22
use futures::executor::block_on;
33
use futures::future::{self, Future};
44
use futures::sink::SinkExt;
@@ -118,6 +118,22 @@ fn take_until() {
118118
});
119119
}
120120

121+
#[test]
122+
#[should_panic]
123+
fn buffered_panic_on_cap_zero() {
124+
let (_, rx1) = mpsc::channel::<BoxFuture<()>>(1);
125+
126+
let _ = rx1.buffered(0);
127+
}
128+
129+
#[test]
130+
#[should_panic]
131+
fn buffer_unordered_panic_on_cap_zero() {
132+
let (_, rx1) = mpsc::channel::<BoxFuture<()>>(1);
133+
134+
let _ = rx1.buffer_unordered(0);
135+
}
136+
121137
#[test]
122138
#[should_panic]
123139
fn chunks_panic_on_cap_zero() {

0 commit comments

Comments
 (0)