Skip to content

Commit 4ebce39

Browse files
authored
Flush compressed data out of encoders more often (#383)
* Flush compressed data out of encoders more often Currently flush() is not getting called so compressed output is not returned to the client the encoder until input is fully read(). In my use case, flushing more often is necessary as otherwise it appears to the client the data gets 'buffered'. There's been a previous PR here and some discussion #155 but appears to be abandonded so if there's interest in this fix, this PR tries this again. The tests are passing with this change. * Update futures bufread futures encoder to add more flushes It's equivalent to the tokio implementation.
1 parent 74f5eb1 commit 4ebce39

File tree

3 files changed

+78
-22
lines changed

3 files changed

+78
-22
lines changed

crates/async-compression/src/futures/bufread/generic/encoder.rs

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use pin_project_lite::pin_project;
1414
enum State {
1515
Encoding,
1616
Flushing,
17+
Finishing,
1718
Done,
1819
}
1920

@@ -70,27 +71,53 @@ impl<R: AsyncBufRead, E: Encode> Encoder<R, E> {
7071
output: &mut PartialBuffer<&mut [u8]>,
7172
) -> Poll<Result<()>> {
7273
let mut this = self.project();
74+
let mut read = 0usize;
7375

7476
loop {
7577
*this.state = match this.state {
7678
State::Encoding => {
77-
let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
78-
if input.is_empty() {
79-
State::Flushing
80-
} else {
81-
let mut input = PartialBuffer::new(input);
82-
this.encoder.encode(&mut input, output)?;
83-
let len = input.written().len();
84-
this.reader.as_mut().consume(len);
85-
State::Encoding
79+
let res = this.reader.as_mut().poll_fill_buf(cx);
80+
81+
match res {
82+
Poll::Pending => {
83+
if read == 0 {
84+
return Poll::Pending;
85+
} else {
86+
State::Flushing
87+
}
88+
}
89+
Poll::Ready(res) => {
90+
let input = res?;
91+
92+
if input.is_empty() {
93+
State::Finishing
94+
} else {
95+
let mut input = PartialBuffer::new(input);
96+
this.encoder.encode(&mut input, output)?;
97+
let len = input.written().len();
98+
this.reader.as_mut().consume(len);
99+
read += len;
100+
101+
State::Encoding
102+
}
103+
}
86104
}
87105
}
88106

89107
State::Flushing => {
108+
if this.encoder.flush(output)? {
109+
read = 0;
110+
State::Encoding
111+
} else {
112+
State::Flushing
113+
}
114+
}
115+
116+
State::Finishing => {
90117
if this.encoder.finish(output)? {
91118
State::Done
92119
} else {
93-
State::Flushing
120+
State::Finishing
94121
}
95122
}
96123

crates/async-compression/src/tokio/bufread/generic/encoder.rs

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf};
1313
enum State {
1414
Encoding,
1515
Flushing,
16+
Finishing,
1617
Done,
1718
}
1819

@@ -68,27 +69,53 @@ impl<R: AsyncBufRead, E: Encode> Encoder<R, E> {
6869
output: &mut PartialBuffer<&mut [u8]>,
6970
) -> Poll<Result<()>> {
7071
let mut this = self.project();
72+
let mut read = 0usize;
7173

7274
loop {
7375
*this.state = match this.state {
7476
State::Encoding => {
75-
let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
76-
if input.is_empty() {
77-
State::Flushing
78-
} else {
79-
let mut input = PartialBuffer::new(input);
80-
this.encoder.encode(&mut input, output)?;
81-
let len = input.written().len();
82-
this.reader.as_mut().consume(len);
83-
State::Encoding
77+
let res = this.reader.as_mut().poll_fill_buf(cx);
78+
79+
match res {
80+
Poll::Pending => {
81+
if read == 0 {
82+
return Poll::Pending;
83+
} else {
84+
State::Flushing
85+
}
86+
}
87+
Poll::Ready(res) => {
88+
let input = res?;
89+
90+
if input.is_empty() {
91+
State::Finishing
92+
} else {
93+
let mut input = PartialBuffer::new(input);
94+
this.encoder.encode(&mut input, output)?;
95+
let len = input.written().len();
96+
this.reader.as_mut().consume(len);
97+
read += len;
98+
99+
State::Encoding
100+
}
101+
}
84102
}
85103
}
86104

87105
State::Flushing => {
106+
if this.encoder.flush(output)? {
107+
read = 0;
108+
State::Encoding
109+
} else {
110+
State::Flushing
111+
}
112+
}
113+
114+
State::Finishing => {
88115
if this.encoder.finish(output)? {
89116
State::Done
90117
} else {
91-
State::Flushing
118+
State::Finishing
92119
}
93120
}
94121

crates/async-compression/tests/utils/impls.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ pub mod futures {
3232
// All current test cases are < 100kB
3333
let mut output = Cursor::new(vec![0; 102_400]);
3434
pin_mut!(read);
35-
let len = block_on(copy_buf(BufReader::with_capacity(2, read), &mut output)).unwrap();
35+
// With more flushing from encoders, 4 appears to be the minimal buffer size that works.
36+
let len = block_on(copy_buf(BufReader::with_capacity(4, read), &mut output)).unwrap();
3637
let mut output = output.into_inner();
3738
output.truncate(len as usize);
3839
output
@@ -104,7 +105,8 @@ pub mod tokio {
104105
pub fn to_vec(read: impl AsyncRead) -> Vec<u8> {
105106
let mut output = Cursor::new(vec![0; 102_400]);
106107
pin_mut!(read);
107-
let len = block_on(copy_buf(BufReader::with_capacity(2, read), &mut output)).unwrap();
108+
// With more flushing from encoders, 4 appears to be the minimal buffer size that works.
109+
let len = block_on(copy_buf(BufReader::with_capacity(4, read), &mut output)).unwrap();
108110
let mut output = output.into_inner();
109111
output.truncate(len as usize);
110112
output

0 commit comments

Comments
 (0)