Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions src/storage/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,28 +90,38 @@ pub fn compress(content: impl Read, algorithm: CompressionAlgorithm) -> Result<V
}
}

/// Wrap an AsyncWrite sink for compression using the specified algorithm.
///
/// Will return an AsyncWrite you can just write data to, we will compress
/// the data, and then write the compressed data into the provided output sink.
pub fn wrap_writer_for_compression<'a>(
output_sink: impl AsyncWrite + Unpin + Send + 'a,
/// async compression, reads from an AsyncRead, writes to an AsyncWrite.
pub async fn compress_async<'a, R, W>(
mut reader: R,
writer: W,
algorithm: CompressionAlgorithm,
) -> Box<dyn AsyncWrite + Unpin + 'a> {
) -> io::Result<()>
where
R: AsyncRead + Unpin + Send + 'a,
W: AsyncWrite + Unpin + Send + 'a,
{
use async_compression::tokio::write;
use tokio::io;
use tokio::io::{self, AsyncWriteExt as _};

match algorithm {
CompressionAlgorithm::Zstd => {
Box::new(io::BufWriter::new(write::ZstdEncoder::new(output_sink)))
let mut enc = write::ZstdEncoder::new(writer);
io::copy(&mut reader, &mut enc).await?;
enc.shutdown().await?;
}
CompressionAlgorithm::Bzip2 => {
Box::new(io::BufWriter::new(write::BzEncoder::new(output_sink)))
let mut enc = write::BzEncoder::new(writer);
io::copy(&mut reader, &mut enc).await?;
enc.shutdown().await?;
}
CompressionAlgorithm::Gzip => {
Box::new(io::BufWriter::new(write::GzipEncoder::new(output_sink)))
let mut enc = write::GzipEncoder::new(writer);
io::copy(&mut reader, &mut enc).await?;
enc.shutdown().await?;
}
}

Ok(())
}

/// Wrap an AsyncRead for decompression.
Expand Down
62 changes: 59 additions & 3 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod s3;

pub use self::compression::{CompressionAlgorithm, CompressionAlgorithms, compress, decompress};
use self::{
compression::{wrap_reader_for_decompression, wrap_writer_for_compression},
compression::{compress_async, wrap_reader_for_decompression},
database::DatabaseBackend,
s3::S3Backend,
};
Expand Down Expand Up @@ -543,9 +543,10 @@ impl AsyncStorage {
.await?;

let mut buf: Vec<u8> = Vec::new();
tokio::io::copy(
compress_async(
&mut tokio::io::BufReader::new(tokio::fs::File::open(&local_index_path).await?),
&mut wrap_writer_for_compression(&mut buf, alg),
&mut buf,
alg,
)
.await?;
buf
Expand Down Expand Up @@ -1014,6 +1015,61 @@ mod test {
use std::env;
use test_case::test_case;

#[tokio::test]
#[test_case(CompressionAlgorithm::Zstd)]
#[test_case(CompressionAlgorithm::Bzip2)]
#[test_case(CompressionAlgorithm::Gzip)]
async fn test_async_compression(alg: CompressionAlgorithm) -> Result<()> {
const CONTENT: &[u8] = b"Hello, world! Hello, world! Hello, world! Hello, world!";

let compressed_index_content = {
let mut buf: Vec<u8> = Vec::new();
compress_async(&mut io::Cursor::new(CONTENT.to_vec()), &mut buf, alg).await?;
buf
};

{
// try low-level async decompression
let mut decompressed_buf: Vec<u8> = Vec::new();
let mut reader = wrap_reader_for_decompression(
io::Cursor::new(compressed_index_content.clone()),
alg,
);

tokio::io::copy(&mut reader, &mut io::Cursor::new(&mut decompressed_buf)).await?;

assert_eq!(decompressed_buf, CONTENT);
}

{
// try sync decompression
let decompressed_buf: Vec<u8> = decompress(
io::Cursor::new(compressed_index_content.clone()),
alg,
usize::MAX,
)?;

assert_eq!(decompressed_buf, CONTENT);
}

// try decompress via storage API
let stream = StreamingBlob {
path: "some_path.db".into(),
mime: mime::APPLICATION_OCTET_STREAM,
date_updated: Utc::now(),
compression: Some(alg),
content_length: compressed_index_content.len(),
content: Box::new(io::Cursor::new(compressed_index_content)),
};

let blob = stream.materialize(usize::MAX).await?;

assert_eq!(blob.compression, None);
assert_eq!(blob.content, CONTENT);

Ok(())
}

#[test_case("latest", RustdocJsonFormatVersion::Latest)]
#[test_case("42", RustdocJsonFormatVersion::Version(42))]
fn test_json_format_version(input: &str, expected: RustdocJsonFormatVersion) {
Expand Down
Loading