6464//! safely shut down as well. Any store refs you are holding will be inoperable
6565//! after this.
6666use std:: {
67- fmt, fs,
67+ fmt:: { self , Debug } ,
68+ fs,
6869 future:: Future ,
6970 io:: Write ,
7071 num:: NonZeroU64 ,
@@ -224,7 +225,7 @@ impl entity_manager::Params for EmParams {
224225 state : entity_manager:: ActiveEntityState < Self > ,
225226 _cause : entity_manager:: ShutdownCause ,
226227 ) {
227- state. persist ( ) ;
228+ state. persist ( ) . await ;
228229 }
229230}
230231
@@ -248,13 +249,13 @@ struct Actor {
248249
249250type HashContext = ActiveEntityState < EmParams > ;
250251
251- impl HashContext {
252+ impl SyncEntityApi for HashContext {
252253 /// Load the state from the database.
253254 ///
254255 /// If the state is Initial, this will start the load.
255256 /// If it is Loading, it will wait until loading is done.
256257 /// If it is any other state, it will be a noop.
257- pub async fn load ( & self ) {
258+ async fn load ( & self ) {
258259 enum Action {
259260 Load ,
260261 Wait ,
@@ -304,32 +305,8 @@ impl HashContext {
304305 }
305306 }
306307
307- pub ( super ) fn persist ( & self ) {
308- self . state . send_if_modified ( |guard| {
309- let hash = & self . id ;
310- let BaoFileStorage :: Partial ( fs) = guard. take ( ) else {
311- return false ;
312- } ;
313- let path = self . global . options . path . bitfield_path ( hash) ;
314- trace ! ( "writing bitfield for hash {} to {}" , hash, path. display( ) ) ;
315- if let Err ( cause) = fs. sync_all ( & path) {
316- error ! (
317- "failed to write bitfield for {} at {}: {:?}" ,
318- hash,
319- path. display( ) ,
320- cause
321- ) ;
322- }
323- false
324- } ) ;
325- }
326-
327308 /// Write a batch and notify the db
328- pub ( super ) async fn write_batch (
329- & self ,
330- batch : & [ BaoContentItem ] ,
331- bitfield : & Bitfield ,
332- ) -> io:: Result < ( ) > {
309+ async fn write_batch ( & self , batch : & [ BaoContentItem ] , bitfield : & Bitfield ) -> io:: Result < ( ) > {
333310 trace ! ( "write_batch bitfield={:?} batch={}" , bitfield, batch. len( ) ) ;
334311 let mut res = Ok ( None ) ;
335312 self . state . send_if_modified ( |state| {
@@ -351,44 +328,48 @@ impl HashContext {
351328 ///
352329 /// Caution: this is a reader for the unvalidated data file. Reading this
353330 /// can produce data that does not match the hash.
354- pub fn data_reader ( & self ) -> DataReader {
331+ #[ allow( refining_impl_trait_internal) ]
332+ fn data_reader ( & self ) -> DataReader {
355333 DataReader ( self . state . clone ( ) )
356334 }
357335
358336 /// An AsyncSliceReader for the outboard file.
359337 ///
360338 /// The outboard file is used to validate the data file. It is not guaranteed
361339 /// to be complete.
362- pub fn outboard_reader ( & self ) -> OutboardReader {
340+ #[ allow( refining_impl_trait_internal) ]
341+ fn outboard_reader ( & self ) -> OutboardReader {
363342 OutboardReader ( self . state . clone ( ) )
364343 }
365344
366345 /// The most precise known total size of the data file.
367- pub fn current_size ( & self ) -> io:: Result < u64 > {
346+ fn current_size ( & self ) -> io:: Result < u64 > {
368347 match self . state . borrow ( ) . deref ( ) {
369348 BaoFileStorage :: Complete ( mem) => Ok ( mem. size ( ) ) ,
370349 BaoFileStorage :: PartialMem ( mem) => Ok ( mem. current_size ( ) ) ,
371350 BaoFileStorage :: Partial ( file) => file. current_size ( ) ,
372- BaoFileStorage :: Poisoned => io :: Result :: Err ( io:: Error :: other ( "poisoned storage" ) ) ,
373- BaoFileStorage :: Initial => io :: Result :: Err ( io:: Error :: other ( "initial" ) ) ,
374- BaoFileStorage :: Loading => io :: Result :: Err ( io:: Error :: other ( "loading" ) ) ,
375- BaoFileStorage :: NonExisting => io :: Result :: Err ( io:: ErrorKind :: NotFound . into ( ) ) ,
351+ BaoFileStorage :: Poisoned => Err ( io:: Error :: other ( "poisoned storage" ) ) ,
352+ BaoFileStorage :: Initial => Err ( io:: Error :: other ( "initial" ) ) ,
353+ BaoFileStorage :: Loading => Err ( io:: Error :: other ( "loading" ) ) ,
354+ BaoFileStorage :: NonExisting => Err ( io:: ErrorKind :: NotFound . into ( ) ) ,
376355 }
377356 }
378357
379358 /// The most precise known total size of the data file.
380- pub fn bitfield ( & self ) -> io:: Result < Bitfield > {
359+ fn bitfield ( & self ) -> io:: Result < Bitfield > {
381360 match self . state . borrow ( ) . deref ( ) {
382361 BaoFileStorage :: Complete ( mem) => Ok ( mem. bitfield ( ) ) ,
383362 BaoFileStorage :: PartialMem ( mem) => Ok ( mem. bitfield ( ) . clone ( ) ) ,
384363 BaoFileStorage :: Partial ( file) => Ok ( file. bitfield ( ) . clone ( ) ) ,
385- BaoFileStorage :: Poisoned => io :: Result :: Err ( io:: Error :: other ( "poisoned storage" ) ) ,
386- BaoFileStorage :: Initial => io :: Result :: Err ( io:: Error :: other ( "initial" ) ) ,
387- BaoFileStorage :: Loading => io :: Result :: Err ( io:: Error :: other ( "loading" ) ) ,
388- BaoFileStorage :: NonExisting => io :: Result :: Err ( io:: ErrorKind :: NotFound . into ( ) ) ,
364+ BaoFileStorage :: Poisoned => Err ( io:: Error :: other ( "poisoned storage" ) ) ,
365+ BaoFileStorage :: Initial => Err ( io:: Error :: other ( "initial" ) ) ,
366+ BaoFileStorage :: Loading => Err ( io:: Error :: other ( "loading" ) ) ,
367+ BaoFileStorage :: NonExisting => Err ( io:: ErrorKind :: NotFound . into ( ) ) ,
389368 }
390369 }
370+ }
391371
372+ impl HashContext {
392373 /// The outboard for the file.
393374 pub fn outboard ( & self ) -> io:: Result < PreOrderOutboard < OutboardReader > > {
394375 let tree = BaoTree :: new ( self . current_size ( ) ?, IROH_BLOCK_SIZE ) ;
@@ -722,25 +703,62 @@ impl HashSpecificCommand for ExportPathMsg {
722703 async fn handle ( self , ctx : HashContext ) {
723704 ctx. export_path ( self ) . await
724705 }
725- async fn on_error ( self , _arg : SpawnArg < EmParams > ) { }
706+ async fn on_error ( self , arg : SpawnArg < EmParams > ) {
707+ let err = match arg {
708+ SpawnArg :: Busy => io:: ErrorKind :: ResourceBusy . into ( ) ,
709+ SpawnArg :: Dead => io:: Error :: other ( "entity is dead" ) ,
710+ _ => unreachable ! ( ) ,
711+ } ;
712+ self . tx
713+ . send ( ExportProgressItem :: Error ( api:: Error :: Io ( err) ) )
714+ . await
715+ . ok ( ) ;
716+ }
726717}
727718impl HashSpecificCommand for ExportBaoMsg {
728719 async fn handle ( self , ctx : HashContext ) {
729720 ctx. export_bao ( self ) . await
730721 }
731- async fn on_error ( self , _arg : SpawnArg < EmParams > ) { }
722+ async fn on_error ( self , arg : SpawnArg < EmParams > ) {
723+ let err = match arg {
724+ SpawnArg :: Busy => io:: ErrorKind :: ResourceBusy . into ( ) ,
725+ SpawnArg :: Dead => io:: Error :: other ( "entity is dead" ) ,
726+ _ => unreachable ! ( ) ,
727+ } ;
728+ self . tx
729+ . send ( EncodedItem :: Error ( bao_tree:: io:: EncodeError :: Io ( err) ) )
730+ . await
731+ . ok ( ) ;
732+ }
732733}
733734impl HashSpecificCommand for ExportRangesMsg {
734735 async fn handle ( self , ctx : HashContext ) {
735736 ctx. export_ranges ( self ) . await
736737 }
737- async fn on_error ( self , _arg : SpawnArg < EmParams > ) { }
738+ async fn on_error ( self , arg : SpawnArg < EmParams > ) {
739+ let err = match arg {
740+ SpawnArg :: Busy => io:: ErrorKind :: ResourceBusy . into ( ) ,
741+ SpawnArg :: Dead => io:: Error :: other ( "entity is dead" ) ,
742+ _ => unreachable ! ( ) ,
743+ } ;
744+ self . tx
745+ . send ( ExportRangesItem :: Error ( api:: Error :: Io ( err) ) )
746+ . await
747+ . ok ( ) ;
748+ }
738749}
739750impl HashSpecificCommand for ImportBaoMsg {
740751 async fn handle ( self , ctx : HashContext ) {
741752 ctx. import_bao ( self ) . await
742753 }
743- async fn on_error ( self , _arg : SpawnArg < EmParams > ) { }
754+ async fn on_error ( self , arg : SpawnArg < EmParams > ) {
755+ let err = match arg {
756+ SpawnArg :: Busy => io:: ErrorKind :: ResourceBusy . into ( ) ,
757+ SpawnArg :: Dead => io:: Error :: other ( "entity is dead" ) ,
758+ _ => unreachable ! ( ) ,
759+ } ;
760+ self . tx . send ( Err ( api:: Error :: Io ( err) ) ) . await . ok ( ) ;
761+ }
744762}
745763impl HashSpecific for ( TempTag , ImportEntryMsg ) {
746764 fn hash ( & self ) -> Hash {
@@ -752,7 +770,14 @@ impl HashSpecificCommand for (TempTag, ImportEntryMsg) {
752770 let ( tt, cmd) = self ;
753771 ctx. finish_import ( cmd, tt) . await
754772 }
755- async fn on_error ( self , _arg : SpawnArg < EmParams > ) { }
773+ async fn on_error ( self , arg : SpawnArg < EmParams > ) {
774+ let err = match arg {
775+ SpawnArg :: Busy => io:: ErrorKind :: ResourceBusy . into ( ) ,
776+ SpawnArg :: Dead => io:: Error :: other ( "entity is dead" ) ,
777+ _ => unreachable ! ( ) ,
778+ } ;
779+ self . 1 . tx . send ( AddProgressItem :: Error ( err) ) . await . ok ( ) ;
780+ }
756781}
757782
758783struct RtWrapper ( Option < tokio:: runtime:: Runtime > ) ;
@@ -809,8 +834,50 @@ async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>)
809834 Ok ( ( ) )
810835}
811836
837+ /// The minimal API you need to implement for an entity for a store to work.
838+ trait EntityApi {
839+ /// Import from a stream of n0 bao encoded data.
840+ async fn import_bao ( & self , cmd : ImportBaoMsg ) ;
841+ /// Finish an import from a local file or memory.
842+ async fn finish_import ( & self , cmd : ImportEntryMsg , tt : TempTag ) ;
843+ /// Observe the bitfield of the entry.
844+ async fn observe ( & self , cmd : ObserveMsg ) ;
845+ /// Export byte ranges of the entry as data
846+ async fn export_ranges ( & self , cmd : ExportRangesMsg ) ;
847+ /// Export chunk ranges of the entry as a n0 bao encoded stream.
848+ async fn export_bao ( & self , cmd : ExportBaoMsg ) ;
849+ /// Export the entry to a local file.
850+ async fn export_path ( & self , cmd : ExportPathMsg ) ;
851+ /// Persist the entry at the end of its lifecycle.
852+ async fn persist ( & self ) ;
853+ }
854+
855+ /// A more opinionated API that can be used as a helper to save implementation
856+ /// effort when implementing the EntityApi trait.
857+ trait SyncEntityApi : EntityApi {
858+ /// Load the entry state from the database. This must make sure that it is
859+ /// not run concurrently, so if load is called multiple times, all but one
860+ /// must wait. You can use a tokio::sync::OnceCell or similar to achieve this.
861+ async fn load ( & self ) ;
862+
863+ /// Get a synchronous reader for the data file.
864+ fn data_reader ( & self ) -> impl ReadBytesAt ;
865+
866+ /// Get a synchronous reader for the outboard file.
867+ fn outboard_reader ( & self ) -> impl ReadAt ;
868+
869+ /// Get the best known size of the data file.
870+ fn current_size ( & self ) -> io:: Result < u64 > ;
871+
872+ /// Get the bitfield of the entry.
873+ fn bitfield ( & self ) -> io:: Result < Bitfield > ;
874+
875+ /// Write a batch of content items to the entry.
876+ async fn write_batch ( & self , batch : & [ BaoContentItem ] , bitfield : & Bitfield ) -> io:: Result < ( ) > ;
877+ }
878+
812879/// The high level entry point per entry.
813- impl HashContext {
880+ impl EntityApi for HashContext {
814881 #[ instrument( skip_all, fields( hash = %cmd. hash_short( ) ) ) ]
815882 async fn import_bao ( & self , cmd : ImportBaoMsg ) {
816883 trace ! ( "{cmd:?}" ) ;
@@ -890,6 +957,27 @@ impl HashContext {
890957 } ;
891958 cmd. tx . send ( res) . await . ok ( ) ;
892959 }
960+
961+ #[ instrument( skip_all, fields( hash = %self . id. fmt_short( ) ) ) ]
962+ async fn persist ( & self ) {
963+ self . state . send_if_modified ( |guard| {
964+ let hash = & self . id ;
965+ let BaoFileStorage :: Partial ( fs) = guard. take ( ) else {
966+ return false ;
967+ } ;
968+ let path = self . global . options . path . bitfield_path ( hash) ;
969+ trace ! ( "writing bitfield for hash {} to {}" , hash, path. display( ) ) ;
970+ if let Err ( cause) = fs. sync_all ( & path) {
971+ error ! (
972+ "failed to write bitfield for {} at {}: {:?}" ,
973+ hash,
974+ path. display( ) ,
975+ cause
976+ ) ;
977+ }
978+ false
979+ } ) ;
980+ }
893981}
894982
895983async fn finish_import_impl ( ctx : & HashContext , import_data : ImportEntry ) -> io:: Result < ( ) > {
0 commit comments