From 51b94bd7e400f97de88a774d7df7154c3fc1eac3 Mon Sep 17 00:00:00 2001 From: Daniel Donohue Date: Sun, 2 Nov 2025 02:23:39 -0500 Subject: [PATCH] fix: make sink::With send after inner sink returns ready --- futures-util/src/sink/with.rs | 38 ++++++++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/futures-util/src/sink/with.rs b/futures-util/src/sink/with.rs index 5a2c8a089..114d89fb3 100644 --- a/futures-util/src/sink/with.rs +++ b/futures-util/src/sink/with.rs @@ -17,6 +17,7 @@ pin_project! { f: F, #[pin] state: Option, + item: Option, _phantom: PhantomData Item>, } } @@ -25,9 +26,14 @@ impl fmt::Debug for With where Si: fmt::Debug, Fut: fmt::Debug, + Item: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("With").field("sink", &self.sink).field("state", &self.state).finish() + f.debug_struct("With") + .field("sink", &self.sink) + .field("state", &self.state) + .field("item", &self.item) + .finish() } } @@ -42,13 +48,14 @@ where Fut: Future>, E: From, { - Self { state: None, sink, f, _phantom: PhantomData } + Self { state: None, sink, f, item: None, _phantom: PhantomData } } } impl Clone for With where Si: Clone, + Item: Clone, F: Clone, Fut: Clone, { @@ -57,6 +64,7 @@ where state: self.state.clone(), sink: self.sink.clone(), f: self.f.clone(), + item: self.item.clone(), _phantom: PhantomData, } } @@ -98,13 +106,25 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); - let item = match this.state.as_mut().as_pin_mut() { - None => return Poll::Ready(Ok(())), - Some(fut) => ready!(fut.poll(cx))?, - }; - this.state.set(None); - this.sink.start_send(item)?; - Poll::Ready(Ok(())) + loop { + if this.item.is_some() { + // Check if the underlying sink is prepared for another item. + // If it is, we have to send it without yielding in between. + match this.sink.as_mut().poll_ready(cx)? { + Poll::Ready(()) => this.sink.start_send(this.item.take().unwrap())?, + Poll::Pending => match this.sink.as_mut().poll_flush(cx)? { + Poll::Ready(()) => continue, // check `poll_ready` again + Poll::Pending => return Poll::Pending, + }, + } + } + if let Some(fut) = this.state.as_mut().as_pin_mut() { + let item = ready!(fut.poll(cx))?; + *this.item = Some(item); + this.state.set(None); + } + return Poll::Ready(Ok(())); + } } }