From 8aa47ff69715da3ad5f1f50b7ffb777bf2b9bf86 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 29 Nov 2024 15:09:57 +0200 Subject: [PATCH 1/2] Add back tests that somehow ended up in iroh-docs --- src/net_protocol.rs | 7 - src/rpc/client/tags.rs | 3 + tests/gc.rs | 534 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 537 insertions(+), 7 deletions(-) create mode 100644 tests/gc.rs diff --git a/src/net_protocol.rs b/src/net_protocol.rs index 83d4c72ab..b7234669c 100644 --- a/src/net_protocol.rs +++ b/src/net_protocol.rs @@ -124,7 +124,6 @@ impl BlobBatches { pub struct Builder { store: S, events: Option, - gc_config: Option, } impl Builder { @@ -134,11 +133,6 @@ impl Builder { self } - pub fn gc_config(mut self, value: crate::store::GcConfig) -> Self { - self.gc_config = Some(value); - self - } - /// Build the Blobs protocol handler. /// You need to provide a local pool handle and an endpoint. pub fn build(self, rt: &LocalPoolHandle, endpoint: &Endpoint) -> Arc> { @@ -159,7 +153,6 @@ impl Blobs { Builder { store, events: None, - gc_config: None, } } } diff --git a/src/rpc/client/tags.rs b/src/rpc/client/tags.rs index 2b7cbc04d..103ecc618 100644 --- a/src/rpc/client/tags.rs +++ b/src/rpc/client/tags.rs @@ -30,6 +30,9 @@ pub struct Client> { pub(super) rpc: RpcClient, } +/// A client that uses the memory connector. +pub type MemClient = Client; + impl Client where C: Connector, diff --git a/tests/gc.rs b/tests/gc.rs new file mode 100644 index 000000000..e8b853943 --- /dev/null +++ b/tests/gc.rs @@ -0,0 +1,534 @@ +#![cfg(feature = "rpc")] +use std::{ + io::{Cursor, Write}, + path::PathBuf, + sync::Arc, + time::Duration, +}; + +use anyhow::Result; +use bao_tree::{blake3, io::sync::Outboard, ChunkRanges}; +use bytes::Bytes; +use iroh::{protocol::Router, Endpoint, NodeAddr, NodeId}; +use iroh_blobs::{ + hashseq::HashSeq, + net_protocol::Blobs, + rpc::client::{blobs, tags}, + store::{bao_tree, EntryStatus, GcConfig, MapMut, Store}, + util::{local_pool::LocalPool, Tag}, + BlobFormat, HashAndFormat, IROH_BLOCK_SIZE, +}; +use rand::RngCore; + +/// An iroh node that just has the blobs transport +#[derive(Debug)] +pub struct Node { + pub router: iroh::protocol::Router, + pub blobs: Arc>, + pub store: S, + pub _local_pool: LocalPool, +} + +impl Node { + pub fn blob_store(&self) -> &S { + &self.store + } + + /// Returns the node id + pub fn node_id(&self) -> NodeId { + self.router.endpoint().node_id() + } + + /// Returns the node address + pub async fn node_addr(&self) -> anyhow::Result { + self.router.endpoint().node_addr().await + } + + /// Shuts down the node + pub async fn shutdown(self) -> anyhow::Result<()> { + self.router.shutdown().await + } + + /// Returns an in-memory blobs client + pub fn blobs(&self) -> blobs::MemClient { + self.blobs.clone().client() + } + + /// Returns an in-memory tags client + pub fn tags(&self) -> tags::MemClient { + self.blobs().tags() + } +} + +pub fn create_test_data(size: usize) -> Bytes { + let mut rand = rand::thread_rng(); + let mut res = vec![0u8; size]; + rand.fill_bytes(&mut res); + res.into() +} + +/// Take some data and encode it +pub fn simulate_remote(data: &[u8]) -> (blake3::Hash, Cursor) { + let outboard = bao_tree::io::outboard::PostOrderMemOutboard::create(data, IROH_BLOCK_SIZE); + let mut encoded = Vec::new(); + encoded + .write_all(outboard.tree.size().to_le_bytes().as_ref()) + .unwrap(); + bao_tree::io::sync::encode_ranges_validated(data, &outboard, &ChunkRanges::all(), &mut encoded) + .unwrap(); + let hash = outboard.root(); + (hash, Cursor::new(encoded.into())) +} + +/// Wrap a bao store in a node that has gc enabled. +async fn node(store: S, gc_period: Duration) -> (Node, async_channel::Receiver<()>) { + let (gc_send, gc_recv) = async_channel::unbounded(); + let endpoint = Endpoint::builder().discovery_n0().bind().await.unwrap(); + let local_pool = LocalPool::single(); + let blobs = Blobs::builder(store.clone()).build(&local_pool, &endpoint); + let router = Router::builder(endpoint) + .accept(iroh_blobs::ALPN, blobs.clone()) + .spawn() + .await + .unwrap(); + blobs + .start_gc(GcConfig { + period: gc_period, + done_callback: Some(Box::new(move || { + gc_send.send_blocking(()).ok(); + })), + }) + .unwrap(); + ( + Node { + store, + router, + blobs, + _local_pool: local_pool, + }, + gc_recv, + ) +} + +/// Wrap a bao store in a node that has gc enabled. +async fn persistent_node( + path: PathBuf, + gc_period: Duration, +) -> ( + Node, + async_channel::Receiver<()>, +) { + let store = iroh_blobs::store::fs::Store::load(path).await.unwrap(); + node(store, gc_period).await +} + +async fn mem_node( + gc_period: Duration, +) -> ( + Node, + async_channel::Receiver<()>, +) { + let store = iroh_blobs::store::mem::Store::new(); + node(store, gc_period).await +} + +async fn gc_test_node() -> ( + Node, + iroh_blobs::store::mem::Store, + async_channel::Receiver<()>, +) { + let (node, gc_recv) = mem_node(Duration::from_millis(500)).await; + let store = node.blob_store().clone(); + (node, store, gc_recv) +} + +async fn step(evs: &async_channel::Receiver<()>) { + // drain the event queue, we want a new GC + while evs.try_recv().is_ok() {} + // wait for several GC cycles + for _ in 0..3 { + evs.recv().await.unwrap(); + } +} + +/// Test the absolute basics of gc, temp tags and tags for blobs. +#[tokio::test] +async fn gc_basics() -> Result<()> { + let _ = tracing_subscriber::fmt::try_init(); + let (node, bao_store, evs) = gc_test_node().await; + let data1 = create_test_data(1234); + let tt1 = bao_store.import_bytes(data1, BlobFormat::Raw).await?; + let data2 = create_test_data(5678); + let tt2 = bao_store.import_bytes(data2, BlobFormat::Raw).await?; + let h1 = *tt1.hash(); + let h2 = *tt2.hash(); + // temp tags are still there, so the entries should be there + step(&evs).await; + assert_eq!(bao_store.entry_status(&h1).await?, EntryStatus::Complete); + assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::Complete); + + // drop the first tag, the entry should be gone after some time + drop(tt1); + step(&evs).await; + assert_eq!(bao_store.entry_status(&h1).await?, EntryStatus::NotFound); + assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::Complete); + + // create an explicit tag for h1 (as raw) and then delete the temp tag. Entry should still be there. + let tag = Tag::from("test"); + bao_store + .set_tag(tag.clone(), Some(HashAndFormat::raw(h2))) + .await?; + drop(tt2); + tracing::info!("dropped tt2"); + step(&evs).await; + assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::Complete); + + // delete the explicit tag, entry should be gone + bao_store.set_tag(tag, None).await?; + step(&evs).await; + assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::NotFound); + + node.shutdown().await?; + Ok(()) +} + +/// Test gc for sequences of hashes that protect their children from deletion. +#[tokio::test] +async fn gc_hashseq_impl() -> Result<()> { + let _ = tracing_subscriber::fmt::try_init(); + let (node, bao_store, evs) = gc_test_node().await; + let data1 = create_test_data(1234); + let tt1 = bao_store.import_bytes(data1, BlobFormat::Raw).await?; + let data2 = create_test_data(5678); + let tt2 = bao_store.import_bytes(data2, BlobFormat::Raw).await?; + let seq = vec![*tt1.hash(), *tt2.hash()] + .into_iter() + .collect::(); + let ttr = bao_store + .import_bytes(seq.into_inner(), BlobFormat::HashSeq) + .await?; + let h1 = *tt1.hash(); + let h2 = *tt2.hash(); + let hr = *ttr.hash(); + drop(tt1); + drop(tt2); + + // there is a temp tag for the link seq, so it and its entries should be there + step(&evs).await; + assert_eq!(bao_store.entry_status(&h1).await?, EntryStatus::Complete); + assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::Complete); + assert_eq!(bao_store.entry_status(&hr).await?, EntryStatus::Complete); + + // make a permanent tag for the link seq, then delete the temp tag. Entries should still be there. + let tag = Tag::from("test"); + bao_store + .set_tag(tag.clone(), Some(HashAndFormat::hash_seq(hr))) + .await?; + drop(ttr); + step(&evs).await; + assert_eq!(bao_store.entry_status(&h1).await?, EntryStatus::Complete); + assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::Complete); + assert_eq!(bao_store.entry_status(&hr).await?, EntryStatus::Complete); + + // change the permanent tag to be just for the linkseq itself as a blob. Only the linkseq should be there, not the entries. + bao_store + .set_tag(tag.clone(), Some(HashAndFormat::raw(hr))) + .await?; + step(&evs).await; + assert_eq!(bao_store.entry_status(&h1).await?, EntryStatus::NotFound); + assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::NotFound); + assert_eq!(bao_store.entry_status(&hr).await?, EntryStatus::Complete); + + // delete the permanent tag, everything should be gone + bao_store.set_tag(tag, None).await?; + step(&evs).await; + assert_eq!(bao_store.entry_status(&h1).await?, EntryStatus::NotFound); + assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::NotFound); + assert_eq!(bao_store.entry_status(&hr).await?, EntryStatus::NotFound); + + node.shutdown().await?; + Ok(()) +} + +mod file { + use std::{io, path::PathBuf}; + + use bao_tree::{ + io::fsm::{BaoContentItem, ResponseDecoderNext}, + BaoTree, + }; + use iroh_blobs::{ + store::{BaoBatchWriter, ConsistencyCheckProgress, MapEntryMut, ReportLevel}, + util::progress::{AsyncChannelProgressSender, ProgressSender as _}, + TempTag, + }; + use testdir::testdir; + use tokio::io::AsyncReadExt; + + use super::*; + + fn path(root: PathBuf, suffix: &'static str) -> impl Fn(&iroh_blobs::Hash) -> PathBuf { + move |hash| root.join(format!("{}.{}", hash.to_hex(), suffix)) + } + + fn data_path(root: PathBuf) -> impl Fn(&iroh_blobs::Hash) -> PathBuf { + // this assumes knowledge of the internal directory structure of the flat store + path(root.join("data"), "data") + } + + fn outboard_path(root: PathBuf) -> impl Fn(&iroh_blobs::Hash) -> PathBuf { + // this assumes knowledge of the internal directory structure of the flat store + path(root.join("data"), "obao4") + } + + async fn check_consistency(store: &impl Store) -> anyhow::Result { + let mut max_level = ReportLevel::Trace; + let (tx, rx) = async_channel::bounded(1); + let task = tokio::task::spawn(async move { + while let Ok(ev) = rx.recv().await { + if let ConsistencyCheckProgress::Update { level, .. } = &ev { + max_level = max_level.max(*level); + } + } + }); + store + .consistency_check(false, AsyncChannelProgressSender::new(tx).boxed()) + .await?; + task.await?; + Ok(max_level) + } + + /// Test gc for sequences of hashes that protect their children from deletion. + #[tokio::test] + async fn gc_file_basics() -> Result<()> { + let _ = tracing_subscriber::fmt::try_init(); + let dir = testdir!(); + let path = data_path(dir.clone()); + let outboard_path = outboard_path(dir.clone()); + let (node, evs) = persistent_node(dir.clone(), Duration::from_millis(100)).await; + let bao_store = node.blob_store().clone(); + let data1 = create_test_data(10000000); + let tt1 = bao_store + .import_bytes(data1.clone(), BlobFormat::Raw) + .await?; + let data2 = create_test_data(1000000); + let tt2 = bao_store + .import_bytes(data2.clone(), BlobFormat::Raw) + .await?; + let seq = vec![*tt1.hash(), *tt2.hash()] + .into_iter() + .collect::(); + let ttr = bao_store + .import_bytes(seq.into_inner(), BlobFormat::HashSeq) + .await?; + + let h1 = *tt1.hash(); + let h2 = *tt2.hash(); + let hr = *ttr.hash(); + + // data is protected by the temp tag + step(&evs).await; + bao_store.sync().await?; + assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); + // h1 is for a giant file, so we will have both data and outboard files + assert!(path(&h1).exists()); + assert!(outboard_path(&h1).exists()); + // h2 is for a mid sized file, so we will have just the data file + assert!(path(&h2).exists()); + assert!(!outboard_path(&h2).exists()); + // hr so small that data will be inlined and outboard will not exist at all + assert!(!path(&hr).exists()); + assert!(!outboard_path(&hr).exists()); + + drop(tt1); + drop(tt2); + let tag = Tag::from("test"); + bao_store + .set_tag(tag.clone(), Some(HashAndFormat::hash_seq(*ttr.hash()))) + .await?; + drop(ttr); + + // data is now protected by a normal tag, nothing should be gone + step(&evs).await; + bao_store.sync().await?; + assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); + // h1 is for a giant file, so we will have both data and outboard files + assert!(path(&h1).exists()); + assert!(outboard_path(&h1).exists()); + // h2 is for a mid sized file, so we will have just the data file + assert!(path(&h2).exists()); + assert!(!outboard_path(&h2).exists()); + // hr so small that data will be inlined and outboard will not exist at all + assert!(!path(&hr).exists()); + assert!(!outboard_path(&hr).exists()); + + tracing::info!("changing tag from hashseq to raw, this should orphan the children"); + bao_store + .set_tag(tag.clone(), Some(HashAndFormat::raw(hr))) + .await?; + + // now only hr itself should be protected, but not its children + step(&evs).await; + bao_store.sync().await?; + assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); + // h1 should be gone + assert!(!path(&h1).exists()); + assert!(!outboard_path(&h1).exists()); + // h2 should still not be there + assert!(!path(&h2).exists()); + assert!(!outboard_path(&h2).exists()); + // hr should still not be there + assert!(!path(&hr).exists()); + assert!(!outboard_path(&hr).exists()); + + bao_store.set_tag(tag, None).await?; + step(&evs).await; + bao_store.sync().await?; + assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); + // h1 should be gone + assert!(!path(&h1).exists()); + assert!(!outboard_path(&h1).exists()); + // h2 should still not be there + assert!(!path(&h2).exists()); + assert!(!outboard_path(&h2).exists()); + // hr should still not be there + assert!(!path(&hr).exists()); + assert!(!outboard_path(&hr).exists()); + + node.shutdown().await?; + + Ok(()) + } + + /// Add a file to the store in the same way a download works. + /// + /// we know the hash in advance, create a partial entry, write the data to it and + /// the outboard file, then commit it to a complete entry. + /// + /// During this time, the partial entry is protected by a temp tag. + async fn simulate_download_partial( + bao_store: &S, + data: Bytes, + ) -> io::Result<(S::EntryMut, TempTag)> { + // simulate the remote side. + let (hash, mut response) = simulate_remote(data.as_ref()); + // simulate the local side. + // we got a hash and a response from the remote side. + let tt = bao_store.temp_tag(HashAndFormat::raw(hash.into())); + // get the size + let size = response.read_u64_le().await?; + // start reading the response + let mut reading = bao_tree::io::fsm::ResponseDecoder::new( + hash, + ChunkRanges::all(), + BaoTree::new(size, IROH_BLOCK_SIZE), + response, + ); + // create the partial entry + let entry = bao_store.get_or_create(hash.into(), size).await?; + // create the + let mut bw = entry.batch_writer().await?; + let mut buf = Vec::new(); + while let ResponseDecoderNext::More((next, res)) = reading.next().await { + let item = res?; + match &item { + BaoContentItem::Parent(_) => { + buf.push(item); + } + BaoContentItem::Leaf(_) => { + buf.push(item); + let batch = std::mem::take(&mut buf); + bw.write_batch(size, batch).await?; + } + } + reading = next; + } + bw.sync().await?; + drop(bw); + Ok((entry, tt)) + } + + async fn simulate_download_complete( + bao_store: &S, + data: Bytes, + ) -> io::Result { + let (entry, tt) = simulate_download_partial(bao_store, data).await?; + // commit the entry + bao_store.insert_complete(entry).await?; + Ok(tt) + } + + /// Test that partial files are deleted. + #[tokio::test] + async fn gc_file_partial() -> Result<()> { + let _ = tracing_subscriber::fmt::try_init(); + let dir = testdir!(); + let path = data_path(dir.clone()); + let outboard_path = outboard_path(dir.clone()); + + let (node, evs) = persistent_node(dir.clone(), Duration::from_millis(10)).await; + let bao_store = node.blob_store().clone(); + + let data1: Bytes = create_test_data(10000000); + let (_entry, tt1) = simulate_download_partial(&bao_store, data1.clone()).await?; + drop(_entry); + let h1 = *tt1.hash(); + // partial data and outboard files should be there + step(&evs).await; + bao_store.sync().await?; + assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); + assert!(path(&h1).exists()); + assert!(outboard_path(&h1).exists()); + + drop(tt1); + // partial data and outboard files should be gone + step(&evs).await; + bao_store.sync().await?; + assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); + assert!(!path(&h1).exists()); + assert!(!outboard_path(&h1).exists()); + + node.shutdown().await?; + Ok(()) + } + + #[tokio::test] + async fn gc_file_stress() -> Result<()> { + let _ = tracing_subscriber::fmt::try_init(); + let dir = testdir!(); + + let (node, evs) = persistent_node(dir.clone(), Duration::from_secs(1)).await; + let bao_store = node.blob_store().clone(); + + let mut deleted = Vec::new(); + let mut live = Vec::new(); + // download + for i in 0..100 { + let data: Bytes = create_test_data(16 * 1024 * 3 + 1); + let tt = simulate_download_complete(&bao_store, data).await.unwrap(); + if i % 100 == 0 { + let tag = Tag::from(format!("test{}", i)); + bao_store + .set_tag(tag.clone(), Some(HashAndFormat::raw(*tt.hash()))) + .await?; + live.push(*tt.hash()); + } else { + deleted.push(*tt.hash()); + } + } + step(&evs).await; + + for h in deleted.iter() { + assert_eq!(bao_store.entry_status(h).await?, EntryStatus::NotFound); + assert!(!dir.join(format!("data/{}.data", h.to_hex())).exists()); + } + + for h in live.iter() { + assert_eq!(bao_store.entry_status(h).await?, EntryStatus::Complete); + assert!(dir.join(format!("data/{}.data", h.to_hex())).exists()); + } + + node.shutdown().await?; + Ok(()) + } +} From 1846ac2e3a713579f6b61a5ef21d51fca07c50e6 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 29 Nov 2024 15:20:50 +0200 Subject: [PATCH 2/2] flatten tests --- tests/gc.rs | 535 ++++++++++++++++++++++++++-------------------------- 1 file changed, 267 insertions(+), 268 deletions(-) diff --git a/tests/gc.rs b/tests/gc.rs index e8b853943..10cc74e6e 100644 --- a/tests/gc.rs +++ b/tests/gc.rs @@ -1,5 +1,6 @@ #![cfg(feature = "rpc")] use std::{ + io, io::{Cursor, Write}, path::PathBuf, sync::Arc, @@ -7,18 +8,34 @@ use std::{ }; use anyhow::Result; -use bao_tree::{blake3, io::sync::Outboard, ChunkRanges}; +use bao_tree::{ + blake3, + io::{ + fsm::{BaoContentItem, ResponseDecoderNext}, + sync::Outboard, + }, + BaoTree, ChunkRanges, +}; use bytes::Bytes; use iroh::{protocol::Router, Endpoint, NodeAddr, NodeId}; use iroh_blobs::{ hashseq::HashSeq, net_protocol::Blobs, rpc::client::{blobs, tags}, - store::{bao_tree, EntryStatus, GcConfig, MapMut, Store}, - util::{local_pool::LocalPool, Tag}, - BlobFormat, HashAndFormat, IROH_BLOCK_SIZE, + store::{ + bao_tree, BaoBatchWriter, ConsistencyCheckProgress, EntryStatus, GcConfig, MapEntryMut, + MapMut, ReportLevel, Store, + }, + util::{ + local_pool::LocalPool, + progress::{AsyncChannelProgressSender, ProgressSender as _}, + Tag, + }, + BlobFormat, HashAndFormat, TempTag, IROH_BLOCK_SIZE, }; use rand::RngCore; +use testdir::testdir; +use tokio::io::AsyncReadExt; /// An iroh node that just has the blobs transport #[derive(Debug)] @@ -250,285 +267,267 @@ async fn gc_hashseq_impl() -> Result<()> { Ok(()) } -mod file { - use std::{io, path::PathBuf}; - - use bao_tree::{ - io::fsm::{BaoContentItem, ResponseDecoderNext}, - BaoTree, - }; - use iroh_blobs::{ - store::{BaoBatchWriter, ConsistencyCheckProgress, MapEntryMut, ReportLevel}, - util::progress::{AsyncChannelProgressSender, ProgressSender as _}, - TempTag, - }; - use testdir::testdir; - use tokio::io::AsyncReadExt; - - use super::*; - - fn path(root: PathBuf, suffix: &'static str) -> impl Fn(&iroh_blobs::Hash) -> PathBuf { - move |hash| root.join(format!("{}.{}", hash.to_hex(), suffix)) - } +fn path(root: PathBuf, suffix: &'static str) -> impl Fn(&iroh_blobs::Hash) -> PathBuf { + move |hash| root.join(format!("{}.{}", hash.to_hex(), suffix)) +} - fn data_path(root: PathBuf) -> impl Fn(&iroh_blobs::Hash) -> PathBuf { - // this assumes knowledge of the internal directory structure of the flat store - path(root.join("data"), "data") - } +fn data_path(root: PathBuf) -> impl Fn(&iroh_blobs::Hash) -> PathBuf { + // this assumes knowledge of the internal directory structure of the flat store + path(root.join("data"), "data") +} - fn outboard_path(root: PathBuf) -> impl Fn(&iroh_blobs::Hash) -> PathBuf { - // this assumes knowledge of the internal directory structure of the flat store - path(root.join("data"), "obao4") - } +fn outboard_path(root: PathBuf) -> impl Fn(&iroh_blobs::Hash) -> PathBuf { + // this assumes knowledge of the internal directory structure of the flat store + path(root.join("data"), "obao4") +} - async fn check_consistency(store: &impl Store) -> anyhow::Result { - let mut max_level = ReportLevel::Trace; - let (tx, rx) = async_channel::bounded(1); - let task = tokio::task::spawn(async move { - while let Ok(ev) = rx.recv().await { - if let ConsistencyCheckProgress::Update { level, .. } = &ev { - max_level = max_level.max(*level); - } +async fn check_consistency(store: &impl Store) -> anyhow::Result { + let mut max_level = ReportLevel::Trace; + let (tx, rx) = async_channel::bounded(1); + let task = tokio::task::spawn(async move { + while let Ok(ev) = rx.recv().await { + if let ConsistencyCheckProgress::Update { level, .. } = &ev { + max_level = max_level.max(*level); } - }); - store - .consistency_check(false, AsyncChannelProgressSender::new(tx).boxed()) - .await?; - task.await?; - Ok(max_level) - } + } + }); + store + .consistency_check(false, AsyncChannelProgressSender::new(tx).boxed()) + .await?; + task.await?; + Ok(max_level) +} - /// Test gc for sequences of hashes that protect their children from deletion. - #[tokio::test] - async fn gc_file_basics() -> Result<()> { - let _ = tracing_subscriber::fmt::try_init(); - let dir = testdir!(); - let path = data_path(dir.clone()); - let outboard_path = outboard_path(dir.clone()); - let (node, evs) = persistent_node(dir.clone(), Duration::from_millis(100)).await; - let bao_store = node.blob_store().clone(); - let data1 = create_test_data(10000000); - let tt1 = bao_store - .import_bytes(data1.clone(), BlobFormat::Raw) - .await?; - let data2 = create_test_data(1000000); - let tt2 = bao_store - .import_bytes(data2.clone(), BlobFormat::Raw) - .await?; - let seq = vec![*tt1.hash(), *tt2.hash()] - .into_iter() - .collect::(); - let ttr = bao_store - .import_bytes(seq.into_inner(), BlobFormat::HashSeq) - .await?; - - let h1 = *tt1.hash(); - let h2 = *tt2.hash(); - let hr = *ttr.hash(); - - // data is protected by the temp tag - step(&evs).await; - bao_store.sync().await?; - assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); - // h1 is for a giant file, so we will have both data and outboard files - assert!(path(&h1).exists()); - assert!(outboard_path(&h1).exists()); - // h2 is for a mid sized file, so we will have just the data file - assert!(path(&h2).exists()); - assert!(!outboard_path(&h2).exists()); - // hr so small that data will be inlined and outboard will not exist at all - assert!(!path(&hr).exists()); - assert!(!outboard_path(&hr).exists()); - - drop(tt1); - drop(tt2); - let tag = Tag::from("test"); - bao_store - .set_tag(tag.clone(), Some(HashAndFormat::hash_seq(*ttr.hash()))) - .await?; - drop(ttr); - - // data is now protected by a normal tag, nothing should be gone - step(&evs).await; - bao_store.sync().await?; - assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); - // h1 is for a giant file, so we will have both data and outboard files - assert!(path(&h1).exists()); - assert!(outboard_path(&h1).exists()); - // h2 is for a mid sized file, so we will have just the data file - assert!(path(&h2).exists()); - assert!(!outboard_path(&h2).exists()); - // hr so small that data will be inlined and outboard will not exist at all - assert!(!path(&hr).exists()); - assert!(!outboard_path(&hr).exists()); - - tracing::info!("changing tag from hashseq to raw, this should orphan the children"); - bao_store - .set_tag(tag.clone(), Some(HashAndFormat::raw(hr))) - .await?; - - // now only hr itself should be protected, but not its children - step(&evs).await; - bao_store.sync().await?; - assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); - // h1 should be gone - assert!(!path(&h1).exists()); - assert!(!outboard_path(&h1).exists()); - // h2 should still not be there - assert!(!path(&h2).exists()); - assert!(!outboard_path(&h2).exists()); - // hr should still not be there - assert!(!path(&hr).exists()); - assert!(!outboard_path(&hr).exists()); - - bao_store.set_tag(tag, None).await?; - step(&evs).await; - bao_store.sync().await?; - assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); - // h1 should be gone - assert!(!path(&h1).exists()); - assert!(!outboard_path(&h1).exists()); - // h2 should still not be there - assert!(!path(&h2).exists()); - assert!(!outboard_path(&h2).exists()); - // hr should still not be there - assert!(!path(&hr).exists()); - assert!(!outboard_path(&hr).exists()); - - node.shutdown().await?; - - Ok(()) - } +/// Test gc for sequences of hashes that protect their children from deletion. +#[tokio::test] +async fn gc_file_basics() -> Result<()> { + let _ = tracing_subscriber::fmt::try_init(); + let dir = testdir!(); + let path = data_path(dir.clone()); + let outboard_path = outboard_path(dir.clone()); + let (node, evs) = persistent_node(dir.clone(), Duration::from_millis(100)).await; + let bao_store = node.blob_store().clone(); + let data1 = create_test_data(10000000); + let tt1 = bao_store + .import_bytes(data1.clone(), BlobFormat::Raw) + .await?; + let data2 = create_test_data(1000000); + let tt2 = bao_store + .import_bytes(data2.clone(), BlobFormat::Raw) + .await?; + let seq = vec![*tt1.hash(), *tt2.hash()] + .into_iter() + .collect::(); + let ttr = bao_store + .import_bytes(seq.into_inner(), BlobFormat::HashSeq) + .await?; + + let h1 = *tt1.hash(); + let h2 = *tt2.hash(); + let hr = *ttr.hash(); + + // data is protected by the temp tag + step(&evs).await; + bao_store.sync().await?; + assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); + // h1 is for a giant file, so we will have both data and outboard files + assert!(path(&h1).exists()); + assert!(outboard_path(&h1).exists()); + // h2 is for a mid sized file, so we will have just the data file + assert!(path(&h2).exists()); + assert!(!outboard_path(&h2).exists()); + // hr so small that data will be inlined and outboard will not exist at all + assert!(!path(&hr).exists()); + assert!(!outboard_path(&hr).exists()); + + drop(tt1); + drop(tt2); + let tag = Tag::from("test"); + bao_store + .set_tag(tag.clone(), Some(HashAndFormat::hash_seq(*ttr.hash()))) + .await?; + drop(ttr); - /// Add a file to the store in the same way a download works. - /// - /// we know the hash in advance, create a partial entry, write the data to it and - /// the outboard file, then commit it to a complete entry. - /// - /// During this time, the partial entry is protected by a temp tag. - async fn simulate_download_partial( - bao_store: &S, - data: Bytes, - ) -> io::Result<(S::EntryMut, TempTag)> { - // simulate the remote side. - let (hash, mut response) = simulate_remote(data.as_ref()); - // simulate the local side. - // we got a hash and a response from the remote side. - let tt = bao_store.temp_tag(HashAndFormat::raw(hash.into())); - // get the size - let size = response.read_u64_le().await?; - // start reading the response - let mut reading = bao_tree::io::fsm::ResponseDecoder::new( - hash, - ChunkRanges::all(), - BaoTree::new(size, IROH_BLOCK_SIZE), - response, - ); - // create the partial entry - let entry = bao_store.get_or_create(hash.into(), size).await?; - // create the - let mut bw = entry.batch_writer().await?; - let mut buf = Vec::new(); - while let ResponseDecoderNext::More((next, res)) = reading.next().await { - let item = res?; - match &item { - BaoContentItem::Parent(_) => { - buf.push(item); - } - BaoContentItem::Leaf(_) => { - buf.push(item); - let batch = std::mem::take(&mut buf); - bw.write_batch(size, batch).await?; - } + // data is now protected by a normal tag, nothing should be gone + step(&evs).await; + bao_store.sync().await?; + assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); + // h1 is for a giant file, so we will have both data and outboard files + assert!(path(&h1).exists()); + assert!(outboard_path(&h1).exists()); + // h2 is for a mid sized file, so we will have just the data file + assert!(path(&h2).exists()); + assert!(!outboard_path(&h2).exists()); + // hr so small that data will be inlined and outboard will not exist at all + assert!(!path(&hr).exists()); + assert!(!outboard_path(&hr).exists()); + + tracing::info!("changing tag from hashseq to raw, this should orphan the children"); + bao_store + .set_tag(tag.clone(), Some(HashAndFormat::raw(hr))) + .await?; + + // now only hr itself should be protected, but not its children + step(&evs).await; + bao_store.sync().await?; + assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); + // h1 should be gone + assert!(!path(&h1).exists()); + assert!(!outboard_path(&h1).exists()); + // h2 should still not be there + assert!(!path(&h2).exists()); + assert!(!outboard_path(&h2).exists()); + // hr should still not be there + assert!(!path(&hr).exists()); + assert!(!outboard_path(&hr).exists()); + + bao_store.set_tag(tag, None).await?; + step(&evs).await; + bao_store.sync().await?; + assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); + // h1 should be gone + assert!(!path(&h1).exists()); + assert!(!outboard_path(&h1).exists()); + // h2 should still not be there + assert!(!path(&h2).exists()); + assert!(!outboard_path(&h2).exists()); + // hr should still not be there + assert!(!path(&hr).exists()); + assert!(!outboard_path(&hr).exists()); + + node.shutdown().await?; + + Ok(()) +} + +/// Add a file to the store in the same way a download works. +/// +/// we know the hash in advance, create a partial entry, write the data to it and +/// the outboard file, then commit it to a complete entry. +/// +/// During this time, the partial entry is protected by a temp tag. +async fn simulate_download_partial( + bao_store: &S, + data: Bytes, +) -> io::Result<(S::EntryMut, TempTag)> { + // simulate the remote side. + let (hash, mut response) = simulate_remote(data.as_ref()); + // simulate the local side. + // we got a hash and a response from the remote side. + let tt = bao_store.temp_tag(HashAndFormat::raw(hash.into())); + // get the size + let size = response.read_u64_le().await?; + // start reading the response + let mut reading = bao_tree::io::fsm::ResponseDecoder::new( + hash, + ChunkRanges::all(), + BaoTree::new(size, IROH_BLOCK_SIZE), + response, + ); + // create the partial entry + let entry = bao_store.get_or_create(hash.into(), size).await?; + // create the + let mut bw = entry.batch_writer().await?; + let mut buf = Vec::new(); + while let ResponseDecoderNext::More((next, res)) = reading.next().await { + let item = res?; + match &item { + BaoContentItem::Parent(_) => { + buf.push(item); + } + BaoContentItem::Leaf(_) => { + buf.push(item); + let batch = std::mem::take(&mut buf); + bw.write_batch(size, batch).await?; } - reading = next; } - bw.sync().await?; - drop(bw); - Ok((entry, tt)) + reading = next; } + bw.sync().await?; + drop(bw); + Ok((entry, tt)) +} - async fn simulate_download_complete( - bao_store: &S, - data: Bytes, - ) -> io::Result { - let (entry, tt) = simulate_download_partial(bao_store, data).await?; - // commit the entry - bao_store.insert_complete(entry).await?; - Ok(tt) - } +async fn simulate_download_complete( + bao_store: &S, + data: Bytes, +) -> io::Result { + let (entry, tt) = simulate_download_partial(bao_store, data).await?; + // commit the entry + bao_store.insert_complete(entry).await?; + Ok(tt) +} - /// Test that partial files are deleted. - #[tokio::test] - async fn gc_file_partial() -> Result<()> { - let _ = tracing_subscriber::fmt::try_init(); - let dir = testdir!(); - let path = data_path(dir.clone()); - let outboard_path = outboard_path(dir.clone()); - - let (node, evs) = persistent_node(dir.clone(), Duration::from_millis(10)).await; - let bao_store = node.blob_store().clone(); - - let data1: Bytes = create_test_data(10000000); - let (_entry, tt1) = simulate_download_partial(&bao_store, data1.clone()).await?; - drop(_entry); - let h1 = *tt1.hash(); - // partial data and outboard files should be there - step(&evs).await; - bao_store.sync().await?; - assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); - assert!(path(&h1).exists()); - assert!(outboard_path(&h1).exists()); - - drop(tt1); - // partial data and outboard files should be gone - step(&evs).await; - bao_store.sync().await?; - assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); - assert!(!path(&h1).exists()); - assert!(!outboard_path(&h1).exists()); - - node.shutdown().await?; - Ok(()) - } +/// Test that partial files are deleted. +#[tokio::test] +async fn gc_file_partial() -> Result<()> { + let _ = tracing_subscriber::fmt::try_init(); + let dir = testdir!(); + let path = data_path(dir.clone()); + let outboard_path = outboard_path(dir.clone()); - #[tokio::test] - async fn gc_file_stress() -> Result<()> { - let _ = tracing_subscriber::fmt::try_init(); - let dir = testdir!(); - - let (node, evs) = persistent_node(dir.clone(), Duration::from_secs(1)).await; - let bao_store = node.blob_store().clone(); - - let mut deleted = Vec::new(); - let mut live = Vec::new(); - // download - for i in 0..100 { - let data: Bytes = create_test_data(16 * 1024 * 3 + 1); - let tt = simulate_download_complete(&bao_store, data).await.unwrap(); - if i % 100 == 0 { - let tag = Tag::from(format!("test{}", i)); - bao_store - .set_tag(tag.clone(), Some(HashAndFormat::raw(*tt.hash()))) - .await?; - live.push(*tt.hash()); - } else { - deleted.push(*tt.hash()); - } - } - step(&evs).await; + let (node, evs) = persistent_node(dir.clone(), Duration::from_millis(10)).await; + let bao_store = node.blob_store().clone(); - for h in deleted.iter() { - assert_eq!(bao_store.entry_status(h).await?, EntryStatus::NotFound); - assert!(!dir.join(format!("data/{}.data", h.to_hex())).exists()); - } + let data1: Bytes = create_test_data(10000000); + let (_entry, tt1) = simulate_download_partial(&bao_store, data1.clone()).await?; + drop(_entry); + let h1 = *tt1.hash(); + // partial data and outboard files should be there + step(&evs).await; + bao_store.sync().await?; + assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); + assert!(path(&h1).exists()); + assert!(outboard_path(&h1).exists()); + + drop(tt1); + // partial data and outboard files should be gone + step(&evs).await; + bao_store.sync().await?; + assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); + assert!(!path(&h1).exists()); + assert!(!outboard_path(&h1).exists()); + + node.shutdown().await?; + Ok(()) +} - for h in live.iter() { - assert_eq!(bao_store.entry_status(h).await?, EntryStatus::Complete); - assert!(dir.join(format!("data/{}.data", h.to_hex())).exists()); +#[tokio::test] +async fn gc_file_stress() -> Result<()> { + let _ = tracing_subscriber::fmt::try_init(); + let dir = testdir!(); + + let (node, evs) = persistent_node(dir.clone(), Duration::from_secs(1)).await; + let bao_store = node.blob_store().clone(); + + let mut deleted = Vec::new(); + let mut live = Vec::new(); + // download + for i in 0..100 { + let data: Bytes = create_test_data(16 * 1024 * 3 + 1); + let tt = simulate_download_complete(&bao_store, data).await.unwrap(); + if i % 100 == 0 { + let tag = Tag::from(format!("test{}", i)); + bao_store + .set_tag(tag.clone(), Some(HashAndFormat::raw(*tt.hash()))) + .await?; + live.push(*tt.hash()); + } else { + deleted.push(*tt.hash()); } + } + step(&evs).await; - node.shutdown().await?; - Ok(()) + for h in deleted.iter() { + assert_eq!(bao_store.entry_status(h).await?, EntryStatus::NotFound); + assert!(!dir.join(format!("data/{}.data", h.to_hex())).exists()); } + + for h in live.iter() { + assert_eq!(bao_store.entry_status(h).await?, EntryStatus::Complete); + assert!(dir.join(format!("data/{}.data", h.to_hex())).exists()); + } + + node.shutdown().await?; + Ok(()) }