Skip to content

Commit ac76d10

Browse files
Nemo157cramertj
authored andcommitted
Add AsyncBufRead implementation for IntoAsyncRead
1 parent 50f3f71 commit ac76d10

File tree

1 file changed

+79
-1
lines changed

1 file changed

+79
-1
lines changed

futures-util/src/try_stream/into_async_read.rs

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use core::pin::Pin;
22
use futures_core::stream::TryStream;
33
use futures_core::task::{Context, Poll};
4-
use futures_io::AsyncRead;
4+
use futures_io::{AsyncRead, AsyncBufRead};
55
use std::cmp;
66
use std::io::{Error, Result};
77

@@ -98,6 +98,59 @@ where
9898
}
9999
}
100100

101+
impl<St> AsyncBufRead for IntoAsyncRead<St>
102+
where
103+
St: TryStream<Error = Error> + Unpin,
104+
St::Ok: AsRef<[u8]>,
105+
{
106+
fn poll_fill_buf<'a>(
107+
mut self: Pin<&'a mut Self>,
108+
cx: &mut Context<'_>,
109+
) -> Poll<Result<&'a [u8]>> {
110+
if let ReadState::PendingChunk = self.state {
111+
match ready!(Pin::new(&mut self.stream).try_poll_next(cx)) {
112+
Some(Ok(chunk)) => {
113+
self.state = ReadState::Ready {
114+
chunk,
115+
chunk_start: 0,
116+
};
117+
}
118+
Some(Err(err)) => {
119+
self.state = ReadState::Eof;
120+
return Poll::Ready(Err(err));
121+
}
122+
None => {
123+
self.state = ReadState::Eof;
124+
return Poll::Ready(Ok(&[]));
125+
}
126+
}
127+
}
128+
129+
if let ReadState::Ready { ref chunk, chunk_start } = self.into_ref().get_ref().state {
130+
let chunk = chunk.as_ref();
131+
return Poll::Ready(Ok(&chunk[chunk_start..]));
132+
}
133+
134+
// To get to this point we must be in ReadState::Eof
135+
Poll::Ready(Ok(&[]))
136+
}
137+
138+
fn consume(
139+
mut self: Pin<&mut Self>,
140+
amount: usize,
141+
) {
142+
if let ReadState::Ready { chunk, chunk_start } = &mut self.state {
143+
*chunk_start += amount;
144+
debug_assert!(*chunk_start <= chunk.as_ref().len());
145+
if *chunk_start >= chunk.as_ref().len() {
146+
self.state = ReadState::PendingChunk;
147+
}
148+
} else {
149+
debug_assert!(false, "Attempted to consume from IntoAsyncRead without chunk");
150+
}
151+
}
152+
}
153+
101154
#[cfg(test)]
102155
mod tests {
103156
use super::*;
@@ -148,4 +201,29 @@ mod tests {
148201

149202
assert_read!(reader, &mut buf, 0);
150203
}
204+
205+
#[test]
206+
fn test_into_async_bufread() -> std::io::Result<()> {
207+
let stream = stream::iter(1..=2).map(|_| Ok(vec![1, 2, 3, 4, 5]));
208+
let mut reader = stream.into_async_read();
209+
210+
let mut cx = noop_context();
211+
let mut reader = Pin::new(&mut reader);
212+
213+
assert_eq!(reader.as_mut().poll_fill_buf(&mut cx)?, Poll::Ready(&[1, 2, 3, 4, 5][..]));
214+
reader.as_mut().consume(3);
215+
216+
assert_eq!(reader.as_mut().poll_fill_buf(&mut cx)?, Poll::Ready(&[4, 5][..]));
217+
reader.as_mut().consume(2);
218+
219+
assert_eq!(reader.as_mut().poll_fill_buf(&mut cx)?, Poll::Ready(&[1, 2, 3, 4, 5][..]));
220+
reader.as_mut().consume(2);
221+
222+
assert_eq!(reader.as_mut().poll_fill_buf(&mut cx)?, Poll::Ready(&[3, 4, 5][..]));
223+
reader.as_mut().consume(3);
224+
225+
assert_eq!(reader.as_mut().poll_fill_buf(&mut cx)?, Poll::Ready(&[][..]));
226+
227+
Ok(())
228+
}
151229
}

0 commit comments

Comments
 (0)