Skip to content

Commit e894423

Browse files
authored
feature: Add uninitialized output buffer support (#414)
* Add uninitialized buffer API to `WriteBuffer` * Demono `WriteBuffer::copy_unwritten_from` - reduce compile time - having one version would make it easier to optimize for compiler * Use bzip2 0.6.1 uninitialized API * Use deflate64 0.1.10 uninitialized API * Use flate2 1.1.4 uninitialized API * Support uninitialized buffer for lz4 codecs * Ret early in `Lz4Encoder::write` if output has no spare space Avoid allocating a new buffer when the output has no spare space, it makes no sense to allocate an internal buffer and try compress when output is empty * Fix `Lz4Encoder`: Use `Vec::spare_capacity_mut()` This is the safe usage, previous usage is ok in practice, but is actually considered an out-of-bound access by miri and any santizier. * Refactor xz2 codecs: Extract new fn `process_stream` Shared between encoder and decoder * Use liblzma 0.4.5 uninitialized API * Use zstd uninitialized API * Refactor: Extract `tokio::poll_read` Extract duplicate code, easier to modify * Support uninitialized buffer for `tokio::bufread`
1 parent cf54f96 commit e894423

File tree

18 files changed

+374
-186
lines changed

18 files changed

+374
-186
lines changed

crates/async-compression/src/tokio/bufread/generic/decoder.rs

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,11 @@ use crate::{
44
generic::bufread::impl_decoder,
55
};
66

7-
use core::{
7+
use std::{
8+
io::{IoSlice, Result},
89
pin::Pin,
910
task::{Context, Poll},
1011
};
11-
use std::io::{IoSlice, Result};
12-
1312
use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf};
1413

1514
impl_decoder!();
@@ -20,19 +19,7 @@ impl<R: AsyncBufRead, D: DecodeV2> AsyncRead for Decoder<R, D> {
2019
cx: &mut Context<'_>,
2120
buf: &mut ReadBuf<'_>,
2221
) -> Poll<Result<()>> {
23-
if buf.remaining() == 0 {
24-
return Poll::Ready(Ok(()));
25-
}
26-
27-
let mut output = WriteBuffer::new_initialized(buf.initialize_unfilled());
28-
match self.do_poll_read(cx, &mut output)? {
29-
Poll::Pending if output.written().is_empty() => Poll::Pending,
30-
_ => {
31-
let len = output.written_len();
32-
buf.advance(len);
33-
Poll::Ready(Ok(()))
34-
}
35-
}
22+
super::poll_read(buf, |output| self.do_poll_read(cx, output))
3623
}
3724
}
3825

crates/async-compression/src/tokio/bufread/generic/encoder.rs

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,7 @@ impl<R: AsyncBufRead, E: EncodeV2> AsyncRead for Encoder<R, E> {
1818
cx: &mut Context<'_>,
1919
buf: &mut ReadBuf<'_>,
2020
) -> Poll<Result<()>> {
21-
if buf.remaining() == 0 {
22-
return Poll::Ready(Ok(()));
23-
}
24-
25-
let mut output = WriteBuffer::new_initialized(buf.initialize_unfilled());
26-
match self.do_poll_read(cx, &mut output)? {
27-
Poll::Pending if output.written().is_empty() => Poll::Pending,
28-
_ => {
29-
let len = output.written_len();
30-
buf.advance(len);
31-
Poll::Ready(Ok(()))
32-
}
33-
}
21+
super::poll_read(buf, |output| self.do_poll_read(cx, output))
3422
}
3523
}
3624

crates/async-compression/src/tokio/bufread/generic/mod.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,36 @@ mod decoder;
22
mod encoder;
33

