Skip to content

Commit 126d834

Browse files
committed
Merge branch 'main' into entity-manager
2 parents 24c7f2c + 6fc7630 commit 126d834

File tree

8 files changed

+194
-33
lines changed

8 files changed

+194
-33
lines changed

src/api.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
//! with a remote store via rpc calls.
55
use std::{io, net::SocketAddr, ops::Deref, sync::Arc};
66

7+
use bao_tree::io::EncodeError;
78
use iroh::Endpoint;
89
use irpc::rpc::{listen, Handler};
910
use n0_snafu::SpanTrace;
@@ -211,6 +212,15 @@ impl std::error::Error for Error {
211212
}
212213
}
213214

215+
impl From<EncodeError> for Error {
216+
fn from(value: EncodeError) -> Self {
217+
match value {
218+
EncodeError::Io(cause) => Self::Io(cause),
219+
_ => Self::other(value),
220+
}
221+
}
222+
}
223+
214224
pub type Result<T> = std::result::Result<T, Error>;
215225

216226
/// The main entry point for the store API.

src/api/blobs.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,13 +102,21 @@ impl Blobs {
102102
})
103103
}
104104

105-
pub async fn delete_with_opts(&self, options: DeleteOptions) -> RequestResult<()> {
105+
/// Delete a blob.
106+
///
107+
/// This function is not public, because it does not work as expected when called manually,
108+
/// because blobs are protected from deletion. This is only called from the gc task, which
109+
/// clears the protections before.
110+
///
111+
/// Users should rely only on garbage collection for blob deletion.
112+
pub(crate) async fn delete_with_opts(&self, options: DeleteOptions) -> RequestResult<()> {
106113
trace!("{options:?}");
107114
self.client.rpc(options).await??;
108115
Ok(())
109116
}
110117

111-
pub async fn delete(
118+
/// See [`Self::delete_with_opts`].
119+
pub(crate) async fn delete(
112120
&self,
113121
hashes: impl IntoIterator<Item = impl Into<Hash>>,
114122
) -> RequestResult<()> {
@@ -962,7 +970,6 @@ impl ExportBaoProgress {
962970
let mut data = Vec::new();
963971
let mut stream = self.into_byte_stream();
964972
while let Some(item) = stream.next().await {
965-
println!("item: {item:?}");
966973
data.extend_from_slice(&item?);
967974
}
968975
Ok(data)
@@ -1088,7 +1095,7 @@ impl ExportBaoProgress {
10881095
}
10891096
EncodedItem::Leaf(leaf) => Some(Ok(leaf.data)),
10901097
EncodedItem::Done => None,
1091-
EncodedItem::Error(cause) => Some(Err(super::Error::other(cause))),
1098+
EncodedItem::Error(cause) => Some(Err(cause.into())),
10921099
})
10931100
}
10941101

src/api/remote.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use nested_enum_utils::common_fields;
1010
use ref_cast::RefCast;
1111
use snafu::{Backtrace, IntoError, Snafu};
1212

13-
use super::blobs::Bitfield;
13+
use super::blobs::{Bitfield, ExportBaoOptions};
1414
use crate::{
1515
api::{blobs::WriteProgress, ApiClient},
1616
get::{fsm::DecodeError, BadRequestSnafu, GetError, GetResult, LocalFailureSnafu, Stats},
@@ -159,7 +159,7 @@ impl PushProgress {
159159

160160
async fn just_result<S, R>(stream: S) -> Option<R>
161161
where
162-
S: Stream,
162+
S: Stream<Item: std::fmt::Debug>,
163163
R: TryFrom<S::Item>,
164164
{
165165
tokio::pin!(stream);
@@ -417,12 +417,17 @@ impl Remote {
417417
let root = request.hash;
418418
let bitfield = self.store().observe(root).await?;
419419
let children = if !request.ranges.is_blob() {
420-
let bao = self.store().export_bao(root, bitfield.ranges.clone());
420+
let opts = ExportBaoOptions {
421+
hash: root,
422+
ranges: bitfield.ranges.clone(),
423+
};
424+
let bao = self.store().export_bao_with_opts(opts, 32);
421425
let mut by_index = BTreeMap::new();
422426
let mut stream = bao.hashes_with_index();
423427
while let Some(item) = stream.next().await {
424-
let (index, hash) = item?;
425-
by_index.insert(index, hash);
428+
if let Ok((index, hash)) = item {
429+
by_index.insert(index, hash);
430+
}
426431
}
427432
let mut bitfields = BTreeMap::new();
428433
let mut hash_seq = BTreeMap::new();

src/store/fs.rs

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,34 @@ impl HashContext {
305305
Ok(())
306306
}
307307

308+
pub async fn get_maybe_create(&self, hash: Hash, create: bool) -> api::Result<BaoFileHandle> {
309+
if create {
310+
self.get_or_create(hash).await
311+
} else {
312+
self.get(hash).await
313+
}
314+
}
315+
316+
pub async fn get(&self, hash: Hash) -> api::Result<BaoFileHandle> {
317+
if hash == Hash::EMPTY {
318+
return Ok(self.ctx.empty.clone());
319+
}
320+
let res = self
321+
.slot
322+
.get_or_create(|| async {
323+
let res = self.db().get(hash).await.map_err(io::Error::other)?;
324+
let res = match res {
325+
Some(state) => open_bao_file(&hash, state, &self.ctx).await,
326+
None => Err(io::Error::new(io::ErrorKind::NotFound, "hash not found")),
327+
};
328+
Ok((res?, ()))
329+
})
330+
.await
331+
.map_err(api::Error::from);
332+
let (res, _) = res?;
333+
Ok(res)
334+
}
335+
308336
pub async fn get_or_create(&self, hash: Hash) -> api::Result<BaoFileHandle> {
309337
if hash == Hash::EMPTY {
310338
return Ok(self.ctx.empty.clone());
@@ -939,7 +967,7 @@ async fn observe(cmd: ObserveMsg, ctx: HashContext) {
939967

940968
#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
941969
async fn export_ranges(mut cmd: ExportRangesMsg, ctx: HashContext) {
942-
match ctx.get_or_create(cmd.hash).await {
970+
match ctx.get(cmd.hash).await {
943971
Ok(handle) => {
944972
if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, handle).await {
945973
cmd.tx
@@ -1000,7 +1028,7 @@ async fn export_ranges_impl(
10001028

10011029
#[instrument(skip_all, fields(hash = %cmd.hash_short()))]
10021030
async fn export_bao(mut cmd: ExportBaoMsg, ctx: HashContext) {
1003-
match ctx.get_or_create(cmd.hash).await {
1031+
match ctx.get_maybe_create(cmd.hash, false).await {
10041032
Ok(handle) => {
10051033
if let Err(cause) = export_bao_impl(cmd.inner, &mut cmd.tx, handle).await {
10061034
cmd.tx
@@ -1010,9 +1038,9 @@ async fn export_bao(mut cmd: ExportBaoMsg, ctx: HashContext) {
10101038
}
10111039
}
10121040
Err(cause) => {
1013-
let cause = anyhow::anyhow!("failed to open file: {cause}");
1041+
let crate::api::Error::Io(cause) = cause;
10141042
cmd.tx
1015-
.send(bao_tree::io::EncodeError::Io(io::Error::other(cause)).into())
1043+
.send(bao_tree::io::EncodeError::Io(cause).into())
10161044
.await
10171045
.ok();
10181046
}
@@ -1024,7 +1052,7 @@ async fn export_bao_impl(
10241052
tx: &mut mpsc::Sender<EncodedItem>,
10251053
handle: BaoFileHandle,
10261054
) -> anyhow::Result<()> {
1027-
let ExportBaoRequest { ranges, hash } = cmd;
1055+
let ExportBaoRequest { ranges, hash, .. } = cmd;
10281056
debug_assert!(handle.hash() == hash, "hash mismatch");
10291057
let outboard = handle.outboard()?;
10301058
let size = outboard.tree.size();

src/store/fs/gc.rs

Lines changed: 86 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,14 +189,18 @@ pub async fn run_gc(store: Store, config: GcConfig) {
189189

190190
#[cfg(test)]
191191
mod tests {
192-
use std::path::Path;
192+
use std::{
193+
io::{self},
194+
path::Path,
195+
};
193196

194-
use bao_tree::ChunkNum;
197+
use bao_tree::{io::EncodeError, ChunkNum};
198+
use range_collections::RangeSet2;
195199
use testresult::TestResult;
196200

197201
use super::*;
198202
use crate::{
199-
api::{blobs::AddBytesOptions, Store},
203+
api::{blobs::AddBytesOptions, ExportBaoError, RequestError, Store},
200204
hashseq::HashSeq,
201205
store::fs::{options::PathOptions, tests::create_n0_bao},
202206
BlobFormat,
@@ -326,4 +330,83 @@ mod tests {
326330
gc_smoke(&store).await?;
327331
Ok(())
328332
}
333+
334+
#[tokio::test]
335+
async fn gc_check_deletion_fs() -> TestResult {
336+
tracing_subscriber::fmt::try_init().ok();
337+
let testdir = tempfile::tempdir()?;
338+
let db_path = testdir.path().join("db");
339+
let store = crate::store::fs::FsStore::load(&db_path).await?;
340+
gc_check_deletion(&store).await
341+
}
342+
343+
#[tokio::test]
344+
async fn gc_check_deletion_mem() -> TestResult {
345+
tracing_subscriber::fmt::try_init().ok();
346+
let store = crate::store::mem::MemStore::default();
347+
gc_check_deletion(&store).await
348+
}
349+
350+
async fn gc_check_deletion(store: &Store) -> TestResult {
351+
let temp_tag = store.add_bytes(b"foo".to_vec()).temp_tag().await?;
352+
let hash = *temp_tag.hash();
353+
assert_eq!(store.get_bytes(hash).await?.as_ref(), b"foo");
354+
drop(temp_tag);
355+
let mut live = HashSet::new();
356+
gc_run_once(store, &mut live).await?;
357+
358+
// check that `get_bytes` returns an error.
359+
let res = store.get_bytes(hash).await;
360+
assert!(res.is_err());
361+
assert!(matches!(
362+
res,
363+
Err(ExportBaoError::ExportBaoInner {
364+
source: EncodeError::Io(cause),
365+
..
366+
}) if cause.kind() == io::ErrorKind::NotFound
367+
));
368+
369+
// check that `export_ranges` returns an error.
370+
let res = store
371+
.export_ranges(hash, RangeSet2::all())
372+
.concatenate()
373+
.await;
374+
assert!(res.is_err());
375+
assert!(matches!(
376+
res,
377+
Err(RequestError::Inner{
378+
source: crate::api::Error::Io(cause),
379+
..
380+
}) if cause.kind() == io::ErrorKind::NotFound
381+
));
382+
383+
// check that `export_bao` returns an error.
384+
let res = store
385+
.export_bao(hash, ChunkRanges::all())
386+
.bao_to_vec()
387+
.await;
388+
assert!(res.is_err());
389+
println!("export_bao res {res:?}");
390+
assert!(matches!(
391+
res,
392+
Err(RequestError::Inner{
393+
source: crate::api::Error::Io(cause),
394+
..
395+
}) if cause.kind() == io::ErrorKind::NotFound
396+
));
397+
398+
// check that `export` returns an error.
399+
let target = tempfile::NamedTempFile::new()?;
400+
let path = target.path();
401+
let res = store.export(hash, path).await;
402+
assert!(res.is_err());
403+
assert!(matches!(
404+
res,
405+
Err(RequestError::Inner{
406+
source: crate::api::Error::Io(cause),
407+
..
408+
}) if cause.kind() == io::ErrorKind::NotFound
409+
));
410+
Ok(())
411+
}
329412
}

src/store/fs/meta.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,7 @@ impl Actor {
463463
} = cmd;
464464
for hash in hashes {
465465
if !force && protected.contains(&hash) {
466+
trace!("delete {hash}: skip (protected)");
466467
continue;
467468
}
468469
if let Some(entry) = tables.blobs.remove(hash).context(StorageSnafu)? {
@@ -471,6 +472,7 @@ impl Actor {
471472
data_location,
472473
outboard_location,
473474
} => {
475+
trace!("delete {hash}: currently complete. will be deleted.");
474476
match data_location {
475477
DataLocation::Inline(_) => {
476478
tables.inline_data.remove(hash).context(StorageSnafu)?;
@@ -493,6 +495,7 @@ impl Actor {
493495
}
494496
}
495497
EntryState::Partial { .. } => {
498+
trace!("delete {hash}: currently partial. will be deleted.");
496499
tables.ftx.delete(
497500
hash,
498501
[

0 commit comments

Comments
 (0)