@@ -47,16 +47,21 @@ impl Default for GcState {
4747 }
4848}
4949
50- #[ derive( Debug , Clone ) ]
51- pub struct Blobs < S > {
50+ #[ derive( Debug ) ]
51+ struct BlobsInner < S > {
5252 rt : LocalPoolHandle ,
5353 pub ( crate ) store : S ,
5454 events : EventSender ,
5555 downloader : Downloader ,
56- #[ cfg( feature = "rpc" ) ]
57- batches : Arc < tokio:: sync:: Mutex < BlobBatches > > ,
5856 endpoint : Endpoint ,
59- gc_state : Arc < std:: sync:: Mutex < GcState > > ,
57+ gc_state : std:: sync:: Mutex < GcState > ,
58+ #[ cfg( feature = "rpc" ) ]
59+ batches : tokio:: sync:: Mutex < BlobBatches > ,
60+ }
61+
62+ #[ derive( Debug , Clone ) ]
63+ pub struct Blobs < S > {
64+ inner : Arc < BlobsInner < S > > ,
6065 #[ cfg( feature = "rpc" ) ]
6166 pub ( crate ) rpc_handler : Arc < std:: sync:: OnceLock < crate :: rpc:: RpcHandler > > ,
6267}
@@ -178,40 +183,46 @@ impl<S: crate::store::Store> Blobs<S> {
178183 endpoint : Endpoint ,
179184 ) -> Self {
180185 Self {
181- rt,
182- store,
183- events,
184- downloader,
185- endpoint,
186- #[ cfg( feature = "rpc" ) ]
187- batches : Default :: default ( ) ,
188- gc_state : Default :: default ( ) ,
186+ inner : Arc :: new ( BlobsInner {
187+ rt,
188+ store,
189+ events,
190+ downloader,
191+ endpoint,
192+ #[ cfg( feature = "rpc" ) ]
193+ batches : Default :: default ( ) ,
194+ gc_state : Default :: default ( ) ,
195+ } ) ,
189196 #[ cfg( feature = "rpc" ) ]
190197 rpc_handler : Default :: default ( ) ,
191198 }
192199 }
193200
194201 pub fn store ( & self ) -> & S {
195- & self . store
202+ & self . inner . store
203+ }
204+
205+ pub fn events ( & self ) -> & EventSender {
206+ & self . inner . events
196207 }
197208
198209 pub fn rt ( & self ) -> & LocalPoolHandle {
199- & self . rt
210+ & self . inner . rt
200211 }
201212
202213 pub fn downloader ( & self ) -> & Downloader {
203- & self . downloader
214+ & self . inner . downloader
204215 }
205216
206217 pub fn endpoint ( & self ) -> & Endpoint {
207- & self . endpoint
218+ & self . inner . endpoint
208219 }
209220
210221 /// Add a callback that will be called before the garbage collector runs.
211222 ///
212223 /// This can only be called before the garbage collector has started, otherwise it will return an error.
213224 pub fn add_protected ( & self , cb : ProtectCb ) -> Result < ( ) > {
214- let mut state = self . gc_state . lock ( ) . unwrap ( ) ;
225+ let mut state = self . inner . gc_state . lock ( ) . unwrap ( ) ;
215226 match & mut * state {
216227 GcState :: Initial ( cbs) => {
217228 cbs. push ( cb) ;
@@ -225,7 +236,7 @@ impl<S: crate::store::Store> Blobs<S> {
225236
226237 /// Start garbage collection with the given settings.
227238 pub fn start_gc ( & self , config : GcConfig ) -> Result < ( ) > {
228- let mut state = self . gc_state . lock ( ) . unwrap ( ) ;
239+ let mut state = self . inner . gc_state . lock ( ) . unwrap ( ) ;
229240 let protected = match state. deref_mut ( ) {
230241 GcState :: Initial ( items) => std:: mem:: take ( items) ,
231242 GcState :: Started ( _) => bail ! ( "gc already started" ) ,
@@ -241,17 +252,17 @@ impl<S: crate::store::Store> Blobs<S> {
241252 set
242253 }
243254 } ;
244- let store = self . store . clone ( ) ;
255+ let store = self . store ( ) . clone ( ) ;
245256 let run = self
246- . rt
257+ . rt ( )
247258 . spawn ( move || async move { store. gc_run ( config, protected_cb) . await } ) ;
248259 * state = GcState :: Started ( Some ( run) ) ;
249260 Ok ( ( ) )
250261 }
251262
252263 #[ cfg( feature = "rpc" ) ]
253264 pub ( crate ) async fn batches ( & self ) -> tokio:: sync:: MutexGuard < ' _ , BlobBatches > {
254- self . batches . lock ( ) . await
265+ self . inner . batches . lock ( ) . await
255266 }
256267
257268 pub ( crate ) async fn download (
@@ -268,7 +279,7 @@ impl<S: crate::store::Store> Blobs<S> {
268279 mode,
269280 } = req;
270281 let hash_and_format = HashAndFormat { hash, format } ;
271- let temp_tag = self . store . temp_tag ( hash_and_format) ;
282+ let temp_tag = self . store ( ) . temp_tag ( hash_and_format) ;
272283 let stats = match mode {
273284 DownloadMode :: Queued => {
274285 self . download_queued ( endpoint, hash_and_format, nodes, progress. clone ( ) )
@@ -283,10 +294,10 @@ impl<S: crate::store::Store> Blobs<S> {
283294 progress. send ( DownloadProgress :: AllDone ( stats) ) . await . ok ( ) ;
284295 match tag {
285296 SetTagOption :: Named ( tag) => {
286- self . store . set_tag ( tag, Some ( hash_and_format) ) . await ?;
297+ self . store ( ) . set_tag ( tag, Some ( hash_and_format) ) . await ?;
287298 }
288299 SetTagOption :: Auto => {
289- self . store . create_tag ( hash_and_format) . await ?;
300+ self . store ( ) . create_tag ( hash_and_format) . await ?;
290301 }
291302 }
292303 drop ( temp_tag) ;
@@ -316,7 +327,7 @@ impl<S: crate::store::Store> Blobs<S> {
316327 let can_download = !node_ids. is_empty ( ) && ( any_added || endpoint. discovery ( ) . is_some ( ) ) ;
317328 anyhow:: ensure!( can_download, "no way to reach a node for download" ) ;
318329 let req = DownloadRequest :: new ( hash_and_format, node_ids) . progress_sender ( progress) ;
319- let handle = self . downloader . queue ( req) . await ;
330+ let handle = self . downloader ( ) . queue ( req) . await ;
320331 let stats = handle. await ?;
321332 Ok ( stats)
322333 }
@@ -334,7 +345,7 @@ impl<S: crate::store::Store> Blobs<S> {
334345 let mut nodes_iter = nodes. into_iter ( ) ;
335346 ' outer: loop {
336347 match crate :: get:: db:: get_to_db_in_steps (
337- self . store . clone ( ) ,
348+ self . store ( ) . clone ( ) ,
338349 hash_and_format,
339350 progress. clone ( ) ,
340351 )
@@ -393,9 +404,9 @@ impl<S: crate::store::Store> Blobs<S> {
393404
394405impl < S : crate :: store:: Store > ProtocolHandler for Blobs < S > {
395406 fn accept ( & self , conn : Connecting ) -> BoxedFuture < Result < ( ) > > {
396- let db = self . store . clone ( ) ;
397- let events = self . events . clone ( ) ;
398- let rt = self . rt . clone ( ) ;
407+ let db = self . store ( ) . clone ( ) ;
408+ let events = self . events ( ) . clone ( ) ;
409+ let rt = self . rt ( ) . clone ( ) ;
399410
400411 Box :: pin ( async move {
401412 crate :: provider:: handle_connection ( conn. await ?, db, events, rt) . await ;
@@ -404,7 +415,7 @@ impl<S: crate::store::Store> ProtocolHandler for Blobs<S> {
404415 }
405416
406417 fn shutdown ( & self ) -> BoxedFuture < ( ) > {
407- let store = self . store . clone ( ) ;
418+ let store = self . store ( ) . clone ( ) ;
408419 Box :: pin ( async move {
409420 store. shutdown ( ) . await ;
410421 } )
0 commit comments