diff --git a/src/storage/compression.rs b/src/storage/compression.rs index 87e267058..8070d7bcd 100644 --- a/src/storage/compression.rs +++ b/src/storage/compression.rs @@ -7,7 +7,7 @@ use std::{ io::{self, Read}, }; use strum::{Display, EnumIter, EnumString, FromRepr}; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite}; pub type CompressionAlgorithms = HashSet; @@ -129,19 +129,19 @@ where /// You provide an AsyncRead that gives us the compressed data. With the /// wrapper we return you can then read decompressed data from the wrapper. pub fn wrap_reader_for_decompression<'a>( - input: impl AsyncRead + Unpin + Send + 'a, + input: impl AsyncBufRead + Unpin + Send + 'a, algorithm: CompressionAlgorithm, -) -> Box { +) -> Box { use async_compression::tokio::bufread; use tokio::io; match algorithm { CompressionAlgorithm::Zstd => { - Box::new(bufread::ZstdDecoder::new(io::BufReader::new(input))) + Box::new(io::BufReader::new(bufread::ZstdDecoder::new(input))) } - CompressionAlgorithm::Bzip2 => Box::new(bufread::BzDecoder::new(io::BufReader::new(input))), + CompressionAlgorithm::Bzip2 => Box::new(io::BufReader::new(bufread::BzDecoder::new(input))), CompressionAlgorithm::Gzip => { - Box::new(bufread::GzipDecoder::new(io::BufReader::new(input))) + Box::new(io::BufReader::new(bufread::GzipDecoder::new(input))) } } } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index c3347124a..8b2400c1d 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -41,7 +41,11 @@ use std::{ atomic::{AtomicU64, Ordering}, }, }; -use tokio::{io::AsyncRead, runtime, sync::RwLock}; +use tokio::{ + io::{AsyncBufRead, AsyncBufReadExt}, + runtime, + sync::RwLock, +}; use tracing::{error, info, info_span, instrument, trace, warn}; use tracing_futures::Instrument as _; use walkdir::WalkDir; @@ -75,7 +79,7 @@ pub(crate) struct StreamingBlob { pub(crate) date_updated: DateTime, pub(crate) compression: Option, pub(crate) content_length: usize, - pub(crate) content: Box, + pub(crate) content: Box, } impl std::fmt::Debug for StreamingBlob { @@ -92,14 +96,33 @@ impl std::fmt::Debug for StreamingBlob { impl StreamingBlob { /// wrap the content stream in a streaming decompressor according to the /// algorithm found in `compression` attribute. - pub(crate) fn decompress(mut self) -> Self { + pub(crate) async fn decompress(mut self) -> Result { let Some(alg) = self.compression else { - return self; + return Ok(self); }; self.content = wrap_reader_for_decompression(self.content, alg); + + // We fill the first bytes here to force the compressor to start decompressing. + // This is because we want a failure here in this method when the data is corrupted, + // so we can directly act on that, and users don't have any errors when they just + // stream the data. + // This won't _comsume_ the bytes. The user of this StreamingBlob will still be able + // to stream the whole content. + // + // This doesn't work 100% of the time. We might get other i/o error here, + // or the decompressor might stumble on corrupted data later during streaming. + // + // But: the most common error is that the format "magic bytes" at the beginning + // of the stream are missing, and that's caught here. + let decompressed_buf = self.content.fill_buf().await?; + debug_assert!( + !decompressed_buf.is_empty(), + "we assume if we have > 0 decompressed bytes, start of the decompression works." + ); + self.compression = None; - self + Ok(self) } /// consume the inner stream and materialize the full blob into memory. @@ -347,7 +370,7 @@ impl AsyncStorage { StorageBackend::Database(db) => db.get_stream(path, None).await, StorageBackend::S3(s3) => s3.get_stream(path, None).await, }?; - Ok(blob.decompress()) + Ok(blob.decompress().await?) } /// get, decompress and materialize part of an object from store @@ -373,15 +396,15 @@ impl AsyncStorage { range: FileRange, compression: Option, ) -> Result { - let mut blob = match &self.backend { + let mut raw_stream = match &self.backend { StorageBackend::Database(db) => db.get_stream(path, Some(range)).await, StorageBackend::S3(s3) => s3.get_stream(path, Some(range)).await, }?; // `compression` represents the compression of the file-stream inside the archive. // We don't compress the whole archive, so the encoding of the archive's blob is irrelevant // here. - blob.compression = compression; - Ok(blob.decompress()) + raw_stream.compression = compression; + Ok(raw_stream.decompress().await?) } fn local_index_cache_lock(&self, local_index_path: impl AsRef) -> Arc> { @@ -394,6 +417,32 @@ impl AsyncStorage { .clone() } + async fn purge_archive_index_cache( + &self, + archive_path: &str, + latest_build_id: Option, + ) -> Result<()> { + // we know that config.local_archive_cache_path is an absolute path, not relative. + // So it will be usable as key in the DashMap. + let local_index_path = self.config.local_archive_cache_path.join(format!( + "{archive_path}.{}.{ARCHIVE_INDEX_FILE_EXTENSION}", + latest_build_id.map(|id| id.0).unwrap_or(0) + )); + + let rwlock = self.local_index_cache_lock(&local_index_path); + + let _write_guard = rwlock.write().await; + + if tokio::fs::try_exists(&local_index_path).await? { + tokio::fs::remove_file(&local_index_path).await?; + } + + Ok(()) + } + + /// Find find the file into needed to fetch a certain path inside a remote archive. + /// Will try to use a local cache of the index file, and otherwise download it + /// from storage. #[instrument] async fn find_in_archive_index( &self, @@ -469,55 +518,70 @@ impl AsyncStorage { path: &str, max_size: usize, ) -> Result { - let info = self - .find_in_archive_index(archive_path, latest_build_id, path) + self.stream_from_archive(archive_path, latest_build_id, path) .await? - .ok_or(PathNotFoundError)?; - - let blob = self - .get_range( - archive_path, - max_size, - info.range(), - Some(info.compression()), - ) - .await?; - assert_eq!(blob.compression, None); - - Ok(Blob { - path: format!("{archive_path}/{path}"), - mime: detect_mime(path), - date_updated: blob.date_updated, - content: blob.content, - compression: None, - }) + .materialize(max_size) + .await } - #[instrument] + #[instrument(skip(self))] pub(crate) async fn stream_from_archive( &self, archive_path: &str, latest_build_id: Option, path: &str, ) -> Result { - let info = self - .find_in_archive_index(archive_path, latest_build_id, path) - .await? - .ok_or(PathNotFoundError)?; + for attempt in 0..2 { + let info = self + .find_in_archive_index(archive_path, latest_build_id, path) + .await? + .ok_or(PathNotFoundError)?; - let blob = self - .get_range_stream(archive_path, info.range(), Some(info.compression())) - .await?; - assert_eq!(blob.compression, None); + match self + .get_range_stream(archive_path, info.range(), Some(info.compression())) + .await + { + Ok(stream) => { + debug_assert_eq!(stream.compression, None); + return Ok(StreamingBlob { + path: format!("{archive_path}/{path}"), + mime: detect_mime(path), + date_updated: stream.date_updated, + content: stream.content, + content_length: stream.content_length, + compression: None, + }); + } + Err(err) if attempt == 0 => { + // We have some existing race conditions where the local cache of the index + // file is outdated. + // These mostly appear as "invalid bzip2 header" errors from the decompression + // of the downloaded data, because we're fetching the wrong range from the + // archive. + // While we're also working on fixing the root causes, we want to have a fallback + // here so the user impact is less. + // In this case, we purge the locally cached index file and retry once. + // We're not checking for the _type_ of error here, which could be improved + // in the future, but also doesn't hurt much. + // + // NOTE: this only works because when creating the stream in `get_stream`, we're + // already starting to decompress the first couple of bytes by filling + // the BufReader buffer. + // The reader of the `StreamingBlob` will still see the full stream. + warn!( + ?err, + "error fetching range from archive, purging local index cache and retrying once" + ); + self.purge_archive_index_cache(archive_path, latest_build_id) + .await?; + + continue; + } + Err(err) => return Err(err), + } + } - Ok(StreamingBlob { - path: format!("{archive_path}/{path}"), - mime: detect_mime(path), - date_updated: blob.date_updated, - content: blob.content, - content_length: blob.content_length, - compression: None, - }) + unreachable!("stream_from_archive retry loop exited unexpectedly"); } #[instrument(skip(self))] @@ -1213,7 +1277,7 @@ mod test { // with decompression, does nothing { let stream = streaming_blob(CONTENT, None); - let blob = stream.decompress().materialize(usize::MAX).await?; + let blob = stream.decompress().await?.materialize(usize::MAX).await?; assert_eq!(blob.content, CONTENT); assert!(blob.compression.is_none()); } @@ -1221,6 +1285,42 @@ mod test { Ok(()) } + #[tokio::test] + async fn test_streaming_broken_zstd_blob() -> Result<()> { + const NOT_ZSTD: &[u8] = b"Hello, world!"; + let alg = CompressionAlgorithm::Zstd; + + // without decompression + // Doesn't fail because we don't call `.decompress` + { + let stream = streaming_blob(NOT_ZSTD, Some(alg)); + let blob = stream.materialize(usize::MAX).await?; + assert_eq!(blob.content, NOT_ZSTD); + assert_eq!(blob.compression, Some(alg)); + } + + // with decompression + // should fail in the `.decompress` call, + // not later when materializing / streaming. + { + let err = streaming_blob(NOT_ZSTD, Some(alg)) + .decompress() + .await + .unwrap_err(); + + assert_eq!(err.kind(), io::ErrorKind::Other); + + assert_eq!( + err.to_string(), + "Unknown frame descriptor", + "unexpected error: {}", + err + ); + } + + Ok(()) + } + #[tokio::test] async fn test_streaming_blob_zstd() -> Result<()> { const CONTENT: &[u8] = b"Hello, world!"; @@ -1244,8 +1344,11 @@ mod test { // with decompression { - let stream = streaming_blob(compressed_content.clone(), Some(alg)).decompress(); - let blob = stream.materialize(usize::MAX).await?; + let blob = streaming_blob(compressed_content.clone(), Some(alg)) + .decompress() + .await? + .materialize(usize::MAX) + .await?; assert_eq!(blob.content, CONTENT); assert!(blob.compression.is_none()); } @@ -1300,6 +1403,7 @@ mod test { content: Box::new(io::Cursor::new(compressed_index_content)), } .decompress() + .await? .materialize(usize::MAX) .await?; @@ -1537,6 +1641,140 @@ mod test { Ok(()) } + + #[tokio::test(flavor = "multi_thread")] + async fn test_outdated_local_archive_index_gets_redownloaded() -> Result<()> { + use tokio::fs; + + let env = TestEnvironment::with_config( + TestEnvironment::base_config() + .storage_backend(StorageKind::S3) + .build()?, + ) + .await?; + + let storage = env.async_storage(); + + // virtual latest build id, used for local caching of the index files + const LATEST_BUILD_ID: Option = Some(BuildId(42)); + let cache_root = env.config().local_archive_cache_path.clone(); + + let cache_filename = |archive_name: &str| { + cache_root.join(format!( + "{}.{}.{}", + archive_name, + LATEST_BUILD_ID.unwrap(), + ARCHIVE_INDEX_FILE_EXTENSION + )) + }; + + /// dummy archives, files will contain their name as content + async fn create_archive( + storage: &AsyncStorage, + archive_name: &str, + filenames: &[&str], + ) -> Result<()> { + let dir = tempfile::Builder::new() + .prefix("docs.rs-upload-archive-test") + .tempdir()?; + for &file in filenames.iter() { + let path = dir.path().join(file); + fs::write(path, file).await?; + } + storage + .store_all_in_archive(archive_name, dir.path()) + .await?; + + Ok(()) + } + + // create two archives with indexes that contain the same filename + create_archive( + storage, + "test1.zip", + &["file1.txt", "file2.txt", "important.txt"], + ) + .await?; + + create_archive( + storage, + "test2.zip", + &["important.txt", "another_file_1.txt", "another_file_2.txt"], + ) + .await?; + + for archive_name in &["test1.zip", "test2.zip"] { + assert!(storage.exists(archive_name).await?); + + assert!( + storage + .exists(&format!("{}.{ARCHIVE_INDEX_FILE_EXTENSION}", archive_name)) + .await? + ); + // local index cache doesn't exist yet + let local_index_file = cache_filename(archive_name); + assert!(!fs::try_exists(&local_index_file).await?); + + // this will then create the cache + assert!( + storage + .exists_in_archive(archive_name, LATEST_BUILD_ID, "important.txt") + .await? + ); + assert!(fs::try_exists(&local_index_file).await?); + + // fetching the content out of the archive also works + assert_eq!( + storage + .get_from_archive(archive_name, LATEST_BUILD_ID, "important.txt", usize::MAX) + .await? + .content, + b"important.txt" + ); + } + + // validate if the positions are really different in the archvies, + // for the same filename. + let pos_in_test1_zip = storage + .find_in_archive_index("test1.zip", LATEST_BUILD_ID, "important.txt") + .await? + .unwrap(); + let pos_in_test2_zip = storage + .find_in_archive_index("test2.zip", LATEST_BUILD_ID, "important.txt") + .await? + .unwrap(); + + assert_ne!(pos_in_test1_zip.range(), pos_in_test2_zip.range()); + + // now I'm swapping the local index files. + // This should simulate hat I have an outdated byte-range for a file + + let local_index_file_1 = cache_filename("test1.zip"); + let local_index_file_2 = cache_filename("test2.zip"); + + { + let temp_path = cache_root.join("temp_index_swap.tmp"); + fs::rename(&local_index_file_1, &temp_path).await?; + fs::rename(&local_index_file_2, &local_index_file_1).await?; + fs::rename(&temp_path, &local_index_file_2).await?; + } + + // now try to fetch the files inside the archives again, the local files + // should be removed, refetched, and all should be fine. + // Without our fallback / delete mechanism, this would fail. + + for archive_name in &["test1.zip", "test2.zip"] { + assert_eq!( + storage + .get_from_archive(archive_name, LATEST_BUILD_ID, "important.txt", usize::MAX) + .await? + .content, + b"important.txt" + ); + } + + Ok(()) + } } /// Backend tests are a set of tests executed on all the supported storage backends. They ensure