diff --git a/Cargo.lock b/Cargo.lock index 78fd707db17..40417ede1b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -567,9 +567,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.34" +version = "1.1.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67b9470d453346108f93a59222a9a1a5724db32d0a4727b7ab7ace4b4d822dc9" +checksum = "0f57c4b4da2a9d619dd035f27316d7a426305b75be93d09e92f2b9229c34feaf" dependencies = [ "shlex", ] @@ -2654,7 +2654,6 @@ dependencies = [ "tracing", "tracing-subscriber", "url", - "walkdir", ] [[package]] @@ -2703,7 +2702,7 @@ dependencies = [ [[package]] name = "iroh-blobs" version = "0.28.1" -source = "git+https://github.com/n0-computer/iroh-blobs?branch=main#191cd2a1c25885f8ef0d58d83df150017bc4c8bb" +source = "git+https://github.com/n0-computer/iroh-blobs?branch=main#2337e46d89d6f2f2fda512439890ab74bab24fb4" dependencies = [ "anyhow", "async-channel", @@ -2713,6 +2712,7 @@ dependencies = [ "derive_more", "futures-buffered", "futures-lite 2.4.0", + "futures-util", "genawaiter", "hashlink", "hex", @@ -2722,26 +2722,33 @@ dependencies = [ "iroh-net", "iroh-quinn", "iroh-router", + "nested_enum_utils", "num_cpus", "oneshot", "parking_lot", "pin-project", + "portable-atomic", "postcard", + "quic-rpc", + "quic-rpc-derive", "rand", "range-collections", "redb 1.5.1", "redb 2.2.0", + "ref-cast", "reflink-copy", "self_cell", "serde", "serde-error", "smallvec", + "strum 0.26.3", "tempfile", "thiserror", "tokio", "tokio-util", "tracing", "tracing-futures", + "walkdir", ] [[package]] @@ -2893,7 +2900,7 @@ dependencies = [ [[package]] name = "iroh-gossip" version = "0.28.1" -source = "git+https://github.com/n0-computer/iroh-gossip?branch=main#efce3e1dc991c15a7f1fc6f579f04876a22a7b1e" +source = "git+https://github.com/n0-computer/iroh-gossip?branch=main#3d6659daf326f57cbafec189be996b89c7f441bd" dependencies = [ "anyhow", "async-channel", @@ -4419,9 +4426,9 @@ dependencies = [ [[package]] name = "quic-rpc" -version = "0.14.0" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d8431b2e7c22929347b61a354d4936d71fe7ab1e6b0475dc50e98276970dfec" +checksum = "61e131f594054d27d077162815db3b5e9ddd76a28fbb9091b68095971e75c286" dependencies = [ "anyhow", "bincode", @@ -4444,9 +4451,9 @@ dependencies = [ [[package]] name = "quic-rpc-derive" -version = "0.14.0" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "403bc8506c847468e00170dbbbfe2c54d13b090031bcbe474cd3faea021cbd9f" +checksum = "cbef4c942978f74ef296ae40d43d4375c9d730b65a582688a358108cfd5c0cf7" dependencies = [ "proc-macro2", "quic-rpc", @@ -4497,9 +4504,9 @@ dependencies = [ [[package]] name = "quinn-udp" -version = "0.5.6" +version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e346e016eacfff12233c243718197ca12f148c84e1e84268a896699b41c71780" +checksum = "7d5a626c6807713b15cac82a6acaccd6043c9a5408c24baae07611fec3f243da" dependencies = [ "cfg_aliases", "libc", @@ -4917,9 +4924,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.38" +version = "0.38.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa260229e6538e52293eeb577aabd09945a09d6d9cc0fc550ed7529056c2e32a" +checksum = "375116bee2be9ed569afe2154ea6a99dfdffd257f533f187498c2a8f5feaf4ee" dependencies = [ "bitflags 2.6.0", "errno", diff --git a/Cargo.toml b/Cargo.toml index c069d76c1a9..a7d2a811049 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,6 @@ iroh-metrics = { path = "./iroh-metrics" } iroh-test = { path = "./iroh-test" } iroh-router = { path = "./iroh-router" } -iroh-blobs = { git = "https://github.com/n0-computer/iroh-blobs", branch = "main" } iroh-gossip = { git = "https://github.com/n0-computer/iroh-gossip", branch = "main" } iroh-docs = { git = "https://github.com/n0-computer/iroh-docs", branch = "main" } +iroh-blobs = { git = "https://github.com/n0-computer/iroh-blobs", branch = "main" } diff --git a/iroh-cli/Cargo.toml b/iroh-cli/Cargo.toml index d33c68e5e39..5a76c09c1b5 100644 --- a/iroh-cli/Cargo.toml +++ b/iroh-cli/Cargo.toml @@ -48,7 +48,7 @@ pkarr = { version = "2.2.0", default-features = false } portable-atomic = "1" portmapper = { version = "0.1.0", path = "../net-tools/portmapper" } postcard = "1.0.8" -quic-rpc = { version = "0.14", features = ["flume-transport", "quinn-transport"] } +quic-rpc = { version = "0.15", features = ["flume-transport", "quinn-transport"] } rand = "0.8.5" ratatui = "0.26.2" reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } diff --git a/iroh-cli/src/commands/blobs.rs b/iroh-cli/src/commands/blobs.rs index 0be5657a87f..f945f9d788b 100644 --- a/iroh-cli/src/commands/blobs.rs +++ b/iroh-cli/src/commands/blobs.rs @@ -19,6 +19,7 @@ use iroh::{ base::{node_addr::AddrInfoOptions, ticket::BlobTicket}, blobs::{ get::{db::DownloadProgress, progress::BlobProgress, Stats}, + net_protocol::DownloadMode, provider::AddProgress, store::{ ConsistencyCheckProgress, ExportFormat, ExportMode, ReportLevel, ValidateProgress, @@ -28,8 +29,7 @@ use iroh::{ }, client::{ blobs::{ - BlobInfo, BlobStatus, CollectionInfo, DownloadMode, DownloadOptions, - IncompleteBlobInfo, WrapOption, + BlobInfo, BlobStatus, CollectionInfo, DownloadOptions, IncompleteBlobInfo, WrapOption, }, Iroh, }, @@ -370,7 +370,9 @@ impl BlobCommands { BlobFormat::Raw }; let status = iroh.blobs().status(hash).await?; - let ticket = iroh.blobs().share(hash, format, addr_options).await?; + let mut addr: NodeAddr = iroh.net().node_addr().await?; + addr.apply_options(addr_options); + let ticket = BlobTicket::new(addr, hash, format)?; let (blob_status, size) = match (status, format) { (BlobStatus::Complete { size }, BlobFormat::Raw) => ("blob", size), diff --git a/iroh-net/src/relay/client.rs b/iroh-net/src/relay/client.rs index a5ee9b645ac..e7c449b7739 100644 --- a/iroh-net/src/relay/client.rs +++ b/iroh-net/src/relay/client.rs @@ -1029,7 +1029,7 @@ fn host_header_value(relay_url: RelayUrl) -> Result { .host_str() .ok_or_else(|| ClientError::InvalidUrl(relay_url.to_string()))?; // strip the trailing dot, if present: example.com. -> example.com - let relay_url_host = relay_url_host.strip_suffix(".").unwrap_or(relay_url_host); + let relay_url_host = relay_url_host.strip_suffix('.').unwrap_or(relay_url_host); // build the host header value (reserve up to 6 chars for the ":" and port digits): let mut host_header_value = String::with_capacity(relay_url_host.len() + 6); host_header_value += relay_url_host; diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index cb3f298d5db..d0d1062352a 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -39,8 +39,8 @@ iroh-docs = { version = "0.28.0" } iroh-gossip = "0.28.1" parking_lot = "0.12.1" postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] } -quic-rpc = { version = "0.14", default-features = false, features = ["flume-transport", "quinn-transport"] } -quic-rpc-derive = { version = "0.14" } +quic-rpc = { version = "0.15", default-features = false, features = ["flume-transport", "quinn-transport"] } +quic-rpc-derive = { version = "0.15" } quinn = { package = "iroh-quinn", version = "0.12" } rand = "0.8" serde = { version = "1", features = ["derive"] } @@ -51,7 +51,6 @@ tokio = { version = "1", features = ["io-util", "rt"] } tokio-stream = "0.1" tokio-util = { version = "0.7", features = ["codec", "io-util", "io", "time"] } tracing = "0.1" -walkdir = "2" # Examples clap = { version = "4", features = ["derive"], optional = true } diff --git a/iroh/examples/collection-provide.rs b/iroh/examples/collection-provide.rs index 429288a2f2e..06d9f6dcfee 100644 --- a/iroh/examples/collection-provide.rs +++ b/iroh/examples/collection-provide.rs @@ -7,7 +7,7 @@ //! run this example from the project root: //! $ cargo run --example collection-provide use iroh::blobs::{format::collection::Collection, util::SetTagOption, BlobFormat}; -use iroh_base::node_addr::AddrInfoOptions; +use iroh_base::{node_addr::AddrInfoOptions, ticket::BlobTicket}; use tracing_subscriber::{prelude::*, EnvFilter}; // set the RUST_LOG env var to one of {debug,info,warn} to see logging info @@ -44,14 +44,9 @@ async fn main() -> anyhow::Result<()> { // create a ticket // tickets wrap all details needed to get a collection - let ticket = node - .blobs() - .share( - hash, - BlobFormat::HashSeq, - AddrInfoOptions::RelayAndAddresses, - ) - .await?; + let mut addr = node.net().node_addr().await?; + addr.apply_options(AddrInfoOptions::RelayAndAddresses); + let ticket = BlobTicket::new(addr, hash, BlobFormat::HashSeq)?; // print some info about the node println!("serving hash: {}", ticket.hash()); diff --git a/iroh/examples/custom-protocol.rs b/iroh/examples/custom-protocol.rs index c6aeae5a5ff..b6bdd56d387 100644 --- a/iroh/examples/custom-protocol.rs +++ b/iroh/examples/custom-protocol.rs @@ -118,7 +118,7 @@ async fn main() -> Result<()> { // Print out our query results. for hash in hashes { - read_and_print(node.blobs(), hash).await?; + read_and_print(&node.blobs(), hash).await?; } } } diff --git a/iroh/examples/hello-world-provide.rs b/iroh/examples/hello-world-provide.rs index 15c21801dea..a74bc3bae35 100644 --- a/iroh/examples/hello-world-provide.rs +++ b/iroh/examples/hello-world-provide.rs @@ -3,7 +3,7 @@ //! This is using an in memory database and a random node id. //! run this example from the project root: //! $ cargo run --example hello-world-provide -use iroh_base::node_addr::AddrInfoOptions; +use iroh_base::{node_addr::AddrInfoOptions, ticket::BlobTicket}; use tracing_subscriber::{prelude::*, EnvFilter}; // set the RUST_LOG env var to one of {debug,info,warn} to see logging info @@ -27,10 +27,9 @@ async fn main() -> anyhow::Result<()> { let res = node.blobs().add_bytes("Hello, world!").await?; // create a ticket - let ticket = node - .blobs() - .share(res.hash, res.format, AddrInfoOptions::RelayAndAddresses) - .await?; + let mut addr = node.net().node_addr().await?; + addr.apply_options(AddrInfoOptions::RelayAndAddresses); + let ticket = BlobTicket::new(addr, res.hash, res.format)?; // print some info about the node println!("serving hash: {}", ticket.hash()); diff --git a/iroh/src/client.rs b/iroh/src/client.rs index 54505682a03..c74d2082814 100644 --- a/iroh/src/client.rs +++ b/iroh/src/client.rs @@ -6,7 +6,6 @@ use std::collections::BTreeMap; use anyhow::Result; use futures_lite::{Stream, StreamExt}; -use quic_rpc::client::BoxedServiceConnection; use ref_cast::RefCast; use crate::rpc_protocol::node::{CounterStats, ShutdownRequest, StatsRequest, StatusRequest}; @@ -19,13 +18,10 @@ pub(crate) use self::quic::{connect_raw as quic_connect_raw, RPC_ALPN}; pub use self::{docs::Doc, net::NodeStatus}; pub mod authors; -pub mod blobs; +pub use iroh_blobs::rpc::client::{blobs, tags}; +pub use iroh_gossip::rpc::client as gossip; pub mod docs; pub mod net; -pub mod tags; - -/// Iroh rpc connection - boxed so that we can have a concrete type. -pub(crate) type RpcConnection = BoxedServiceConnection; // Keep this type exposed, otherwise every occurrence of `RpcClient` in the API // will show up as `RpcClient>` in the docs. @@ -60,8 +56,8 @@ impl Iroh { } /// Returns the blobs client. - pub fn blobs(&self) -> &blobs::Client { - blobs::Client::ref_cast(&self.rpc) + pub fn blobs(&self) -> blobs::Client { + blobs::Client::new(self.rpc.clone().map().boxed()) } /// Returns the docs client. @@ -75,14 +71,13 @@ impl Iroh { } /// Returns the tags client. - pub fn tags(&self) -> &tags::Client { - tags::Client::ref_cast(&self.rpc) + pub fn tags(&self) -> tags::Client { + tags::Client::new(self.rpc.clone().map().boxed()) } /// Returns the gossip client. - pub fn gossip(&self) -> iroh_gossip::RpcClient { - let channel = self.rpc.clone().map::(); - iroh_gossip::RpcClient::new(channel) + pub fn gossip(&self) -> gossip::Client { + gossip::Client::new(self.rpc.clone().map().boxed()) } /// Returns the net client. diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs deleted file mode 100644 index 7bc4d889ad3..00000000000 --- a/iroh/src/client/blobs.rs +++ /dev/null @@ -1,1677 +0,0 @@ -//! API for blobs management. -//! -//! The main entry point is the [`Client`]. -//! -//! You obtain a [`Client`] via [`Iroh::blobs()`](crate::client::Iroh::blobs). -//! -//! ## Interacting with the local blob store -//! -//! ### Importing data -//! -//! There are several ways to import data into the local blob store: -//! -//! - [`add_bytes`](Client::add_bytes) -//! imports in memory data. -//! - [`add_stream`](Client::add_stream) -//! imports data from a stream of bytes. -//! - [`add_reader`](Client::add_reader) -//! imports data from an [async reader](tokio::io::AsyncRead). -//! - [`add_from_path`](Client::add_from_path) -//! imports data from a file. -//! -//! The last method imports data from a file on the local filesystem. -//! This is the most efficient way to import large amounts of data. -//! -//! ### Exporting data -//! -//! There are several ways to export data from the local blob store: -//! -//! - [`read_to_bytes`](Client::read_to_bytes) reads data into memory. -//! - [`read`](Client::read) creates a [reader](Reader) to read data from. -//! - [`export`](Client::export) eports data to a file on the local filesystem. -//! -//! ## Interacting with remote nodes -//! -//! - [`download`](Client::download) downloads data from a remote node. -//! - [`share`](Client::share) allows creating a ticket to share data with a -//! remote node. -//! -//! ## Interacting with the blob store itself -//! -//! These are more advanced operations that are usually not needed in normal -//! operation. -//! -//! - [`consistency_check`](Client::consistency_check) checks the internal -//! consistency of the local blob store. -//! - [`validate`](Client::validate) validates the locally stored data against -//! their BLAKE3 hashes. -//! - [`delete_blob`](Client::delete_blob) deletes a blob from the local store. -//! -//! ### Batch operations -//! -//! For complex update operations, there is a [`batch`](Client::batch) API that -//! allows you to add multiple blobs in a single logical batch. -//! -//! Operations in a batch return [temporary tags](crate::blobs::TempTag) that -//! protect the added data from garbage collection as long as the batch is -//! alive. -//! -//! To store the data permanently, a temp tag needs to be upgraded to a -//! permanent tag using [`persist`](crate::client::blobs::Batch::persist) or -//! [`persist_to`](crate::client::blobs::Batch::persist_to). -use std::{ - future::Future, - io, - path::PathBuf, - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; - -use anyhow::{anyhow, Context as _, Result}; -use bytes::Bytes; -use futures_lite::{Stream, StreamExt}; -use futures_util::SinkExt; -use genawaiter::sync::{Co, Gen}; -use iroh_base::{node_addr::AddrInfoOptions, ticket::BlobTicket}; -pub use iroh_blobs::net_protocol::DownloadMode; -use iroh_blobs::{ - export::ExportProgress as BytesExportProgress, - format::collection::{Collection, SimpleStore}, - get::db::DownloadProgress as BytesDownloadProgress, - net_protocol::BlobDownloadRequest, - store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress}, - util::SetTagOption, - BlobFormat, Hash, Tag, -}; -use iroh_net::NodeAddr; -use portable_atomic::{AtomicU64, Ordering}; -use quic_rpc::client::BoxStreamSync; -use ref_cast::RefCast; -use serde::{Deserialize, Serialize}; -use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; -use tokio_util::io::{ReaderStream, StreamReader}; -use tracing::warn; - -mod batch; -pub use batch::{AddDirOpts, AddFileOpts, AddReaderOpts, Batch}; - -use super::{flatten, tags, Iroh, RpcClient}; -use crate::rpc_protocol::{ - blobs::{ - AddPathRequest, AddStreamRequest, AddStreamUpdate, BatchCreateRequest, BatchCreateResponse, - BlobStatusRequest, ConsistencyCheckRequest, CreateCollectionRequest, - CreateCollectionResponse, DeleteRequest, ExportRequest, ListIncompleteRequest, ListRequest, - ReadAtRequest, ReadAtResponse, ValidateRequest, - }, - node::StatusRequest, -}; - -/// Iroh blobs client. -#[derive(Debug, Clone, RefCast)] -#[repr(transparent)] -pub struct Client { - pub(super) rpc: RpcClient, -} - -impl<'a> From<&'a Iroh> for &'a RpcClient { - fn from(client: &'a Iroh) -> &'a RpcClient { - &client.blobs().rpc - } -} - -impl Client { - /// Check if a blob is completely stored on the node. - /// - /// Note that this will return false for blobs that are partially stored on - /// the node. - pub async fn status(&self, hash: Hash) -> Result { - let status = self.rpc.rpc(BlobStatusRequest { hash }).await??; - Ok(status.0) - } - - /// Check if a blob is completely stored on the node. - /// - /// This is just a convenience wrapper around `status` that returns a boolean. - pub async fn has(&self, hash: Hash) -> Result { - match self.status(hash).await { - Ok(BlobStatus::Complete { .. }) => Ok(true), - Ok(_) => Ok(false), - Err(err) => Err(err), - } - } - - /// Create a new batch for adding data. - /// - /// A batch is a context in which temp tags are created and data is added to the node. Temp tags - /// are automatically deleted when the batch is dropped, leading to the data being garbage collected - /// unless a permanent tag is created for it. - pub async fn batch(&self) -> Result { - let (updates, mut stream) = self.rpc.bidi(BatchCreateRequest).await?; - let BatchCreateResponse::Id(batch) = stream.next().await.context("expected scope id")??; - let rpc = self.rpc.clone(); - Ok(Batch::new(batch, rpc, updates, 1024)) - } - - /// Stream the contents of a a single blob. - /// - /// Returns a [`Reader`], which can report the size of the blob before reading it. - pub async fn read(&self, hash: Hash) -> Result { - Reader::from_rpc_read(&self.rpc, hash).await - } - - /// Read offset + len from a single blob. - /// - /// If `len` is `None` it will read the full blob. - pub async fn read_at(&self, hash: Hash, offset: u64, len: ReadAtLen) -> Result { - Reader::from_rpc_read_at(&self.rpc, hash, offset, len).await - } - - /// Read all bytes of single blob. - /// - /// This allocates a buffer for the full blob. Use only if you know that the blob you're - /// reading is small. If not sure, use [`Self::read`] and check the size with - /// [`Reader::size`] before calling [`Reader::read_to_bytes`]. - pub async fn read_to_bytes(&self, hash: Hash) -> Result { - Reader::from_rpc_read(&self.rpc, hash) - .await? - .read_to_bytes() - .await - } - - /// Read all bytes of single blob at `offset` for length `len`. - /// - /// This allocates a buffer for the full length. - pub async fn read_at_to_bytes(&self, hash: Hash, offset: u64, len: ReadAtLen) -> Result { - Reader::from_rpc_read_at(&self.rpc, hash, offset, len) - .await? - .read_to_bytes() - .await - } - - /// Import a blob from a filesystem path. - /// - /// `path` should be an absolute path valid for the file system on which - /// the node runs. - /// If `in_place` is true, Iroh will assume that the data will not change and will share it in - /// place without copying to the Iroh data directory. - pub async fn add_from_path( - &self, - path: PathBuf, - in_place: bool, - tag: SetTagOption, - wrap: WrapOption, - ) -> Result { - let stream = self - .rpc - .server_streaming(AddPathRequest { - path, - in_place, - tag, - wrap, - }) - .await?; - Ok(AddProgress::new(stream)) - } - - /// Create a collection from already existing blobs. - /// - /// For automatically clearing the tags for the passed in blobs you can set - /// `tags_to_delete` to those tags, and they will be deleted once the collection is created. - pub async fn create_collection( - &self, - collection: Collection, - tag: SetTagOption, - tags_to_delete: Vec, - ) -> anyhow::Result<(Hash, Tag)> { - let CreateCollectionResponse { hash, tag } = self - .rpc - .rpc(CreateCollectionRequest { - collection, - tag, - tags_to_delete, - }) - .await??; - Ok((hash, tag)) - } - - /// Write a blob by passing an async reader. - pub async fn add_reader( - &self, - reader: impl AsyncRead + Unpin + Send + 'static, - tag: SetTagOption, - ) -> anyhow::Result { - const CAP: usize = 1024 * 64; // send 64KB per request by default - let input = ReaderStream::with_capacity(reader, CAP); - self.add_stream(input, tag).await - } - - /// Write a blob by passing a stream of bytes. - pub async fn add_stream( - &self, - input: impl Stream> + Send + Unpin + 'static, - tag: SetTagOption, - ) -> anyhow::Result { - let (mut sink, progress) = self.rpc.bidi(AddStreamRequest { tag }).await?; - let mut input = input.map(|chunk| match chunk { - Ok(chunk) => Ok(AddStreamUpdate::Chunk(chunk)), - Err(err) => { - warn!("Abort send, reason: failed to read from source stream: {err:?}"); - Ok(AddStreamUpdate::Abort) - } - }); - tokio::spawn(async move { - // TODO: Is it important to catch this error? It should also result in an error on the - // response stream. If we deem it important, we could one-shot send it into the - // BlobAddProgress and return from there. Not sure. - if let Err(err) = sink.send_all(&mut input).await { - warn!("Failed to send input stream to remote: {err:?}"); - } - }); - - Ok(AddProgress::new(progress)) - } - - /// Write a blob by passing bytes. - pub async fn add_bytes(&self, bytes: impl Into) -> anyhow::Result { - let input = futures_lite::stream::once(Ok(bytes.into())); - self.add_stream(input, SetTagOption::Auto).await?.await - } - - /// Write a blob by passing bytes, setting an explicit tag name. - pub async fn add_bytes_named( - &self, - bytes: impl Into, - name: impl Into, - ) -> anyhow::Result { - let input = futures_lite::stream::once(Ok(bytes.into())); - self.add_stream(input, SetTagOption::Named(name.into())) - .await? - .await - } - - /// Validate hashes on the running node. - /// - /// If `repair` is true, repair the store by removing invalid data. - pub async fn validate( - &self, - repair: bool, - ) -> Result>> { - let stream = self - .rpc - .server_streaming(ValidateRequest { repair }) - .await?; - Ok(stream.map(|res| res.map_err(anyhow::Error::from))) - } - - /// Validate hashes on the running node. - /// - /// If `repair` is true, repair the store by removing invalid data. - pub async fn consistency_check( - &self, - repair: bool, - ) -> Result>> { - let stream = self - .rpc - .server_streaming(ConsistencyCheckRequest { repair }) - .await?; - Ok(stream.map(|r| r.map_err(anyhow::Error::from))) - } - - /// Download a blob from another node and add it to the local database. - pub async fn download(&self, hash: Hash, node: NodeAddr) -> Result { - self.download_with_opts( - hash, - DownloadOptions { - format: BlobFormat::Raw, - nodes: vec![node], - tag: SetTagOption::Auto, - mode: DownloadMode::Queued, - }, - ) - .await - } - - /// Download a hash sequence from another node and add it to the local database. - pub async fn download_hash_seq(&self, hash: Hash, node: NodeAddr) -> Result { - self.download_with_opts( - hash, - DownloadOptions { - format: BlobFormat::HashSeq, - nodes: vec![node], - tag: SetTagOption::Auto, - mode: DownloadMode::Queued, - }, - ) - .await - } - - /// Download a blob, with additional options. - pub async fn download_with_opts( - &self, - hash: Hash, - opts: DownloadOptions, - ) -> Result { - let DownloadOptions { - format, - nodes, - tag, - mode, - } = opts; - let stream = self - .rpc - .server_streaming(BlobDownloadRequest { - hash, - format, - nodes, - tag, - mode, - }) - .await?; - Ok(DownloadProgress::new( - stream.map(|res| res.map_err(anyhow::Error::from)), - )) - } - - /// Export a blob from the internal blob store to a path on the node's filesystem. - /// - /// `destination` should be an writeable, absolute path on the local node's filesystem. - /// - /// If `format` is set to [`ExportFormat::Collection`], and the `hash` refers to a collection, - /// all children of the collection will be exported. See [`ExportFormat`] for details. - /// - /// The `mode` argument defines if the blob should be copied to the target location or moved out of - /// the internal store into the target location. See [`ExportMode`] for details. - pub async fn export( - &self, - hash: Hash, - destination: PathBuf, - format: ExportFormat, - mode: ExportMode, - ) -> Result { - let req = ExportRequest { - hash, - path: destination, - format, - mode, - }; - let stream = self.rpc.server_streaming(req).await?; - Ok(ExportProgress::new( - stream.map(|r| r.map_err(anyhow::Error::from)), - )) - } - - /// List all complete blobs. - pub async fn list(&self) -> Result>> { - let stream = self.rpc.server_streaming(ListRequest).await?; - Ok(flatten(stream)) - } - - /// List all incomplete (partial) blobs. - pub async fn list_incomplete(&self) -> Result>> { - let stream = self.rpc.server_streaming(ListIncompleteRequest).await?; - Ok(flatten(stream)) - } - - /// Read the content of a collection. - pub async fn get_collection(&self, hash: Hash) -> Result { - Collection::load(hash, self).await - } - - /// List all collections. - pub fn list_collections(&self) -> Result>> { - let this = self.clone(); - Ok(Gen::new(|co| async move { - if let Err(cause) = this.list_collections_impl(&co).await { - co.yield_(Err(cause)).await; - } - })) - } - - async fn list_collections_impl(&self, co: &Co>) -> Result<()> { - let tags = self.tags_client(); - let mut tags = tags.list_hash_seq().await?; - while let Some(tag) = tags.next().await { - let tag = tag?; - if let Ok(collection) = self.get_collection(tag.hash).await { - let info = CollectionInfo { - tag: tag.name, - hash: tag.hash, - total_blobs_count: Some(collection.len() as u64 + 1), - total_blobs_size: Some(0), - }; - co.yield_(Ok(info)).await; - } - } - Ok(()) - } - - /// Delete a blob. - /// - /// **Warning**: this operation deletes the blob from the local store even - /// if it is tagged. You should usually not do this manually, but rely on the - /// node to remove data that is not tagged. - pub async fn delete_blob(&self, hash: Hash) -> Result<()> { - self.rpc.rpc(DeleteRequest { hash }).await??; - Ok(()) - } - - /// Share a blob. - pub async fn share( - &self, - hash: Hash, - blob_format: BlobFormat, - addr_options: AddrInfoOptions, - ) -> Result { - let mut addr = self.rpc.rpc(StatusRequest).await??.addr; - addr.apply_options(addr_options); - let ticket = BlobTicket::new(addr, hash, blob_format).expect("correct ticket"); - - Ok(ticket) - } - - fn tags_client(&self) -> tags::Client { - tags::Client { - rpc: self.rpc.clone(), - } - } -} - -impl SimpleStore for Client { - async fn load(&self, hash: Hash) -> anyhow::Result { - self.read_to_bytes(hash).await - } -} - -/// Defines the way to read bytes. -#[derive(Debug, Serialize, Deserialize, Default, Clone, Copy)] -pub enum ReadAtLen { - /// Reads all available bytes. - #[default] - All, - /// Reads exactly this many bytes, erroring out on larger or smaller. - Exact(u64), - /// Reads at most this many bytes. - AtMost(u64), -} - -impl ReadAtLen { - pub(crate) fn as_result_len(&self, size_remaining: u64) -> u64 { - match self { - ReadAtLen::All => size_remaining, - ReadAtLen::Exact(len) => *len, - ReadAtLen::AtMost(len) => std::cmp::min(*len, size_remaining), - } - } -} - -/// Whether to wrap the added data in a collection. -#[derive(Debug, Serialize, Deserialize, Default, Clone)] -pub enum WrapOption { - /// Do not wrap the file or directory. - #[default] - NoWrap, - /// Wrap the file or directory in a collection. - Wrap { - /// Override the filename in the wrapping collection. - name: Option, - }, -} - -/// Status information about a blob. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub enum BlobStatus { - /// The blob is not stored at all. - NotFound, - /// The blob is only stored partially. - Partial { - /// The size of the currently stored partial blob. - size: BaoBlobSize, - }, - /// The blob is stored completely. - Complete { - /// The size of the blob. - size: u64, - }, -} - -/// Outcome of a blob add operation. -#[derive(Debug, Clone)] -pub struct AddOutcome { - /// The hash of the blob - pub hash: Hash, - /// The format the blob - pub format: BlobFormat, - /// The size of the blob - pub size: u64, - /// The tag of the blob - pub tag: Tag, -} - -/// Information about a stored collection. -#[derive(Debug, Serialize, Deserialize)] -pub struct CollectionInfo { - /// Tag of the collection - pub tag: Tag, - - /// Hash of the collection - pub hash: Hash, - /// Number of children in the collection - /// - /// This is an optional field, because the data is not always available. - pub total_blobs_count: Option, - /// Total size of the raw data referred to by all links - /// - /// This is an optional field, because the data is not always available. - pub total_blobs_size: Option, -} - -/// Information about a complete blob. -#[derive(Debug, Serialize, Deserialize)] -pub struct BlobInfo { - /// Location of the blob - pub path: String, - /// The hash of the blob - pub hash: Hash, - /// The size of the blob - pub size: u64, -} - -/// Information about an incomplete blob. -#[derive(Debug, Serialize, Deserialize)] -pub struct IncompleteBlobInfo { - /// The size we got - pub size: u64, - /// The size we expect - pub expected_size: u64, - /// The hash of the blob - pub hash: Hash, -} - -/// Progress stream for blob add operations. -#[derive(derive_more::Debug)] -pub struct AddProgress { - #[debug(skip)] - stream: Pin< - Box> + Send + Unpin + 'static>, - >, - current_total_size: Arc, -} - -impl AddProgress { - fn new( - stream: (impl Stream< - Item = Result, impl Into>, - > + Send - + Unpin - + 'static), - ) -> Self { - let current_total_size = Arc::new(AtomicU64::new(0)); - let total_size = current_total_size.clone(); - let stream = stream.map(move |item| match item { - Ok(item) => { - let item = item.into(); - if let iroh_blobs::provider::AddProgress::Found { size, .. } = &item { - total_size.fetch_add(*size, Ordering::Relaxed); - } - Ok(item) - } - Err(err) => Err(err.into()), - }); - Self { - stream: Box::pin(stream), - current_total_size, - } - } - /// Finish writing the stream, ignoring all intermediate progress events. - /// - /// Returns a [`AddOutcome`] which contains a tag, format, hash and a size. - /// When importing a single blob, this is the hash and size of that blob. - /// When importing a collection, the hash is the hash of the collection and the size - /// is the total size of all imported blobs (but excluding the size of the collection blob - /// itself). - pub async fn finish(self) -> Result { - self.await - } -} - -impl Stream for AddProgress { - type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.stream).poll_next(cx) - } -} - -impl Future for AddProgress { - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - match Pin::new(&mut self.stream).poll_next(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(None) => { - return Poll::Ready(Err(anyhow!("Response stream ended prematurely"))) - } - Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)), - Poll::Ready(Some(Ok(msg))) => match msg { - iroh_blobs::provider::AddProgress::AllDone { hash, format, tag } => { - let outcome = AddOutcome { - hash, - format, - tag, - size: self.current_total_size.load(Ordering::Relaxed), - }; - return Poll::Ready(Ok(outcome)); - } - iroh_blobs::provider::AddProgress::Abort(err) => { - return Poll::Ready(Err(err.into())); - } - _ => {} - }, - } - } - } -} - -/// Outcome of a blob download operation. -#[derive(Debug, Clone)] -pub struct DownloadOutcome { - /// The size of the data we already had locally - pub local_size: u64, - /// The size of the data we downloaded from the network - pub downloaded_size: u64, - /// Statistics about the download - pub stats: iroh_blobs::get::Stats, -} - -/// Progress stream for blob download operations. -#[derive(derive_more::Debug)] -pub struct DownloadProgress { - #[debug(skip)] - stream: Pin> + Send + Unpin + 'static>>, - current_local_size: Arc, - current_network_size: Arc, -} - -impl DownloadProgress { - /// Create a [`DownloadProgress`] that can help you easily poll the [`BytesDownloadProgress`] stream from your download until it is finished or errors. - pub fn new( - stream: (impl Stream, impl Into>> - + Send - + Unpin - + 'static), - ) -> Self { - let current_local_size = Arc::new(AtomicU64::new(0)); - let current_network_size = Arc::new(AtomicU64::new(0)); - - let local_size = current_local_size.clone(); - let network_size = current_network_size.clone(); - - let stream = stream.map(move |item| match item { - Ok(item) => { - let item = item.into(); - match &item { - BytesDownloadProgress::FoundLocal { size, .. } => { - local_size.fetch_add(size.value(), Ordering::Relaxed); - } - BytesDownloadProgress::Found { size, .. } => { - network_size.fetch_add(*size, Ordering::Relaxed); - } - _ => {} - } - - Ok(item) - } - Err(err) => Err(err.into()), - }); - Self { - stream: Box::pin(stream), - current_local_size, - current_network_size, - } - } - - /// Finish writing the stream, ignoring all intermediate progress events. - /// - /// Returns a [`DownloadOutcome`] which contains the size of the content we downloaded and the size of the content we already had locally. - /// When importing a single blob, this is the size of that blob. - /// When importing a collection, this is the total size of all imported blobs (but excluding the size of the collection blob itself). - pub async fn finish(self) -> Result { - self.await - } -} - -impl Stream for DownloadProgress { - type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.stream).poll_next(cx) - } -} - -impl Future for DownloadProgress { - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - match Pin::new(&mut self.stream).poll_next(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(None) => { - return Poll::Ready(Err(anyhow!("Response stream ended prematurely"))) - } - Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)), - Poll::Ready(Some(Ok(msg))) => match msg { - BytesDownloadProgress::AllDone(stats) => { - let outcome = DownloadOutcome { - local_size: self.current_local_size.load(Ordering::Relaxed), - downloaded_size: self.current_network_size.load(Ordering::Relaxed), - stats, - }; - return Poll::Ready(Ok(outcome)); - } - BytesDownloadProgress::Abort(err) => { - return Poll::Ready(Err(err.into())); - } - _ => {} - }, - } - } - } -} - -/// Outcome of a blob export operation. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct ExportOutcome { - /// The total size of the exported data. - total_size: u64, -} - -/// Progress stream for blob export operations. -#[derive(derive_more::Debug)] -pub struct ExportProgress { - #[debug(skip)] - stream: Pin> + Send + Unpin + 'static>>, - current_total_size: Arc, -} - -impl ExportProgress { - /// Create a [`ExportProgress`] that can help you easily poll the [`BytesExportProgress`] stream from your - /// download until it is finished or errors. - pub fn new( - stream: (impl Stream, impl Into>> - + Send - + Unpin - + 'static), - ) -> Self { - let current_total_size = Arc::new(AtomicU64::new(0)); - let total_size = current_total_size.clone(); - let stream = stream.map(move |item| match item { - Ok(item) => { - let item = item.into(); - if let BytesExportProgress::Found { size, .. } = &item { - let size = size.value(); - total_size.fetch_add(size, Ordering::Relaxed); - } - - Ok(item) - } - Err(err) => Err(err.into()), - }); - Self { - stream: Box::pin(stream), - current_total_size, - } - } - - /// Finish writing the stream, ignoring all intermediate progress events. - /// - /// Returns a [`ExportOutcome`] which contains the size of the content we exported. - pub async fn finish(self) -> Result { - self.await - } -} - -impl Stream for ExportProgress { - type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.stream).poll_next(cx) - } -} - -impl Future for ExportProgress { - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - match Pin::new(&mut self.stream).poll_next(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(None) => { - return Poll::Ready(Err(anyhow!("Response stream ended prematurely"))) - } - Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)), - Poll::Ready(Some(Ok(msg))) => match msg { - BytesExportProgress::AllDone => { - let outcome = ExportOutcome { - total_size: self.current_total_size.load(Ordering::Relaxed), - }; - return Poll::Ready(Ok(outcome)); - } - BytesExportProgress::Abort(err) => { - return Poll::Ready(Err(err.into())); - } - _ => {} - }, - } - } - } -} - -/// Data reader for a single blob. -/// -/// Implements [`AsyncRead`]. -#[derive(derive_more::Debug)] -pub struct Reader { - size: u64, - response_size: u64, - is_complete: bool, - #[debug("StreamReader")] - stream: tokio_util::io::StreamReader>, Bytes>, -} - -impl Reader { - fn new( - size: u64, - response_size: u64, - is_complete: bool, - stream: BoxStreamSync<'static, io::Result>, - ) -> Self { - Self { - size, - response_size, - is_complete, - stream: StreamReader::new(stream), - } - } - - pub(crate) async fn from_rpc_read(rpc: &RpcClient, hash: Hash) -> anyhow::Result { - Self::from_rpc_read_at(rpc, hash, 0, ReadAtLen::All).await - } - - async fn from_rpc_read_at( - rpc: &RpcClient, - hash: Hash, - offset: u64, - len: ReadAtLen, - ) -> anyhow::Result { - let stream = rpc - .server_streaming(ReadAtRequest { hash, offset, len }) - .await?; - let mut stream = flatten(stream); - - let (size, is_complete) = match stream.next().await { - Some(Ok(ReadAtResponse::Entry { size, is_complete })) => (size, is_complete), - Some(Err(err)) => return Err(err), - Some(Ok(_)) => return Err(anyhow!("Expected header frame, but got data frame")), - None => return Err(anyhow!("Expected header frame, but RPC stream was dropped")), - }; - - let stream = stream.map(|item| match item { - Ok(ReadAtResponse::Data { chunk }) => Ok(chunk), - Ok(_) => Err(io::Error::new(io::ErrorKind::Other, "Expected data frame")), - Err(err) => Err(io::Error::new(io::ErrorKind::Other, format!("{err}"))), - }); - let len = len.as_result_len(size.value() - offset); - Ok(Self::new(size.value(), len, is_complete, Box::pin(stream))) - } - - /// Total size of this blob. - pub fn size(&self) -> u64 { - self.size - } - - /// Whether this blob has been downloaded completely. - /// - /// Returns false for partial blobs for which some chunks are missing. - pub fn is_complete(&self) -> bool { - self.is_complete - } - - /// Read all bytes of the blob. - pub async fn read_to_bytes(&mut self) -> anyhow::Result { - let mut buf = Vec::with_capacity(self.response_size as usize); - self.read_to_end(&mut buf).await?; - Ok(buf.into()) - } -} - -impl AsyncRead for Reader { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - Pin::new(&mut self.stream).poll_read(cx, buf) - } -} - -impl Stream for Reader { - type Item = io::Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.stream).get_pin_mut().poll_next(cx) - } - - fn size_hint(&self) -> (usize, Option) { - self.stream.get_ref().size_hint() - } -} - -/// Options to configure a download request. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct DownloadOptions { - /// The format of the data to download. - pub format: BlobFormat, - /// Source nodes to download from. - /// - /// If set to more than a single node, they will all be tried. If `mode` is set to - /// [`DownloadMode::Direct`], they will be tried sequentially until a download succeeds. - /// If `mode` is set to [`DownloadMode::Queued`], the nodes may be dialed in parallel, - /// if the concurrency limits permit. - pub nodes: Vec, - /// Optional tag to tag the data with. - pub tag: SetTagOption, - /// Whether to directly start the download or add it to the download queue. - pub mode: DownloadMode, -} - -#[cfg(test)] -mod tests { - use iroh_blobs::hashseq::HashSeq; - use iroh_net::NodeId; - use rand::RngCore; - use testresult::TestResult; - use tokio::{io::AsyncWriteExt, sync::mpsc}; - - use super::*; - - #[tokio::test] - async fn test_blob_create_collection() -> Result<()> { - let _guard = iroh_test::logging::setup(); - - let node = crate::node::Node::memory().spawn().await?; - - // create temp file - let temp_dir = tempfile::tempdir().context("tempdir")?; - - let in_root = temp_dir.path().join("in"); - tokio::fs::create_dir_all(in_root.clone()) - .await - .context("create dir all")?; - - let mut paths = Vec::new(); - for i in 0..5 { - let path = in_root.join(format!("test-{i}")); - let size = 100; - let mut buf = vec![0u8; size]; - rand::thread_rng().fill_bytes(&mut buf); - let mut file = tokio::fs::File::create(path.clone()) - .await - .context("create file")?; - file.write_all(&buf.clone()).await.context("write_all")?; - file.flush().await.context("flush")?; - paths.push(path); - } - - let client = node.client(); - - let mut collection = Collection::default(); - let mut tags = Vec::new(); - // import files - for path in &paths { - let import_outcome = client - .blobs() - .add_from_path( - path.to_path_buf(), - false, - SetTagOption::Auto, - WrapOption::NoWrap, - ) - .await - .context("import file")? - .finish() - .await - .context("import finish")?; - - collection.push( - path.file_name().unwrap().to_str().unwrap().to_string(), - import_outcome.hash, - ); - tags.push(import_outcome.tag); - } - - let (hash, tag) = client - .blobs() - .create_collection(collection, SetTagOption::Auto, tags) - .await?; - - let collections: Vec<_> = client.blobs().list_collections()?.try_collect().await?; - - assert_eq!(collections.len(), 1); - { - let CollectionInfo { - tag, - hash, - total_blobs_count, - .. - } = &collections[0]; - assert_eq!(tag, tag); - assert_eq!(hash, hash); - // 5 blobs + 1 meta - assert_eq!(total_blobs_count, &Some(5 + 1)); - } - - // check that "temp" tags have been deleted - let tags: Vec<_> = client.tags().list().await?.try_collect().await?; - assert_eq!(tags.len(), 1); - assert_eq!(tags[0].hash, hash); - assert_eq!(tags[0].name, tag); - assert_eq!(tags[0].format, BlobFormat::HashSeq); - - Ok(()) - } - - #[tokio::test] - async fn test_blob_read_at() -> Result<()> { - // let _guard = iroh_test::logging::setup(); - - let node = crate::node::Node::memory().spawn().await?; - - // create temp file - let temp_dir = tempfile::tempdir().context("tempdir")?; - - let in_root = temp_dir.path().join("in"); - tokio::fs::create_dir_all(in_root.clone()) - .await - .context("create dir all")?; - - let path = in_root.join("test-blob"); - let size = 1024 * 128; - let buf: Vec = (0..size).map(|i| i as u8).collect(); - let mut file = tokio::fs::File::create(path.clone()) - .await - .context("create file")?; - file.write_all(&buf.clone()).await.context("write_all")?; - file.flush().await.context("flush")?; - - let client = node.client(); - - let import_outcome = client - .blobs() - .add_from_path( - path.to_path_buf(), - false, - SetTagOption::Auto, - WrapOption::NoWrap, - ) - .await - .context("import file")? - .finish() - .await - .context("import finish")?; - - let hash = import_outcome.hash; - - // Read everything - let res = client.blobs().read_to_bytes(hash).await?; - assert_eq!(&res, &buf[..]); - - // Read at smaller than blob_get_chunk_size - let res = client - .blobs() - .read_at_to_bytes(hash, 0, ReadAtLen::Exact(100)) - .await?; - assert_eq!(res.len(), 100); - assert_eq!(&res[..], &buf[0..100]); - - let res = client - .blobs() - .read_at_to_bytes(hash, 20, ReadAtLen::Exact(120)) - .await?; - assert_eq!(res.len(), 120); - assert_eq!(&res[..], &buf[20..140]); - - // Read at equal to blob_get_chunk_size - let res = client - .blobs() - .read_at_to_bytes(hash, 0, ReadAtLen::Exact(1024 * 64)) - .await?; - assert_eq!(res.len(), 1024 * 64); - assert_eq!(&res[..], &buf[0..1024 * 64]); - - let res = client - .blobs() - .read_at_to_bytes(hash, 20, ReadAtLen::Exact(1024 * 64)) - .await?; - assert_eq!(res.len(), 1024 * 64); - assert_eq!(&res[..], &buf[20..(20 + 1024 * 64)]); - - // Read at larger than blob_get_chunk_size - let res = client - .blobs() - .read_at_to_bytes(hash, 0, ReadAtLen::Exact(10 + 1024 * 64)) - .await?; - assert_eq!(res.len(), 10 + 1024 * 64); - assert_eq!(&res[..], &buf[0..(10 + 1024 * 64)]); - - let res = client - .blobs() - .read_at_to_bytes(hash, 20, ReadAtLen::Exact(10 + 1024 * 64)) - .await?; - assert_eq!(res.len(), 10 + 1024 * 64); - assert_eq!(&res[..], &buf[20..(20 + 10 + 1024 * 64)]); - - // full length - let res = client - .blobs() - .read_at_to_bytes(hash, 20, ReadAtLen::All) - .await?; - assert_eq!(res.len(), 1024 * 128 - 20); - assert_eq!(&res[..], &buf[20..]); - - // size should be total - let reader = client - .blobs() - .read_at(hash, 0, ReadAtLen::Exact(20)) - .await?; - assert_eq!(reader.size(), 1024 * 128); - assert_eq!(reader.response_size, 20); - - // last chunk - exact - let res = client - .blobs() - .read_at_to_bytes(hash, 1024 * 127, ReadAtLen::Exact(1024)) - .await?; - assert_eq!(res.len(), 1024); - assert_eq!(res, &buf[1024 * 127..]); - - // last chunk - open - let res = client - .blobs() - .read_at_to_bytes(hash, 1024 * 127, ReadAtLen::All) - .await?; - assert_eq!(res.len(), 1024); - assert_eq!(res, &buf[1024 * 127..]); - - // last chunk - larger - let mut res = client - .blobs() - .read_at(hash, 1024 * 127, ReadAtLen::AtMost(2048)) - .await?; - assert_eq!(res.size, 1024 * 128); - assert_eq!(res.response_size, 1024); - let res = res.read_to_bytes().await?; - assert_eq!(res.len(), 1024); - assert_eq!(res, &buf[1024 * 127..]); - - // out of bounds - too long - let res = client - .blobs() - .read_at(hash, 0, ReadAtLen::Exact(1024 * 128 + 1)) - .await; - let err = res.unwrap_err(); - assert!(err.to_string().contains("out of bound")); - - // out of bounds - offset larger than blob - let res = client - .blobs() - .read_at(hash, 1024 * 128 + 1, ReadAtLen::All) - .await; - let err = res.unwrap_err(); - assert!(err.to_string().contains("out of range")); - - // out of bounds - offset + length too large - let res = client - .blobs() - .read_at(hash, 1024 * 127, ReadAtLen::Exact(1025)) - .await; - let err = res.unwrap_err(); - assert!(err.to_string().contains("out of bound")); - - Ok(()) - } - - #[tokio::test] - async fn test_blob_get_collection() -> Result<()> { - let _guard = iroh_test::logging::setup(); - - let node = crate::node::Node::memory().spawn().await?; - - // create temp file - let temp_dir = tempfile::tempdir().context("tempdir")?; - - let in_root = temp_dir.path().join("in"); - tokio::fs::create_dir_all(in_root.clone()) - .await - .context("create dir all")?; - - let mut paths = Vec::new(); - for i in 0..5 { - let path = in_root.join(format!("test-{i}")); - let size = 100; - let mut buf = vec![0u8; size]; - rand::thread_rng().fill_bytes(&mut buf); - let mut file = tokio::fs::File::create(path.clone()) - .await - .context("create file")?; - file.write_all(&buf.clone()).await.context("write_all")?; - file.flush().await.context("flush")?; - paths.push(path); - } - - let client = node.client(); - - let mut collection = Collection::default(); - let mut tags = Vec::new(); - // import files - for path in &paths { - let import_outcome = client - .blobs() - .add_from_path( - path.to_path_buf(), - false, - SetTagOption::Auto, - WrapOption::NoWrap, - ) - .await - .context("import file")? - .finish() - .await - .context("import finish")?; - - collection.push( - path.file_name().unwrap().to_str().unwrap().to_string(), - import_outcome.hash, - ); - tags.push(import_outcome.tag); - } - - let (hash, _tag) = client - .blobs() - .create_collection(collection, SetTagOption::Auto, tags) - .await?; - - let collection = client.blobs().get_collection(hash).await?; - - // 5 blobs - assert_eq!(collection.len(), 5); - - Ok(()) - } - - #[tokio::test] - async fn test_blob_share() -> Result<()> { - let _guard = iroh_test::logging::setup(); - - let node = crate::node::Node::memory().spawn().await?; - - // create temp file - let temp_dir = tempfile::tempdir().context("tempdir")?; - - let in_root = temp_dir.path().join("in"); - tokio::fs::create_dir_all(in_root.clone()) - .await - .context("create dir all")?; - - let path = in_root.join("test-blob"); - let size = 1024 * 128; - let buf: Vec = (0..size).map(|i| i as u8).collect(); - let mut file = tokio::fs::File::create(path.clone()) - .await - .context("create file")?; - file.write_all(&buf.clone()).await.context("write_all")?; - file.flush().await.context("flush")?; - - let client = node.client(); - - let import_outcome = client - .blobs() - .add_from_path( - path.to_path_buf(), - false, - SetTagOption::Auto, - WrapOption::NoWrap, - ) - .await - .context("import file")? - .finish() - .await - .context("import finish")?; - - let ticket = client - .blobs() - .share(import_outcome.hash, BlobFormat::Raw, Default::default()) - .await?; - assert_eq!(ticket.hash(), import_outcome.hash); - - let status = client.blobs().status(import_outcome.hash).await?; - assert_eq!(status, BlobStatus::Complete { size }); - - Ok(()) - } - - #[derive(Debug, Clone)] - struct BlobEvents { - sender: mpsc::Sender, - } - - impl BlobEvents { - fn new(cap: usize) -> (Self, mpsc::Receiver) { - let (s, r) = mpsc::channel(cap); - (Self { sender: s }, r) - } - } - - impl iroh_blobs::provider::CustomEventSender for BlobEvents { - fn send(&self, event: iroh_blobs::provider::Event) -> futures_lite::future::Boxed<()> { - let sender = self.sender.clone(); - Box::pin(async move { - sender.send(event).await.ok(); - }) - } - - fn try_send(&self, event: iroh_blobs::provider::Event) { - self.sender.try_send(event).ok(); - } - } - - #[tokio::test] - async fn test_blob_provide_events() -> Result<()> { - let _guard = iroh_test::logging::setup(); - - let (node1_events, mut node1_events_r) = BlobEvents::new(16); - let node1 = crate::node::Node::memory() - .blobs_events(node1_events) - .spawn() - .await?; - - let (node2_events, mut node2_events_r) = BlobEvents::new(16); - let node2 = crate::node::Node::memory() - .blobs_events(node2_events) - .spawn() - .await?; - - let import_outcome = node1.blobs().add_bytes(&b"hello world"[..]).await?; - - // Download in node2 - let node1_addr = node1.net().node_addr().await?; - let res = node2 - .blobs() - .download(import_outcome.hash, node1_addr) - .await? - .await?; - dbg!(&res); - assert_eq!(res.local_size, 0); - assert_eq!(res.downloaded_size, 11); - - node1.shutdown().await?; - node2.shutdown().await?; - - let mut ev1 = Vec::new(); - while let Some(ev) = node1_events_r.recv().await { - ev1.push(ev); - } - // assert_eq!(ev1.len(), 3); - assert!(matches!( - ev1[0], - iroh_blobs::provider::Event::ClientConnected { .. } - )); - assert!(matches!( - ev1[1], - iroh_blobs::provider::Event::GetRequestReceived { .. } - )); - assert!(matches!( - ev1[2], - iroh_blobs::provider::Event::TransferProgress { .. } - )); - assert!(matches!( - ev1[3], - iroh_blobs::provider::Event::TransferCompleted { .. } - )); - dbg!(&ev1); - - let mut ev2 = Vec::new(); - while let Some(ev) = node2_events_r.recv().await { - ev2.push(ev); - } - - // Node 2 did not provide anything - assert!(ev2.is_empty()); - Ok(()) - } - /// Download a existing blob from oneself - #[tokio::test] - async fn test_blob_get_self_existing() -> TestResult<()> { - let _guard = iroh_test::logging::setup(); - - let node = crate::node::Node::memory().spawn().await?; - let node_id = node.node_id(); - let client = node.client(); - - let AddOutcome { hash, size, .. } = client.blobs().add_bytes("foo").await?; - - // Direct - let res = client - .blobs() - .download_with_opts( - hash, - DownloadOptions { - format: BlobFormat::Raw, - nodes: vec![node_id.into()], - tag: SetTagOption::Auto, - mode: DownloadMode::Direct, - }, - ) - .await? - .await?; - - assert_eq!(res.local_size, size); - assert_eq!(res.downloaded_size, 0); - - // Queued - let res = client - .blobs() - .download_with_opts( - hash, - DownloadOptions { - format: BlobFormat::Raw, - nodes: vec![node_id.into()], - tag: SetTagOption::Auto, - mode: DownloadMode::Queued, - }, - ) - .await? - .await?; - - assert_eq!(res.local_size, size); - assert_eq!(res.downloaded_size, 0); - - Ok(()) - } - - /// Download a missing blob from oneself - #[tokio::test] - async fn test_blob_get_self_missing() -> TestResult<()> { - let _guard = iroh_test::logging::setup(); - - let node = crate::node::Node::memory().spawn().await?; - let node_id = node.node_id(); - let client = node.client(); - - let hash = Hash::from_bytes([0u8; 32]); - - // Direct - let res = client - .blobs() - .download_with_opts( - hash, - DownloadOptions { - format: BlobFormat::Raw, - nodes: vec![node_id.into()], - tag: SetTagOption::Auto, - mode: DownloadMode::Direct, - }, - ) - .await? - .await; - assert!(res.is_err()); - assert_eq!( - res.err().unwrap().to_string().as_str(), - "No nodes to download from provided" - ); - - // Queued - let res = client - .blobs() - .download_with_opts( - hash, - DownloadOptions { - format: BlobFormat::Raw, - nodes: vec![node_id.into()], - tag: SetTagOption::Auto, - mode: DownloadMode::Queued, - }, - ) - .await? - .await; - assert!(res.is_err()); - assert_eq!( - res.err().unwrap().to_string().as_str(), - "No provider nodes found" - ); - - Ok(()) - } - - /// Download a existing collection. Check that things succeed and no download is performed. - #[tokio::test] - async fn test_blob_get_existing_collection() -> TestResult<()> { - let _guard = iroh_test::logging::setup(); - - let node = crate::node::Node::memory().spawn().await?; - // We use a nonexisting node id because we just want to check that this succeeds without - // hitting the network. - let node_id = NodeId::from_bytes(&[0u8; 32])?; - let client = node.client(); - - let mut collection = Collection::default(); - let mut tags = Vec::new(); - let mut size = 0; - for value in ["iroh", "is", "cool"] { - let import_outcome = client.blobs().add_bytes(value).await.context("add bytes")?; - collection.push(value.to_string(), import_outcome.hash); - tags.push(import_outcome.tag); - size += import_outcome.size; - } - - let (hash, _tag) = client - .blobs() - .create_collection(collection, SetTagOption::Auto, tags) - .await?; - - // load the hashseq and collection header manually to calculate our expected size - let hashseq_bytes = client.blobs().read_to_bytes(hash).await?; - size += hashseq_bytes.len() as u64; - let hashseq = HashSeq::try_from(hashseq_bytes)?; - let collection_header_bytes = client - .blobs() - .read_to_bytes(hashseq.into_iter().next().expect("header to exist")) - .await?; - size += collection_header_bytes.len() as u64; - - // Direct - let res = client - .blobs() - .download_with_opts( - hash, - DownloadOptions { - format: BlobFormat::HashSeq, - nodes: vec![node_id.into()], - tag: SetTagOption::Auto, - mode: DownloadMode::Direct, - }, - ) - .await? - .await - .context("direct (download)")?; - - assert_eq!(res.local_size, size); - assert_eq!(res.downloaded_size, 0); - - // Queued - let res = client - .blobs() - .download_with_opts( - hash, - DownloadOptions { - format: BlobFormat::HashSeq, - nodes: vec![node_id.into()], - tag: SetTagOption::Auto, - mode: DownloadMode::Queued, - }, - ) - .await? - .await - .context("queued")?; - - assert_eq!(res.local_size, size); - assert_eq!(res.downloaded_size, 0); - - Ok(()) - } - - #[tokio::test] - #[cfg_attr(target_os = "windows", ignore = "flaky")] - async fn test_blob_delete_mem() -> Result<()> { - let _guard = iroh_test::logging::setup(); - - let node = crate::node::Node::memory().spawn().await?; - - let res = node.blobs().add_bytes(&b"hello world"[..]).await?; - - let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?; - assert_eq!(hashes.len(), 1); - assert_eq!(hashes[0].hash, res.hash); - - // delete - node.blobs().delete_blob(res.hash).await?; - - let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?; - assert!(hashes.is_empty()); - - Ok(()) - } - - #[tokio::test] - async fn test_blob_delete_fs() -> Result<()> { - let _guard = iroh_test::logging::setup(); - - let dir = tempfile::tempdir()?; - let node = crate::node::Node::persistent(dir.path()) - .await? - .spawn() - .await?; - - let res = node.blobs().add_bytes(&b"hello world"[..]).await?; - - let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?; - assert_eq!(hashes.len(), 1); - assert_eq!(hashes[0].hash, res.hash); - - // delete - node.blobs().delete_blob(res.hash).await?; - - let hashes: Vec<_> = node.blobs().list().await?.try_collect().await?; - assert!(hashes.is_empty()); - - Ok(()) - } -} diff --git a/iroh/src/client/blobs/batch.rs b/iroh/src/client/blobs/batch.rs deleted file mode 100644 index 9dad9797ba6..00000000000 --- a/iroh/src/client/blobs/batch.rs +++ /dev/null @@ -1,463 +0,0 @@ -use std::{ - io, - path::PathBuf, - sync::{Arc, Mutex}, -}; - -use anyhow::{anyhow, Context, Result}; -use bytes::Bytes; -use futures_buffered::BufferedStreamExt; -use futures_lite::StreamExt; -use futures_util::{sink::Buffer, FutureExt, SinkExt, Stream}; -use iroh_blobs::{ - format::collection::Collection, - net_protocol::BatchId, - provider::BatchAddPathProgress, - store::ImportMode, - util::{SetTagOption, TagDrop}, - BlobFormat, HashAndFormat, Tag, TempTag, -}; -use quic_rpc::client::UpdateSink; -use tokio::io::AsyncRead; -use tokio_util::io::ReaderStream; -use tracing::{debug, warn}; - -use super::WrapOption; -use crate::{ - client::{RpcClient, RpcConnection, RpcService}, - rpc_protocol::{ - blobs::{ - BatchAddPathRequest, BatchAddStreamRequest, BatchAddStreamResponse, - BatchAddStreamUpdate, BatchCreateTempTagRequest, BatchUpdate, - }, - tags::{self, SyncMode}, - }, -}; - -/// A scope in which blobs can be added. -#[derive(derive_more::Debug)] -struct BatchInner { - /// The id of the scope. - batch: BatchId, - /// The rpc client. - rpc: RpcClient, - /// The stream to send drop - #[debug(skip)] - updates: Mutex, BatchUpdate>>, -} - -/// A batch for write operations. -/// -/// This serves mostly as a scope for temporary tags. -/// -/// It is not a transaction, so things in a batch are not atomic. Also, there is -/// no isolation between batches. -#[derive(derive_more::Debug)] -pub struct Batch(Arc); - -impl TagDrop for BatchInner { - fn on_drop(&self, content: &HashAndFormat) { - let mut updates = self.updates.lock().unwrap(); - // make a spirited attempt to notify the server that we are dropping the content - // - // this will occasionally fail, but that's acceptable. The temp tags for the batch - // will be cleaned up as soon as the entire batch is dropped. - // - // E.g. a typical scenario is that you create a large array of temp tags, and then - // store them in a hash sequence and then drop the array. You will get many drops - // at the same time, and might get a send failure here. - // - // But that just means that the server will clean up the temp tags when the batch is - // dropped. - updates.feed(BatchUpdate::Drop(*content)).now_or_never(); - updates.flush().now_or_never(); - } -} - -/// Options for adding a file as a blob -#[derive(Debug, Clone, Copy, Default)] -pub struct AddFileOpts { - /// The import mode - pub import_mode: ImportMode, - /// The format of the blob - pub format: BlobFormat, -} - -/// Options for adding a directory as a collection -#[derive(Debug, Clone)] -pub struct AddDirOpts { - /// The import mode - pub import_mode: ImportMode, - /// Whether to preserve the directory name - pub wrap: WrapOption, - /// Io parallelism - pub io_parallelism: usize, -} - -impl Default for AddDirOpts { - fn default() -> Self { - Self { - import_mode: ImportMode::TryReference, - wrap: WrapOption::NoWrap, - io_parallelism: 4, - } - } -} - -/// Options for adding a directory as a collection -#[derive(Debug, Clone)] -pub struct AddReaderOpts { - /// The format of the blob - pub format: BlobFormat, - /// Size of the chunks to send - pub chunk_size: usize, -} - -impl Default for AddReaderOpts { - fn default() -> Self { - Self { - format: BlobFormat::Raw, - chunk_size: 1024 * 64, - } - } -} - -impl Batch { - pub(super) fn new( - batch: BatchId, - rpc: RpcClient, - updates: UpdateSink, - buffer_size: usize, - ) -> Self { - let updates = updates.buffer(buffer_size); - Self(Arc::new(BatchInner { - batch, - rpc, - updates: updates.into(), - })) - } - - /// Write a blob by passing bytes. - pub async fn add_bytes(&self, bytes: impl Into) -> Result { - self.add_bytes_with_opts(bytes, Default::default()).await - } - - /// Import a blob from a filesystem path, using the default options. - /// - /// For more control, use [`Self::add_file_with_opts`]. - pub async fn add_file(&self, path: PathBuf) -> Result<(TempTag, u64)> { - self.add_file_with_opts(path, AddFileOpts::default()).await - } - - /// Add a directory as a hashseq in iroh collection format - pub async fn add_dir(&self, root: PathBuf) -> Result { - self.add_dir_with_opts(root, Default::default()).await - } - - /// Write a blob by passing an async reader. - /// - /// This will consume the stream in 64KB chunks, and use a format of [BlobFormat::Raw]. - /// - /// For more options, see [`Self::add_reader_with_opts`]. - pub async fn add_reader( - &self, - reader: impl AsyncRead + Unpin + Send + 'static, - ) -> anyhow::Result { - self.add_reader_with_opts(reader, Default::default()).await - } - - /// Write a blob by passing a stream of bytes. - pub async fn add_stream( - &self, - input: impl Stream> + Send + Unpin + 'static, - ) -> Result { - self.add_stream_with_opts(input, Default::default()).await - } - - /// Creates a temp tag to protect some content (blob or hashseq) from being deleted. - /// - /// This is a lower-level API. The other functions in [`Batch`] already create [`TempTag`]s automatically. - /// - /// [`TempTag`]s allow you to protect some data from deletion while a download is ongoing, - /// even if you don't want to protect it permanently. - pub async fn temp_tag(&self, content: HashAndFormat) -> Result { - // Notify the server that we want one temp tag for the given content - self.0 - .rpc - .rpc(BatchCreateTempTagRequest { - batch: self.0.batch, - content, - }) - .await??; - // Only after success of the above call, we can create the corresponding local temp tag - Ok(self.local_temp_tag(content, None)) - } - - /// Write a blob by passing an async reader. - /// - /// This consumes the stream in chunks using `opts.chunk_size`. A good default is 64KB. - pub async fn add_reader_with_opts( - &self, - reader: impl AsyncRead + Unpin + Send + 'static, - opts: AddReaderOpts, - ) -> anyhow::Result { - let AddReaderOpts { format, chunk_size } = opts; - let input = ReaderStream::with_capacity(reader, chunk_size); - self.add_stream_with_opts(input, format).await - } - - /// Write a blob by passing bytes. - pub async fn add_bytes_with_opts( - &self, - bytes: impl Into, - format: BlobFormat, - ) -> Result { - let input = futures_lite::stream::once(Ok(bytes.into())); - self.add_stream_with_opts(input, format).await - } - - /// Import a blob from a filesystem path. - /// - /// `path` should be an absolute path valid for the file system on which - /// the node runs, which refers to a file. - /// - /// If you use [`ImportMode::TryReference`], Iroh will assume that the data will not - /// change and will share it in place without copying to the Iroh data directory - /// if appropriate. However, for tiny files, Iroh will copy the data. - /// - /// If you use [`ImportMode::Copy`], Iroh will always copy the data. - /// - /// Will return a temp tag for the added blob, as well as the size of the file. - pub async fn add_file_with_opts( - &self, - path: PathBuf, - opts: AddFileOpts, - ) -> Result<(TempTag, u64)> { - let AddFileOpts { - import_mode, - format, - } = opts; - anyhow::ensure!( - path.is_absolute(), - "Path must be absolute, but got: {:?}", - path - ); - anyhow::ensure!(path.is_file(), "Path does not refer to a file: {:?}", path); - let mut stream = self - .0 - .rpc - .server_streaming(BatchAddPathRequest { - path, - import_mode, - format, - batch: self.0.batch, - }) - .await?; - let mut res_hash = None; - let mut res_size = None; - while let Some(item) = stream.next().await { - match item?.0 { - BatchAddPathProgress::Abort(cause) => { - Err(cause)?; - } - BatchAddPathProgress::Done { hash } => { - res_hash = Some(hash); - } - BatchAddPathProgress::Found { size } => { - res_size = Some(size); - } - _ => {} - } - } - let hash = res_hash.context("Missing hash")?; - let size = res_size.context("Missing size")?; - Ok(( - self.local_temp_tag(HashAndFormat { hash, format }, Some(size)), - size, - )) - } - - /// Add a directory as a hashseq in iroh collection format - /// - /// This can also be used to add a single file as a collection, if - /// wrap is set to [WrapOption::Wrap]. - /// - /// However, if you want to add a single file as a raw blob, use add_file instead. - pub async fn add_dir_with_opts(&self, root: PathBuf, opts: AddDirOpts) -> Result { - let AddDirOpts { - import_mode, - wrap, - io_parallelism, - } = opts; - anyhow::ensure!(root.is_absolute(), "Path must be absolute"); - - // let (send, recv) = flume::bounded(32); - // let import_progress = FlumeProgressSender::new(send); - - // import all files below root recursively - let data_sources = crate::util::fs::scan_path(root, wrap)?; - let opts = AddFileOpts { - import_mode, - format: BlobFormat::Raw, - }; - let result: Vec<_> = futures_lite::stream::iter(data_sources) - .map(|source| { - // let import_progress = import_progress.clone(); - async move { - let name = source.name().to_string(); - let (tag, size) = self - .add_file_with_opts(source.path().to_owned(), opts) - .await?; - let hash = *tag.hash(); - anyhow::Ok((name, hash, size, tag)) - } - }) - .buffered_ordered(io_parallelism) - .try_collect() - .await?; - - // create a collection - let (collection, child_tags): (Collection, Vec<_>) = result - .into_iter() - .map(|(name, hash, _, tag)| ((name, hash), tag)) - .unzip(); - - let tag = self.add_collection(collection).await?; - drop(child_tags); - Ok(tag) - } - - /// Write a blob by passing a stream of bytes. - /// - /// For convenient interop with common sources of data, this function takes a stream of `io::Result`. - /// If you have raw bytes, you need to wrap them in `io::Result::Ok`. - pub async fn add_stream_with_opts( - &self, - mut input: impl Stream> + Send + Unpin + 'static, - format: BlobFormat, - ) -> Result { - let (mut sink, mut stream) = self - .0 - .rpc - .bidi(BatchAddStreamRequest { - batch: self.0.batch, - format, - }) - .await?; - let mut size = 0u64; - while let Some(item) = input.next().await { - match item { - Ok(chunk) => { - size += chunk.len() as u64; - sink.send(BatchAddStreamUpdate::Chunk(chunk)) - .await - .map_err(|err| anyhow!("Failed to send input stream to remote: {err:?}"))?; - } - Err(err) => { - warn!("Abort send, reason: failed to read from source stream: {err:?}"); - sink.send(BatchAddStreamUpdate::Abort) - .await - .map_err(|err| anyhow!("Failed to send input stream to remote: {err:?}"))?; - break; - } - } - } - // this is needed for the remote to notice that the stream is closed - drop(sink); - let mut res = None; - while let Some(item) = stream.next().await { - match item? { - BatchAddStreamResponse::Abort(cause) => { - Err(cause)?; - } - BatchAddStreamResponse::Result { hash } => { - res = Some(hash); - } - _ => {} - } - } - let hash = res.context("Missing answer")?; - Ok(self.local_temp_tag(HashAndFormat { hash, format }, Some(size))) - } - - /// Add a collection. - /// - /// This is a convenience function that converts the collection into two blobs - /// (the metadata and the hash sequence) and adds them, returning a temp tag for - /// the hash sequence. - /// - /// Note that this does not guarantee that the data that the collection refers to - /// actually exists. It will just create 2 blobs, the metadata and the hash sequence - /// itself. - pub async fn add_collection(&self, collection: Collection) -> Result { - self.add_blob_seq(collection.to_blobs()).await - } - - /// Add a sequence of blobs, where the last is a hash sequence. - /// - /// It is a common pattern in iroh to have a hash sequence with one or more - /// blobs of metadata, and the remaining blobs being the actual data. E.g. - /// a collection is a hash sequence where the first child is the metadata. - pub async fn add_blob_seq(&self, iter: impl Iterator) -> Result { - let mut blobs = iter.peekable(); - // put the tags somewhere - let mut tags = vec![]; - loop { - let blob = blobs.next().context("Failed to get next blob")?; - if blobs.peek().is_none() { - return self.add_bytes_with_opts(blob, BlobFormat::HashSeq).await; - } else { - tags.push(self.add_bytes(blob).await?); - } - } - } - - /// Upgrades a temp tag to a persistent tag. - pub async fn persist(&self, tt: TempTag) -> Result { - let tag = self - .0 - .rpc - .rpc(tags::CreateRequest { - value: tt.hash_and_format(), - batch: Some(self.0.batch), - sync: SyncMode::Full, - }) - .await??; - Ok(tag) - } - - /// Upgrades a temp tag to a persistent tag with a specific name. - pub async fn persist_to(&self, tt: TempTag, tag: Tag) -> Result<()> { - self.0 - .rpc - .rpc(tags::SetRequest { - name: tag, - value: Some(tt.hash_and_format()), - batch: Some(self.0.batch), - sync: SyncMode::Full, - }) - .await??; - Ok(()) - } - - /// Upgrades a temp tag to a persistent tag with either a specific name or - /// an automatically generated name. - pub async fn persist_with_opts(&self, tt: TempTag, opts: SetTagOption) -> Result { - match opts { - SetTagOption::Auto => self.persist(tt).await, - SetTagOption::Named(tag) => { - self.persist_to(tt, tag.clone()).await?; - Ok(tag) - } - } - } - - /// Creates a temp tag for the given hash and format, without notifying the server. - /// - /// Caution: only do this for data for which you know the server side has created a temp tag. - fn local_temp_tag(&self, inner: HashAndFormat, _size: Option) -> TempTag { - let on_drop: Arc = self.0.clone(); - let on_drop = Some(Arc::downgrade(&on_drop)); - TempTag::new(inner, on_drop) - } -} diff --git a/iroh/src/client/docs.rs b/iroh/src/client/docs.rs index cdceaede8b6..a0c2237260f 100644 --- a/iroh/src/client/docs.rs +++ b/iroh/src/client/docs.rs @@ -475,14 +475,19 @@ impl Entry { /// /// You can pass either a [`Doc`] or the `Iroh` client by reference as `client`. pub async fn content_reader(&self, client: impl Into<&RpcClient>) -> Result { - blobs::Reader::from_rpc_read(client.into(), self.content_hash()).await + let client: RpcClient = client.into().clone(); + let client: quic_rpc::RpcClient = client.map(); + blobs::Reader::from_rpc_read(&client, self.content_hash()).await } /// Reads all content of an [`Entry`] into a buffer. /// /// You can pass either a [`Doc`] or the `Iroh` client by reference as `client`. pub async fn content_bytes(&self, client: impl Into<&RpcClient>) -> Result { - blobs::Reader::from_rpc_read(client.into(), self.content_hash()) + let client: RpcClient = client.into().clone(); + let client: quic_rpc::RpcClient = client.map(); + + blobs::Reader::from_rpc_read(&client, self.content_hash()) .await? .read_to_bytes() .await diff --git a/iroh/src/client/quic.rs b/iroh/src/client/quic.rs index a5ab2d1351e..f7634cd57df 100644 --- a/iroh/src/client/quic.rs +++ b/iroh/src/client/quic.rs @@ -8,7 +8,7 @@ use std::{ }; use anyhow::{bail, Context}; -use quic_rpc::transport::{boxed::Connection as BoxedConnection, quinn::QuinnConnection}; +use quic_rpc::transport::{boxed::BoxedConnector, quinn::QuinnConnector}; use super::{Iroh, RpcClient}; use crate::{node::RpcStatus, rpc_protocol::node::StatusRequest}; @@ -43,8 +43,8 @@ pub(crate) async fn connect_raw(addr: SocketAddr) -> anyhow::Result { let endpoint = create_quinn_client(bind_addr, vec![RPC_ALPN.to_vec()], false)?; let server_name = "localhost".to_string(); - let connection = QuinnConnection::new(endpoint, addr, server_name); - let connection = BoxedConnection::new(connection); + let connection = QuinnConnector::new(endpoint, addr, server_name); + let connection = BoxedConnector::new(connection); let client = RpcClient::new(connection); // Do a status request to check if the server is running. let _version = tokio::time::timeout(Duration::from_secs(1), client.rpc(StatusRequest)) diff --git a/iroh/src/client/tags.rs b/iroh/src/client/tags.rs deleted file mode 100644 index 1b0accf03aa..00000000000 --- a/iroh/src/client/tags.rs +++ /dev/null @@ -1,60 +0,0 @@ -//! API for tag management. -//! -//! The purpose of tags is to mark information as important to prevent it -//! from being garbage-collected (if the garbage collector is turned on). -//! Currently this is used for blobs. -//! -//! The main entry point is the [`Client`]. -//! -//! You obtain a [`Client`] via [`Iroh::tags()`](crate::client::Iroh::tags). -//! -//! [`Client::list`] can be used to list all tags. -//! [`Client::list_hash_seq`] can be used to list all tags with a hash_seq format. -//! -//! [`Client::delete`] can be used to delete a tag. -use anyhow::Result; -use futures_lite::{Stream, StreamExt}; -use iroh_blobs::{BlobFormat, Hash, Tag}; -use ref_cast::RefCast; -use serde::{Deserialize, Serialize}; - -use super::RpcClient; -use crate::rpc_protocol::tags::{DeleteRequest, ListRequest}; - -/// Iroh tags client. -#[derive(Debug, Clone, RefCast)] -#[repr(transparent)] -pub struct Client { - pub(super) rpc: RpcClient, -} - -impl Client { - /// Lists all tags. - pub async fn list(&self) -> Result>> { - let stream = self.rpc.server_streaming(ListRequest::all()).await?; - Ok(stream.map(|res| res.map_err(anyhow::Error::from))) - } - - /// Lists all tags with a hash_seq format. - pub async fn list_hash_seq(&self) -> Result>> { - let stream = self.rpc.server_streaming(ListRequest::hash_seq()).await?; - Ok(stream.map(|res| res.map_err(anyhow::Error::from))) - } - - /// Deletes a tag. - pub async fn delete(&self, name: Tag) -> Result<()> { - self.rpc.rpc(DeleteRequest { name }).await??; - Ok(()) - } -} - -/// Information about a tag. -#[derive(Debug, Serialize, Deserialize)] -pub struct TagInfo { - /// Name of the tag - pub name: Tag, - /// Format of the data - pub format: BlobFormat, - /// Hash of the data - pub hash: Hash, -} diff --git a/iroh/src/node.rs b/iroh/src/node.rs index bd2df139442..e04925fa280 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -60,7 +60,7 @@ use iroh_net::{ AddrInfo, Endpoint, NodeAddr, }; use iroh_router::{ProtocolHandler, Router}; -use quic_rpc::{transport::ServerEndpoint as _, RpcServer}; +use quic_rpc::{transport::Listener as _, RpcServer}; use tokio::task::{JoinError, JoinSet}; use tokio_util::{sync::CancellationToken, task::AbortOnDropHandle}; use tracing::{debug, error, info, info_span, trace, warn, Instrument}; @@ -87,7 +87,7 @@ const SAVE_NODES_INTERVAL: Duration = Duration::from_secs(30); /// The quic-rpc server endpoint for the iroh node. /// /// We use a boxed endpoint here to allow having a concrete type for the server endpoint. -pub type IrohServerEndpoint = quic_rpc::transport::boxed::ServerEndpoint< +pub type IrohServerEndpoint = quic_rpc::transport::boxed::BoxedListener< crate::rpc_protocol::Request, crate::rpc_protocol::Response, >; @@ -497,7 +497,7 @@ fn node_address_for_storage(info: RemoteInfo) -> Option { mod tests { use anyhow::{bail, Context}; use bytes::Bytes; - use iroh_base::node_addr::AddrInfoOptions; + use iroh_base::{node_addr::AddrInfoOptions, ticket::BlobTicket}; use iroh_blobs::{provider::AddProgress, util::SetTagOption, BlobFormat}; use iroh_net::{key::SecretKey, relay::RelayMode, test_utils::DnsPkarrServer, NodeAddr}; @@ -518,11 +518,9 @@ mod tests { .hash; let _drop_guard = node.cancel_token().drop_guard(); - let ticket = node - .blobs() - .share(hash, BlobFormat::Raw, AddrInfoOptions::RelayAndAddresses) - .await - .unwrap(); + let mut addr = node.net().node_addr().await.unwrap(); + addr.apply_options(AddrInfoOptions::RelayAndAddresses); + let ticket = BlobTicket::new(addr, hash, BlobFormat::Raw).unwrap(); println!("addrs: {:?}", ticket.node_addr().info); assert!(!ticket.node_addr().info.direct_addresses.is_empty()); } diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index f232ea7af61..e6d380ed038 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -31,7 +31,7 @@ use iroh_net::{ Endpoint, }; use iroh_router::{ProtocolHandler, RouterBuilder}; -use quic_rpc::transport::{boxed::BoxableServerEndpoint, quinn::QuinnServerEndpoint}; +use quic_rpc::transport::{boxed::BoxableListener, quinn::QuinnListener}; use serde::{Deserialize, Serialize}; use tokio::task::JoinError; use tokio_util::{sync::CancellationToken, task::AbortOnDropHandle}; @@ -41,7 +41,6 @@ use super::{rpc_status::RpcStatus, IrohServerEndpoint, JoinErrToStr, Node, NodeI use crate::{ client::RPC_ALPN, node::nodes_storage::load_node_addrs, - rpc_protocol::RpcService, util::{fs::load_secret_key, path::IrohPaths}, }; @@ -222,13 +221,12 @@ impl From> for DiscoveryConfig { #[derive(Debug, Default)] struct DummyServerEndpoint; -impl BoxableServerEndpoint +impl BoxableListener for DummyServerEndpoint { fn clone_box( &self, - ) -> Box> - { + ) -> Box> { Box::new(DummyServerEndpoint) } @@ -247,7 +245,7 @@ impl BoxableServerEndpoint IrohServerEndpoint { - quic_rpc::transport::boxed::ServerEndpoint::new(DummyServerEndpoint) + quic_rpc::transport::boxed::BoxedListener::new(DummyServerEndpoint) } impl Default for Builder { @@ -395,7 +393,7 @@ where let (ep, actual_rpc_port) = make_rpc_endpoint(&self.secret_key, rpc_addr)?; rpc_addr.set_port(actual_rpc_port); - let ep = quic_rpc::transport::boxed::ServerEndpoint::new(ep); + let ep = quic_rpc::transport::boxed::BoxedListener::new(ep); if let StorageConfig::Persistent(ref root) = self.storage { // store rpc endpoint RpcStatus::store(root, actual_rpc_port).await?; @@ -691,12 +689,11 @@ where .await?; // Initialize the internal RPC connection. - let (internal_rpc, controller) = - quic_rpc::transport::flume::service_connection::(32); - let internal_rpc = quic_rpc::transport::boxed::ServerEndpoint::new(internal_rpc); + let (internal_rpc, controller) = quic_rpc::transport::flume::channel(32); + let internal_rpc = quic_rpc::transport::boxed::BoxedListener::new(internal_rpc); // box the controller. Boxing has a special case for the flume channel that avoids allocations, // so this has zero overhead. - let controller = quic_rpc::transport::boxed::Connection::new(controller); + let controller = quic_rpc::transport::boxed::BoxedConnector::new(controller); let client = crate::client::Iroh::new(quic_rpc::RpcClient::new(controller.clone())); let inner = Arc::new(NodeInner { @@ -845,6 +842,7 @@ impl ProtocolBuilder { self.local_pool_handle().clone(), blob_events, downloader, + self.endpoint().clone(), ); self = self.accept(iroh_blobs::protocol::ALPN.to_vec(), Arc::new(blobs_proto)); @@ -951,7 +949,7 @@ fn make_rpc_endpoint( secret_key: &SecretKey, mut rpc_addr: SocketAddr, ) -> Result<( - QuinnServerEndpoint, + QuinnListener, u16, )> { let mut transport_config = quinn::TransportConfig::default(); @@ -984,7 +982,7 @@ fn make_rpc_endpoint( }; let actual_rpc_port = rpc_quinn_endpoint.local_addr()?.port(); - let rpc_endpoint = QuinnServerEndpoint::new(rpc_quinn_endpoint)?; + let rpc_endpoint = QuinnListener::new(rpc_quinn_endpoint)?; Ok((rpc_endpoint, actual_rpc_port)) } diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index a6dc9f3a2b1..d18b88a0248 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -1,35 +1,23 @@ use std::{ fmt::Debug, - io, sync::{Arc, Mutex}, time::Duration, }; use anyhow::{anyhow, Result}; -use futures_buffered::BufferedStreamExt; use futures_lite::{Stream, StreamExt}; -use futures_util::FutureExt; -use genawaiter::sync::{Co, Gen}; use iroh_blobs::{ export::ExportProgress, - format::collection::Collection, - get::db::DownloadProgress, - net_protocol::{BlobDownloadRequest, Blobs as BlobsProtocol}, - provider::{AddProgress, BatchAddPathProgress}, - store::{ - ConsistencyCheckProgress, ExportFormat, ImportProgress, MapEntry, Store as BaoStore, - ValidateProgress, - }, + net_protocol::Blobs as BlobsProtocol, + store::{ExportFormat, ImportProgress, Store as BaoStore}, util::{ local_pool::LocalPoolHandle, progress::{AsyncChannelProgressSender, ProgressSender}, - SetTagOption, }, - BlobFormat, HashAndFormat, Tag, + BlobFormat, HashAndFormat, }; use iroh_docs::{engine::Engine, net::DOCS_ALPN}; use iroh_gossip::net::{Gossip, GOSSIP_ALPN}; -use iroh_io::AsyncSliceReader; use iroh_net::{relay::RelayUrl, NodeAddr, NodeId}; use iroh_router::Router; use quic_rpc::server::{RpcChannel, RpcServerError}; @@ -39,23 +27,10 @@ use tracing::{debug, info, warn}; use super::IrohServerEndpoint; use crate::{ - client::{ - blobs::{BlobInfo, BlobStatus, IncompleteBlobInfo, WrapOption}, - tags::TagInfo, - NodeStatus, - }, + client::NodeStatus, node::NodeInner, rpc_protocol::{ authors, - blobs::{ - self, AddPathRequest, AddPathResponse, AddStreamRequest, AddStreamResponse, - AddStreamUpdate, BatchAddPathRequest, BatchAddPathResponse, BatchAddStreamRequest, - BatchAddStreamResponse, BatchAddStreamUpdate, BatchCreateRequest, BatchCreateResponse, - BatchCreateTempTagRequest, BatchUpdate, BlobStatusRequest, BlobStatusResponse, - ConsistencyCheckRequest, CreateCollectionRequest, CreateCollectionResponse, - DeleteRequest, DownloadResponse, ExportRequest, ExportResponse, ListIncompleteRequest, - ListRequest, ReadAtRequest, ReadAtResponse, ValidateRequest, - }, docs::{ ExportFileRequest, ExportFileResponse, ImportFileRequest, ImportFileResponse, Request as DocsRequest, SetHashRequest, @@ -66,7 +41,6 @@ use crate::{ WatchResponse, }, node::{self, ShutdownRequest, StatsRequest, StatsResponse, StatusRequest}, - tags::{self, DeleteRequest as TagDeleteRequest, ListRequest as ListTagsRequest, SyncMode}, Request, RpcService, }, }; @@ -74,11 +48,6 @@ use crate::{ mod docs; const HEALTH_POLL_WAIT: Duration = Duration::from_secs(1); -/// Chunk size for getting blobs over RPC -const RPC_BLOB_GET_CHUNK_SIZE: usize = 1024 * 64; -/// Channel cap for getting blobs over RPC -const RPC_BLOB_GET_CHANNEL_CAP: usize = 2; - pub(crate) type RpcError = serde_error::Error; pub(crate) type RpcResult = Result; @@ -186,60 +155,15 @@ impl Handler { } } - async fn handle_blobs_request( + async fn handle_blobs_and_tags_request( self, - msg: blobs::Request, - chan: RpcChannel, + msg: iroh_blobs::rpc::proto::Request, + chan: RpcChannel, ) -> Result<(), RpcServerError> { - use blobs::Request::*; - debug!("handling blob request: {msg}"); - match msg { - List(msg) => chan.server_streaming(msg, self, Self::blob_list).await, - ListIncomplete(msg) => { - chan.server_streaming(msg, self, Self::blob_list_incomplete) - .await - } - CreateCollection(msg) => chan.rpc(msg, self, Self::create_collection).await, - Delete(msg) => chan.rpc(msg, self, Self::blob_delete_blob).await, - AddPath(msg) => { - chan.server_streaming(msg, self, Self::blob_add_from_path) - .await - } - Download(msg) => chan.server_streaming(msg, self, Self::blob_download).await, - Export(msg) => chan.server_streaming(msg, self, Self::blob_export).await, - Validate(msg) => chan.server_streaming(msg, self, Self::blob_validate).await, - Fsck(msg) => { - chan.server_streaming(msg, self, Self::blob_consistency_check) - .await - } - ReadAt(msg) => chan.server_streaming(msg, self, Self::blob_read_at).await, - AddStream(msg) => chan.bidi_streaming(msg, self, Self::blob_add_stream).await, - AddStreamUpdate(_msg) => Err(RpcServerError::UnexpectedUpdateMessage), - BlobStatus(msg) => chan.rpc(msg, self, Self::blob_status).await, - BatchCreate(msg) => chan.bidi_streaming(msg, self, Self::batch_create).await, - BatchUpdate(_) => Err(RpcServerError::UnexpectedStartMessage), - BatchAddStream(msg) => chan.bidi_streaming(msg, self, Self::batch_add_stream).await, - BatchAddStreamUpdate(_) => Err(RpcServerError::UnexpectedStartMessage), - BatchAddPath(msg) => { - chan.server_streaming(msg, self, Self::batch_add_from_path) - .await - } - BatchCreateTempTag(msg) => chan.rpc(msg, self, Self::batch_create_temp_tag).await, - } - } - - async fn handle_tags_request( - self, - msg: tags::Request, - chan: RpcChannel, - ) -> Result<(), RpcServerError> { - use tags::Request::*; - match msg { - ListTags(msg) => chan.server_streaming(msg, self, Self::blob_list_tags).await, - DeleteTag(msg) => chan.rpc(msg, self, Self::blob_delete_tag).await, - Create(msg) => chan.rpc(msg, self, Self::tags_create).await, - Set(msg) => chan.rpc(msg, self, Self::tags_set).await, - } + self.blobs() + .handle_rpc_request(msg, chan) + .await + .map_err(|e| e.errors_into()) } async fn handle_gossip_request( @@ -252,7 +176,10 @@ impl Handler { .get_protocol::(GOSSIP_ALPN) .expect("missing gossip"); let chan = chan.map::(); - gossip.handle_rpc_request(msg, chan).await + gossip + .handle_rpc_request(msg, chan) + .await + .map_err(|e| e.errors_into()) } async fn handle_authors_request( @@ -322,193 +249,16 @@ impl Handler { match msg { Net(msg) => self.handle_net_request(msg, chan).await, Node(msg) => self.handle_node_request(msg, chan).await, - Blobs(msg) => self.handle_blobs_request(msg, chan).await, - Tags(msg) => self.handle_tags_request(msg, chan).await, + BlobsAndTags(msg) => { + self.handle_blobs_and_tags_request(msg, chan.map().boxed()) + .await + } Authors(msg) => self.handle_authors_request(msg, chan).await, Docs(msg) => self.handle_docs_request(msg, chan).await, Gossip(msg) => self.handle_gossip_request(msg, chan).await, } } - fn local_pool_handle(&self) -> LocalPoolHandle { - self.inner.local_pool_handle.clone() - } - - async fn blob_status(self, msg: BlobStatusRequest) -> RpcResult { - let blobs = self.blobs(); - let entry = blobs - .store() - .get(&msg.hash) - .await - .map_err(|e| RpcError::new(&e))?; - Ok(BlobStatusResponse(match entry { - Some(entry) => { - if entry.is_complete() { - BlobStatus::Complete { - size: entry.size().value(), - } - } else { - BlobStatus::Partial { size: entry.size() } - } - } - None => BlobStatus::NotFound, - })) - } - - async fn blob_list_impl(self, co: &Co>) -> io::Result<()> { - use bao_tree::io::fsm::Outboard; - - let blobs = self.blobs(); - let db = blobs.store(); - for blob in db.blobs().await? { - let blob = blob?; - let Some(entry) = db.get(&blob).await? else { - continue; - }; - let hash = entry.hash(); - let size = entry.outboard().await?.tree().size(); - let path = "".to_owned(); - co.yield_(Ok(BlobInfo { hash, size, path })).await; - } - Ok(()) - } - - async fn blob_list_incomplete_impl( - self, - co: &Co>, - ) -> io::Result<()> { - let blobs = self.blobs(); - let db = blobs.store(); - for hash in db.partial_blobs().await? { - let hash = hash?; - let Ok(Some(entry)) = db.get_mut(&hash).await else { - continue; - }; - if entry.is_complete() { - continue; - } - let size = 0; - let expected_size = entry.size().value(); - co.yield_(Ok(IncompleteBlobInfo { - hash, - size, - expected_size, - })) - .await; - } - Ok(()) - } - - fn blob_list( - self, - _msg: ListRequest, - ) -> impl Stream> + Send + 'static { - Gen::new(|co| async move { - if let Err(e) = self.blob_list_impl(&co).await { - co.yield_(Err(RpcError::new(&e))).await; - } - }) - } - - fn blob_list_incomplete( - self, - _msg: ListIncompleteRequest, - ) -> impl Stream> + Send + 'static { - Gen::new(move |co| async move { - if let Err(e) = self.blob_list_incomplete_impl(&co).await { - co.yield_(Err(RpcError::new(&e))).await; - } - }) - } - - async fn blob_delete_tag(self, msg: TagDeleteRequest) -> RpcResult<()> { - self.blobs_store() - .set_tag(msg.name, None) - .await - .map_err(|e| RpcError::new(&e))?; - Ok(()) - } - - async fn blob_delete_blob(self, msg: DeleteRequest) -> RpcResult<()> { - self.blobs_store() - .delete(vec![msg.hash]) - .await - .map_err(|e| RpcError::new(&e))?; - Ok(()) - } - - fn blob_list_tags(self, msg: ListTagsRequest) -> impl Stream + Send + 'static { - tracing::info!("blob_list_tags"); - let blobs = self.blobs(); - Gen::new(|co| async move { - let tags = blobs.store().tags().await.unwrap(); - #[allow(clippy::manual_flatten)] - for item in tags { - if let Ok((name, HashAndFormat { hash, format })) = item { - if (format.is_raw() && msg.raw) || (format.is_hash_seq() && msg.hash_seq) { - co.yield_(TagInfo { name, hash, format }).await; - } - } - } - }) - } - - /// Invoke validate on the database and stream out the result - fn blob_validate( - self, - msg: ValidateRequest, - ) -> impl Stream + Send + 'static { - let (tx, rx) = async_channel::bounded(1); - let tx2 = tx.clone(); - let blobs = self.blobs(); - tokio::task::spawn(async move { - if let Err(e) = blobs - .store() - .validate(msg.repair, AsyncChannelProgressSender::new(tx).boxed()) - .await - { - tx2.send(ValidateProgress::Abort(RpcError::new(&e))) - .await - .ok(); - } - }); - rx - } - - /// Invoke validate on the database and stream out the result - fn blob_consistency_check( - self, - msg: ConsistencyCheckRequest, - ) -> impl Stream + Send + 'static { - let (tx, rx) = async_channel::bounded(1); - let tx2 = tx.clone(); - let blobs = self.blobs(); - tokio::task::spawn(async move { - if let Err(e) = blobs - .store() - .consistency_check(msg.repair, AsyncChannelProgressSender::new(tx).boxed()) - .await - { - tx2.send(ConsistencyCheckProgress::Abort(RpcError::new(&e))) - .await - .ok(); - } - }); - rx - } - - fn blob_add_from_path(self, msg: AddPathRequest) -> impl Stream { - // provide a little buffer so that we don't slow down the sender - let (tx, rx) = async_channel::bounded(32); - let tx2 = tx.clone(); - self.local_pool_handle().spawn_detached(|| async move { - if let Err(e) = self.blob_add_from_path0(msg, tx).await { - tx2.send(AddProgress::Abort(RpcError::new(&*e))).await.ok(); - } - }); - rx.map(AddPathResponse) - } - fn doc_import_file(self, msg: ImportFileRequest) -> impl Stream { // provide a little buffer so that we don't slow down the sender let (tx, rx) = async_channel::bounded(32); @@ -640,174 +390,6 @@ impl Handler { Ok(()) } - fn blob_download(self, msg: BlobDownloadRequest) -> impl Stream { - let (sender, receiver) = async_channel::bounded(1024); - let endpoint = self.inner.endpoint.clone(); - let progress = AsyncChannelProgressSender::new(sender); - - let blobs_protocol = self - .router - .get_protocol::>(iroh_blobs::protocol::ALPN) - .expect("missing blobs"); - - self.local_pool_handle().spawn_detached(move || async move { - if let Err(err) = blobs_protocol - .download(endpoint, msg, progress.clone()) - .await - { - progress - .send(DownloadProgress::Abort(RpcError::new(&*err))) - .await - .ok(); - } - }); - - receiver.map(DownloadResponse) - } - - fn blob_export(self, msg: ExportRequest) -> impl Stream { - let (tx, rx) = async_channel::bounded(1024); - let progress = AsyncChannelProgressSender::new(tx); - self.local_pool_handle().spawn_detached(move || async move { - let res = iroh_blobs::export::export( - self.blobs().store(), - msg.hash, - msg.path, - msg.format, - msg.mode, - progress.clone(), - ) - .await; - match res { - Ok(()) => progress.send(ExportProgress::AllDone).await.ok(), - Err(err) => progress - .send(ExportProgress::Abort(RpcError::new(&*err))) - .await - .ok(), - }; - }); - rx.map(ExportResponse) - } - - async fn blob_add_from_path0( - self, - msg: AddPathRequest, - progress: async_channel::Sender, - ) -> anyhow::Result<()> { - use std::collections::BTreeMap; - - use iroh_blobs::store::ImportMode; - - let blobs = self.blobs(); - let progress = AsyncChannelProgressSender::new(progress); - let names = Arc::new(Mutex::new(BTreeMap::new())); - // convert import progress to provide progress - let import_progress = progress.clone().with_filter_map(move |x| match x { - ImportProgress::Found { id, name } => { - names.lock().unwrap().insert(id, name); - None - } - ImportProgress::Size { id, size } => { - let name = names.lock().unwrap().remove(&id)?; - Some(AddProgress::Found { id, name, size }) - } - ImportProgress::OutboardProgress { id, offset } => { - Some(AddProgress::Progress { id, offset }) - } - ImportProgress::OutboardDone { hash, id } => Some(AddProgress::Done { hash, id }), - _ => None, - }); - let AddPathRequest { - wrap, - path: root, - in_place, - tag, - } = msg; - // Check that the path is absolute and exists. - anyhow::ensure!(root.is_absolute(), "path must be absolute"); - anyhow::ensure!( - root.exists(), - "trying to add missing path: {}", - root.display() - ); - - let import_mode = match in_place { - true => ImportMode::TryReference, - false => ImportMode::Copy, - }; - - let create_collection = match wrap { - WrapOption::Wrap { .. } => true, - WrapOption::NoWrap => root.is_dir(), - }; - - let temp_tag = if create_collection { - // import all files below root recursively - let data_sources = crate::util::fs::scan_path(root, wrap)?; - let blobs = self.blobs(); - - const IO_PARALLELISM: usize = 4; - let result: Vec<_> = futures_lite::stream::iter(data_sources) - .map(|source| { - let import_progress = import_progress.clone(); - let blobs = blobs.clone(); - async move { - let name = source.name().to_string(); - let (tag, size) = blobs - .store() - .import_file( - source.path().to_owned(), - import_mode, - BlobFormat::Raw, - import_progress, - ) - .await?; - let hash = *tag.hash(); - io::Result::Ok((name, hash, size, tag)) - } - }) - .buffered_ordered(IO_PARALLELISM) - .try_collect() - .await?; - - // create a collection - let (collection, _child_tags): (Collection, Vec<_>) = result - .into_iter() - .map(|(name, hash, _, tag)| ((name, hash), tag)) - .unzip(); - - collection.store(blobs.store()).await? - } else { - // import a single file - let (tag, _size) = blobs - .store() - .import_file(root, import_mode, BlobFormat::Raw, import_progress) - .await?; - tag - }; - - let hash_and_format = temp_tag.inner(); - let HashAndFormat { hash, format } = *hash_and_format; - let tag = match tag { - SetTagOption::Named(tag) => { - blobs - .store() - .set_tag(tag.clone(), Some(*hash_and_format)) - .await?; - tag - } - SetTagOption::Auto => blobs.store().create_tag(*hash_and_format).await?, - }; - progress - .send(AddProgress::AllDone { - hash, - format, - tag: tag.clone(), - }) - .await?; - Ok(()) - } - #[allow(clippy::unused_async)] async fn node_stats(self, _req: StatsRequest) -> RpcResult { #[cfg(feature = "metrics")] @@ -871,48 +453,6 @@ impl Handler { } } - async fn tags_set(self, msg: tags::SetRequest) -> RpcResult<()> { - let blobs = self.blobs(); - blobs - .store() - .set_tag(msg.name, msg.value) - .await - .map_err(|e| RpcError::new(&e))?; - if let SyncMode::Full = msg.sync { - blobs.store().sync().await.map_err(|e| RpcError::new(&e))?; - } - if let Some(batch) = msg.batch { - if let Some(content) = msg.value.as_ref() { - blobs - .batches() - .await - .remove_one(batch, content) - .map_err(|e| RpcError::new(&*e))?; - } - } - Ok(()) - } - - async fn tags_create(self, msg: tags::CreateRequest) -> RpcResult { - let blobs = self.blobs(); - let tag = blobs - .store() - .create_tag(msg.value) - .await - .map_err(|e| RpcError::new(&e))?; - if let SyncMode::Full = msg.sync { - blobs.store().sync().await.map_err(|e| RpcError::new(&e))?; - } - if let Some(batch) = msg.batch { - blobs - .batches() - .await - .remove_one(batch, &msg.value) - .map_err(|e| RpcError::new(&*e))?; - } - Ok(tag) - } - fn node_watch(self, _: NodeWatchRequest) -> impl Stream { futures_lite::stream::unfold((), |()| async move { tokio::time::sleep(HEALTH_POLL_WAIT).await; @@ -925,298 +465,8 @@ impl Handler { }) } - async fn batch_create_temp_tag(self, msg: BatchCreateTempTagRequest) -> RpcResult<()> { - let blobs = self.blobs(); - let tag = blobs.store().temp_tag(msg.content); - blobs.batches().await.store(msg.batch, tag); - Ok(()) - } - - fn batch_add_stream( - self, - msg: BatchAddStreamRequest, - stream: impl Stream + Send + Unpin + 'static, - ) -> impl Stream { - let (tx, rx) = async_channel::bounded(32); - let this = self.clone(); - - self.local_pool_handle().spawn_detached(|| async move { - if let Err(err) = this.batch_add_stream0(msg, stream, tx.clone()).await { - tx.send(BatchAddStreamResponse::Abort(RpcError::new(&*err))) - .await - .ok(); - } - }); - rx - } - - fn batch_add_from_path( - self, - msg: BatchAddPathRequest, - ) -> impl Stream { - // provide a little buffer so that we don't slow down the sender - let (tx, rx) = async_channel::bounded(32); - let tx2 = tx.clone(); - self.local_pool_handle().spawn_detached(|| async move { - if let Err(e) = self.batch_add_from_path0(msg, tx).await { - tx2.send(BatchAddPathProgress::Abort(RpcError::new(&*e))) - .await - .ok(); - } - }); - rx.map(BatchAddPathResponse) - } - - async fn batch_add_stream0( - self, - msg: BatchAddStreamRequest, - stream: impl Stream + Send + Unpin + 'static, - progress: async_channel::Sender, - ) -> anyhow::Result<()> { - let blobs = self.blobs(); - let progress = AsyncChannelProgressSender::new(progress); - - let stream = stream.map(|item| match item { - BatchAddStreamUpdate::Chunk(chunk) => Ok(chunk), - BatchAddStreamUpdate::Abort => { - Err(io::Error::new(io::ErrorKind::Interrupted, "Remote abort")) - } - }); - - let import_progress = progress.clone().with_filter_map(move |x| match x { - ImportProgress::OutboardProgress { offset, .. } => { - Some(BatchAddStreamResponse::OutboardProgress { offset }) - } - _ => None, - }); - let (temp_tag, _len) = blobs - .store() - .import_stream(stream, msg.format, import_progress) - .await?; - let hash = temp_tag.inner().hash; - blobs.batches().await.store(msg.batch, temp_tag); - progress - .send(BatchAddStreamResponse::Result { hash }) - .await?; - Ok(()) - } - - async fn batch_add_from_path0( - self, - msg: BatchAddPathRequest, - progress: async_channel::Sender, - ) -> anyhow::Result<()> { - let progress = AsyncChannelProgressSender::new(progress); - // convert import progress to provide progress - let import_progress = progress.clone().with_filter_map(move |x| match x { - ImportProgress::Size { size, .. } => Some(BatchAddPathProgress::Found { size }), - ImportProgress::OutboardProgress { offset, .. } => { - Some(BatchAddPathProgress::Progress { offset }) - } - ImportProgress::OutboardDone { hash, .. } => Some(BatchAddPathProgress::Done { hash }), - _ => None, - }); - let BatchAddPathRequest { - path: root, - import_mode, - format, - batch, - } = msg; - // Check that the path is absolute and exists. - anyhow::ensure!(root.is_absolute(), "path must be absolute"); - anyhow::ensure!( - root.exists(), - "trying to add missing path: {}", - root.display() - ); - let blobs = self.blobs(); - let (tag, _) = blobs - .store() - .import_file(root, import_mode, format, import_progress) - .await?; - let hash = *tag.hash(); - blobs.batches().await.store(batch, tag); - - progress.send(BatchAddPathProgress::Done { hash }).await?; - Ok(()) - } - - fn blob_add_stream( - self, - msg: AddStreamRequest, - stream: impl Stream + Send + Unpin + 'static, - ) -> impl Stream { - let (tx, rx) = async_channel::bounded(32); - let this = self.clone(); - - self.local_pool_handle().spawn_detached(|| async move { - if let Err(err) = this.blob_add_stream0(msg, stream, tx.clone()).await { - tx.send(AddProgress::Abort(RpcError::new(&*err))).await.ok(); - } - }); - - rx.map(AddStreamResponse) - } - - async fn blob_add_stream0( - self, - msg: AddStreamRequest, - stream: impl Stream + Send + Unpin + 'static, - progress: async_channel::Sender, - ) -> anyhow::Result<()> { - let progress = AsyncChannelProgressSender::new(progress); - - let stream = stream.map(|item| match item { - AddStreamUpdate::Chunk(chunk) => Ok(chunk), - AddStreamUpdate::Abort => { - Err(io::Error::new(io::ErrorKind::Interrupted, "Remote abort")) - } - }); - - let name_cache = Arc::new(Mutex::new(None)); - let import_progress = progress.clone().with_filter_map(move |x| match x { - ImportProgress::Found { id: _, name } => { - let _ = name_cache.lock().unwrap().insert(name); - None - } - ImportProgress::Size { id, size } => { - let name = name_cache.lock().unwrap().take()?; - Some(AddProgress::Found { id, name, size }) - } - ImportProgress::OutboardProgress { id, offset } => { - Some(AddProgress::Progress { id, offset }) - } - ImportProgress::OutboardDone { hash, id } => Some(AddProgress::Done { hash, id }), - _ => None, - }); - let blobs = self.blobs(); - let (temp_tag, _len) = blobs - .store() - .import_stream(stream, BlobFormat::Raw, import_progress) - .await?; - let hash_and_format = *temp_tag.inner(); - let HashAndFormat { hash, format } = hash_and_format; - let tag = match msg.tag { - SetTagOption::Named(tag) => { - blobs - .store() - .set_tag(tag.clone(), Some(hash_and_format)) - .await?; - tag - } - SetTagOption::Auto => blobs.store().create_tag(hash_and_format).await?, - }; - progress - .send(AddProgress::AllDone { hash, tag, format }) - .await?; - Ok(()) - } - - fn blob_read_at( - self, - req: ReadAtRequest, - ) -> impl Stream> + Send + 'static { - let (tx, rx) = async_channel::bounded(RPC_BLOB_GET_CHANNEL_CAP); - let db = self.blobs_store(); - self.local_pool_handle().spawn_detached(move || async move { - if let Err(err) = read_loop(req, db, tx.clone(), RPC_BLOB_GET_CHUNK_SIZE).await { - tx.send(RpcResult::Err(RpcError::new(&*err))).await.ok(); - } - }); - - async fn read_loop( - req: ReadAtRequest, - db: D, - tx: async_channel::Sender>, - max_chunk_size: usize, - ) -> anyhow::Result<()> { - let entry = db.get(&req.hash).await?; - let entry = entry.ok_or_else(|| anyhow!("Blob not found"))?; - let size = entry.size(); - - anyhow::ensure!( - req.offset <= size.value(), - "requested offset is out of range: {} > {:?}", - req.offset, - size - ); - - let len: usize = req - .len - .as_result_len(size.value() - req.offset) - .try_into()?; - - anyhow::ensure!( - req.offset + len as u64 <= size.value(), - "requested range is out of bounds: offset: {}, len: {} > {:?}", - req.offset, - len, - size - ); - - tx.send(Ok(ReadAtResponse::Entry { - size, - is_complete: entry.is_complete(), - })) - .await?; - let mut reader = entry.data_reader().await?; - - let (num_chunks, chunk_size) = if len <= max_chunk_size { - (1, len) - } else { - let num_chunks = len / max_chunk_size + (len % max_chunk_size != 0) as usize; - (num_chunks, max_chunk_size) - }; - - let mut read = 0u64; - for i in 0..num_chunks { - let chunk_size = if i == num_chunks - 1 { - // last chunk might be smaller - len - read as usize - } else { - chunk_size - }; - let chunk = reader.read_at(req.offset + read, chunk_size).await?; - let chunk_len = chunk.len(); - if !chunk.is_empty() { - tx.send(Ok(ReadAtResponse::Data { chunk })).await?; - } - if chunk_len < chunk_size { - break; - } else { - read += chunk_len as u64; - } - } - Ok(()) - } - - rx - } - - fn batch_create( - self, - _: BatchCreateRequest, - mut updates: impl Stream + Send + Unpin + 'static, - ) -> impl Stream { - let blobs = self.blobs(); - async move { - let batch = blobs.batches().await.create(); - tokio::spawn(async move { - while let Some(item) = updates.next().await { - match item { - BatchUpdate::Drop(content) => { - // this can not fail, since we keep the batch alive. - // therefore it is safe to ignore the result. - let _ = blobs.batches().await.remove_one(batch, &content); - } - BatchUpdate::Ping => {} - } - } - blobs.batches().await.remove(batch); - }); - BatchCreateResponse::Id(batch) - } - .into_stream() + fn local_pool_handle(&self) -> LocalPoolHandle { + self.inner.local_pool_handle.clone() } fn remote_infos_iter( @@ -1253,51 +503,6 @@ impl Handler { .map_err(|e| RpcError::new(&*e))?; Ok(()) } - - async fn create_collection( - self, - req: CreateCollectionRequest, - ) -> RpcResult { - let CreateCollectionRequest { - collection, - tag, - tags_to_delete, - } = req; - - let blobs = self.blobs(); - - let temp_tag = collection - .store(blobs.store()) - .await - .map_err(|e| RpcError::new(&*e))?; - let hash_and_format = temp_tag.inner(); - let HashAndFormat { hash, .. } = *hash_and_format; - let tag = match tag { - SetTagOption::Named(tag) => { - blobs - .store() - .set_tag(tag.clone(), Some(*hash_and_format)) - .await - .map_err(|e| RpcError::new(&e))?; - tag - } - SetTagOption::Auto => blobs - .store() - .create_tag(*hash_and_format) - .await - .map_err(|e| RpcError::new(&e))?, - }; - - for tag in tags_to_delete { - blobs - .store() - .set_tag(tag, None) - .await - .map_err(|e| RpcError::new(&e))?; - } - - Ok(CreateCollectionResponse { hash, tag }) - } } fn docs_disabled() -> RpcError { diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index d062c6c4440..88b2b520a74 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -18,11 +18,9 @@ use serde::{Deserialize, Serialize}; pub mod authors; -pub mod blobs; pub mod docs; pub mod net; pub mod node; -pub mod tags; /// The RPC service for the iroh provider process. #[derive(Debug, Clone)] @@ -35,11 +33,10 @@ pub struct RpcService; pub enum Request { Node(node::Request), Net(net::Request), - Blobs(blobs::Request), Docs(docs::Request), - Tags(tags::Request), Authors(authors::Request), Gossip(iroh_gossip::RpcRequest), + BlobsAndTags(iroh_blobs::rpc::proto::Request), } /// The response enum, listing all possible responses. @@ -49,11 +46,10 @@ pub enum Request { pub enum Response { Node(node::Response), Net(net::Response), - Blobs(blobs::Response), - Tags(tags::Response), Docs(docs::Response), Authors(authors::Response), Gossip(iroh_gossip::RpcResponse), + BlobsAndTags(iroh_blobs::rpc::proto::Response), } impl quic_rpc::Service for RpcService { diff --git a/iroh/src/rpc_protocol/blobs.rs b/iroh/src/rpc_protocol/blobs.rs deleted file mode 100644 index b27c83809b9..00000000000 --- a/iroh/src/rpc_protocol/blobs.rs +++ /dev/null @@ -1,319 +0,0 @@ -use std::path::PathBuf; - -use bytes::Bytes; -use iroh_base::hash::Hash; -use iroh_blobs::{ - export::ExportProgress, - format::collection::Collection, - get::db::DownloadProgress, - net_protocol::{BatchId, BlobDownloadRequest}, - provider::{AddProgress, BatchAddPathProgress}, - store::{ - BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ImportMode, - ValidateProgress, - }, - util::SetTagOption, - BlobFormat, HashAndFormat, Tag, -}; -use nested_enum_utils::enum_conversions; -use quic_rpc_derive::rpc_requests; -use serde::{Deserialize, Serialize}; - -use super::RpcService; -use crate::{ - client::blobs::{BlobInfo, BlobStatus, IncompleteBlobInfo, ReadAtLen, WrapOption}, - node::{RpcError, RpcResult}, -}; - -#[allow(missing_docs)] -#[derive(strum::Display, Debug, Serialize, Deserialize)] -#[enum_conversions(super::Request)] -#[rpc_requests(RpcService)] -pub enum Request { - #[server_streaming(response = RpcResult)] - ReadAt(ReadAtRequest), - #[bidi_streaming(update = AddStreamUpdate, response = AddStreamResponse)] - AddStream(AddStreamRequest), - AddStreamUpdate(AddStreamUpdate), - #[server_streaming(response = AddPathResponse)] - AddPath(AddPathRequest), - #[server_streaming(response = DownloadResponse)] - Download(BlobDownloadRequest), - #[server_streaming(response = ExportResponse)] - Export(ExportRequest), - #[server_streaming(response = RpcResult)] - List(ListRequest), - #[server_streaming(response = RpcResult)] - ListIncomplete(ListIncompleteRequest), - #[rpc(response = RpcResult<()>)] - Delete(DeleteRequest), - #[server_streaming(response = ValidateProgress)] - Validate(ValidateRequest), - #[server_streaming(response = ConsistencyCheckProgress)] - Fsck(ConsistencyCheckRequest), - #[rpc(response = RpcResult)] - CreateCollection(CreateCollectionRequest), - #[rpc(response = RpcResult)] - BlobStatus(BlobStatusRequest), - - #[bidi_streaming(update = BatchUpdate, response = BatchCreateResponse)] - BatchCreate(BatchCreateRequest), - BatchUpdate(BatchUpdate), - #[bidi_streaming(update = BatchAddStreamUpdate, response = BatchAddStreamResponse)] - BatchAddStream(BatchAddStreamRequest), - BatchAddStreamUpdate(BatchAddStreamUpdate), - #[server_streaming(response = BatchAddPathResponse)] - BatchAddPath(BatchAddPathRequest), - #[rpc(response = RpcResult<()>)] - BatchCreateTempTag(BatchCreateTempTagRequest), -} - -#[allow(missing_docs)] -#[derive(strum::Display, Debug, Serialize, Deserialize)] -#[enum_conversions(super::Response)] -pub enum Response { - ReadAt(RpcResult), - AddStream(AddStreamResponse), - AddPath(AddPathResponse), - List(RpcResult), - ListIncomplete(RpcResult), - Download(DownloadResponse), - Fsck(ConsistencyCheckProgress), - Export(ExportResponse), - Validate(ValidateProgress), - CreateCollection(RpcResult), - BlobStatus(RpcResult), - BatchCreate(BatchCreateResponse), - BatchAddStream(BatchAddStreamResponse), - BatchAddPath(BatchAddPathResponse), -} - -/// A request to the node to provide the data at the given path -/// -/// Will produce a stream of [`AddProgress`] messages. -#[derive(Debug, Serialize, Deserialize)] -pub struct AddPathRequest { - /// The path to the data to provide. - /// - /// This should be an absolute path valid for the file system on which - /// the node runs. Usually the cli will run on the same machine as the - /// node, so this should be an absolute path on the cli machine. - pub path: PathBuf, - /// True if the provider can assume that the data will not change, so it - /// can be shared in place. - pub in_place: bool, - /// Tag to tag the data with. - pub tag: SetTagOption, - /// Whether to wrap the added data in a collection - pub wrap: WrapOption, -} - -/// Wrapper around [`AddProgress`]. -#[derive(Debug, Serialize, Deserialize, derive_more::Into)] -pub struct AddPathResponse(pub AddProgress); - -/// Progress response for [`BlobDownloadRequest`] -#[derive(Debug, Clone, Serialize, Deserialize, derive_more::From, derive_more::Into)] -pub struct DownloadResponse(pub DownloadProgress); - -/// A request to the node to download and share the data specified by the hash. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ExportRequest { - /// The hash of the blob to export. - pub hash: Hash, - /// The filepath to where the data should be saved - /// - /// This should be an absolute path valid for the file system on which - /// the node runs. - pub path: PathBuf, - /// Set to [`ExportFormat::Collection`] if the `hash` refers to a [`Collection`] and you want - /// to export all children of the collection into individual files. - pub format: ExportFormat, - /// The mode of exporting. - /// - /// The default is [`ExportMode::Copy`]. See [`ExportMode`] for details. - pub mode: ExportMode, -} - -/// Progress response for [`ExportRequest`] -#[derive(Debug, Clone, Serialize, Deserialize, derive_more::From, derive_more::Into)] -pub struct ExportResponse(pub ExportProgress); - -/// A request to the node to validate the integrity of all provided data -#[derive(Debug, Serialize, Deserialize)] -pub struct ConsistencyCheckRequest { - /// repair the store by dropping inconsistent blobs - pub repair: bool, -} - -/// A request to the node to validate the integrity of all provided data -#[derive(Debug, Serialize, Deserialize)] -pub struct ValidateRequest { - /// repair the store by downgrading blobs from complete to partial - pub repair: bool, -} - -/// List all blobs, including collections -#[derive(Debug, Serialize, Deserialize)] -pub struct ListRequest; - -/// List all blobs, including collections -#[derive(Debug, Serialize, Deserialize)] -pub struct ListIncompleteRequest; - -/// Get the bytes for a hash -#[derive(Serialize, Deserialize, Debug)] -pub struct ReadAtRequest { - /// Hash to get bytes for - pub hash: Hash, - /// Offset to start reading at - pub offset: u64, - /// Length of the data to get - pub len: ReadAtLen, -} - -/// Response to [`ReadAtRequest`] -#[derive(Serialize, Deserialize, Debug)] -pub enum ReadAtResponse { - /// The entry header. - Entry { - /// The size of the blob - size: BaoBlobSize, - /// Whether the blob is complete - is_complete: bool, - }, - /// Chunks of entry data. - Data { - /// The data chunk - chunk: Bytes, - }, -} - -/// Write a blob from a byte stream -#[derive(Serialize, Deserialize, Debug)] -pub struct AddStreamRequest { - /// Tag to tag the data with. - pub tag: SetTagOption, -} - -/// Write a blob from a byte stream -#[derive(Serialize, Deserialize, Debug)] -pub enum AddStreamUpdate { - /// A chunk of stream data - Chunk(Bytes), - /// Abort the request due to an error on the client side - Abort, -} - -/// Wrapper around [`AddProgress`]. -#[derive(Debug, Serialize, Deserialize, derive_more::Into)] -pub struct AddStreamResponse(pub AddProgress); - -/// Delete a blob -#[derive(Debug, Serialize, Deserialize)] -pub struct DeleteRequest { - /// Name of the tag - pub hash: Hash, -} - -/// Create a collection. -#[derive(Debug, Serialize, Deserialize)] -pub struct CreateCollectionRequest { - /// The collection - pub collection: Collection, - /// Tag option. - pub tag: SetTagOption, - /// Tags that should be deleted after creation. - pub tags_to_delete: Vec, -} - -/// A response to a create collection request -#[derive(Debug, Serialize, Deserialize)] -pub struct CreateCollectionResponse { - /// The resulting hash. - pub hash: Hash, - /// The resulting tag. - pub tag: Tag, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct BlobStatusRequest { - /// The hash of the blob - pub hash: Hash, -} - -/// The response to a status request -#[derive(Debug, Serialize, Deserialize, derive_more::From, derive_more::Into)] -pub struct BlobStatusResponse(pub BlobStatus); - -/// Request to create a new scope for temp tags -#[derive(Debug, Serialize, Deserialize)] -pub struct BatchCreateRequest; - -/// Update to a temp tag scope -#[derive(Debug, Serialize, Deserialize)] -pub enum BatchUpdate { - /// Drop of a remote temp tag - Drop(HashAndFormat), - /// Message to check that the connection is still alive - Ping, -} - -/// Response to a temp tag scope request -#[derive(Debug, Serialize, Deserialize)] -pub enum BatchCreateResponse { - /// We got the id of the scope - Id(BatchId), -} - -/// Create a temp tag with a given hash and format -#[derive(Debug, Serialize, Deserialize)] -pub struct BatchCreateTempTagRequest { - /// Content to protect - pub content: HashAndFormat, - /// Batch to create the temp tag in - pub batch: BatchId, -} - -/// Write a blob from a byte stream -#[derive(Serialize, Deserialize, Debug)] -pub struct BatchAddStreamRequest { - /// What format to use for the blob - pub format: BlobFormat, - /// Batch to create the temp tag in - pub batch: BatchId, -} - -/// Write a blob from a byte stream -#[derive(Serialize, Deserialize, Debug)] -pub enum BatchAddStreamUpdate { - /// A chunk of stream data - Chunk(Bytes), - /// Abort the request due to an error on the client side - Abort, -} - -/// Wrapper around [`AddProgress`]. -#[derive(Debug, Serialize, Deserialize)] -pub enum BatchAddStreamResponse { - Abort(RpcError), - OutboardProgress { offset: u64 }, - Result { hash: Hash }, -} - -/// Write a blob from a byte stream -#[derive(Serialize, Deserialize, Debug)] -pub struct BatchAddPathRequest { - /// The path to the data to provide. - pub path: PathBuf, - /// Add the data in place - pub import_mode: ImportMode, - /// What format to use for the blob - pub format: BlobFormat, - /// Batch to create the temp tag in - pub batch: BatchId, -} - -/// Response to a batch add path request -#[derive(Serialize, Deserialize, Debug)] -pub struct BatchAddPathResponse(pub BatchAddPathProgress); diff --git a/iroh/src/rpc_protocol/net.rs b/iroh/src/rpc_protocol/net.rs index 35e0768ede7..de04f6d655a 100644 --- a/iroh/src/rpc_protocol/net.rs +++ b/iroh/src/rpc_protocol/net.rs @@ -37,6 +37,7 @@ pub enum Response { RemoteInfosIter(RpcResult), RemoteInfo(RpcResult), Watch(WatchResponse), + Unit(RpcResult<()>), } /// List network path information about all the remote nodes known by this node. diff --git a/iroh/src/rpc_protocol/tags.rs b/iroh/src/rpc_protocol/tags.rs deleted file mode 100644 index 34e4edd9ae0..00000000000 --- a/iroh/src/rpc_protocol/tags.rs +++ /dev/null @@ -1,109 +0,0 @@ -use iroh_blobs::{net_protocol::BatchId, HashAndFormat, Tag}; -use nested_enum_utils::enum_conversions; -use quic_rpc_derive::rpc_requests; -use serde::{Deserialize, Serialize}; - -use super::RpcService; -use crate::{client::tags::TagInfo, node::RpcResult}; - -#[allow(missing_docs)] -#[derive(strum::Display, Debug, Serialize, Deserialize)] -#[enum_conversions(super::Request)] -#[rpc_requests(RpcService)] -pub enum Request { - #[rpc(response = RpcResult)] - Create(CreateRequest), - #[rpc(response = RpcResult<()>)] - Set(SetRequest), - #[rpc(response = RpcResult<()>)] - DeleteTag(DeleteRequest), - #[server_streaming(response = TagInfo)] - ListTags(ListRequest), -} - -#[allow(missing_docs)] -#[derive(strum::Display, Debug, Serialize, Deserialize)] -#[enum_conversions(super::Response)] -pub enum Response { - Create(RpcResult), - ListTags(TagInfo), - DeleteTag(RpcResult<()>), -} - -/// Determine how to sync the db after a modification operation -#[derive(Debug, Serialize, Deserialize, Default)] -pub enum SyncMode { - /// Fully sync the db - #[default] - Full, - /// Do not sync the db - None, -} - -/// Create a tag -#[derive(Debug, Serialize, Deserialize)] -pub struct CreateRequest { - /// Value of the tag - pub value: HashAndFormat, - /// Batch to use, none for global - pub batch: Option, - /// Sync mode - pub sync: SyncMode, -} - -/// Set or delete a tag -#[derive(Debug, Serialize, Deserialize)] -pub struct SetRequest { - /// Name of the tag - pub name: Tag, - /// Value of the tag, None to delete - pub value: Option, - /// Batch to use, none for global - pub batch: Option, - /// Sync mode - pub sync: SyncMode, -} - -/// List all collections -/// -/// Lists all collections that have been explicitly added to the database. -#[derive(Debug, Serialize, Deserialize)] -pub struct ListRequest { - /// List raw tags - pub raw: bool, - /// List hash seq tags - pub hash_seq: bool, -} - -impl ListRequest { - /// List all tags - pub fn all() -> Self { - Self { - raw: true, - hash_seq: true, - } - } - - /// List raw tags - pub fn raw() -> Self { - Self { - raw: true, - hash_seq: false, - } - } - - /// List hash seq tags - pub fn hash_seq() -> Self { - Self { - raw: false, - hash_seq: true, - } - } -} - -/// Delete a tag -#[derive(Debug, Serialize, Deserialize)] -pub struct DeleteRequest { - /// Name of the tag - pub name: Tag, -} diff --git a/iroh/src/util/fs.rs b/iroh/src/util/fs.rs index df548964b5f..061be9a7627 100644 --- a/iroh/src/util/fs.rs +++ b/iroh/src/util/fs.rs @@ -1,6 +1,5 @@ //! Utilities for filesystem operations. use std::{ - borrow::Cow, fs::read_dir, path::{Component, Path, PathBuf}, }; @@ -9,115 +8,6 @@ use anyhow::{bail, Context}; use bytes::Bytes; use iroh_net::key::SecretKey; use tokio::io::AsyncWriteExt; -use walkdir::WalkDir; - -use crate::client::blobs::WrapOption; - -/// A data source -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] -pub struct DataSource { - /// Custom name - name: String, - /// Path to the file - path: PathBuf, -} - -impl DataSource { - /// Creates a new [`DataSource`] from a [`PathBuf`]. - pub fn new(path: PathBuf) -> Self { - let name = path - .file_name() - .map(|s| s.to_string_lossy().to_string()) - .unwrap_or_default(); - DataSource { path, name } - } - /// Creates a new [`DataSource`] from a [`PathBuf`] and a custom name. - pub fn with_name(path: PathBuf, name: String) -> Self { - DataSource { path, name } - } - - /// Returns blob name for this data source. - /// - /// If no name was provided when created it is derived from the path name. - pub fn name(&self) -> Cow<'_, str> { - Cow::Borrowed(&self.name) - } - - /// Returns the path of this data source. - pub fn path(&self) -> &Path { - &self.path - } -} - -impl From for DataSource { - fn from(value: PathBuf) -> Self { - DataSource::new(value) - } -} - -impl From<&std::path::Path> for DataSource { - fn from(value: &std::path::Path) -> Self { - DataSource::new(value.to_path_buf()) - } -} - -/// Create data sources from a path. -pub fn scan_path(path: PathBuf, wrap: WrapOption) -> anyhow::Result> { - if path.is_dir() { - scan_dir(path, wrap) - } else { - let name = match wrap { - WrapOption::NoWrap => bail!("Cannot scan a file without wrapping"), - WrapOption::Wrap { name: None } => file_name(&path)?, - WrapOption::Wrap { name: Some(name) } => name, - }; - Ok(vec![DataSource { name, path }]) - } -} - -fn file_name(path: &Path) -> anyhow::Result { - relative_canonicalized_path_to_string(path.file_name().context("path is invalid")?) -} - -/// Create data sources from a directory. -pub fn scan_dir(root: PathBuf, wrap: WrapOption) -> anyhow::Result> { - if !root.is_dir() { - bail!("Expected {} to be a file", root.to_string_lossy()); - } - let prefix = match wrap { - WrapOption::NoWrap => None, - WrapOption::Wrap { name: None } => Some(file_name(&root)?), - WrapOption::Wrap { name: Some(name) } => Some(name), - }; - let files = WalkDir::new(&root).into_iter(); - let data_sources = files - .map(|entry| { - let entry = entry?; - if !entry.file_type().is_file() { - // Skip symlinks. Directories are handled by WalkDir. - return Ok(None); - } - let path = entry.into_path(); - let mut name = relative_canonicalized_path_to_string(path.strip_prefix(&root)?)?; - if let Some(prefix) = &prefix { - name = format!("{prefix}/{name}"); - } - anyhow::Ok(Some(DataSource { name, path })) - }) - .filter_map(Result::transpose); - let data_sources: Vec> = data_sources.collect::>(); - data_sources.into_iter().collect::>>() -} - -/// This function converts a canonicalized relative path to a string, returning -/// an error if the path is not valid unicode. -/// -/// This function will also fail if the path is non canonical, i.e. contains -/// `..` or `.`, or if the path components contain any windows or unix path -/// separators. -pub fn relative_canonicalized_path_to_string(path: impl AsRef) -> anyhow::Result { - canonicalized_path_to_string(path, true) -} /// Loads a [`SecretKey`] from the provided file, or stores a newly generated one /// at the given location. diff --git a/iroh/tests/provide.rs b/iroh/tests/provide.rs index 7d380dc4bb2..87ae66f3842 100644 --- a/iroh/tests/provide.rs +++ b/iroh/tests/provide.rs @@ -10,7 +10,7 @@ use bao_tree::{blake3, ChunkNum, ChunkRanges}; use bytes::Bytes; use futures_lite::FutureExt; use iroh::node::{Builder, DocsStorage}; -use iroh_base::node_addr::AddrInfoOptions; +use iroh_base::{node_addr::AddrInfoOptions, ticket::BlobTicket}; use iroh_blobs::{ format::collection::Collection, get::{ @@ -19,7 +19,7 @@ use iroh_blobs::{ }, protocol::{GetRequest, RangeSpecSeq}, store::{MapMut, Store}, - BlobFormat, Hash, + Hash, }; use iroh_net::{defaults::staging::default_relay_map, key::SecretKey, NodeAddr, NodeId}; use rand::RngCore; @@ -386,15 +386,11 @@ async fn test_run_ticket() { let node = test_node(db).spawn().await.unwrap(); let _drop_guard = node.cancel_token().drop_guard(); - let ticket = node - .blobs() - .share( - hash, - BlobFormat::HashSeq, - AddrInfoOptions::RelayAndAddresses, - ) - .await - .unwrap(); + let mut addr = node.net().node_addr().await.unwrap(); + addr.apply_options(AddrInfoOptions::RelayAndAddresses); + let ticket = BlobTicket::new(addr, hash, iroh_blobs::BlobFormat::HashSeq) + .expect("ticket creation failed"); + tokio::time::timeout(Duration::from_secs(10), async move { let request = GetRequest::all(hash); run_collection_get_request(SecretKey::generate(), ticket.node_addr().clone(), request).await