44#![ allow( missing_docs) ]
55
66use std:: {
7- collections:: BTreeMap ,
7+ collections:: { BTreeMap , BTreeSet } ,
88 fmt:: Debug ,
9+ ops:: DerefMut ,
910 sync:: { Arc , OnceLock } ,
1011} ;
1112
12- use anyhow:: { anyhow, Result } ;
13+ use anyhow:: { anyhow, bail , Result } ;
1314use futures_lite:: future:: Boxed as BoxedFuture ;
15+ use futures_util:: future:: BoxFuture ;
1416use iroh:: { endpoint:: Connecting , protocol:: ProtocolHandler , Endpoint , NodeAddr } ;
1517use iroh_base:: hash:: { BlobFormat , Hash } ;
1618use serde:: { Deserialize , Serialize } ;
@@ -23,27 +25,32 @@ use crate::{
2325 Stats ,
2426 } ,
2527 provider:: EventSender ,
28+ store:: GcConfig ,
2629 util:: {
27- local_pool:: LocalPoolHandle ,
30+ local_pool:: { self , LocalPoolHandle } ,
2831 progress:: { AsyncChannelProgressSender , ProgressSender } ,
2932 SetTagOption ,
3033 } ,
3134 HashAndFormat , TempTag ,
3235} ;
3336
34- // pub type ProtectCb = Box<dyn Fn(&mut BTreeSet<Hash>) -> BoxFuture<()> + Send + Sync>;
35- //
36- // #[derive(derive_more::Debug)]
37- // enum GcState {
38- // Initial(#[debug(skip)] Vec<ProtectCb>),
39- // Started(#[allow(dead_code)] Option<local_pool::Run<()>>),
40- // }
41- //
42- // impl Default for GcState {
43- // fn default() -> Self {
44- // Self::Initial(Vec::new())
45- // }
46- // }
37+ /// A callback that blobs can ask about a set of hashes that should not be garbage collected.
38+ pub type ProtectCb = Box < dyn Fn ( & mut BTreeSet < Hash > ) -> BoxFuture < ( ) > + Send + Sync > ;
39+
40+ /// The state of the gc loop.
41+ #[ derive( derive_more:: Debug ) ]
42+ enum GcState {
43+ // Gc loop is not yet running. Other protocols can add protect callbacks
44+ Initial ( #[ debug( skip) ] Vec < ProtectCb > ) ,
45+ // Gc loop is running. No more protect callbacks can be added.
46+ Started ( #[ allow( dead_code) ] Option < local_pool:: Run < ( ) > > ) ,
47+ }
48+
49+ impl Default for GcState {
50+ fn default ( ) -> Self {
51+ Self :: Initial ( Vec :: new ( ) )
52+ }
53+ }
4754
4855#[ derive( Debug ) ]
4956pub struct Blobs < S > {
@@ -53,6 +60,7 @@ pub struct Blobs<S> {
5360 downloader : Downloader ,
5461 batches : tokio:: sync:: Mutex < BlobBatches > ,
5562 endpoint : Endpoint ,
63+ gc_state : Arc < std:: sync:: Mutex < GcState > > ,
5664 #[ cfg( feature = "rpc" ) ]
5765 pub ( crate ) rpc_handler : Arc < OnceLock < crate :: rpc:: RpcHandler > > ,
5866}
@@ -184,6 +192,7 @@ impl<S: crate::store::Store> Blobs<S> {
184192 downloader,
185193 endpoint,
186194 batches : Default :: default ( ) ,
195+ gc_state : Default :: default ( ) ,
187196 #[ cfg( feature = "rpc" ) ]
188197 rpc_handler : Arc :: new ( OnceLock :: new ( ) ) ,
189198 }
@@ -205,43 +214,47 @@ impl<S: crate::store::Store> Blobs<S> {
205214 & self . endpoint
206215 }
207216
208- // pub fn add_protected(&self, cb: ProtectCb) -> Result<()> {
209- // let mut state = self.gc_state.lock().unwrap();
210- // match &mut *state {
211- // GcState::Initial(cbs) => {
212- // cbs.push(cb);
213- // }
214- // GcState::Started(_) => {
215- // anyhow::bail!("cannot add protected blobs after gc has started");
216- // }
217- // }
218- // Ok(())
219- // }
220- //
221- // pub fn start_gc(&self, config: GcConfig) -> Result<()> {
222- // let mut state = self.gc_state.lock().unwrap();
223- // let protected = match state.deref_mut() {
224- // GcState::Initial(items) => std::mem::take(items),
225- // GcState::Started(_) => anyhow::bail!("gc already started"),
226- // };
227- // let protected = Arc::new(protected);
228- // let protected_cb = move || {
229- // let protected = protected.clone();
230- // async move {
231- // let mut set = BTreeSet::new();
232- // for cb in protected.iter() {
233- // cb(&mut set).await;
234- // }
235- // set
236- // }
237- // };
238- // let store = self.store.clone();
239- // let run = self
240- // .rt
241- // .spawn(move || async move { store.gc_run(config, protected_cb).await });
242- // *state = GcState::Started(Some(run));
243- // Ok(())
244- // }
217+ /// Add a callback that will be called before the garbage collector runs.
218+ ///
219+ /// This can only be called before the garbage collector has started, otherwise it will return an error.
220+ pub fn add_protected ( & self , cb : ProtectCb ) -> Result < ( ) > {
221+ let mut state = self . gc_state . lock ( ) . unwrap ( ) ;
222+ match & mut * state {
223+ GcState :: Initial ( cbs) => {
224+ cbs. push ( cb) ;
225+ }
226+ GcState :: Started ( _) => {
227+ anyhow:: bail!( "cannot add protected blobs after gc has started" ) ;
228+ }
229+ }
230+ Ok ( ( ) )
231+ }
232+
233+ /// Start garbage collection with the given settings.
234+ pub fn start_gc ( & self , config : GcConfig ) -> Result < ( ) > {
235+ let mut state = self . gc_state . lock ( ) . unwrap ( ) ;
236+ let protected = match state. deref_mut ( ) {
237+ GcState :: Initial ( items) => std:: mem:: take ( items) ,
238+ GcState :: Started ( _) => bail ! ( "gc already started" ) ,
239+ } ;
240+ let protected = Arc :: new ( protected) ;
241+ let protected_cb = move || {
242+ let protected = protected. clone ( ) ;
243+ async move {
244+ let mut set = BTreeSet :: new ( ) ;
245+ for cb in protected. iter ( ) {
246+ cb ( & mut set) . await ;
247+ }
248+ set
249+ }
250+ } ;
251+ let store = self . store . clone ( ) ;
252+ let run = self
253+ . rt
254+ . spawn ( move || async move { store. gc_run ( config, protected_cb) . await } ) ;
255+ * state = GcState :: Started ( Some ( run) ) ;
256+ Ok ( ( ) )
257+ }
245258
246259 pub ( crate ) async fn batches ( & self ) -> tokio:: sync:: MutexGuard < ' _ , BlobBatches > {
247260 self . batches . lock ( ) . await
0 commit comments