55
66use std:: {
77 collections:: BTreeMap ,
8+ fmt:: Debug ,
89 sync:: { Arc , OnceLock } ,
910} ;
1011
@@ -31,6 +32,20 @@ use crate::{
3132 HashAndFormat , TempTag ,
3233} ;
3334
35+ // pub type ProtectCb = Box<dyn Fn(&mut BTreeSet<Hash>) -> BoxFuture<()> + Send + Sync>;
36+ //
37+ // #[derive(derive_more::Debug)]
38+ // enum GcState {
39+ // Initial(#[debug(skip)] Vec<ProtectCb>),
40+ // Started(#[allow(dead_code)] Option<local_pool::Run<()>>),
41+ // }
42+ //
43+ // impl Default for GcState {
44+ // fn default() -> Self {
45+ // Self::Initial(Vec::new())
46+ // }
47+ // }
48+
3449#[ derive( Debug ) ]
3550pub struct Blobs < S > {
3651 rt : LocalPoolHandle ,
@@ -97,8 +112,66 @@ impl BlobBatches {
97112 }
98113}
99114
115+ /// Builder for the Blobs protocol handler
116+ #[ derive( Debug ) ]
117+ pub struct Builder < S > {
118+ store : S ,
119+ events : Option < EventSender > ,
120+ gc_config : Option < crate :: store:: GcConfig > ,
121+ }
122+
123+ impl < S : crate :: store:: Store > Builder < S > {
124+ /// Set the event sender for the blobs protocol.
125+ pub fn events ( mut self , value : EventSender ) -> Self {
126+ self . events = Some ( value) ;
127+ self
128+ }
129+
130+ pub fn gc_config ( mut self , value : crate :: store:: GcConfig ) -> Self {
131+ self . gc_config = Some ( value) ;
132+ self
133+ }
134+
135+ /// Build the Blobs protocol handler.
136+ /// You need to provide a local pool handle and an endpoint.
137+ pub fn build ( self , rt : & LocalPoolHandle , endpoint : & Endpoint ) -> Arc < Blobs < S > > {
138+ let downloader = Downloader :: new ( self . store . clone ( ) , endpoint. clone ( ) , rt. clone ( ) ) ;
139+ Arc :: new ( Blobs :: new (
140+ self . store ,
141+ rt. clone ( ) ,
142+ self . events . unwrap_or_default ( ) ,
143+ downloader,
144+ endpoint. clone ( ) ,
145+ ) )
146+ }
147+ }
148+
149+ impl Blobs < crate :: store:: mem:: Store > {
150+ /// Create a new memory-backed Blobs protocol handler.
151+ pub fn memory ( ) -> Builder < crate :: store:: mem:: Store > {
152+ Builder {
153+ store : crate :: store:: mem:: Store :: new ( ) ,
154+ events : None ,
155+ gc_config : None ,
156+ }
157+ }
158+ }
159+
160+ impl Blobs < crate :: store:: fs:: Store > {
161+ /// Load a persistent Blobs protocol handler from a path.
162+ pub async fn persistent (
163+ path : impl AsRef < std:: path:: Path > ,
164+ ) -> anyhow:: Result < Builder < crate :: store:: fs:: Store > > {
165+ Ok ( Builder {
166+ store : crate :: store:: fs:: Store :: load ( path) . await ?,
167+ events : None ,
168+ gc_config : None ,
169+ } )
170+ }
171+ }
172+
100173impl < S : crate :: store:: Store > Blobs < S > {
101- pub fn new_with_events (
174+ pub fn new (
102175 store : S ,
103176 rt : LocalPoolHandle ,
104177 events : EventSender ,
@@ -133,6 +206,44 @@ impl<S: crate::store::Store> Blobs<S> {
133206 & self . endpoint
134207 }
135208
209+ // pub fn add_protected(&self, cb: ProtectCb) -> Result<()> {
210+ // let mut state = self.gc_state.lock().unwrap();
211+ // match &mut *state {
212+ // GcState::Initial(cbs) => {
213+ // cbs.push(cb);
214+ // }
215+ // GcState::Started(_) => {
216+ // anyhow::bail!("cannot add protected blobs after gc has started");
217+ // }
218+ // }
219+ // Ok(())
220+ // }
221+ //
222+ // pub fn start_gc(&self, config: GcConfig) -> Result<()> {
223+ // let mut state = self.gc_state.lock().unwrap();
224+ // let protected = match state.deref_mut() {
225+ // GcState::Initial(items) => std::mem::take(items),
226+ // GcState::Started(_) => anyhow::bail!("gc already started"),
227+ // };
228+ // let protected = Arc::new(protected);
229+ // let protected_cb = move || {
230+ // let protected = protected.clone();
231+ // async move {
232+ // let mut set = BTreeSet::new();
233+ // for cb in protected.iter() {
234+ // cb(&mut set).await;
235+ // }
236+ // set
237+ // }
238+ // };
239+ // let store = self.store.clone();
240+ // let run = self
241+ // .rt
242+ // .spawn(move || async move { store.gc_run(config, protected_cb).await });
243+ // *state = GcState::Started(Some(run));
244+ // Ok(())
245+ // }
246+
136247 pub ( crate ) async fn batches ( & self ) -> tokio:: sync:: MutexGuard < ' _ , BlobBatches > {
137248 self . batches . lock ( ) . await
138249 }
@@ -271,6 +382,65 @@ impl<S: crate::store::Store> Blobs<S> {
271382 }
272383}
273384
385+ // trait BlobsInner: Debug + Send + Sync + 'static {
386+ // fn shutdown(self: Arc<Self>) -> BoxedFuture<()>;
387+ // fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>>;
388+ // fn client(self: Arc<Self>) -> MemClient;
389+ // fn local_pool_handle(&self) -> &LocalPoolHandle;
390+ // fn downloader(&self) -> &Downloader;
391+ // }
392+
393+ // #[derive(Debug)]
394+ // struct Blobs2 {
395+ // inner: Arc<dyn BlobsInner>,
396+ // }
397+
398+ // impl Blobs2 {
399+ // fn client(&self) -> MemClient {
400+ // self.inner.clone().client()
401+ // }
402+
403+ // fn local_pool_handle(&self) -> &LocalPoolHandle {
404+ // self.inner.local_pool_handle()
405+ // }
406+
407+ // fn downloader(&self) -> &Downloader {
408+ // self.inner.downloader()
409+ // }
410+ // }
411+
412+ // impl<S: crate::store::Store> BlobsInner for Blobs<S> {
413+ // fn shutdown(self: Arc<Self>) -> BoxedFuture<()> {
414+ // ProtocolHandler::shutdown(self)
415+ // }
416+
417+ // fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>> {
418+ // ProtocolHandler::accept(self, conn)
419+ // }
420+
421+ // fn client(self: Arc<Self>) -> MemClient {
422+ // Blobs::client(self)
423+ // }
424+
425+ // fn local_pool_handle(&self) -> &LocalPoolHandle {
426+ // self.rt()
427+ // }
428+
429+ // fn downloader(&self) -> &Downloader {
430+ // self.downloader()
431+ // }
432+ // }
433+
434+ // impl ProtocolHandler for Blobs2 {
435+ // fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>> {
436+ // self.inner.clone().accept(conn)
437+ // }
438+
439+ // fn shutdown(self: Arc<Self>) -> BoxedFuture<()> {
440+ // self.inner.clone().shutdown()
441+ // }
442+ // }
443+
274444impl < S : crate :: store:: Store > ProtocolHandler for Blobs < S > {
275445 fn accept ( self : Arc < Self > , conn : Connecting ) -> BoxedFuture < Result < ( ) > > {
276446 Box :: pin ( async move {
0 commit comments