Skip to content

Commit 83a06fe

Browse files
authored
Update async-compression to use codecs v2 (#410)
* Adopt `{De, En}coderV2` in generic en/decoder impl * Demono `generic::bufread::{De, En}coder` impl
1 parent 2c13c1d commit 83a06fe

File tree

8 files changed

+161
-125
lines changed

8 files changed

+161
-125
lines changed

crates/async-compression/src/futures/bufread/generic/decoder.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
1-
use crate::{codecs::Decode, core::util::PartialBuffer, generic::bufread::impl_decoder};
2-
1+
use crate::{
2+
codecs::DecodeV2,
3+
core::util::{PartialBuffer, WriteBuffer},
4+
generic::bufread::impl_decoder,
5+
};
36
use core::{
47
pin::Pin,
58
task::{Context, Poll},
69
};
7-
use std::io::{IoSlice, Result};
8-
910
use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
11+
use std::io::{IoSlice, Result};
1012

1113
impl_decoder!();
1214

13-
impl<R: AsyncBufRead, D: Decode> AsyncRead for Decoder<R, D> {
15+
impl<R: AsyncBufRead, D: DecodeV2> AsyncRead for Decoder<R, D> {
1416
fn poll_read(
1517
self: Pin<&mut Self>,
1618
cx: &mut Context<'_>,
@@ -20,10 +22,10 @@ impl<R: AsyncBufRead, D: Decode> AsyncRead for Decoder<R, D> {
2022
return Poll::Ready(Ok(0));
2123
}
2224

23-
let mut output = PartialBuffer::new(buf);
25+
let mut output = WriteBuffer::new_initialized(buf);
2426
match self.do_poll_read(cx, &mut output)? {
2527
Poll::Pending if output.written().is_empty() => Poll::Pending,
26-
_ => Poll::Ready(Ok(output.written().len())),
28+
_ => Poll::Ready(Ok(output.written_len())),
2729
}
2830
}
2931
}

crates/async-compression/src/futures/bufread/generic/encoder.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
1-
use crate::{codecs::Encode, core::util::PartialBuffer, generic::bufread::impl_encoder};
2-
use core::{
1+
use crate::{
2+
codecs::EncodeV2,
3+
core::util::{PartialBuffer, WriteBuffer},
4+
generic::bufread::impl_encoder,
5+
};
6+
use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite, IoSlice};
7+
use std::{
8+
io::Result,
39
pin::Pin,
410
task::{Context, Poll},
511
};
6-
use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite, IoSlice};
7-
use std::io::Result;
812

913
impl_encoder!();
1014

11-
impl<R: AsyncBufRead, E: Encode> AsyncRead for Encoder<R, E> {
15+
impl<R: AsyncBufRead, E: EncodeV2> AsyncRead for Encoder<R, E> {
1216
fn poll_read(
1317
self: Pin<&mut Self>,
1418
cx: &mut Context<'_>,
@@ -18,10 +22,10 @@ impl<R: AsyncBufRead, E: Encode> AsyncRead for Encoder<R, E> {
1822
return Poll::Ready(Ok(0));
1923
}
2024

21-
let mut output = PartialBuffer::new(buf);
25+
let mut output = WriteBuffer::new_initialized(buf);
2226
match self.do_poll_read(cx, &mut output)? {
2327
Poll::Pending if output.written().is_empty() => Poll::Pending,
24-
_ => Poll::Ready(Ok(output.written().len())),
28+
_ => Poll::Ready(Ok(output.written_len())),
2529
}
2630
}
2731
}

crates/async-compression/src/generic/bufread/decoder.rs

Lines changed: 41 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1-
use crate::codecs::Decode;
2-
use crate::core::util::PartialBuffer;
1+
use crate::{
2+
codecs::DecodeV2,
3+
core::util::{PartialBuffer, WriteBuffer},
4+
};
35

46
use std::{io::Result, ops::ControlFlow};
57

@@ -31,10 +33,10 @@ impl Decoder {
3133
self.multiple_members = enabled;
3234
}
3335

34-
pub fn do_poll_read<D: Decode>(
36+
pub fn do_poll_read(
3537
&mut self,
36-
output: &mut PartialBuffer<&mut [u8]>,
37-
decoder: &mut D,
38+
output: &mut WriteBuffer<'_>,
39+
decoder: &mut dyn DecodeV2,
3840
input: &mut PartialBuffer<&[u8]>,
3941
mut first: bool,
4042
) -> ControlFlow<Result<()>> {
@@ -95,12 +97,12 @@ impl Decoder {
9597
}
9698
};
9799

98-
if output.unwritten().is_empty() {
100+
if output.has_no_spare_space() {
99101
return ControlFlow::Break(Ok(()));
100102
}
101103
}
102104

103-
if output.unwritten().is_empty() {
105+
if output.has_no_spare_space() {
104106
ControlFlow::Break(Ok(()))
105107
} else {
106108
ControlFlow::Continue(())
@@ -127,7 +129,7 @@ macro_rules! impl_decoder {
127129
}
128130
}
129131

130-
impl<R: AsyncBufRead, D: Decode> Decoder<R, D> {
132+
impl<R: AsyncBufRead, D: DecodeV2> Decoder<R, D> {
131133
pub fn new(reader: R, decoder: D) -> Self {
132134
Self {
133135
reader,
@@ -159,40 +161,44 @@ macro_rules! impl_decoder {
159161
}
160162
}
161163

162-
impl<R: AsyncBufRead, D: Decode> Decoder<R, D> {
163-
fn do_poll_read(
164-
self: Pin<&mut Self>,
165-
cx: &mut Context<'_>,
166-
output: &mut PartialBuffer<&mut [u8]>,
167-
) -> Poll<Result<()>> {
168-
let mut this = self.project();
169-
170-
if let ControlFlow::Break(res) = this.inner.do_poll_read(
171-
output,
172-
this.decoder,
173-
&mut PartialBuffer::new(&[][..]),
174-
true,
175-
) {
176-
return Poll::Ready(res);
177-
}
164+
fn do_poll_read(
165+
inner: &mut GenericDecoder,
166+
decoder: &mut dyn DecodeV2,
167+
mut reader: Pin<&mut dyn AsyncBufRead>,
168+
cx: &mut Context<'_>,
169+
output: &mut WriteBuffer<'_>,
170+
) -> Poll<Result<()>> {
171+
if let ControlFlow::Break(res) =
172+
inner.do_poll_read(output, decoder, &mut PartialBuffer::new(&[][..]), true)
173+
{
174+
return Poll::Ready(res);
175+
}
178176

179-
loop {
180-
let mut input =
181-
PartialBuffer::new(ready!(this.reader.as_mut().poll_fill_buf(cx))?);
177+
loop {
178+
let mut input = PartialBuffer::new(ready!(reader.as_mut().poll_fill_buf(cx))?);
182179

183-
let control_flow =
184-
this.inner
185-
.do_poll_read(output, this.decoder, &mut input, false);
180+
let control_flow = inner.do_poll_read(output, decoder, &mut input, false);
186181

187-
let bytes_read = input.written().len();
188-
this.reader.as_mut().consume(bytes_read);
182+
let bytes_read = input.written().len();
183+
reader.as_mut().consume(bytes_read);
189184

190-
if let ControlFlow::Break(res) = control_flow {
191-
break Poll::Ready(res);
192-
}
185+
if let ControlFlow::Break(res) = control_flow {
186+
break Poll::Ready(res);
193187
}
194188
}
195189
}
190+
191+
impl<R: AsyncBufRead, D: DecodeV2> Decoder<R, D> {
192+
fn do_poll_read(
193+
self: Pin<&mut Self>,
194+
cx: &mut Context<'_>,
195+
output: &mut WriteBuffer<'_>,
196+
) -> Poll<Result<()>> {
197+
let this = self.project();
198+
199+
do_poll_read(this.inner, this.decoder, this.reader, cx, output)
200+
}
201+
}
196202
};
197203
}
198204
pub(crate) use impl_decoder;

crates/async-compression/src/generic/bufread/encoder.rs

Lines changed: 47 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use crate::{codecs::Encode, core::util::PartialBuffer};
1+
use crate::{
2+
codecs::EncodeV2,
3+
core::util::{PartialBuffer, WriteBuffer},
4+
};
25
use std::{io::Result, ops::ControlFlow};
36

47
#[derive(Debug)]
@@ -26,8 +29,8 @@ impl Encoder {
2629
/// `input` - should be `None` if `Poll::Pending`.
2730
pub fn do_poll_read(
2831
&mut self,
29-
output: &mut PartialBuffer<&mut [u8]>,
30-
encoder: &mut impl Encode,
32+
output: &mut WriteBuffer<'_>,
33+
encoder: &mut dyn EncodeV2,
3134
mut input: Option<&mut PartialBuffer<&[u8]>>,
3235
) -> ControlFlow<Result<()>> {
3336
loop {
@@ -81,12 +84,12 @@ impl Encoder {
8184
State::Done => return ControlFlow::Break(Ok(())),
8285
};
8386

84-
if output.unwritten().is_empty() {
87+
if output.has_no_spare_space() {
8588
return ControlFlow::Break(Ok(()));
8689
}
8790
}
8891

89-
if output.unwritten().is_empty() {
92+
if output.has_no_spare_space() {
9093
ControlFlow::Break(Ok(()))
9194
} else {
9295
ControlFlow::Continue(())
@@ -113,7 +116,7 @@ macro_rules! impl_encoder {
113116
}
114117
}
115118

116-
impl<R: AsyncBufRead, E: Encode> Encoder<R, E> {
119+
impl<R: AsyncBufRead, E: EncodeV2> Encoder<R, E> {
117120
pub fn new(reader: R, encoder: E) -> Self {
118121
Self {
119122
reader,
@@ -149,46 +152,52 @@ macro_rules! impl_encoder {
149152
}
150153
}
151154

152-
impl<R: AsyncBufRead, E: Encode> Encoder<R, E> {
153-
fn do_poll_read(
154-
self: Pin<&mut Self>,
155-
cx: &mut Context<'_>,
156-
output: &mut PartialBuffer<&mut [u8]>,
157-
) -> Poll<Result<()>> {
158-
let mut this = self.project();
159-
160-
if let ControlFlow::Break(res) =
161-
this.inner.do_poll_read(output, &mut *this.encoder, None)
162-
{
163-
return Poll::Ready(res);
164-
}
155+
fn do_poll_read(
156+
inner: &mut GenericEncoder,
157+
encoder: &mut dyn EncodeV2,
158+
mut reader: Pin<&mut dyn AsyncBufRead>,
159+
cx: &mut Context<'_>,
160+
output: &mut WriteBuffer<'_>,
161+
) -> Poll<Result<()>> {
162+
if let ControlFlow::Break(res) = inner.do_poll_read(output, encoder, None) {
163+
return Poll::Ready(res);
164+
}
165165

166-
loop {
167-
let mut input = match this.reader.as_mut().poll_fill_buf(cx) {
168-
Poll::Pending => None,
169-
Poll::Ready(res) => Some(PartialBuffer::new(res?)),
170-
};
166+
loop {
167+
let mut input = match reader.as_mut().poll_fill_buf(cx) {
168+
Poll::Pending => None,
169+
Poll::Ready(res) => Some(PartialBuffer::new(res?)),
170+
};
171171

172-
let control_flow =
173-
this.inner
174-
.do_poll_read(output, &mut *this.encoder, input.as_mut());
172+
let control_flow = inner.do_poll_read(output, encoder, input.as_mut());
175173

176-
let is_pending = input.is_none();
177-
if let Some(input) = input {
178-
let len = input.written().len();
179-
this.reader.as_mut().consume(len);
180-
}
174+
let is_pending = input.is_none();
175+
if let Some(input) = input {
176+
let len = input.written().len();
177+
reader.as_mut().consume(len);
178+
}
181179

182-
if let ControlFlow::Break(res) = control_flow {
183-
break Poll::Ready(res);
184-
}
180+
if let ControlFlow::Break(res) = control_flow {
181+
break Poll::Ready(res);
182+
}
185183

186-
if is_pending {
187-
return Poll::Pending;
188-
}
184+
if is_pending {
185+
return Poll::Pending;
189186
}
190187
}
191188
}
189+
190+
impl<R: AsyncBufRead, E: EncodeV2> Encoder<R, E> {
191+
fn do_poll_read(
192+
self: Pin<&mut Self>,
193+
cx: &mut Context<'_>,
194+
output: &mut WriteBuffer<'_>,
195+
) -> Poll<Result<()>> {
196+
let this = self.project();
197+
198+
do_poll_read(this.inner, this.encoder, this.reader, cx, output)
199+
}
200+
}
192201
};
193202
}
194203
pub(crate) use impl_encoder;

0 commit comments

Comments
 (0)