Skip to content

Commit 6c0835e

Browse files
authored
Deduplicate bufread::Encoder impl (#402)
* Deduplicate `bufread::Encoder` impl * Fix polling BufReader after EOF
1 parent 6fb985b commit 6c0835e

File tree

4 files changed

+198
-255
lines changed

4 files changed

+198
-255
lines changed

crates/async-compression/src/futures/bufread/generic/encoder.rs

Lines changed: 3 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -1,138 +1,12 @@
1+
use crate::{codecs::Encode, core::util::PartialBuffer, generic::bufread::impl_encoder};
12
use core::{
23
pin::Pin,
34
task::{Context, Poll},
45
};
5-
use std::io::Result;
6-
7-
use crate::codecs::Encode;
8-
use crate::core::util::PartialBuffer;
9-
use futures_core::ready;
106
use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite, IoSlice};
11-
use pin_project_lite::pin_project;
12-
13-
#[derive(Debug)]
14-
enum State {
15-
Encoding,
16-
Flushing,
17-
Finishing,
18-
Done,
19-
}
20-
21-
pin_project! {
22-
#[derive(Debug)]
23-
pub struct Encoder<R, E> {
24-
#[pin]
25-
reader: R,
26-
encoder: E,
27-
state: State,
28-
}
29-
}
30-
31-
impl<R: AsyncBufRead, E: Encode> Encoder<R, E> {
32-
pub fn new(reader: R, encoder: E) -> Self {
33-
Self {
34-
reader,
35-
encoder,
36-
state: State::Encoding,
37-
}
38-
}
39-
40-
pub fn with_capacity(reader: R, encoder: E, _cap: usize) -> Self {
41-
Self::new(reader, encoder)
42-
}
43-
}
44-
45-
impl<R, E> Encoder<R, E> {
46-
pub fn get_ref(&self) -> &R {
47-
&self.reader
48-
}
49-
50-
pub fn get_mut(&mut self) -> &mut R {
51-
&mut self.reader
52-
}
53-
54-
pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
55-
self.project().reader
56-
}
57-
58-
pub(crate) fn get_encoder_ref(&self) -> &E {
59-
&self.encoder
60-
}
61-
62-
pub fn into_inner(self) -> R {
63-
self.reader
64-
}
65-
}
66-
67-
impl<R: AsyncBufRead, E: Encode> Encoder<R, E> {
68-
fn do_poll_read(
69-
self: Pin<&mut Self>,
70-
cx: &mut Context<'_>,
71-
output: &mut PartialBuffer<&mut [u8]>,
72-
) -> Poll<Result<()>> {
73-
let mut this = self.project();
74-
let mut read = 0usize;
75-
76-
loop {
77-
*this.state = match this.state {
78-
State::Encoding => {
79-
let res = this.reader.as_mut().poll_fill_buf(cx);
80-
81-
match res {
82-
Poll::Pending => {
83-
if read == 0 {
84-
return Poll::Pending;
85-
} else {
86-
State::Flushing
87-
}
88-
}
89-
Poll::Ready(res) => {
90-
let input = res?;
91-
92-
if input.is_empty() {
93-
State::Finishing
94-
} else {
95-
let mut input = PartialBuffer::new(input);
96-
this.encoder.encode(&mut input, output)?;
97-
let len = input.written().len();
98-
this.reader.as_mut().consume(len);
99-
read += len;
100-
101-
State::Encoding
102-
}
103-
}
104-
}
105-
}
106-
107-
State::Flushing => {
108-
if this.encoder.flush(output)? {
109-
read = 0;
110-
State::Encoding
111-
} else {
112-
State::Flushing
113-
}
114-
}
115-
116-
State::Finishing => {
117-
if this.encoder.finish(output)? {
118-
State::Done
119-
} else {
120-
State::Finishing
121-
}
122-
}
123-
124-
State::Done => State::Done,
125-
};
7+
use std::io::Result;
1268

127-
if let State::Done = *this.state {
128-
return Poll::Ready(Ok(()));
129-
}
130-
if output.unwritten().is_empty() {
131-
return Poll::Ready(Ok(()));
132-
}
133-
}
134-
}
135-
}
9+
impl_encoder!();
13610

