diff --git a/Cargo.lock b/Cargo.lock index 94803e0db70..68d292b8d26 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2485,6 +2485,7 @@ dependencies = [ "ref-cast", "regex", "serde", + "serde-error", "serde_json", "strum 0.25.0", "tempfile", @@ -2521,7 +2522,6 @@ dependencies = [ "rand_core", "redb 2.1.1", "serde", - "serde-error", "serde_json", "serde_test", "ssh-key", @@ -2547,8 +2547,7 @@ dependencies = [ [[package]] name = "iroh-blobs" version = "0.27.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69e83475d6c8cc312a224d4c6bdf287f862d30d2b176768ef71f188a0d8aa7cd" +source = "git+https://github.com/n0-computer/iroh-blobs?branch=main#f87c3bf59e5040382d29d872ba543351a908f500" dependencies = [ "anyhow", "async-channel", @@ -2578,6 +2577,7 @@ dependencies = [ "reflink-copy", "self_cell", "serde", + "serde-error", "smallvec", "tempfile", "thiserror", @@ -4949,9 +4949,9 @@ dependencies = [ [[package]] name = "serde-error" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e988182713aeed6a619a88bca186f6d6407483485ffe44c869ee264f8eabd13f" +checksum = "342110fb7a5d801060c885da03bf91bfa7c7ca936deafcc64bb6706375605d47" dependencies = [ "serde", ] diff --git a/Cargo.toml b/Cargo.toml index 6c80d6fe817..e49e7967d22 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,3 +51,5 @@ iroh-net = { path = "./iroh-net" } iroh-metrics = { path = "./iroh-metrics" } iroh-test = { path = "./iroh-test" } iroh-router = { path = "./iroh-router" } + +iroh-blobs = { git = "https://github.com/n0-computer/iroh-blobs", branch = "main" } diff --git a/deny.toml b/deny.toml index 7db03d12962..e39fab7b216 100644 --- a/deny.toml +++ b/deny.toml @@ -34,3 +34,8 @@ license-files = [ ignore = [ "RUSTSEC-2024-0370", # unmaintained, no upgrade available ] + +[sources] +allow-git = [ + "https://github.com/n0-computer/iroh-blobs.git", +] diff --git a/iroh-base/Cargo.toml b/iroh-base/Cargo.toml index 9c00b1e9d88..9fb13add6e7 100644 --- a/iroh-base/Cargo.toml +++ b/iroh-base/Cargo.toml @@ -22,7 +22,6 @@ hex = "0.4.3" postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"], optional = true } redb = { version = "2.0.0", optional = true } serde = { version = "1", features = ["derive"] } -serde-error = "0.1.2" thiserror = "1" # key module diff --git a/iroh-base/src/lib.rs b/iroh-base/src/lib.rs index cd726a60066..2c8ec03fdc0 100644 --- a/iroh-base/src/lib.rs +++ b/iroh-base/src/lib.rs @@ -13,7 +13,6 @@ pub mod key; #[cfg(feature = "key")] #[cfg_attr(iroh_docsrs, doc(cfg(feature = "key")))] pub mod node_addr; -pub mod rpc; #[cfg(feature = "base32")] #[cfg_attr(iroh_docsrs, doc(cfg(feature = "base32")))] pub mod ticket; diff --git a/iroh-base/src/rpc.rs b/iroh-base/src/rpc.rs deleted file mode 100644 index 845ee7438d4..00000000000 --- a/iroh-base/src/rpc.rs +++ /dev/null @@ -1,34 +0,0 @@ -use std::fmt; - -use serde::{Deserialize, Serialize}; - -/// A serializable error type for use in RPC responses. -#[derive(Serialize, Deserialize, Debug, thiserror::Error)] -pub struct RpcError(serde_error::Error); - -impl fmt::Display for RpcError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt::Display::fmt(&self.0, f) - } -} - -impl From for RpcError { - fn from(e: anyhow::Error) -> Self { - RpcError(serde_error::Error::new(&*e)) - } -} - -impl From for RpcError { - fn from(e: std::io::Error) -> Self { - RpcError(serde_error::Error::new(&e)) - } -} - -impl std::clone::Clone for RpcError { - fn clone(&self) -> Self { - RpcError(serde_error::Error::new(self)) - } -} - -/// A serializable result type for use in RPC responses. -pub type RpcResult = std::result::Result; diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 05b22e193c5..b553f384db1 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -61,6 +61,7 @@ console = { version = "0.15.5", optional = true } # Documentation tests url = { version = "2.5.0", features = ["serde"] } +serde-error = "0.1.3" [features] default = ["metrics", "fs-store"] diff --git a/iroh/src/client/docs.rs b/iroh/src/client/docs.rs index 73f0b65ce28..cdceaede8b6 100644 --- a/iroh/src/client/docs.rs +++ b/iroh/src/client/docs.rs @@ -15,7 +15,7 @@ use anyhow::{anyhow, Context as _, Result}; use bytes::Bytes; use derive_more::{Display, FromStr}; use futures_lite::{Stream, StreamExt}; -use iroh_base::{key::PublicKey, node_addr::AddrInfoOptions, rpc::RpcError}; +use iroh_base::{key::PublicKey, node_addr::AddrInfoOptions}; use iroh_blobs::{export::ExportProgress, store::ExportMode, Hash}; #[doc(inline)] pub use iroh_docs::engine::{Origin, SyncEvent, SyncReason}; @@ -527,7 +527,7 @@ pub enum ImportProgress { /// We got an error and need to abort. /// /// This will be the last message in the stream. - Abort(RpcError), + Abort(serde_error::Error), } /// Intended capability for document share tickets diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 73767fe24b6..f665ebcb14a 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -73,6 +73,7 @@ mod protocol; mod rpc; mod rpc_status; +pub(crate) use self::rpc::{RpcError, RpcResult}; pub use self::{ builder::{ Builder, DiscoveryConfig, DocsStorage, GcPolicy, ProtocolBuilder, StorageConfig, diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index 868b168e7bc..e7002bdb5a8 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -10,7 +10,6 @@ use futures_buffered::BufferedStreamExt; use futures_lite::{Stream, StreamExt}; use futures_util::FutureExt; use genawaiter::sync::{Co, Gen}; -use iroh_base::rpc::{RpcError, RpcResult}; use iroh_blobs::{ export::ExportProgress, format::collection::Collection, @@ -84,6 +83,9 @@ const RPC_BLOB_GET_CHUNK_SIZE: usize = 1024 * 64; /// Channel cap for getting blobs over RPC const RPC_BLOB_GET_CHANNEL_CAP: usize = 2; +pub(crate) type RpcError = serde_error::Error; +pub(crate) type RpcResult = Result; + #[derive(Debug, Clone)] pub(crate) struct Handler { pub(crate) inner: Arc>, @@ -265,7 +267,7 @@ impl Handler { }, Box::pin(updates), ); - futures_util::TryStreamExt::map_err(stream, RpcError::from) + futures_util::TryStreamExt::map_err(stream, |e| RpcError::new(&*e)) }) .await } @@ -484,7 +486,11 @@ impl Handler { async fn blob_status(self, msg: BlobStatusRequest) -> RpcResult { let blobs = self.blobs(); - let entry = blobs.store().get(&msg.hash).await?; + let entry = blobs + .store() + .get(&msg.hash) + .await + .map_err(|e| RpcError::new(&e))?; Ok(BlobStatusResponse(match entry { Some(entry) => { if entry.is_complete() { @@ -549,7 +555,7 @@ impl Handler { ) -> impl Stream> + Send + 'static { Gen::new(|co| async move { if let Err(e) = self.blob_list_impl(&co).await { - co.yield_(Err(e.into())).await; + co.yield_(Err(RpcError::new(&e))).await; } }) } @@ -560,18 +566,24 @@ impl Handler { ) -> impl Stream> + Send + 'static { Gen::new(move |co| async move { if let Err(e) = self.blob_list_incomplete_impl(&co).await { - co.yield_(Err(e.into())).await; + co.yield_(Err(RpcError::new(&e))).await; } }) } async fn blob_delete_tag(self, msg: TagDeleteRequest) -> RpcResult<()> { - self.blobs_store().set_tag(msg.name, None).await?; + self.blobs_store() + .set_tag(msg.name, None) + .await + .map_err(|e| RpcError::new(&e))?; Ok(()) } async fn blob_delete_blob(self, msg: DeleteRequest) -> RpcResult<()> { - self.blobs_store().delete(vec![msg.hash]).await?; + self.blobs_store() + .delete(vec![msg.hash]) + .await + .map_err(|e| RpcError::new(&e))?; Ok(()) } @@ -605,7 +617,9 @@ impl Handler { .validate(msg.repair, AsyncChannelProgressSender::new(tx).boxed()) .await { - tx2.send(ValidateProgress::Abort(e.into())).await.ok(); + tx2.send(ValidateProgress::Abort(RpcError::new(&e))) + .await + .ok(); } }); rx @@ -625,7 +639,7 @@ impl Handler { .consistency_check(msg.repair, AsyncChannelProgressSender::new(tx).boxed()) .await { - tx2.send(ConsistencyCheckProgress::Abort(e.into())) + tx2.send(ConsistencyCheckProgress::Abort(RpcError::new(&e))) .await .ok(); } @@ -639,7 +653,7 @@ impl Handler { let tx2 = tx.clone(); self.local_pool_handle().spawn_detached(|| async move { if let Err(e) = self.blob_add_from_path0(msg, tx).await { - tx2.send(AddProgress::Abort(e.into())).await.ok(); + tx2.send(AddProgress::Abort(RpcError::new(&*e))).await.ok(); } }); rx.map(AddPathResponse) @@ -651,9 +665,11 @@ impl Handler { let tx2 = tx.clone(); self.local_pool_handle().spawn_detached(|| async move { if let Err(e) = self.doc_import_file0(msg, tx).await { - tx2.send(crate::client::docs::ImportProgress::Abort(e.into())) - .await - .ok(); + tx2.send(crate::client::docs::ImportProgress::Abort(RpcError::new( + &*e, + ))) + .await + .ok(); } }); rx.map(ImportFileResponse) @@ -737,7 +753,9 @@ impl Handler { let tx2 = tx.clone(); self.local_pool_handle().spawn_detached(|| async move { if let Err(e) = self.doc_export_file0(msg, tx).await { - tx2.send(ExportProgress::Abort(e.into())).await.ok(); + tx2.send(ExportProgress::Abort(RpcError::new(&*e))) + .await + .ok(); } }); rx.map(ExportFileResponse) @@ -789,7 +807,7 @@ impl Handler { .await { progress - .send(DownloadProgress::Abort(err.into())) + .send(DownloadProgress::Abort(RpcError::new(&*err))) .await .ok(); } @@ -813,7 +831,10 @@ impl Handler { .await; match res { Ok(()) => progress.send(ExportProgress::AllDone).await.ok(), - Err(err) => progress.send(ExportProgress::Abort(err.into())).await.ok(), + Err(err) => progress + .send(ExportProgress::Abort(RpcError::new(&*err))) + .await + .ok(), }; }); rx.map(ExportResponse) @@ -942,18 +963,23 @@ impl Handler { async fn node_stats(self, _req: StatsRequest) -> RpcResult { #[cfg(feature = "metrics")] let res = Ok(StatsResponse { - stats: crate::metrics::get_metrics()?, + stats: crate::metrics::get_metrics().map_err(|e| RpcError::new(&*e))?, }); #[cfg(not(feature = "metrics"))] - let res = Err(anyhow::anyhow!("metrics are disabled").into()); + let res = Err(RpcError::new(&*anyhow::anyhow!("metrics are disabled"))); res } async fn node_status(self, _: StatusRequest) -> RpcResult { Ok(NodeStatus { - addr: self.inner.endpoint.node_addr().await?, + addr: self + .inner + .endpoint + .node_addr() + .await + .map_err(|e| RpcError::new(&*e))?, listen_addrs: self .inner .local_endpoint_addresses() @@ -970,7 +996,12 @@ impl Handler { } async fn node_addr(self, _: AddrRequest) -> RpcResult { - let addr = self.inner.endpoint.node_addr().await?; + let addr = self + .inner + .endpoint + .node_addr() + .await + .map_err(|e| RpcError::new(&*e))?; Ok(addr) } @@ -993,13 +1024,21 @@ impl Handler { async fn tags_set(self, msg: tags::SetRequest) -> RpcResult<()> { let blobs = self.blobs(); - blobs.store().set_tag(msg.name, msg.value).await?; + blobs + .store() + .set_tag(msg.name, msg.value) + .await + .map_err(|e| RpcError::new(&e))?; if let SyncMode::Full = msg.sync { - blobs.store().sync().await?; + blobs.store().sync().await.map_err(|e| RpcError::new(&e))?; } if let Some(batch) = msg.batch { if let Some(content) = msg.value.as_ref() { - blobs.batches().await.remove_one(batch, content)?; + blobs + .batches() + .await + .remove_one(batch, content) + .map_err(|e| RpcError::new(&*e))?; } } Ok(()) @@ -1007,12 +1046,20 @@ impl Handler { async fn tags_create(self, msg: tags::CreateRequest) -> RpcResult { let blobs = self.blobs(); - let tag = blobs.store().create_tag(msg.value).await?; + let tag = blobs + .store() + .create_tag(msg.value) + .await + .map_err(|e| RpcError::new(&e))?; if let SyncMode::Full = msg.sync { - blobs.store().sync().await?; + blobs.store().sync().await.map_err(|e| RpcError::new(&e))?; } if let Some(batch) = msg.batch { - blobs.batches().await.remove_one(batch, &msg.value)?; + blobs + .batches() + .await + .remove_one(batch, &msg.value) + .map_err(|e| RpcError::new(&*e))?; } Ok(tag) } @@ -1046,7 +1093,7 @@ impl Handler { self.local_pool_handle().spawn_detached(|| async move { if let Err(err) = this.batch_add_stream0(msg, stream, tx.clone()).await { - tx.send(BatchAddStreamResponse::Abort(err.into())) + tx.send(BatchAddStreamResponse::Abort(RpcError::new(&*err))) .await .ok(); } @@ -1063,7 +1110,9 @@ impl Handler { let tx2 = tx.clone(); self.local_pool_handle().spawn_detached(|| async move { if let Err(e) = self.batch_add_from_path0(msg, tx).await { - tx2.send(BatchAddPathProgress::Abort(e.into())).await.ok(); + tx2.send(BatchAddPathProgress::Abort(RpcError::new(&*e))) + .await + .ok(); } }); rx.map(BatchAddPathResponse) @@ -1153,7 +1202,7 @@ impl Handler { self.local_pool_handle().spawn_detached(|| async move { if let Err(err) = this.blob_add_stream0(msg, stream, tx.clone()).await { - tx.send(AddProgress::Abort(err.into())).await.ok(); + tx.send(AddProgress::Abort(RpcError::new(&*err))).await.ok(); } }); @@ -1222,7 +1271,7 @@ impl Handler { let db = self.blobs_store(); self.local_pool_handle().spawn_detached(move || async move { if let Err(err) = read_loop(req, db, tx.clone(), RPC_BLOB_GET_CHUNK_SIZE).await { - tx.send(RpcResult::Err(err.into())).await.ok(); + tx.send(RpcResult::Err(RpcError::new(&*err))).await.ok(); } }); @@ -1349,7 +1398,10 @@ impl Handler { #[allow(clippy::unused_async)] async fn node_add_addr(self, req: AddAddrRequest) -> RpcResult<()> { let AddAddrRequest { addr } = req; - self.inner.endpoint.add_node_addr(addr)?; + self.inner + .endpoint + .add_node_addr(addr) + .map_err(|e| RpcError::new(&*e))?; Ok(()) } @@ -1365,7 +1417,10 @@ impl Handler { let blobs = self.blobs(); - let temp_tag = collection.store(blobs.store()).await?; + let temp_tag = collection + .store(blobs.store()) + .await + .map_err(|e| RpcError::new(&*e))?; let hash_and_format = temp_tag.inner(); let HashAndFormat { hash, .. } = *hash_and_format; let tag = match tag { @@ -1373,14 +1428,23 @@ impl Handler { blobs .store() .set_tag(tag.clone(), Some(*hash_and_format)) - .await?; + .await + .map_err(|e| RpcError::new(&e))?; tag } - SetTagOption::Auto => blobs.store().create_tag(*hash_and_format).await?, + SetTagOption::Auto => blobs + .store() + .create_tag(*hash_and_format) + .await + .map_err(|e| RpcError::new(&e))?, }; for tag in tags_to_delete { - blobs.store().set_tag(tag, None).await?; + blobs + .store() + .set_tag(tag, None) + .await + .map_err(|e| RpcError::new(&e))?; } Ok(CreateCollectionResponse { hash, tag }) @@ -1388,5 +1452,5 @@ impl Handler { } fn docs_disabled() -> RpcError { - anyhow!("docs are disabled").into() + RpcError::new(&*anyhow!("docs are disabled")) } diff --git a/iroh/src/node/rpc/docs.rs b/iroh/src/node/rpc/docs.rs index ec2d04f6458..0ab0c019f03 100644 --- a/iroh/src/node/rpc/docs.rs +++ b/iroh/src/node/rpc/docs.rs @@ -2,10 +2,10 @@ use anyhow::anyhow; use futures_lite::{Stream, StreamExt}; -use iroh_base::rpc::RpcResult; use iroh_blobs::{store::Store as BaoStore, BlobFormat}; use iroh_docs::{Author, DocTicket, NamespaceSecret}; +use super::{RpcError, RpcResult}; use crate::{ client::docs::ShareMode, node::protocol::docs::DocsProtocol, @@ -39,7 +39,10 @@ impl DocsProtocol { pub async fn author_create(&self, _req: CreateRequest) -> RpcResult { // TODO: pass rng let author = Author::new(&mut rand::rngs::OsRng {}); - self.sync.import_author(author.clone()).await?; + self.sync + .import_author(author.clone()) + .await + .map_err(|e| RpcError::new(&*e))?; Ok(CreateResponse { author_id: author.id(), }) @@ -54,7 +57,10 @@ impl DocsProtocol { &self, req: SetDefaultRequest, ) -> RpcResult { - self.default_author.set(req.author_id, &self.sync).await?; + self.default_author + .set(req.author_id, &self.sync) + .await + .map_err(|e| RpcError::new(&*e))?; Ok(SetDefaultResponse) } @@ -74,41 +80,65 @@ impl DocsProtocol { }); rx.boxed().map(|r| { r.map(|author_id| AuthorListResponse { author_id }) - .map_err(Into::into) + .map_err(|e| RpcError::new(&*e)) }) } pub async fn author_import(&self, req: ImportRequest) -> RpcResult { - let author_id = self.sync.import_author(req.author).await?; + let author_id = self + .sync + .import_author(req.author) + .await + .map_err(|e| RpcError::new(&*e))?; Ok(ImportResponse { author_id }) } pub async fn author_export(&self, req: ExportRequest) -> RpcResult { - let author = self.sync.export_author(req.author).await?; + let author = self + .sync + .export_author(req.author) + .await + .map_err(|e| RpcError::new(&*e))?; Ok(ExportResponse { author }) } pub async fn author_delete(&self, req: DeleteRequest) -> RpcResult { if req.author == self.default_author.get() { - return Err(anyhow!("Deleting the default author is not supported").into()); + return Err(RpcError::new(&*anyhow!( + "Deleting the default author is not supported" + ))); } - self.sync.delete_author(req.author).await?; + self.sync + .delete_author(req.author) + .await + .map_err(|e| RpcError::new(&*e))?; Ok(DeleteResponse) } pub async fn doc_create(&self, _req: DocCreateRequest) -> RpcResult { let namespace = NamespaceSecret::new(&mut rand::rngs::OsRng {}); let id = namespace.id(); - self.sync.import_namespace(namespace.into()).await?; - self.sync.open(id, Default::default()).await?; + self.sync + .import_namespace(namespace.into()) + .await + .map_err(|e| RpcError::new(&*e))?; + self.sync + .open(id, Default::default()) + .await + .map_err(|e| RpcError::new(&*e))?; Ok(DocCreateResponse { id }) } pub async fn doc_drop(&self, req: DropRequest) -> RpcResult { let DropRequest { doc_id } = req; - self.leave(doc_id, true).await?; - self.sync.drop_replica(doc_id).await?; + self.leave(doc_id, true) + .await + .map_err(|e| RpcError::new(&*e))?; + self.sync + .drop_replica(doc_id) + .await + .map_err(|e| RpcError::new(&*e))?; Ok(DropResponse {}) } @@ -128,22 +158,32 @@ impl DocsProtocol { }); rx.boxed().map(|r| { r.map(|(id, capability)| DocListResponse { id, capability }) - .map_err(Into::into) + .map_err(|e| RpcError::new(&*e)) }) } pub async fn doc_open(&self, req: OpenRequest) -> RpcResult { - self.sync.open(req.doc_id, Default::default()).await?; + self.sync + .open(req.doc_id, Default::default()) + .await + .map_err(|e| RpcError::new(&*e))?; Ok(OpenResponse {}) } pub async fn doc_close(&self, req: CloseRequest) -> RpcResult { - self.sync.close(req.doc_id).await?; + self.sync + .close(req.doc_id) + .await + .map_err(|e| RpcError::new(&*e))?; Ok(CloseResponse {}) } pub async fn doc_status(&self, req: StatusRequest) -> RpcResult { - let status = self.sync.get_state(req.doc_id).await?; + let status = self + .sync + .get_state(req.doc_id) + .await + .map_err(|e| RpcError::new(&*e))?; Ok(StatusResponse { status }) } @@ -153,17 +193,27 @@ impl DocsProtocol { mode, addr_options, } = req; - let mut me = self.endpoint.node_addr().await?; + let mut me = self + .endpoint + .node_addr() + .await + .map_err(|e| RpcError::new(&*e))?; me.apply_options(addr_options); let capability = match mode { ShareMode::Read => iroh_docs::Capability::Read(doc_id), ShareMode::Write => { - let secret = self.sync.export_secret_key(doc_id).await?; + let secret = self + .sync + .export_secret_key(doc_id) + .await + .map_err(|e| RpcError::new(&*e))?; iroh_docs::Capability::Write(secret) } }; - self.start_sync(doc_id, vec![]).await?; + self.start_sync(doc_id, vec![]) + .await + .map_err(|e| RpcError::new(&*e))?; Ok(ShareResponse(DocTicket { capability, @@ -175,30 +225,44 @@ impl DocsProtocol { &self, req: DocSubscribeRequest, ) -> RpcResult>> { - let stream = self.subscribe(req.doc_id).await?; + let stream = self + .subscribe(req.doc_id) + .await + .map_err(|e| RpcError::new(&*e))?; Ok(stream.map(|el| { el.map(|event| DocSubscribeResponse { event }) - .map_err(Into::into) + .map_err(|e| RpcError::new(&*e)) })) } pub async fn doc_import(&self, req: DocImportRequest) -> RpcResult { let DocImportRequest { capability } = req; - let doc_id = self.sync.import_namespace(capability).await?; - self.sync.open(doc_id, Default::default()).await?; + let doc_id = self + .sync + .import_namespace(capability) + .await + .map_err(|e| RpcError::new(&*e))?; + self.sync + .open(doc_id, Default::default()) + .await + .map_err(|e| RpcError::new(&*e))?; Ok(DocImportResponse { doc_id }) } pub async fn doc_start_sync(&self, req: StartSyncRequest) -> RpcResult { let StartSyncRequest { doc_id, peers } = req; - self.start_sync(doc_id, peers).await?; + self.start_sync(doc_id, peers) + .await + .map_err(|e| RpcError::new(&*e))?; Ok(StartSyncResponse {}) } pub async fn doc_leave(&self, req: LeaveRequest) -> RpcResult { let LeaveRequest { doc_id } = req; - self.leave(doc_id, false).await?; + self.leave(doc_id, false) + .await + .map_err(|e| RpcError::new(&*e))?; Ok(LeaveResponse {}) } @@ -214,15 +278,20 @@ impl DocsProtocol { value, } = req; let len = value.len(); - let tag = bao_store.import_bytes(value, BlobFormat::Raw).await?; + let tag = bao_store + .import_bytes(value, BlobFormat::Raw) + .await + .map_err(|e| RpcError::new(&e))?; self.sync .insert_local(doc_id, author_id, key.clone(), *tag.hash(), len as u64) - .await?; + .await + .map_err(|e| RpcError::new(&*e))?; let entry = self .sync .get_exact(doc_id, author_id, key, false) - .await? - .ok_or_else(|| anyhow!("failed to get entry after insertion"))?; + .await + .map_err(|e| RpcError::new(&*e))? + .ok_or_else(|| RpcError::new(&*anyhow!("failed to get entry after insertion")))?; Ok(SetResponse { entry }) } @@ -232,7 +301,11 @@ impl DocsProtocol { author_id, prefix, } = req; - let removed = self.sync.delete_prefix(doc_id, author_id, prefix).await?; + let removed = self + .sync + .delete_prefix(doc_id, author_id, prefix) + .await + .map_err(|e| RpcError::new(&*e))?; Ok(DelResponse { removed }) } @@ -246,7 +319,8 @@ impl DocsProtocol { } = req; self.sync .insert_local(doc_id, author_id, key.clone(), hash, size) - .await?; + .await + .map_err(|e| RpcError::new(&*e))?; Ok(SetHashResponse {}) } @@ -265,8 +339,10 @@ impl DocsProtocol { tx2.send(Err(err)).await.ok(); } }); - rx.boxed() - .map(|r| r.map(|entry| GetManyResponse { entry }).map_err(Into::into)) + rx.boxed().map(|r| { + r.map(|entry| GetManyResponse { entry }) + .map_err(|e| RpcError::new(&*e)) + }) } pub async fn doc_get_exact(&self, req: GetExactRequest) -> RpcResult { @@ -279,7 +355,8 @@ impl DocsProtocol { let entry = self .sync .get_exact(doc_id, author, key, include_empty) - .await?; + .await + .map_err(|e| RpcError::new(&*e))?; Ok(GetExactResponse { entry }) } @@ -289,14 +366,19 @@ impl DocsProtocol { ) -> RpcResult { self.sync .set_download_policy(req.doc_id, req.policy) - .await?; + .await + .map_err(|e| RpcError::new(&*e))?; Ok(SetDownloadPolicyResponse {}) } pub async fn doc_get_download_policy( &self, req: GetDownloadPolicyRequest, ) -> RpcResult { - let policy = self.sync.get_download_policy(req.doc_id).await?; + let policy = self + .sync + .get_download_policy(req.doc_id) + .await + .map_err(|e| RpcError::new(&*e))?; Ok(GetDownloadPolicyResponse { policy }) } @@ -304,7 +386,11 @@ impl DocsProtocol { &self, req: GetSyncPeersRequest, ) -> RpcResult { - let peers = self.sync.get_sync_peers(req.doc_id).await?; + let peers = self + .sync + .get_sync_peers(req.doc_id) + .await + .map_err(|e| RpcError::new(&*e))?; Ok(GetSyncPeersResponse { peers }) } } diff --git a/iroh/src/rpc_protocol/authors.rs b/iroh/src/rpc_protocol/authors.rs index fec3b93ddbb..2afb1d1f910 100644 --- a/iroh/src/rpc_protocol/authors.rs +++ b/iroh/src/rpc_protocol/authors.rs @@ -1,10 +1,10 @@ -use iroh_base::rpc::RpcResult; use iroh_docs::{Author, AuthorId}; use nested_enum_utils::enum_conversions; use quic_rpc_derive::rpc_requests; use serde::{Deserialize, Serialize}; use super::RpcService; +use crate::node::RpcResult; #[allow(missing_docs)] #[derive(strum::Display, Debug, Serialize, Deserialize)] diff --git a/iroh/src/rpc_protocol/blobs.rs b/iroh/src/rpc_protocol/blobs.rs index d167dceb81f..e7fd876f121 100644 --- a/iroh/src/rpc_protocol/blobs.rs +++ b/iroh/src/rpc_protocol/blobs.rs @@ -1,10 +1,7 @@ use std::path::PathBuf; use bytes::Bytes; -use iroh_base::{ - hash::Hash, - rpc::{RpcError, RpcResult}, -}; +use iroh_base::hash::Hash; use iroh_blobs::{ export::ExportProgress, format::collection::Collection, @@ -23,8 +20,11 @@ use quic_rpc_derive::rpc_requests; use serde::{Deserialize, Serialize}; use super::RpcService; -use crate::client::blobs::{ - BlobInfo, BlobStatus, DownloadMode, IncompleteBlobInfo, ReadAtLen, WrapOption, +use crate::{ + client::blobs::{ + BlobInfo, BlobStatus, DownloadMode, IncompleteBlobInfo, ReadAtLen, WrapOption, + }, + node::{RpcError, RpcResult}, }; #[allow(missing_docs)] diff --git a/iroh/src/rpc_protocol/docs.rs b/iroh/src/rpc_protocol/docs.rs index 83975b6f301..45d2c8aacf9 100644 --- a/iroh/src/rpc_protocol/docs.rs +++ b/iroh/src/rpc_protocol/docs.rs @@ -1,10 +1,7 @@ use std::path::PathBuf; use bytes::Bytes; -use iroh_base::{ - node_addr::AddrInfoOptions, - rpc::{RpcError, RpcResult}, -}; +use iroh_base::node_addr::AddrInfoOptions; use iroh_blobs::{export::ExportProgress, store::ExportMode, Hash}; use iroh_docs::{ actor::OpenState, @@ -19,7 +16,10 @@ use quic_rpc_derive::rpc_requests; use serde::{Deserialize, Serialize}; use super::RpcService; -use crate::client::docs::{ImportProgress, ShareMode}; +use crate::{ + client::docs::{ImportProgress, ShareMode}, + node::{RpcError, RpcResult}, +}; #[allow(missing_docs)] #[derive(strum::Display, Debug, Serialize, Deserialize)] diff --git a/iroh/src/rpc_protocol/gossip.rs b/iroh/src/rpc_protocol/gossip.rs index f2666335ed7..dfc70ff177f 100644 --- a/iroh/src/rpc_protocol/gossip.rs +++ b/iroh/src/rpc_protocol/gossip.rs @@ -1,6 +1,5 @@ use std::collections::BTreeSet; -use iroh_base::rpc::RpcResult; pub use iroh_gossip::net::{Command as SubscribeUpdate, Event as SubscribeResponse}; use iroh_gossip::proto::TopicId; use iroh_net::NodeId; @@ -9,6 +8,7 @@ use quic_rpc_derive::rpc_requests; use serde::{Deserialize, Serialize}; use super::RpcService; +use crate::node::RpcResult; #[allow(missing_docs)] #[derive(strum::Display, Debug, Serialize, Deserialize)] diff --git a/iroh/src/rpc_protocol/net.rs b/iroh/src/rpc_protocol/net.rs index d9cb4e1e40b..35e0768ede7 100644 --- a/iroh/src/rpc_protocol/net.rs +++ b/iroh/src/rpc_protocol/net.rs @@ -1,10 +1,10 @@ -use iroh_base::rpc::RpcResult; use iroh_net::{endpoint::RemoteInfo, key::PublicKey, relay::RelayUrl, NodeAddr, NodeId}; use nested_enum_utils::enum_conversions; use quic_rpc_derive::rpc_requests; use serde::{Deserialize, Serialize}; use super::RpcService; +use crate::node::RpcResult; #[allow(missing_docs)] #[derive(strum::Display, Debug, Serialize, Deserialize)] diff --git a/iroh/src/rpc_protocol/node.rs b/iroh/src/rpc_protocol/node.rs index e0e6d415c55..b615ea47d81 100644 --- a/iroh/src/rpc_protocol/node.rs +++ b/iroh/src/rpc_protocol/node.rs @@ -1,12 +1,11 @@ use std::collections::BTreeMap; -use iroh_base::rpc::RpcResult; use nested_enum_utils::enum_conversions; use quic_rpc_derive::rpc_requests; use serde::{Deserialize, Serialize}; use super::RpcService; -use crate::client::NodeStatus; +use crate::{client::NodeStatus, node::RpcResult}; #[allow(missing_docs)] #[derive(strum::Display, Debug, Serialize, Deserialize)] diff --git a/iroh/src/rpc_protocol/tags.rs b/iroh/src/rpc_protocol/tags.rs index c63243d381e..ecc20775f2e 100644 --- a/iroh/src/rpc_protocol/tags.rs +++ b/iroh/src/rpc_protocol/tags.rs @@ -1,11 +1,10 @@ -use iroh_base::rpc::RpcResult; use iroh_blobs::{HashAndFormat, Tag}; use nested_enum_utils::enum_conversions; use quic_rpc_derive::rpc_requests; use serde::{Deserialize, Serialize}; use super::{blobs::BatchId, RpcService}; -use crate::client::tags::TagInfo; +use crate::{client::tags::TagInfo, node::RpcResult}; #[allow(missing_docs)] #[derive(strum::Display, Debug, Serialize, Deserialize)]