Skip to content

Commit 1fa960f

Browse files
authored
Deduplicate generic::bufread::Decoder impl of tokio/futures-io (#391)
* Deduplicate `generic::bufread::Decoder` impl Ref #384 * Use `ControlFlow` style API design to avoid trait object
1 parent 8aa202e commit 1fa960f

File tree

6 files changed

+212
-284
lines changed

6 files changed

+212
-284
lines changed

crates/async-compression/src/futures/bufread/generic/decoder.rs

Lines changed: 3 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -1,153 +1,14 @@
1-
use crate::codecs::Decode;
2-
use crate::core::util::PartialBuffer;
1+
use crate::{codecs::Decode, core::util::PartialBuffer, generic::bufread::impl_do_poll_read};
32

43
use core::{
54
pin::Pin,
65
task::{Context, Poll},
76
};
8-
use futures_core::ready;
9-
use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
10-
use pin_project_lite::pin_project;
117
use std::io::{IoSlice, Result};
128

13-
#[derive(Debug)]
14-
enum State {
15-
Decoding,
16-
Flushing,
17-
Done,
18-
Next,
19-
}
20-
21-
pin_project! {
22-
#[derive(Debug)]
23-
pub struct Decoder<R, D> {
24-
#[pin]
25-
reader: R,
26-
decoder: D,
27-
state: State,
28-
multiple_members: bool,
29-
}
30-
}
31-
32-
impl<R: AsyncBufRead, D: Decode> Decoder<R, D> {
33-
pub fn new(reader: R, decoder: D) -> Self {
34-
Self {
35-
reader,
36-
decoder,
37-
state: State::Decoding,
38-
multiple_members: false,
39-
}
40-
}
41-
}
42-
43-
impl<R, D> Decoder<R, D> {
44-
pub fn get_ref(&self) -> &R {
45-
&self.reader
46-
}
47-
48-
pub fn get_mut(&mut self) -> &mut R {
49-
&mut self.reader
50-
}
51-
52-
pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
53-
self.project().reader
54-
}
55-
56-
pub fn into_inner(self) -> R {
57-
self.reader
58-
}
59-
60-
pub fn multiple_members(&mut self, enabled: bool) {
61-
self.multiple_members = enabled;
62-
}
63-
}
64-
65-
impl<R: AsyncBufRead, D: Decode> Decoder<R, D> {
66-
fn do_poll_read(
67-
self: Pin<&mut Self>,
68-
cx: &mut Context<'_>,
69-
output: &mut PartialBuffer<&mut [u8]>,
70-
) -> Poll<Result<()>> {
71-
let mut this = self.project();
72-
73-
let mut first = true;
74-
75-
loop {
76-
*this.state = match this.state {
77-
State::Decoding => {
78-
let input = if first {
79-
&[][..]
80-
} else {
81-
ready!(this.reader.as_mut().poll_fill_buf(cx))?
82-
};
83-
84-
if input.is_empty() && !first {
85-
// Avoid attempting to reinitialise the decoder if the
86-
// reader has returned EOF.
87-
*this.multiple_members = false;
88-
89-
State::Flushing
90-
} else {
91-
let mut input = PartialBuffer::new(input);
92-
let res = this.decoder.decode(&mut input, output).or_else(|err| {
93-
// ignore the first error, occurs when input is empty
94-
// but we need to run decode to flush
95-
if first {
96-
Ok(false)
97-
} else {
98-
Err(err)
99-
}
100-
});
101-
102-
if !first {
103-
let len = input.written().len();
104-
this.reader.as_mut().consume(len);
105-
}
106-
107-
first = false;
108-
109-
if res? {
110-
State::Flushing
111-
} else {
112-
State::Decoding
113-
}
114-
}
115-
}
116-
117-
State::Flushing => {
118-
if this.decoder.finish(output)? {
119-
if *this.multiple_members {
120-
this.decoder.reinit()?;
121-
State::Next
122-
} else {
123-
State::Done
124-
}
125-
} else {
126-
State::Flushing
127-
}
128-
}
129-
130-
State::Done => State::Done,
131-
132-
State::Next => {
133-
let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
134-
if input.is_empty() {
135-
State::Done
136-
} else {
137-
State::Decoding
138-
}
139-
}
140-
};
9+
use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
14110

142-
if let State::Done = *this.state {
143-
return Poll::Ready(Ok(()));
144-
}
145-
if output.unwritten().is_empty() {
146-
return Poll::Ready(Ok(()));
147-
}
148-
}
149-
}
150-
}
11+
impl_do_poll_read!();
15112

