From 14ed012c20c3d50d2b2f3c2074e4fccb6350db67 Mon Sep 17 00:00:00 2001 From: andylokandy Date: Tue, 22 Jul 2025 20:16:14 +0800 Subject: [PATCH 1/4] fix: spawn tasks in Stream::buffered and Stream::buffer_unordered to max concurrency --- .../src/stream/stream/buffer_unordered.rs | 63 +++++++++++------- futures-util/src/stream/stream/buffered.rs | 66 ++++++++++++------- futures-util/src/stream/stream/mod.rs | 10 +-- futures/tests/auto_traits.rs | 39 ++++++----- 4 files changed, 109 insertions(+), 69 deletions(-) diff --git a/futures-util/src/stream/stream/buffer_unordered.rs b/futures-util/src/stream/stream/buffer_unordered.rs index 22a525cde6..2e9707d95d 100644 --- a/futures-util/src/stream/stream/buffer_unordered.rs +++ b/futures-util/src/stream/stream/buffer_unordered.rs @@ -1,4 +1,5 @@ use crate::stream::{Fuse, FuturesUnordered, StreamExt}; +use alloc::vec::Vec; use core::fmt; use core::num::NonZeroUsize; use core::pin::Pin; @@ -13,20 +14,23 @@ pin_project! { /// Stream for the [`buffer_unordered`](super::StreamExt::buffer_unordered) /// method. #[must_use = "streams do nothing unless polled"] - pub struct BufferUnordered + pub struct BufferUnordered where - St: Stream, + St: Stream, + F: Future, { #[pin] stream: Fuse, in_progress_queue: FuturesUnordered, + ready_queue: Vec, max: Option, } } -impl fmt::Debug for BufferUnordered +impl fmt::Debug for BufferUnordered where - St: Stream + fmt::Debug, + St: Stream + fmt::Debug, + F: Future, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("BufferUnordered") @@ -37,15 +41,16 @@ where } } -impl BufferUnordered +impl BufferUnordered where - St: Stream, - St::Item: Future, + St: Stream, + F: Future, { pub(super) fn new(stream: St, n: Option) -> Self { Self { stream: super::Fuse::new(stream), in_progress_queue: FuturesUnordered::new(), + ready_queue: Vec::new(), max: n.and_then(NonZeroUsize::new), } } @@ -53,10 +58,10 @@ where delegate_access_inner!(stream, St, (.)); } -impl Stream for BufferUnordered +impl Stream for BufferUnordered where - St: Stream, - St::Item: Future, + St: Stream, + F: Future, { type Item = ::Output; @@ -72,14 +77,28 @@ where } } - // Attempt to pull the next value from the in_progress_queue - match this.in_progress_queue.poll_next_unpin(cx) { - x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x, - Poll::Ready(None) => {} + // Try to poll all ready futures in the in_progress_queue. + loop { + match this.in_progress_queue.poll_next_unpin(cx) { + Poll::Ready(Some(output)) => { + this.ready_queue.push(output); + } + Poll::Ready(None) => break, + Poll::Pending => break, + } + } + + // If we have any ready outputs, return the first one. + if let Some(output) = this.ready_queue.pop() { + // If there are still ready outputs, wake the task to poll again. + if !this.ready_queue.is_empty() { + cx.waker().wake_by_ref(); + } + + return Poll::Ready(Some(output)); } - // If more values are still coming from the stream, we're not done yet - if this.stream.is_done() { + if this.stream.is_done() && this.in_progress_queue.is_empty() { Poll::Ready(None) } else { Poll::Pending @@ -98,10 +117,10 @@ where } } -impl FusedStream for BufferUnordered +impl FusedStream for BufferUnordered where - St: Stream, - St::Item: Future, + St: Stream, + F: Future, { fn is_terminated(&self) -> bool { self.in_progress_queue.is_terminated() && self.stream.is_terminated() @@ -110,10 +129,10 @@ where // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] -impl Sink for BufferUnordered +impl Sink for BufferUnordered where - S: Stream + Sink, - S::Item: Future, + S: Stream + Sink, + F: Future, { type Error = S::Error; diff --git a/futures-util/src/stream/stream/buffered.rs b/futures-util/src/stream/stream/buffered.rs index 41e2948639..1ca69f0791 100644 --- a/futures-util/src/stream/stream/buffered.rs +++ b/futures-util/src/stream/stream/buffered.rs @@ -1,9 +1,9 @@ use crate::stream::{Fuse, FusedStream, FuturesOrdered, StreamExt}; +use alloc::collections::VecDeque; use core::fmt; use core::num::NonZeroUsize; use core::pin::Pin; use futures_core::future::Future; -use futures_core::ready; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] @@ -13,22 +13,23 @@ use pin_project_lite::pin_project; pin_project! { /// Stream for the [`buffered`](super::StreamExt::buffered) method. #[must_use = "streams do nothing unless polled"] - pub struct Buffered + pub struct Buffered where - St: Stream, - St::Item: Future, + St: Stream, + F: Future, { #[pin] stream: Fuse, in_progress_queue: FuturesOrdered, + ready_queue: VecDeque, max: Option, } } -impl fmt::Debug for Buffered +impl fmt::Debug for Buffered where - St: Stream + fmt::Debug, - St::Item: Future, + St: Stream + fmt::Debug, + F: Future, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Buffered") @@ -39,15 +40,16 @@ where } } -impl Buffered +impl Buffered where - St: Stream, - St::Item: Future, + St: Stream, + F: Future, { pub(super) fn new(stream: St, n: Option) -> Self { Self { stream: super::Fuse::new(stream), in_progress_queue: FuturesOrdered::new(), + ready_queue: VecDeque::new(), max: n.and_then(NonZeroUsize::new), } } @@ -55,10 +57,10 @@ where delegate_access_inner!(stream, St, (.)); } -impl Stream for Buffered +impl Stream for Buffered where - St: Stream, - St::Item: Future, + St: Stream, + F: Future, { type Item = ::Output; @@ -74,14 +76,28 @@ where } } - // Attempt to pull the next value from the in_progress_queue - let res = this.in_progress_queue.poll_next_unpin(cx); - if let Some(val) = ready!(res) { - return Poll::Ready(Some(val)); + // Try to poll all ready futures in the in_progress_queue. + loop { + match this.in_progress_queue.poll_next_unpin(cx) { + Poll::Ready(Some(output)) => { + this.ready_queue.push_back(output); + } + Poll::Ready(None) => break, + Poll::Pending => break, + } + } + + // If we have any ready outputs, return the first one. + if let Some(output) = this.ready_queue.pop_front() { + // If there are still ready outputs, wake the task to poll again. + if !this.ready_queue.is_empty() { + cx.waker().wake_by_ref(); + } + + return Poll::Ready(Some(output)); } - // If more values are still coming from the stream, we're not done yet - if this.stream.is_done() { + if this.stream.is_done() && this.in_progress_queue.is_empty() { Poll::Ready(None) } else { Poll::Pending @@ -100,10 +116,10 @@ where } } -impl FusedStream for Buffered +impl FusedStream for Buffered where - St: Stream, - St::Item: Future, + St: Stream, + F: Future, { fn is_terminated(&self) -> bool { self.stream.is_done() && self.in_progress_queue.is_terminated() @@ -112,10 +128,10 @@ where // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] -impl Sink for Buffered +impl Sink for Buffered where - S: Stream + Sink, - S::Item: Future, + S: Stream + Sink, + F: Future, { type Error = S::Error; diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index ee30f8da6f..c018e4b24a 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -1487,10 +1487,11 @@ pub trait StreamExt: Stream { /// library is activated, and it is activated by default. #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] - fn buffered(self, n: impl Into>) -> Buffered + fn buffered(self, n: impl Into>) -> Buffered where - Self::Item: Future, Self: Sized, + Self: Stream, + F: Future, { assert_stream::<::Output, _>(Buffered::new(self, n.into())) } @@ -1536,10 +1537,11 @@ pub trait StreamExt: Stream { /// ``` #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] - fn buffer_unordered(self, n: impl Into>) -> BufferUnordered + fn buffer_unordered(self, n: impl Into>) -> BufferUnordered where - Self::Item: Future, Self: Sized, + Self: Stream, + F: Future, { assert_stream::<::Output, _>(BufferUnordered::new(self, n.into())) } diff --git a/futures/tests/auto_traits.rs b/futures/tests/auto_traits.rs index 8d15fa28ef..ee481b3bee 100644 --- a/futures/tests/auto_traits.rs +++ b/futures/tests/auto_traits.rs @@ -1112,24 +1112,27 @@ mod stream { assert_not_impl!(AndThen: Unpin); assert_not_impl!(AndThen<(), PhantomPinned, ()>: Unpin); - assert_impl!(BufferUnordered>: Send); - assert_not_impl!(BufferUnordered: Send); - assert_not_impl!(BufferUnordered: Send); - assert_impl!(BufferUnordered>: Sync); - assert_not_impl!(BufferUnordered: Sync); - assert_not_impl!(BufferUnordered: Sync); - assert_impl!(BufferUnordered: Unpin); - assert_not_impl!(BufferUnordered: Unpin); - - assert_impl!(Buffered>>: Send); - assert_not_impl!(Buffered>: Send); - assert_not_impl!(Buffered>: Send); - assert_not_impl!(Buffered>>: Send); - assert_impl!(Buffered>>: Sync); - assert_not_impl!(Buffered>>: Sync); - assert_not_impl!(Buffered>>: Sync); - assert_impl!(Buffered>: Unpin); - assert_not_impl!(Buffered>: Unpin); + assert_impl!(BufferUnordered>, SendFuture<()>>: Send); + assert_not_impl!(BufferUnordered, SendFuture>: Send); + assert_not_impl!(BufferUnordered, LocalFuture>: Send); + assert_not_impl!(BufferUnordered, LocalFuture>: Send); + assert_impl!(BufferUnordered>, SendSyncFuture<()>>: Sync); + assert_not_impl!(BufferUnordered>, SyncFuture<()>>: Sync); + assert_not_impl!(BufferUnordered, LocalFuture>: Sync); + assert_not_impl!(BufferUnordered, LocalFuture>: Sync); + assert_impl!(BufferUnordered, UnpinFuture>: Unpin); + assert_not_impl!(BufferUnordered, PinnedFuture>: Unpin); + + assert_impl!(Buffered>, SendFuture<()>>: Send); + assert_not_impl!(Buffered, SendFuture>: Send); + assert_not_impl!(Buffered, LocalFuture>: Send); + assert_not_impl!(Buffered, LocalFuture>: Send); + assert_impl!(Buffered>, SendSyncFuture<()>>: Sync); + assert_not_impl!(Buffered>, SyncFuture<()>>: Sync); + assert_not_impl!(Buffered, LocalFuture>: Sync); + assert_not_impl!(Buffered, LocalFuture>: Sync); + assert_impl!(Buffered, UnpinFuture>: Unpin); + assert_not_impl!(Buffered, PinnedFuture>: Unpin); assert_impl!(CatchUnwind: Send); assert_not_impl!(CatchUnwind: Send); From 90f5a74c1a23aa4ff8c9f9ebdfbfdb017e0f1129 Mon Sep 17 00:00:00 2001 From: andylokandy Date: Tue, 22 Jul 2025 20:34:18 +0800 Subject: [PATCH 2/4] add comment --- futures-util/src/stream/stream/buffer_unordered.rs | 1 + futures-util/src/stream/stream/buffered.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/futures-util/src/stream/stream/buffer_unordered.rs b/futures-util/src/stream/stream/buffer_unordered.rs index 2e9707d95d..6aac7c655c 100644 --- a/futures-util/src/stream/stream/buffer_unordered.rs +++ b/futures-util/src/stream/stream/buffer_unordered.rs @@ -98,6 +98,7 @@ where return Poll::Ready(Some(output)); } + // If more values are still coming from the stream, we're not done yet. if this.stream.is_done() && this.in_progress_queue.is_empty() { Poll::Ready(None) } else { diff --git a/futures-util/src/stream/stream/buffered.rs b/futures-util/src/stream/stream/buffered.rs index 1ca69f0791..2443f1492a 100644 --- a/futures-util/src/stream/stream/buffered.rs +++ b/futures-util/src/stream/stream/buffered.rs @@ -97,6 +97,7 @@ where return Poll::Ready(Some(output)); } + // If more values are still coming from the stream, we're not done yet. if this.stream.is_done() && this.in_progress_queue.is_empty() { Poll::Ready(None) } else { From e8f3dc0bde5f3261c698b5e5006e69950825e6bf Mon Sep 17 00:00:00 2001 From: andylokandy Date: Thu, 24 Jul 2025 01:21:57 +0800 Subject: [PATCH 3/4] fix --- .../src/stream/stream/buffer_unordered.rs | 15 ++++----------- futures-util/src/stream/stream/buffered.rs | 15 ++++----------- 2 files changed, 8 insertions(+), 22 deletions(-) diff --git a/futures-util/src/stream/stream/buffer_unordered.rs b/futures-util/src/stream/stream/buffer_unordered.rs index 6aac7c655c..6cfa8e2de4 100644 --- a/futures-util/src/stream/stream/buffer_unordered.rs +++ b/futures-util/src/stream/stream/buffer_unordered.rs @@ -88,18 +88,11 @@ where } } - // If we have any ready outputs, return the first one. if let Some(output) = this.ready_queue.pop() { - // If there are still ready outputs, wake the task to poll again. - if !this.ready_queue.is_empty() { - cx.waker().wake_by_ref(); - } - - return Poll::Ready(Some(output)); - } - - // If more values are still coming from the stream, we're not done yet. - if this.stream.is_done() && this.in_progress_queue.is_empty() { + // If we have any ready outputs, return the first one. + Poll::Ready(Some(output)) + } else if this.stream.is_done() && this.in_progress_queue.is_empty() { + // If more values are still coming from the stream, we're not done yet. Poll::Ready(None) } else { Poll::Pending diff --git a/futures-util/src/stream/stream/buffered.rs b/futures-util/src/stream/stream/buffered.rs index 2443f1492a..77c7b312e3 100644 --- a/futures-util/src/stream/stream/buffered.rs +++ b/futures-util/src/stream/stream/buffered.rs @@ -87,18 +87,11 @@ where } } - // If we have any ready outputs, return the first one. if let Some(output) = this.ready_queue.pop_front() { - // If there are still ready outputs, wake the task to poll again. - if !this.ready_queue.is_empty() { - cx.waker().wake_by_ref(); - } - - return Poll::Ready(Some(output)); - } - - // If more values are still coming from the stream, we're not done yet. - if this.stream.is_done() && this.in_progress_queue.is_empty() { + // If we have any ready outputs, return the first one. + Poll::Ready(Some(output)) + } else if this.stream.is_done() && this.in_progress_queue.is_empty() { + // If more values are still coming from the stream, we're not done yet. Poll::Ready(None) } else { Poll::Pending From b98f4aeb28ed2713d60d39c8f3aacc6fff7c1ee5 Mon Sep 17 00:00:00 2001 From: andylokandy Date: Thu, 24 Jul 2025 14:00:18 +0800 Subject: [PATCH 4/4] fix --- futures-util/src/stream/stream/buffer_unordered.rs | 1 - futures-util/src/stream/stream/buffered.rs | 1 - 2 files changed, 2 deletions(-) diff --git a/futures-util/src/stream/stream/buffer_unordered.rs b/futures-util/src/stream/stream/buffer_unordered.rs index 6cfa8e2de4..8a70563213 100644 --- a/futures-util/src/stream/stream/buffer_unordered.rs +++ b/futures-util/src/stream/stream/buffer_unordered.rs @@ -92,7 +92,6 @@ where // If we have any ready outputs, return the first one. Poll::Ready(Some(output)) } else if this.stream.is_done() && this.in_progress_queue.is_empty() { - // If more values are still coming from the stream, we're not done yet. Poll::Ready(None) } else { Poll::Pending diff --git a/futures-util/src/stream/stream/buffered.rs b/futures-util/src/stream/stream/buffered.rs index 77c7b312e3..803c9ceba1 100644 --- a/futures-util/src/stream/stream/buffered.rs +++ b/futures-util/src/stream/stream/buffered.rs @@ -91,7 +91,6 @@ where // If we have any ready outputs, return the first one. Poll::Ready(Some(output)) } else if this.stream.is_done() && this.in_progress_queue.is_empty() { - // If more values are still coming from the stream, we're not done yet. Poll::Ready(None) } else { Poll::Pending