From 03744083452385541375c4829cfb5f22433522d0 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 20 Aug 2025 16:40:25 +0200 Subject: [PATCH 1/3] Use reflink_or_copy for export as well --- src/store/fs.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/store/fs.rs b/src/store/fs.rs index 9e11e098f..6a9d2d8c6 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -1263,9 +1263,18 @@ async fn export_path_impl( } MemOrFile::File((source_path, size)) => match mode { ExportMode::Copy => { - let source = fs::File::open(&source_path)?; - let mut target = fs::File::create(&target)?; - copy_with_progress(&source, size, &mut target, tx).await? + tx.try_send(ExportProgressItem::CopyProgress(0)) + .await + .map_err(|_e| io::Error::other(""))?; + let written = reflink_copy::reflink_or_copy(&source_path, &target)?; + if let Some(written) = written { + if written != size { + return Err(io::Error::other(format!( + "wrote {written} bytes, expected {size}", + )) + .into()); + } + } } ExportMode::TryReference => { match std::fs::rename(&source_path, &target) { From 76236c11aaa41992ea929cdd2605b7b763d72a43 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 21 Aug 2025 10:34:01 +0200 Subject: [PATCH 2/3] cargo deny --- Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b966614a4..3358efe33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3596,9 +3596,9 @@ dependencies = [ [[package]] name = "slab" -version = "0.4.10" +version = "0.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04dc19736151f35336d325007ac991178d504a119863a2fcb3758cdb5e52c50d" +checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589" [[package]] name = "smallvec" From 7657336333ac9c5a3016427140bac40fa7f05992 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 21 Aug 2025 11:14:14 +0200 Subject: [PATCH 3/3] Add reflink_or_copy_with_progress and use it from both import and export We have to do some convoluted shit because there is no generic sender/sink. --- src/store/fs.rs | 65 +++++++++++++++++++++++++++++++----------- src/store/fs/import.rs | 12 ++++---- 2 files changed, 56 insertions(+), 21 deletions(-) diff --git a/src/store/fs.rs b/src/store/fs.rs index 6a9d2d8c6..a9b04ac02 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -93,7 +93,7 @@ use entity_manager::{EntityManagerState, SpawnArg}; use entry_state::{DataLocation, OutboardLocation}; use gc::run_gc; use import::{ImportEntry, ImportSource}; -use irpc::channel::mpsc; +use irpc::{channel::mpsc, RpcMessage}; use meta::list_blobs; use n0_future::{future::yield_now, io}; use nested_enum_utils::enum_conversions; @@ -1263,18 +1263,12 @@ async fn export_path_impl( } MemOrFile::File((source_path, size)) => match mode { ExportMode::Copy => { - tx.try_send(ExportProgressItem::CopyProgress(0)) - .await - .map_err(|_e| io::Error::other(""))?; - let written = reflink_copy::reflink_or_copy(&source_path, &target)?; - if let Some(written) = written { - if written != size { - return Err(io::Error::other(format!( - "wrote {written} bytes, expected {size}", - )) - .into()); - } - } + let res = reflink_or_copy_with_progress(&source_path, &target, size, tx).await?; + trace!( + "exported {} to {}, {res:?}", + source_path.display(), + target.display() + ); } ExportMode::TryReference => { match std::fs::rename(&source_path, &target) { @@ -1304,11 +1298,50 @@ async fn export_path_impl( Ok(()) } -async fn copy_with_progress( +trait CopyProgress: RpcMessage { + fn from_offset(offset: u64) -> Self; +} + +impl CopyProgress for ExportProgressItem { + fn from_offset(offset: u64) -> Self { + ExportProgressItem::CopyProgress(offset) + } +} + +impl CopyProgress for AddProgressItem { + fn from_offset(offset: u64) -> Self { + AddProgressItem::CopyProgress(offset) + } +} + +#[derive(Debug)] +enum CopyResult { + Reflinked, + Copied, +} + +async fn reflink_or_copy_with_progress( + from: impl AsRef, + to: impl AsRef, + size: u64, + tx: &mut mpsc::Sender, +) -> io::Result { + let from = from.as_ref(); + let to = to.as_ref(); + if reflink_copy::reflink(from, to).is_ok() { + return Ok(CopyResult::Reflinked); + } + let source = fs::File::open(from)?; + let mut target = fs::File::create(to)?; + copy_with_progress(source, size, &mut target, tx).await?; + Ok(CopyResult::Copied) +} + +async fn copy_with_progress( file: impl ReadAt, size: u64, target: &mut impl Write, - tx: &mut mpsc::Sender, + tx: &mut mpsc::Sender, ) -> io::Result<()> { let mut offset = 0; let mut buf = vec![0u8; 1024 * 1024]; @@ -1317,7 +1350,7 @@ async fn copy_with_progress( let buf: &mut [u8] = &mut buf[..remaining]; file.read_exact_at(offset, buf)?; target.write_all(buf)?; - tx.try_send(ExportProgressItem::CopyProgress(offset)) + tx.try_send(T::from_offset(offset)) .await .map_err(|_e| io::Error::other(""))?; yield_now().await; diff --git a/src/store/fs/import.rs b/src/store/fs/import.rs index 5c64535ea..f5c8fc1aa 100644 --- a/src/store/fs/import.rs +++ b/src/store/fs/import.rs @@ -43,6 +43,7 @@ use crate::{ }, }, store::{ + fs::reflink_or_copy_with_progress, util::{MemOrFile, DD}, IROH_BLOCK_SIZE, }, @@ -491,11 +492,12 @@ async fn import_path_impl( let temp_path = options.path.temp_file_name(); // todo: if reflink works, we don't need progress. // But if it does not, it might take a while and we won't get progress. - if reflink_copy::reflink_or_copy(&path, &temp_path)?.is_none() { - trace!("reflinked {} to {}", path.display(), temp_path.display()); - } else { - trace!("copied {} to {}", path.display(), temp_path.display()); - } + let res = reflink_or_copy_with_progress(&path, &temp_path, size, tx).await?; + trace!( + "imported {} to {}, {res:?}", + path.display(), + temp_path.display() + ); // copy from path to temp_path let file = OpenOptions::new().read(true).open(&temp_path)?; tx.send(AddProgressItem::CopyDone)