Skip to content

Commit e6b3819

Browse files
baszalmstraclaude
andauthored
fix: UnexpectedEof on truncated input (#412)
* Add generic truncated stream test to all decoders Added a generic truncated stream test to the test_cases! macro that automatically tests all decoders (bzip2, gzip, deflate, zlib, xz, lzma, lz4, zstd, brotli) for proper handling of incomplete streams. The test compresses data, truncates it, then attempts decompression. Decoders should return UnexpectedEof errors for truncated streams instead of silently accepting incomplete data. This test currently fails for bzip2, lz4, and zstd decoders, which will be fixed in subsequent commits. * Fix BzDecoder to propagate UnexpectedEof error on truncated streams Fixes #411 The async BzDecoder was silently accepting truncated bzip2 streams, returning Ok(0) instead of raising an error. This contrasts with the synchronous bzip2::read::BzDecoder which properly returns an UnexpectedEof error. Added state tracking to BzDecoder: - Added stream_ended field to track if Status::StreamEnd was received - Modified decode() to set stream_ended = true on Status::StreamEnd - Updated finish() to check stream_ended and return UnexpectedEof if false This ensures applications cannot accidentally accept corrupted or incomplete compressed data as valid, matching the behavior of the synchronous decoder. The generic truncated test now passes for bzip2. * Fix Lz4Decoder to propagate UnexpectedEof error on truncated streams The LZ4 decoder was silently accepting truncated streams by not validating stream completion in finish(). This issue was discovered by the generic truncated stream test. Added state tracking to Lz4Decoder: - Added stream_ended field to track if remaining == 0 was seen - Modified decode() to set stream_ended = true when stream completes - Updated finish() to check stream_ended and return UnexpectedEof if false This matches the behavior of other decoders (bzip2, gzip, etc.) and ensures applications cannot accidentally accept corrupted or incomplete LZ4 data as valid. The generic truncated test now passes for LZ4. * Fix ZstdDecoder to propagate UnexpectedEof error on truncated streams The Zstd decoder was silently accepting truncated streams by not validating stream completion in finish(). This issue was discovered by the generic truncated stream test. Added state tracking to ZstdDecoder: - Added stream_ended field to track if remaining == 0 was seen - Modified decode() to set stream_ended = true when stream completes - Updated finish() to check stream_ended and return UnexpectedEof if false - Updated all constructors to initialize stream_ended = false This matches the behavior of other decoders (bzip2, gzip, lz4, etc.) and ensures applications cannot accidentally accept corrupted or incomplete zstd data as valid. The generic truncated test now passes for Zstd. --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent 83a06fe commit e6b3819

File tree

4 files changed

+79
-5
lines changed

4 files changed

+79
-5
lines changed

crates/async-compression/tests/utils/test_cases.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,26 @@ macro_rules! io_test_cases {
231231

232232
assert_eq!(output, &[1, 2, 3, 4, 5, 6, 6, 5, 4, 3, 2, 1][..]);
233233
}
234+
235+
#[test]
236+
#[ntest::timeout(1000)]
237+
fn truncated() {
238+
let compressed = sync::compress(&[1, 2, 3, 4, 5, 6]);
239+
240+
// Truncate the compressed data (remove last 20 bytes or half, whichever is less)
241+
let truncate_amount = std::cmp::min(20, compressed.len() / 2);
242+
let truncated = &compressed[..compressed.len() - truncate_amount];
243+
244+
let input = InputStream::new(vec![truncated.to_vec()]);
245+
246+
// Try to decompress - should get an error for incomplete stream
247+
// The error manifests as a panic when read::to_vec calls unwrap()
248+
let result =
249+
std::panic::catch_unwind(|| bufread::decompress(bufread::from(&input)));
250+
251+
// Should fail for truncated stream
252+
assert!(result.is_err(), "Expected error for truncated stream");
253+
}
234254
}
235255
}
236256

crates/compression-codecs/src/bzip2/decoder.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::{fmt, io};
55

66
pub struct BzDecoder {
77
decompress: Decompress,
8+
stream_ended: bool,
89
}
910

1011
impl fmt::Debug for BzDecoder {
@@ -22,6 +23,7 @@ impl Default for BzDecoder {
2223
fn default() -> Self {
2324
Self {
2425
decompress: Decompress::new(false),
26+
stream_ended: false,
2527
}
2628
}
2729
}
@@ -49,13 +51,19 @@ impl BzDecoder {
4951
input.advance((self.decompress.total_in() - prior_in) as usize);
5052
output.advance((self.decompress.total_out() - prior_out) as usize);
5153

54+
// Track when stream has properly ended
55+
if status == Status::StreamEnd {
56+
self.stream_ended = true;
57+
}
58+
5259
Ok(status)
5360
}
5461
}
5562

5663
impl DecodeV2 for BzDecoder {
5764
fn reinit(&mut self) -> io::Result<()> {
5865
self.decompress = Decompress::new(false);
66+
self.stream_ended = false;
5967
Ok(())
6068
}
6169

@@ -101,6 +109,13 @@ impl DecodeV2 for BzDecoder {
101109
}
102110

103111
fn finish(&mut self, _output: &mut WriteBuffer<'_>) -> io::Result<bool> {
104-
Ok(true)
112+
if self.stream_ended {
113+
Ok(true)
114+
} else {
115+
Err(io::Error::new(
116+
io::ErrorKind::UnexpectedEof,
117+
"bzip2 stream did not finish",
118+
))
119+
}
105120
}
106121
}

