@@ -25,7 +25,8 @@ use crate::{
2525 Stats ,
2626 } ,
2727 provider:: EventSender ,
28- store:: GcConfig ,
28+ rpc:: { client:: blobs:: MemClient , RpcHandler } ,
29+ store:: { GcConfig , Store } ,
2930 util:: {
3031 local_pool:: { self , LocalPoolHandle } ,
3132 progress:: { AsyncChannelProgressSender , ProgressSender } ,
@@ -53,8 +54,8 @@ impl Default for GcState {
5354}
5455
5556#[ derive( Debug ) ]
56- pub struct Blobs < S > {
57- rt : LocalPoolHandle ,
57+ pub ( crate ) struct BlobsInner < S > {
58+ pub ( crate ) rt : LocalPoolHandle ,
5859 pub ( crate ) store : S ,
5960 events : EventSender ,
6061 downloader : Downloader ,
@@ -135,36 +136,33 @@ impl<S: crate::store::Store> Builder<S> {
135136
136137 /// Build the Blobs protocol handler.
137138 /// You need to provide a local pool handle and an endpoint.
138- pub fn build ( self , rt : & LocalPoolHandle , endpoint : & Endpoint ) -> Arc < Blobs < S > > {
139+ pub fn build ( self , rt : & LocalPoolHandle , endpoint : & Endpoint ) -> Arc < Blobs > {
139140 let downloader = Downloader :: new ( self . store . clone ( ) , endpoint. clone ( ) , rt. clone ( ) ) ;
140- Arc :: new ( Blobs :: new (
141+ let inner = Arc :: new ( BlobsInner :: new (
141142 self . store ,
142143 rt. clone ( ) ,
143144 self . events . unwrap_or_default ( ) ,
144145 downloader,
145146 endpoint. clone ( ) ,
146- ) )
147+ ) ) ;
148+ Arc :: new ( Blobs { inner } )
147149 }
148150}
149151
150- impl < S > Blobs < S > {
152+ impl Blobs {
151153 /// Create a new Blobs protocol handler builder, given a store.
152- pub fn builder ( store : S ) -> Builder < S > {
154+ pub fn builder < S > ( store : S ) -> Builder < S > {
153155 Builder {
154156 store,
155157 events : None ,
156158 }
157159 }
158- }
159160
160- impl Blobs < crate :: store:: mem:: Store > {
161161 /// Create a new memory-backed Blobs protocol handler.
162162 pub fn memory ( ) -> Builder < crate :: store:: mem:: Store > {
163163 Self :: builder ( crate :: store:: mem:: Store :: new ( ) )
164164 }
165- }
166165
167- impl Blobs < crate :: store:: fs:: Store > {
168166 /// Load a persistent Blobs protocol handler from a path.
169167 pub async fn persistent (
170168 path : impl AsRef < std:: path:: Path > ,
@@ -173,8 +171,8 @@ impl Blobs<crate::store::fs::Store> {
173171 }
174172}
175173
176- impl < S : crate :: store:: Store > Blobs < S > {
177- pub fn new (
174+ impl < S : crate :: store:: Store > BlobsInner < S > {
175+ fn new (
178176 store : S ,
179177 rt : LocalPoolHandle ,
180178 events : EventSender ,
@@ -194,14 +192,6 @@ impl<S: crate::store::Store> Blobs<S> {
194192 }
195193 }
196194
197- pub fn rt ( & self ) -> & LocalPoolHandle {
198- & self . rt
199- }
200-
201- pub fn downloader ( & self ) -> & Downloader {
202- & self . downloader
203- }
204-
205195 pub fn endpoint ( & self ) -> & Endpoint {
206196 & self . endpoint
207197 }
@@ -386,66 +376,67 @@ impl<S: crate::store::Store> Blobs<S> {
386376 }
387377}
388378
389- // trait BlobsInner: Debug + Send + Sync + 'static {
390- // fn shutdown(self: Arc<Self>) -> BoxedFuture<()>;
391- // fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>>;
392- // fn client(self: Arc<Self>) -> MemClient;
393- // fn local_pool_handle(&self) -> &LocalPoolHandle;
394- // fn downloader(&self) -> &Downloader;
395- // }
396-
397- // #[derive(Debug)]
398- // struct Blobs2 {
399- // inner: Arc<dyn BlobsInner>,
400- // }
401-
402- // impl Blobs2 {
403- // fn client(&self) -> MemClient {
404- // self.inner.clone().client()
405- // }
406-
407- // fn local_pool_handle(&self) -> &LocalPoolHandle {
408- // self.inner.local_pool_handle()
409- // }
410-
411- // fn downloader(&self) -> &Downloader {
412- // self.inner.downloader()
413- // }
414- // }
415-
416- // impl<S: crate::store::Store> BlobsInner for Blobs<S> {
417- // fn shutdown(self: Arc<Self>) -> BoxedFuture<()> {
418- // ProtocolHandler::shutdown(self)
419- // }
420-
421- // fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>> {
422- // ProtocolHandler::accept(self, conn)
423- // }
424-
425- // fn client(self: Arc<Self>) -> MemClient {
426- // Blobs::client(self)
427- // }
428-
429- // fn local_pool_handle(&self) -> &LocalPoolHandle {
430- // self.rt()
431- // }
432-
433- // fn downloader(&self) -> &Downloader {
434- // self.downloader()
435- // }
436- // }
437-
438- // impl ProtocolHandler for Blobs2 {
439- // fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>> {
440- // self.inner.clone().accept(conn)
441- // }
442-
443- // fn shutdown(self: Arc<Self>) -> BoxedFuture<()> {
444- // self.inner.clone().shutdown()
445- // }
446- // }
447-
448- impl < S : crate :: store:: Store > ProtocolHandler for Blobs < S > {
379+ trait DynBlobs : Debug + Send + Sync + ' static {
380+ fn shutdown ( self : Arc < Self > ) -> BoxedFuture < ( ) > ;
381+ fn accept ( self : Arc < Self > , conn : Connecting ) -> BoxedFuture < Result < ( ) > > ;
382+ fn client ( self : Arc < Self > ) -> MemClient ;
383+ fn local_pool_handle ( & self ) -> & LocalPoolHandle ;
384+ fn downloader ( & self ) -> & Downloader ;
385+ fn endpoint ( & self ) -> & Endpoint ;
386+ fn start_gc ( & self , config : GcConfig ) -> Result < ( ) > ;
387+ fn add_protected ( & self , cb : ProtectCb ) -> Result < ( ) > ;
388+ fn stop_rpc_task ( & self ) ;
389+ }
390+
391+ #[ derive( Debug ) ]
392+ pub struct Blobs {
393+ inner : Arc < dyn DynBlobs > ,
394+ }
395+
396+ impl Blobs {
397+ pub fn client ( & self ) -> MemClient {
398+ self . inner . clone ( ) . client ( )
399+ }
400+
401+ pub fn local_pool_handle ( & self ) -> & LocalPoolHandle {
402+ self . inner . local_pool_handle ( )
403+ }
404+
405+ pub fn downloader ( & self ) -> & Downloader {
406+ self . inner . downloader ( )
407+ }
408+
409+ pub fn endpoint ( & self ) -> & Endpoint {
410+ self . inner . endpoint ( )
411+ }
412+
413+ pub fn add_protected ( & self , cb : ProtectCb ) -> Result < ( ) > {
414+ self . inner . add_protected ( cb)
415+ }
416+
417+ pub fn start_gc ( & self , config : GcConfig ) -> Result < ( ) > {
418+ self . inner . start_gc ( config)
419+ }
420+
421+ pub fn new < S : Store > (
422+ store : S ,
423+ rt : LocalPoolHandle ,
424+ events : EventSender ,
425+ downloader : Downloader ,
426+ endpoint : Endpoint ,
427+ ) -> Self {
428+ let inner = Arc :: new ( BlobsInner :: new ( store, rt, events, downloader, endpoint) ) ;
429+ Self { inner }
430+ }
431+ }
432+
433+ impl Drop for Blobs {
434+ fn drop ( & mut self ) {
435+ self . inner . stop_rpc_task ( ) ;
436+ }
437+ }
438+
439+ impl < S : crate :: store:: Store > DynBlobs for BlobsInner < S > {
449440 fn accept ( self : Arc < Self > , conn : Connecting ) -> BoxedFuture < Result < ( ) > > {
450441 Box :: pin ( async move {
451442 crate :: provider:: handle_connection (
@@ -461,9 +452,55 @@ impl<S: crate::store::Store> ProtocolHandler for Blobs<S> {
461452
462453 fn shutdown ( self : Arc < Self > ) -> BoxedFuture < ( ) > {
463454 Box :: pin ( async move {
455+ self . stop_rpc_task ( ) ;
464456 self . store . shutdown ( ) . await ;
465457 } )
466458 }
459+
460+ fn stop_rpc_task ( & self ) {
461+ if let Some ( rpc_handler) = self . rpc_handler . get ( ) {
462+ rpc_handler. shutdown ( ) ;
463+ }
464+ }
465+
466+ fn client ( self : Arc < Self > ) -> MemClient {
467+ let client = self
468+ . rpc_handler
469+ . get_or_init ( || RpcHandler :: new ( & self ) )
470+ . client
471+ . clone ( ) ;
472+ MemClient :: new ( client)
473+ }
474+
475+ fn local_pool_handle ( & self ) -> & LocalPoolHandle {
476+ & self . rt
477+ }
478+
479+ fn downloader ( & self ) -> & Downloader {
480+ & self . downloader
481+ }
482+
483+ fn start_gc ( & self , config : GcConfig ) -> Result < ( ) > {
484+ self . start_gc ( config)
485+ }
486+
487+ fn add_protected ( & self , cb : ProtectCb ) -> Result < ( ) > {
488+ self . add_protected ( cb)
489+ }
490+
491+ fn endpoint ( & self ) -> & Endpoint {
492+ & self . endpoint
493+ }
494+ }
495+
496+ impl ProtocolHandler for Blobs {
497+ fn accept ( self : Arc < Self > , conn : Connecting ) -> BoxedFuture < Result < ( ) > > {
498+ self . inner . clone ( ) . accept ( conn)
499+ }
500+
501+ fn shutdown ( self : Arc < Self > ) -> BoxedFuture < ( ) > {
502+ self . inner . clone ( ) . shutdown ( )
503+ }
467504}
468505
469506/// A request to the node to download and share the data specified by the hash.
0 commit comments