15213
impl<R: AsyncBufRead, D: Decode> AsyncRead for Decoder<R, D> {
15314
fn poll_read(
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
use crate::codecs::Decode;
2+
use crate::core::util::PartialBuffer;
3+
4+
use std::{io::Result, ops::ControlFlow};
5+
6+
#[derive(Debug)]
7+
enum State {
8+
Decoding,
9+
Flushing,
10+
Done,
11+
Next,
12+
}
13+
14+
#[derive(Debug)]
15+
pub struct Decoder {
16+
state: State,
17+
multiple_members: bool,
18+
}
19+
20+
impl Default for Decoder {
21+
fn default() -> Self {
22+
Self {
23+
state: State::Decoding,
24+
multiple_members: false,
25+
}
26+
}
27+
}
28+
29+
impl Decoder {
30+
pub fn multiple_members(&mut self, enabled: bool) {
31+
self.multiple_members = enabled;
32+
}
33+
34+
pub fn do_poll_read<D: Decode>(
35+
&mut self,
36+
output: &mut PartialBuffer<&mut [u8]>,
37+
decoder: &mut D,
38+
input: &mut PartialBuffer<&[u8]>,
39+
mut first: bool,
40+
) -> ControlFlow<Result<()>> {
41+
loop {
42+
self.state = match self.state {
43+
State::Decoding => {
44+
if input.unwritten().is_empty() && !first {
45+
// Avoid attempting to reinitialise the decoder if the
46+
// reader has returned EOF.
47+
self.multiple_members = false;
48+
49+
State::Flushing
50+
} else {
51+
match decoder.decode(input, output) {
52+
Ok(true) => State::Flushing,
53+
// ignore the first error, occurs when input is empty
54+
// but we need to run decode to flush
55+
Err(err) if !first => return ControlFlow::Break(Err(err)),
56+
// poll for more data for the next decode
57+
_ => break,
58+
}
59+
}
60+
}
61+
62+
State::Flushing => {
63+
match decoder.finish(output) {
64+
Ok(true) => {
65+
if self.multiple_members {
66+
if let Err(err) = decoder.reinit() {
67+
return ControlFlow::Break(Err(err));
68+
}
69+
70+
// The decode stage might consume all the input,
71+
// the next stage might need to poll again if it's empty.
72+
first = true;
73+
State::Next
74+
} else {
75+
State::Done
76+
}
77+
}
78+
Ok(false) => State::Flushing,
79+
Err(err) => return ControlFlow::Break(Err(err)),
80+
}
81+
}
82+
83+
State::Done => return ControlFlow::Break(Ok(())),
84+
85+
State::Next => {
86+
if input.unwritten().is_empty() {
87+
if first {
88+
// poll for more data to check if there's another stream
89+
break;
90+
}
91+
State::Done
92+
} else {
93+
State::Decoding
94+
}
95+
}
96+
};
97+
98+
if output.unwritten().is_empty() {
99+
return ControlFlow::Break(Ok(()));
100+
}
101+
}
102+
103+
if output.unwritten().is_empty() {
104+
ControlFlow::Break(Ok(()))
105+
} else {
106+
ControlFlow::Continue(())
107+
}
108+
}
109+
}
110+
111+
macro_rules! impl_do_poll_read {
112+
() => {
113+
use crate::generic::bufread::Decoder as GenericDecoder;
114+
115+
use std::ops::ControlFlow;
116+
117+
use futures_core::ready;
118+
use pin_project_lite::pin_project;
119+
120+
pin_project! {
121+
#[derive(Debug)]
122+
pub struct Decoder<R, D> {
123+
#[pin]
124+
reader: R,
125+
decoder: D,
126+
inner: GenericDecoder,
127+
}
128+
}
129+
130+
impl<R: AsyncBufRead, D: Decode> Decoder<R, D> {
131+
pub fn new(reader: R, decoder: D) -> Self {
132+
Self {
133+
reader,
134+
decoder,
135+
inner: GenericDecoder::default(),
136+
}
137+
}
138+
}
139+
140+
impl<R, D> Decoder<R, D> {
141+
pub fn get_ref(&self) -> &R {
142+
&self.reader
143+
}
144+
145+
pub fn get_mut(&mut self) -> &mut R {
146+
&mut self.reader
147+
}
148+
149+
pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
150+
self.project().reader
151+
}
152+
153+
pub fn into_inner(self) -> R {
154+
self.reader
155+
}
156+
157+
pub fn multiple_members(&mut self, enabled: bool) {
158+
self.inner.multiple_members(enabled);
159+
}
160+
}
161+
162+
impl<R: AsyncBufRead, D: Decode> Decoder<R, D> {
163+
fn do_poll_read(
164+
self: Pin<&mut Self>,
165+
cx: &mut Context<'_>,
166+
output: &mut PartialBuffer<&mut [u8]>,
167+
) -> Poll<Result<()>> {
168+
let mut this = self.project();
169+
170+
if let ControlFlow::Break(res) = this.inner.do_poll_read(
171+
output,
172+
this.decoder,
173+
&mut PartialBuffer::new(&[][..]),
174+
true,
175+
) {
176+
return Poll::Ready(res);
177+
}
178+
179+
loop {
180+
let mut input =
181+
PartialBuffer::new(ready!(this.reader.as_mut().poll_fill_buf(cx))?);
182+
183+
let control_flow =
184+
this.inner
185+
.do_poll_read(output, this.decoder, &mut input, false);
186+
187+
let bytes_read = input.written().len();
188+
this.reader.as_mut().consume(bytes_read);
189+
190+
if let ControlFlow::Break(res) = control_flow {
191+
break Poll::Ready(res);
192+
}
193+
}
194+
}
195+
}
196+
};
197+
}
198+
pub(crate) use impl_do_poll_read;
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
mod decoder;
2+
3+
pub(crate) use decoder::*;
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub(crate) mod bufread;

crates/async-compression/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@
149149
#[macro_use]
150150
mod macros;
151151

152+
/// Generic, async runtime agonistc implementation of en/decoders
153+
mod generic;
154+
152155
#[cfg(feature = "futures-io")]
153156
pub mod futures;
154157
#[cfg(feature = "tokio")]

0 commit comments

Comments
 (0)