crates/compression-codecs/src/lz4/decoder.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ struct DecoderContext {
1717
#[derive(Debug)]
1818
pub struct Lz4Decoder {
1919
ctx: Unshared<DecoderContext>,
20+
stream_ended: bool,
2021
}
2122

2223
impl DecoderContext {
@@ -37,6 +38,7 @@ impl Default for Lz4Decoder {
3738
fn default() -> Self {
3839
Self {
3940
ctx: Unshared::new(DecoderContext::new().unwrap()),
41+
stream_ended: false,
4042
}
4143
}
4244
}
@@ -50,6 +52,7 @@ impl Lz4Decoder {
5052
impl DecodeV2 for Lz4Decoder {
5153
fn reinit(&mut self) -> Result<()> {
5254
unsafe { LZ4F_resetDecompressionContext(self.ctx.get_mut().ctx) };
55+
self.stream_ended = false;
5356
Ok(())
5457
}
5558

@@ -74,7 +77,12 @@ impl DecodeV2 for Lz4Decoder {
7477
};
7578
input.advance(input_size);
7679
output.advance(output_size);
77-
Ok(remaining == 0)
80+
81+
let finished = remaining == 0;
82+
if finished {
83+
self.stream_ended = true;
84+
}
85+
Ok(finished)
7886
}
7987

8088
fn flush(&mut self, output: &mut WriteBuffer<'_>) -> Result<bool> {
@@ -92,6 +100,15 @@ impl DecodeV2 for Lz4Decoder {
92100
}
93101

94102
fn finish(&mut self, output: &mut WriteBuffer<'_>) -> Result<bool> {
95-
self.flush(output)
103+
self.flush(output)?;
104+
105+
if self.stream_ended {
106+
Ok(true)
107+
} else {
108+
Err(std::io::Error::new(
109+
std::io::ErrorKind::UnexpectedEof,
110+
"lz4 stream did not finish",
111+
))
112+
}
96113
}
97114
}

crates/compression-codecs/src/zstd/decoder.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@ use zstd_safe::get_error_name;
1313
#[derive(Debug)]
1414
pub struct ZstdDecoder {
1515
decoder: Unshared<Decoder<'static>>,
16+
stream_ended: bool,
1617
}
1718

1819
impl Default for ZstdDecoder {
1920
fn default() -> Self {
2021
Self {
2122
decoder: Unshared::new(Decoder::new().unwrap()),
23+
stream_ended: false,
2224
}
2325
}
2426
}
@@ -35,13 +37,15 @@ impl ZstdDecoder {
3537
}
3638
Self {
3739
decoder: Unshared::new(decoder),
40+
stream_ended: false,
3841
}
3942
}
4043

4144
pub fn new_with_dict(dictionary: &[u8]) -> io::Result<Self> {
4245
let decoder = Decoder::with_dictionary(dictionary)?;
4346
Ok(Self {
4447
decoder: Unshared::new(decoder),
48+
stream_ended: false,
4549
})
4650
}
4751

@@ -64,6 +68,7 @@ impl ZstdDecoder {
6468
impl DecodeV2 for ZstdDecoder {
6569
fn reinit(&mut self) -> Result<()> {
6670
self.decoder.get_mut().reinit()?;
71+
self.stream_ended = false;
6772
Ok(())
6873
}
6974

@@ -80,15 +85,32 @@ impl DecodeV2 for ZstdDecoder {
8085
.run_on_buffers(input.unwritten(), output.unwritten_initialized_mut())?;
8186
input.advance(status.bytes_read);
8287
output.advance(status.bytes_written);
83-
Ok(status.remaining == 0)
88+
89+
let finished = status.remaining == 0;
90+
if finished {
91+
self.stream_ended = true;
92+
}
93+
Ok(finished)
8494
}
8595

8696
fn flush(&mut self, output: &mut WriteBuffer<'_>) -> Result<bool> {
97+
// Note: stream_ended is not updated here because zstd's flush only flushes
98+
// buffered output and doesn't indicate stream completion. Stream completion
99+
// is detected in decode() when status.remaining == 0.
87100
self.call_fn_on_out_buffer(output, |decoder, out_buf| decoder.flush(out_buf))
88101
}
89102

90103
fn finish(&mut self, output: &mut WriteBuffer<'_>) -> Result<bool> {
91-
self.call_fn_on_out_buffer(output, |decoder, out_buf| decoder.finish(out_buf, true))
104+
self.call_fn_on_out_buffer(output, |decoder, out_buf| decoder.finish(out_buf, true))?;
105+
106+
if self.stream_ended {
107+
Ok(true)
108+
} else {
109+
Err(io::Error::new(
110+
io::ErrorKind::UnexpectedEof,
111+
"zstd stream did not finish",
112+
))
113+
}
92114
}
93115
}
94116

0 commit comments

Comments
 (0)