11//! Adaptation of `iroh-blobs` as an `iroh` protocol.
2-
3- // TODO: reduce API surface and add documentation
4- #! [ allow ( missing_docs ) ]
5-
2+ //!
3+ //! A blobs protocol handler wraps a store, so you must first create a store.
4+ //!
5+ //! The entry point to create a blobs protocol handler is [`Blobs::builder`].
66use std:: { collections:: BTreeSet , fmt:: Debug , ops:: DerefMut , sync:: Arc } ;
77
88use anyhow:: { bail, Result } ;
99use futures_lite:: future:: Boxed as BoxedFuture ;
1010use futures_util:: future:: BoxFuture ;
11- use iroh:: { endpoint:: Connecting , protocol:: ProtocolHandler , Endpoint , NodeAddr } ;
12- use serde:: { Deserialize , Serialize } ;
11+ use iroh:: { endpoint:: Connecting , protocol:: ProtocolHandler , Endpoint } ;
1312use tracing:: debug;
1413
1514use crate :: {
1615 downloader:: Downloader ,
1716 provider:: EventSender ,
1817 store:: GcConfig ,
19- util:: {
20- local_pool:: { self , LocalPoolHandle } ,
21- SetTagOption ,
22- } ,
23- BlobFormat , Hash ,
18+ util:: local_pool:: { self , LocalPoolHandle } ,
19+ Hash ,
2420} ;
2521
2622/// A callback that blobs can ask about a set of hashes that should not be garbage collected.
@@ -50,67 +46,77 @@ pub(crate) struct BlobsInner<S> {
5046 pub ( crate ) endpoint : Endpoint ,
5147 gc_state : std:: sync:: Mutex < GcState > ,
5248 #[ cfg( feature = "rpc" ) ]
53- pub ( crate ) batches : tokio:: sync:: Mutex < BlobBatches > ,
49+ pub ( crate ) batches : tokio:: sync:: Mutex < batches :: BlobBatches > ,
5450}
5551
52+ /// Blobs protocol handler.
5653#[ derive( Debug , Clone ) ]
5754pub struct Blobs < S > {
5855 pub ( crate ) inner : Arc < BlobsInner < S > > ,
5956 #[ cfg( feature = "rpc" ) ]
6057 pub ( crate ) rpc_handler : Arc < std:: sync:: OnceLock < crate :: rpc:: RpcHandler > > ,
6158}
6259
63- /// Keeps track of all the currently active batch operations of the blobs api.
64- #[ cfg( feature = "rpc" ) ]
65- #[ derive( Debug , Default ) ]
66- pub ( crate ) struct BlobBatches {
67- /// Currently active batches
68- batches : std:: collections:: BTreeMap < BatchId , BlobBatch > ,
69- /// Used to generate new batch ids.
70- max : u64 ,
71- }
60+ pub ( crate ) mod batches {
61+ use anyhow:: Result ;
62+ use serde:: { Deserialize , Serialize } ;
7263
73- /// A single batch of blob operations
74- #[ cfg( feature = "rpc" ) ]
75- #[ derive( Debug , Default ) ]
76- struct BlobBatch {
77- /// The tags in this batch.
78- tags : std:: collections:: BTreeMap < crate :: HashAndFormat , Vec < crate :: TempTag > > ,
79- }
64+ /// Newtype for a batch id
65+ #[ derive( Debug , PartialEq , Eq , PartialOrd , Serialize , Deserialize , Ord , Clone , Copy , Hash ) ]
66+ pub struct BatchId ( pub u64 ) ;
8067
81- #[ cfg( feature = "rpc" ) ]
82- impl BlobBatches {
83- /// Create a new unique batch id.
84- pub fn create ( & mut self ) -> BatchId {
85- let id = self . max ;
86- self . max += 1 ;
87- BatchId ( id)
68+ /// Keeps track of all the currently active batch operations of the blobs api.
69+ #[ cfg( feature = "rpc" ) ]
70+ #[ derive( Debug , Default ) ]
71+ pub ( crate ) struct BlobBatches {
72+ /// Currently active batches
73+ batches : std:: collections:: BTreeMap < BatchId , BlobBatch > ,
74+ /// Used to generate new batch ids.
75+ max : u64 ,
8876 }
8977
90- /// Store a temp tag in a batch identified by a batch id.
91- pub fn store ( & mut self , batch : BatchId , tt : crate :: TempTag ) {
92- let entry = self . batches . entry ( batch) . or_default ( ) ;
93- entry. tags . entry ( tt. hash_and_format ( ) ) . or_default ( ) . push ( tt) ;
78+ /// A single batch of blob operations
79+ #[ cfg( feature = "rpc" ) ]
80+ #[ derive( Debug , Default ) ]
81+ struct BlobBatch {
82+ /// The tags in this batch.
83+ tags : std:: collections:: BTreeMap < crate :: HashAndFormat , Vec < crate :: TempTag > > ,
9484 }
9585
96- /// Remove a tag from a batch.
97- pub fn remove_one ( & mut self , batch : BatchId , content : & crate :: HashAndFormat ) -> Result < ( ) > {
98- if let Some ( batch) = self . batches . get_mut ( & batch) {
99- if let Some ( tags) = batch. tags . get_mut ( content) {
100- tags. pop ( ) ;
101- if tags. is_empty ( ) {
102- batch. tags . remove ( content) ;
86+ #[ cfg( feature = "rpc" ) ]
87+ impl BlobBatches {
88+ /// Create a new unique batch id.
89+ pub fn create ( & mut self ) -> BatchId {
90+ let id = self . max ;
91+ self . max += 1 ;
92+ BatchId ( id)
93+ }
94+
95+ /// Store a temp tag in a batch identified by a batch id.
96+ pub fn store ( & mut self , batch : BatchId , tt : crate :: TempTag ) {
97+ let entry = self . batches . entry ( batch) . or_default ( ) ;
98+ entry. tags . entry ( tt. hash_and_format ( ) ) . or_default ( ) . push ( tt) ;
99+ }
100+
101+ /// Remove a tag from a batch.
102+ pub fn remove_one ( & mut self , batch : BatchId , content : & crate :: HashAndFormat ) -> Result < ( ) > {
103+ if let Some ( batch) = self . batches . get_mut ( & batch) {
104+ if let Some ( tags) = batch. tags . get_mut ( content) {
105+ tags. pop ( ) ;
106+ if tags. is_empty ( ) {
107+ batch. tags . remove ( content) ;
108+ }
109+ return Ok ( ( ) ) ;
103110 }
104- return Ok ( ( ) ) ;
105111 }
112+ // this can happen if we try to upgrade a tag from an expired batch
113+ anyhow:: bail!( "tag not found in batch" ) ;
106114 }
107- // this can happen if we try to upgrade a tag from an expired batch
108- anyhow:: bail!( "tag not found in batch" ) ;
109- }
110115
111- /// Remove an entire batch.
112- pub fn remove ( & mut self , batch : BatchId ) {
113- self . batches . remove ( & batch) ;
116+ /// Remove an entire batch.
117+ pub fn remove ( & mut self , batch : BatchId ) {
118+ self . batches . remove ( & batch) ;
119+ }
114120 }
115121}
116122
@@ -169,6 +175,10 @@ impl Blobs<crate::store::fs::Store> {
169175}
170176
171177impl < S : crate :: store:: Store > Blobs < S > {
178+ /// Create a new Blobs protocol handler.
179+ ///
180+ /// This is the low-level constructor that allows you to customize
181+ /// everything. If you don't need that, consider using [`Blobs::builder`].
172182 pub fn new (
173183 store : S ,
174184 rt : LocalPoolHandle ,
@@ -192,22 +202,27 @@ impl<S: crate::store::Store> Blobs<S> {
192202 }
193203 }
194204
205+ /// Get the store.
195206 pub fn store ( & self ) -> & S {
196207 & self . inner . store
197208 }
198209
210+ /// Get the event sender.
199211 pub fn events ( & self ) -> & EventSender {
200212 & self . inner . events
201213 }
202214
215+ /// Get the local pool handle.
203216 pub fn rt ( & self ) -> & LocalPoolHandle {
204217 & self . inner . rt
205218 }
206219
220+ /// Get the downloader.
207221 pub fn downloader ( & self ) -> & Downloader {
208222 & self . inner . downloader
209223 }
210224
225+ /// Get the endpoint.
211226 pub fn endpoint ( & self ) -> & Endpoint {
212227 & self . inner . endpoint
213228 }
@@ -274,42 +289,3 @@ impl<S: crate::store::Store> ProtocolHandler for Blobs<S> {
274289 } )
275290 }
276291}
277-
278- /// A request to the node to download and share the data specified by the hash.
279- #[ derive( Debug , Clone , Serialize , Deserialize ) ]
280- pub struct BlobDownloadRequest {
281- /// This mandatory field contains the hash of the data to download and share.
282- pub hash : Hash ,
283- /// If the format is [`BlobFormat::HashSeq`], all children are downloaded and shared as
284- /// well.
285- pub format : BlobFormat ,
286- /// This mandatory field specifies the nodes to download the data from.
287- ///
288- /// If set to more than a single node, they will all be tried. If `mode` is set to
289- /// [`DownloadMode::Direct`], they will be tried sequentially until a download succeeds.
290- /// If `mode` is set to [`DownloadMode::Queued`], the nodes may be dialed in parallel,
291- /// if the concurrency limits permit.
292- pub nodes : Vec < NodeAddr > ,
293- /// Optional tag to tag the data with.
294- pub tag : SetTagOption ,
295- /// Whether to directly start the download or add it to the download queue.
296- pub mode : DownloadMode ,
297- }
298-
299- /// Set the mode for whether to directly start the download or add it to the download queue.
300- #[ derive( Debug , Clone , Serialize , Deserialize ) ]
301- pub enum DownloadMode {
302- /// Start the download right away.
303- ///
304- /// No concurrency limits or queuing will be applied. It is up to the user to manage download
305- /// concurrency.
306- Direct ,
307- /// Queue the download.
308- ///
309- /// The download queue will be processed in-order, while respecting the downloader concurrency limits.
310- Queued ,
311- }
312-
313- /// Newtype for a batch id
314- #[ derive( Debug , PartialEq , Eq , PartialOrd , Serialize , Deserialize , Ord , Clone , Copy , Hash ) ]
315- pub struct BatchId ( pub u64 ) ;
0 commit comments