Skip to content

Commit e981dfb

Browse files
committed
introduce RwLock to control access to local archive index files
1 parent a9b9304 commit e981dfb

File tree

3 files changed

+78
-56
lines changed

3 files changed

+78
-56
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ sysinfo = { version = "0.37.2", default-features = false, features = ["system"]
6464
derive_builder = "0.20.2"
6565

6666
# Async
67-
tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros", "process"] }
67+
tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros", "process", "sync"] }
6868
tokio-util = { version = "0.7.15", default-features = false, features = ["io"] }
6969
tracing-futures= { version = "0.2.5", features = ["std-future", "futures-03"] }
7070
futures-util = "0.3.5"

src/config.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{cdn::CdnKind, storage::StorageKind};
22
use anyhow::{Context, Result, anyhow, bail};
3-
use std::{env::VarError, error::Error, path::PathBuf, str::FromStr, time::Duration};
3+
use std::{env::VarError, error::Error, io, path, path::PathBuf, str::FromStr, time::Duration};
44
use tracing::trace;
55
use url::Url;
66

@@ -209,10 +209,10 @@ impl Config {
209209
.cdn_max_queued_age(Duration::from_secs(env("DOCSRS_CDN_MAX_QUEUED_AGE", 3600)?))
210210
.cloudfront_distribution_id_web(maybe_env("CLOUDFRONT_DISTRIBUTION_ID_WEB")?)
211211
.cloudfront_distribution_id_static(maybe_env("CLOUDFRONT_DISTRIBUTION_ID_STATIC")?)
212-
.local_archive_cache_path(env(
212+
.local_archive_cache_path(ensure_absolute_path(env(
213213
"DOCSRS_ARCHIVE_INDEX_CACHE_PATH",
214214
prefix.join("archive_cache"),
215-
)?)
215+
)?)?)
216216
.compiler_metrics_collection_path(maybe_env("DOCSRS_COMPILER_METRICS_PATH")?)
217217
.temp_dir(temp_dir)
218218
.rustwide_workspace(env(
@@ -235,6 +235,14 @@ impl Config {
235235
}
236236
}
237237

238+
fn ensure_absolute_path(path: PathBuf) -> io::Result<PathBuf> {
239+
if path.is_absolute() {
240+
Ok(path)
241+
} else {
242+
Ok(path::absolute(&path)?)
243+
}
244+
}
245+
238246
fn env<T>(var: &str, default: T) -> Result<T>
239247
where
240248
T: FromStr,

src/storage/mod.rs

Lines changed: 66 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::{
2222
};
2323
use anyhow::{anyhow, bail};
2424
use chrono::{DateTime, Utc};
25+
use dashmap::DashMap;
2526
use fn_error_context::context;
2627
use futures_util::{TryStreamExt as _, stream::BoxStream};
2728
use mime::Mime;
@@ -30,24 +31,27 @@ use std::{
3031
fmt,
3132
fs::{self, File},
3233
io::{self, BufReader},
34+
iter,
3335
num::ParseIntError,
3436
ops::RangeInclusive,
3537
path::{Path, PathBuf},
3638
sync::{
3739
Arc,
3840
atomic::{AtomicU64, Ordering},
3941
},
42+
str::FromStr,
4043
};
41-
use std::{iter, str::FromStr};
4244
use tokio::{
4345
io::{AsyncRead, AsyncWriteExt},
4446
runtime,
47+
sync::RwLock,
4548
};
4649
use tracing::{error, info, info_span, instrument, trace, warn};
4750
use tracing_futures::Instrument as _;
4851
use walkdir::WalkDir;
4952

5053
const ZSTD_EOF_BYTES: [u8; 3] = [0x01, 0x00, 0x00];
54+
static ARCHIVE_INDEX_FILE_EXTENSION: &str = "index";
5155

5256
type FileRange = RangeInclusive<u64>;
5357

@@ -186,6 +190,8 @@ enum StorageBackend {
186190
pub struct AsyncStorage {
187191
backend: StorageBackend,
188192
config: Arc<Config>,
193+
/// Locks to synchronize access to the locally cached archive index files.
194+
locks: DashMap<PathBuf, Arc<RwLock<()>>>,
189195
}
190196

191197
impl AsyncStorage {
@@ -204,6 +210,7 @@ impl AsyncStorage {
204210
}
205211
},
206212
config,
213+
locks: DashMap::new(),
207214
})
208215
}
209216

@@ -318,12 +325,10 @@ impl AsyncStorage {
318325
path: &str,
319326
) -> Result<bool> {
320327
match self
321-
.download_archive_index(archive_path, latest_build_id)
328+
.find_in_archive_index(archive_path, latest_build_id, path)
322329
.await
323330
{
324-
Ok(index_filename) => Ok(archive_index::find_in_file(index_filename, path)
325-
.await?
326-
.is_some()),
331+
Ok(file_info) => Ok(file_info.is_some()),
327332
Err(err) => {
328333
if err.downcast_ref::<PathNotFoundError>().is_some() {
329334
Ok(false)
@@ -384,41 +389,67 @@ impl AsyncStorage {
384389
Ok(blob.decompress())
385390
}
386391

392+
fn local_index_cache_lock(&self, local_index_path: impl AsRef<Path>) -> Arc<RwLock<()>> {
393+
let local_index_path = local_index_path.as_ref().to_path_buf();
394+
395+
self.locks
396+
.entry(local_index_path)
397+
.or_insert_with(|| Arc::new(RwLock::new(())))
398+
.downgrade()
399+
.clone()
400+
}
401+
387402
#[instrument]
388-
pub(super) async fn download_archive_index(
403+
async fn find_in_archive_index(
389404
&self,
390405
archive_path: &str,
391406
latest_build_id: Option<BuildId>,
392-
) -> Result<PathBuf> {
393-
// remote/folder/and/x.zip.index
394-
let remote_index_path = format!("{archive_path}.index");
407+
path_in_archive: &str,
408+
) -> Result<Option<archive_index::FileInfo>> {
409+
// we know that config.local_archive_cache_path is an absolute path, not relative.
410+
// So it will be usable as key in the DashMap.
395411
let local_index_path = self.config.local_archive_cache_path.join(format!(
396-
"{archive_path}.{}.index",
412+
"{archive_path}.{}.{ARCHIVE_INDEX_FILE_EXTENSION}",
397413
latest_build_id.map(|id| id.0).unwrap_or(0)
398414
));
399415

400-
if !local_index_path.exists() {
401-
let index_content = self.get(&remote_index_path, usize::MAX).await?.content;
416+
let rwlock = self.local_index_cache_lock(&local_index_path);
402417

403-
tokio::fs::create_dir_all(
404-
local_index_path
405-
.parent()
406-
.ok_or_else(|| anyhow!("index path without parent"))?,
407-
)
408-
.await?;
418+
// directly acquire the read-lock, so the syscall (`path.exists()`) below is already
419+
// protected.
420+
let mut _read_guard = rwlock.read().await;
421+
422+
if !tokio::fs::try_exists(&local_index_path).await? {
423+
// upgrade the lock to a write-lock for downloading & storing the index.
424+
drop(_read_guard);
425+
let _write_guard = rwlock.write().await;
409426

410-
// when we don't have a locally cached index and many parallel request
411-
// we might download the same archive index multiple times here.
412-
// So we're storing the content into a temporary file before renaming it
413-
// into the final location.
414-
let temp_path = tempfile::NamedTempFile::new_in(&self.config.local_archive_cache_path)?
415-
.into_temp_path();
416-
let mut file = tokio::fs::File::create(&temp_path).await?;
417-
file.write_all(&index_content).await?;
418-
tokio::fs::rename(temp_path, &local_index_path).await?;
427+
// check existance again in case of Race Condition (TOCTOU)
428+
if !tokio::fs::try_exists(&local_index_path).await? {
429+
// remote/folder/and/x.zip.index
430+
let remote_index_path = format!("{archive_path}.{ARCHIVE_INDEX_FILE_EXTENSION}");
431+
432+
tokio::fs::create_dir_all(
433+
local_index_path
434+
.parent()
435+
.ok_or_else(|| anyhow!("index path without parent"))?,
436+
)
437+
.await?;
438+
439+
{
440+
let mut file = tokio::fs::File::create(&local_index_path).await?;
441+
let mut stream = self.get_stream(&remote_index_path).await?.content;
442+
443+
tokio::io::copy(&mut stream, &mut file).await?;
444+
445+
file.flush().await?;
446+
}
447+
}
448+
449+
_read_guard = _write_guard.downgrade();
419450
}
420451

421-
Ok(local_index_path)
452+
archive_index::find_in_file(local_index_path, path_in_archive).await
422453
}
423454

424455
#[instrument]
@@ -429,11 +460,8 @@ impl AsyncStorage {
429460
path: &str,
430461
max_size: usize,
431462
) -> Result<Blob> {
432-
let index_filename = self
433-
.download_archive_index(archive_path, latest_build_id)
434-
.await?;
435-
436-
let info = archive_index::find_in_file(index_filename, path)
463+
let info = self
464+
.find_in_archive_index(archive_path, latest_build_id, path)
437465
.await?
438466
.ok_or(PathNotFoundError)?;
439467

@@ -463,11 +491,8 @@ impl AsyncStorage {
463491
latest_build_id: Option<BuildId>,
464492
path: &str,
465493
) -> Result<StreamingBlob> {
466-
let index_filename = self
467-
.download_archive_index(archive_path, latest_build_id)
468-
.await?;
469-
470-
let info = archive_index::find_in_file(index_filename, path)
494+
let info = self
495+
.find_in_archive_index(archive_path, latest_build_id, path)
471496
.await?
472497
.ok_or(PathNotFoundError)?;
473498

@@ -540,7 +565,7 @@ impl AsyncStorage {
540565
.await?;
541566

542567
let alg = CompressionAlgorithm::default();
543-
let remote_index_path = format!("{}.index", &archive_path);
568+
let remote_index_path = format!("{}.{ARCHIVE_INDEX_FILE_EXTENSION}", &archive_path);
544569
let compressed_index_content = {
545570
let _span = info_span!("create_archive_index", %remote_index_path).entered();
546571

@@ -994,17 +1019,6 @@ impl Storage {
9941019
.block_on(self.inner.get_range(path, max_size, range, compression))
9951020
}
9961021

997-
pub(super) fn download_index(
998-
&self,
999-
archive_path: &str,
1000-
latest_build_id: Option<BuildId>,
1001-
) -> Result<PathBuf> {
1002-
self.runtime.block_on(
1003-
self.inner
1004-
.download_archive_index(archive_path, latest_build_id),
1005-
)
1006-
}
1007-
10081022
pub(crate) fn get_from_archive(
10091023
&self,
10101024
archive_path: &str,
@@ -1801,12 +1815,12 @@ mod backend_tests {
18011815
.inner
18021816
.config
18031817
.local_archive_cache_path
1804-
.join("folder/test.zip.0.index");
1818+
.join(format!("folder/test.zip.0.{ARCHIVE_INDEX_FILE_EXTENSION}"));
18051819

18061820
let (stored_files, compression_alg) =
18071821
storage.store_all_in_archive("folder/test.zip", dir.path())?;
18081822

1809-
assert!(storage.exists("folder/test.zip.index")?);
1823+
assert!(storage.exists(&format!("folder/test.zip.{ARCHIVE_INDEX_FILE_EXTENSION}"))?);
18101824

18111825
assert_eq!(compression_alg, CompressionAlgorithm::Bzip2);
18121826
assert_eq!(stored_files.len(), files.len());

0 commit comments

Comments
 (0)