13711
impl<R: AsyncBufRead, E: Encode> AsyncRead for Encoder<R, E> {
13812
fn poll_read(
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
use crate::{codecs::Encode, core::util::PartialBuffer};
2+
use std::{io::Result, ops::ControlFlow};
3+
4+
#[derive(Debug)]
5+
enum State {
6+
Encoding(usize),
7+
Flushing,
8+
Finishing,
9+
Done,
10+
}
11+
12+
#[derive(Debug)]
13+
pub struct Encoder {
14+
state: State,
15+
}
16+
17+
impl Default for Encoder {
18+
fn default() -> Self {
19+
Self {
20+
state: State::Encoding(0),
21+
}
22+
}
23+
}
24+
25+
impl Encoder {
26+
/// `input` - should be `None` if `Poll::Pending`.
27+
pub fn do_poll_read(
28+
&mut self,
29+
output: &mut PartialBuffer<&mut [u8]>,
30+
encoder: &mut impl Encode,
31+
mut input: Option<&mut PartialBuffer<&[u8]>>,
32+
) -> ControlFlow<Result<()>> {
33+
loop {
34+
self.state = match self.state {
35+
State::Encoding(mut read) => match input.as_mut() {
36+
None => {
37+
if read == 0 {
38+
// Poll for more data
39+
// TODO (nobodyxu): Return Ok if `!output.written().is_empty()`
40+
break;
41+
} else {
42+
State::Flushing
43+
}
44+
}
45+
Some(input) => {
46+
if input.unwritten().is_empty() {
47+
State::Finishing
48+
} else {
49+
if let Err(err) = encoder.encode(input, output) {
50+
return ControlFlow::Break(Err(err));
51+
}
52+
53+
read += input.written().len();
54+
55+
// Poll for more data
56+
break;
57+
}
58+
}
59+
},
60+
61+
State::Flushing => match encoder.flush(output) {
62+
Ok(true) => {
63+
self.state = State::Encoding(0);
64+
65+
// Poll for more data
66+
break;
67+
}
68+
Ok(false) => State::Flushing,
69+
Err(err) => return ControlFlow::Break(Err(err)),
70+
},
71+
72+
State::Finishing => match encoder.finish(output) {
73+
Ok(true) => State::Done,
74+
Ok(false) => State::Finishing,
75+
Err(err) => return ControlFlow::Break(Err(err)),
76+
},
77+
78+
State::Done => return ControlFlow::Break(Ok(())),
79+
};
80+
81+
if output.unwritten().is_empty() {
82+
return ControlFlow::Break(Ok(()));
83+
}
84+
}
85+
86+
if output.unwritten().is_empty() {
87+
ControlFlow::Break(Ok(()))
88+
} else {
89+
ControlFlow::Continue(())
90+
}
91+
}
92+
}
93+
94+
macro_rules! impl_encoder {
95+
() => {
96+
use crate::generic::bufread::Encoder as GenericEncoder;
97+
98+
use std::ops::ControlFlow;
99+
100+
use futures_core::ready;
101+
use pin_project_lite::pin_project;
102+
103+
pin_project! {
104+
#[derive(Debug)]
105+
pub struct Encoder<R, E> {
106+
#[pin]
107+
reader: R,
108+
encoder: E,
109+
inner: GenericEncoder,
110+
}
111+
}
112+
113+
impl<R: AsyncBufRead, E: Encode> Encoder<R, E> {
114+
pub fn new(reader: R, encoder: E) -> Self {
115+
Self {
116+
reader,
117+
encoder,
118+
inner: Default::default(),
119+
}
120+
}
121+
122+
pub fn with_capacity(reader: R, encoder: E, _cap: usize) -> Self {
123+
Self::new(reader, encoder)
124+
}
125+
}
126+
127+
impl<R, E> Encoder<R, E> {
128+
pub fn get_ref(&self) -> &R {
129+
&self.reader
130+
}
131+
132+
pub fn get_mut(&mut self) -> &mut R {
133+
&mut self.reader
134+
}
135+
136+
pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
137+
self.project().reader
138+
}
139+
140+
pub(crate) fn get_encoder_ref(&self) -> &E {
141+
&self.encoder
142+
}
143+
144+
pub fn into_inner(self) -> R {
145+
self.reader
146+
}
147+
}
148+
149+
impl<R: AsyncBufRead, E: Encode> Encoder<R, E> {
150+
fn do_poll_read(
151+
self: Pin<&mut Self>,
152+
cx: &mut Context<'_>,
153+
output: &mut PartialBuffer<&mut [u8]>,
154+
) -> Poll<Result<()>> {
155+
let mut this = self.project();
156+
157+
if let ControlFlow::Break(res) =
158+
this.inner.do_poll_read(output, &mut *this.encoder, None)
159+
{
160+
return Poll::Ready(res);
161+
}
162+
163+
loop {
164+
let mut input = match this.reader.as_mut().poll_fill_buf(cx) {
165+
Poll::Pending => None,
166+
Poll::Ready(res) => Some(PartialBuffer::new(res?)),
167+
};
168+
169+
let control_flow =
170+
this.inner
171+
.do_poll_read(output, &mut *this.encoder, input.as_mut());
172+
173+
let is_pending = input.is_none();
174+
if let Some(input) = input {
175+
let len = input.written().len();
176+
this.reader.as_mut().consume(len);
177+
}
178+
179+
if let ControlFlow::Break(res) = control_flow {
180+
break Poll::Ready(res);
181+
}
182+
183+
if is_pending {
184+
return Poll::Pending;
185+
}
186+
}
187+
}
188+
}
189+
};
190+
}
191+
pub(crate) use impl_encoder;
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
11
mod decoder;
2+
mod encoder;
23

34
pub(crate) use decoder::*;
5+
pub(crate) use encoder::*;

0 commit comments

Comments
 (0)