44
pub use self::{decoder::Decoder, encoder::Encoder};
5+
6+
use crate::core::util::WriteBuffer;
7+
use std::{io::Result, task::Poll};
8+
use tokio::io::ReadBuf;
9+
10+
fn poll_read(
11+
buf: &mut ReadBuf<'_>,
12+
do_poll_read: impl FnOnce(&mut WriteBuffer<'_>) -> Poll<Result<()>>,
13+
) -> Poll<Result<()>> {
14+
if buf.remaining() == 0 {
15+
return Poll::Ready(Ok(()));
16+
}
17+
18+
let initialized = buf.initialized().len() - buf.filled().len();
19+
// Safety: `WriteBuffer` has the same safety invariant as `ReadBuf`
20+
let mut output = WriteBuffer::new_uninitialized(unsafe { buf.unfilled_mut() });
21+
// Safety: `ReadBuf` ensures that it is initialized
22+
unsafe { output.assume_init(initialized) };
23+
24+
let res = do_poll_read(&mut output);
25+
26+
let initialized = output.initialized_len();
27+
let written = output.written_len();
28+
29+
// Safety: We trust our implementation to have properly initialized it
30+
unsafe { buf.assume_init(initialized) };
31+
buf.advance(written);
32+
33+
match res? {
34+
Poll::Pending if written == 0 => Poll::Pending,
35+
_ => Poll::Ready(Ok(())),
36+
}
37+
}

crates/compression-codecs/Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,12 @@ deflate64 = ["dep:deflate64"]
4747
compression-core.workspace = true
4848
# features
4949
brotli = { version = "8", optional = true }
50-
bzip2 = { version = "0.6", optional = true }
51-
deflate64 = { version = "0.1.5", optional = true }
52-
flate2 = { version = "1.0.13", optional = true }
50+
bzip2 = { version = "0.6.1", optional = true }
51+
deflate64 = { version = "0.1.10", optional = true }
52+
flate2 = { version = "1.1.4", optional = true }
5353
libzstd = { package = "zstd", version = "0.13.1", optional = true, default-features = false }
5454
lz4 = { version = "1.28.1", optional = true }
55-
liblzma = { version = "0.4.4", optional = true }
55+
liblzma = { version = "0.4.5", optional = true }
5656
memchr = { version = "2", optional = true }
5757
zstd-safe = { version = "7", optional = true, default-features = false }
5858

crates/compression-codecs/src/bzip2/decoder.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,15 @@ impl BzDecoder {
4343

4444
let result = self
4545
.decompress
46-
.decompress(input.unwritten(), output.initialize_unwritten())
46+
// Safety: We **trust** bzip2 to only write initialized data to it
47+
.decompress_uninit(input.unwritten(), unsafe { output.unwritten_mut() })
4748
.map_err(io::Error::other);
4849

4950
input.advance((self.decompress.total_in() - prior_in) as usize);
50-
output.advance((self.decompress.total_out() - prior_out) as usize);
51+
// Safety: We **trust** bzip2 to write bytes properly
52+
unsafe {
53+
output.assume_init_and_advance((self.decompress.total_out() - prior_out) as usize)
54+
};
5155

5256
// Track when stream has properly ended
5357
if matches!(result, Ok(Status::StreamEnd)) {

crates/compression-codecs/src/bzip2/encoder.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,13 @@ impl BzEncoder {
5757

5858
let result = self
5959
.compress
60-
.compress(input.unwritten(), output.initialize_unwritten(), action)
60+
// Safety: We **trust** bzip2 to only write initialized bytes into it
61+
.compress_uninit(input.unwritten(), unsafe { output.unwritten_mut() }, action)
6162
.map_err(io::Error::other);
6263

6364
input.advance((self.compress.total_in() - prior_in) as usize);
64-
output.advance((self.compress.total_out() - prior_out) as usize);
65+
// Safety: We **trust** bzip2 to properly write bytes into it
66+
unsafe { output.assume_init_and_advance((self.compress.total_out() - prior_out) as usize) };
6567

6668
result
6769
}

crates/compression-codecs/src/deflate64/decoder.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@ impl Deflate64Decoder {
2828
) -> Result<bool> {
2929
let result = self
3030
.inflater
31-
.inflate(input.unwritten(), output.initialize_unwritten());
31+
// Safety: We **trust** deflate64 to not write uninitialized bytes
32+
.inflate_uninit(input.unwritten(), unsafe { output.unwritten_mut() });
3233

3334
input.advance(result.bytes_consumed);
34-
output.advance(result.bytes_written);
35+
// Safety: We **trust** deflate64 to properly write bytes into buffer
36+
unsafe { output.assume_init_and_advance(result.bytes_written) };
3537

3638
if result.data_error {
3739
Err(Error::new(ErrorKind::InvalidData, "invalid data"))

crates/compression-codecs/src/flate/decoder.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,16 @@ impl FlateDecoder {
2626
let prior_in = self.decompress.total_in();
2727
let prior_out = self.decompress.total_out();
2828

29-
let result =
30-
self.decompress
31-
.decompress(input.unwritten(), output.initialize_unwritten(), flush);
29+
let result = self
30+
.decompress
31+
// Safety: We **trust** flate2 to not write uninitialized bytes into buffer
32+
.decompress_uninit(input.unwritten(), unsafe { output.unwritten_mut() }, flush);
3233

3334
input.advance((self.decompress.total_in() - prior_in) as usize);
34-
output.advance((self.decompress.total_out() - prior_out) as usize);
35+
// Safety: We **trust** flate2 to write bytes into buffer properly
36+
unsafe {
37+
output.assume_init_and_advance((self.decompress.total_out() - prior_out) as usize)
38+
};
3539

3640
Ok(result?)
3741
}

crates/compression-codecs/src/flate/encoder.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,14 @@ impl FlateEncoder {
3131
let prior_in = self.compress.total_in();
3232
let prior_out = self.compress.total_out();
3333

34-
let result =
35-
self.compress
36-
.compress(input.unwritten(), output.initialize_unwritten(), flush);
34+
let result = self
35+
.compress
36+
// Safety: We **trust** flate2 to not write uninitialized bytes into buffer
37+
.compress_uninit(input.unwritten(), unsafe { output.unwritten_mut() }, flush);
3738

3839
input.advance((self.compress.total_in() - prior_in) as usize);
39-
output.advance((self.compress.total_out() - prior_out) as usize);
40+
// Safety: We **trust** flate2 to write bytes properly into buffer
41+
unsafe { output.assume_init_and_advance((self.compress.total_out() - prior_out) as usize) };
4042

4143
Ok(result?)
4244
}

crates/compression-codecs/src/lz4/decoder.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,22 +61,28 @@ impl DecodeV2 for Lz4Decoder {
6161
input: &mut PartialBuffer<&[u8]>,
6262
output: &mut WriteBuffer<'_>,
6363
) -> Result<bool> {
64-
let out_buf = output.initialize_unwritten();
65-
66-
let mut output_size = out_buf.len();
6764
let mut input_size = input.unwritten().len();
65+
66+
// Safety: We **trust** lz4 bytes to properly function as expected,
67+
// only write decompressed, initialized data into the buffer properly.
6868
let result = unsafe {
69-
check_error(LZ4F_decompress(
69+
let out_buf = output.unwritten_mut();
70+
71+
let mut output_size = out_buf.len();
72+
73+
let result = check_error(LZ4F_decompress(
7074
self.ctx.get_mut().ctx,
71-
out_buf.as_mut_ptr(),
75+
out_buf.as_mut_ptr() as *mut _,
7276
&mut output_size,
7377
input.unwritten().as_ptr(),
7478
&mut input_size,
7579
core::ptr::null(),
76-
))
80+
));
81+
output.assume_init_and_advance(output_size);
82+
83+
result
7784
};
7885
input.advance(input_size);
79-
output.advance(output_size);
8086

8187
let finished = result? == 0;
8288
if finished {

0 commit comments

Comments
 (0)