Skip to content

Commit 7657336

Browse files
committed
Add reflink_or_copy_with_progress and use it from both import and export
We have to do some convoluted shit because there is no generic sender/sink.
1 parent 76236c1 commit 7657336

File tree

2 files changed

+56
-21
lines changed

2 files changed

+56
-21
lines changed

src/store/fs.rs

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ use entity_manager::{EntityManagerState, SpawnArg};
9393
use entry_state::{DataLocation, OutboardLocation};
9494
use gc::run_gc;
9595
use import::{ImportEntry, ImportSource};
96-
use irpc::channel::mpsc;
96+
use irpc::{channel::mpsc, RpcMessage};
9797
use meta::list_blobs;
9898
use n0_future::{future::yield_now, io};
9999
use nested_enum_utils::enum_conversions;
@@ -1263,18 +1263,12 @@ async fn export_path_impl(
12631263
}
12641264
MemOrFile::File((source_path, size)) => match mode {
12651265
ExportMode::Copy => {
1266-
tx.try_send(ExportProgressItem::CopyProgress(0))
1267-
.await
1268-
.map_err(|_e| io::Error::other(""))?;
1269-
let written = reflink_copy::reflink_or_copy(&source_path, &target)?;
1270-
if let Some(written) = written {
1271-
if written != size {
1272-
return Err(io::Error::other(format!(
1273-
"wrote {written} bytes, expected {size}",
1274-
))
1275-
.into());
1276-
}
1277-
}
1266+
let res = reflink_or_copy_with_progress(&source_path, &target, size, tx).await?;
1267+
trace!(
1268+
"exported {} to {}, {res:?}",
1269+
source_path.display(),
1270+
target.display()
1271+
);
12781272
}
12791273
ExportMode::TryReference => {
12801274
match std::fs::rename(&source_path, &target) {
@@ -1304,11 +1298,50 @@ async fn export_path_impl(
13041298
Ok(())
13051299
}
13061300

1307-
async fn copy_with_progress(
1301+
trait CopyProgress: RpcMessage {
1302+
fn from_offset(offset: u64) -> Self;
1303+
}
1304+
1305+
impl CopyProgress for ExportProgressItem {
1306+
fn from_offset(offset: u64) -> Self {
1307+
ExportProgressItem::CopyProgress(offset)
1308+
}
1309+
}
1310+
1311+
impl CopyProgress for AddProgressItem {
1312+
fn from_offset(offset: u64) -> Self {
1313+
AddProgressItem::CopyProgress(offset)
1314+
}
1315+
}
1316+
1317+
#[derive(Debug)]
1318+
enum CopyResult {
1319+
Reflinked,
1320+
Copied,
1321+
}
1322+
1323+
async fn reflink_or_copy_with_progress(
1324+
from: impl AsRef<Path>,
1325+
to: impl AsRef<Path>,
1326+
size: u64,
1327+
tx: &mut mpsc::Sender<impl CopyProgress>,
1328+
) -> io::Result<CopyResult> {
1329+
let from = from.as_ref();
1330+
let to = to.as_ref();
1331+
if reflink_copy::reflink(from, to).is_ok() {
1332+
return Ok(CopyResult::Reflinked);
1333+
}
1334+
let source = fs::File::open(from)?;
1335+
let mut target = fs::File::create(to)?;
1336+
copy_with_progress(source, size, &mut target, tx).await?;
1337+
Ok(CopyResult::Copied)
1338+
}
1339+
1340+
async fn copy_with_progress<T: CopyProgress>(
13081341
file: impl ReadAt,
13091342
size: u64,
13101343
target: &mut impl Write,
1311-
tx: &mut mpsc::Sender<ExportProgressItem>,
1344+
tx: &mut mpsc::Sender<T>,
13121345
) -> io::Result<()> {
13131346
let mut offset = 0;
13141347
let mut buf = vec![0u8; 1024 * 1024];
@@ -1317,7 +1350,7 @@ async fn copy_with_progress(
13171350
let buf: &mut [u8] = &mut buf[..remaining];
13181351
file.read_exact_at(offset, buf)?;
13191352
target.write_all(buf)?;
1320-
tx.try_send(ExportProgressItem::CopyProgress(offset))
1353+
tx.try_send(T::from_offset(offset))
13211354
.await
13221355
.map_err(|_e| io::Error::other(""))?;
13231356
yield_now().await;

src/store/fs/import.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use crate::{
4343
},
4444
},
4545
store::{
46+
fs::reflink_or_copy_with_progress,
4647
util::{MemOrFile, DD},
4748
IROH_BLOCK_SIZE,
4849
},
@@ -491,11 +492,12 @@ async fn import_path_impl(
491492
let temp_path = options.path.temp_file_name();
492493
// todo: if reflink works, we don't need progress.
493494
// But if it does not, it might take a while and we won't get progress.
494-
if reflink_copy::reflink_or_copy(&path, &temp_path)?.is_none() {
495-
trace!("reflinked {} to {}", path.display(), temp_path.display());
496-
} else {
497-
trace!("copied {} to {}", path.display(), temp_path.display());
498-
}
495+
let res = reflink_or_copy_with_progress(&path, &temp_path, size, tx).await?;
496+
trace!(
497+
"imported {} to {}, {res:?}",
498+
path.display(),
499+
temp_path.display()
500+
);
499501
// copy from path to temp_path
500502
let file = OpenOptions::new().read(true).open(&temp_path)?;
501503
tx.send(AddProgressItem::CopyDone)

0 commit comments

Comments
 (0)