@@ -70,7 +70,10 @@ use std::{
7070 num:: NonZeroU64 ,
7171 ops:: Deref ,
7272 path:: { Path , PathBuf } ,
73- sync:: Arc ,
73+ sync:: {
74+ atomic:: { AtomicU64 , Ordering } ,
75+ Arc ,
76+ } ,
7477} ;
7578
7679use bao_tree:: {
@@ -711,31 +714,31 @@ trait HashSpecificCommand: HashSpecific + Send + 'static {
711714
712715impl HashSpecificCommand for ObserveMsg {
713716 async fn handle ( self , ctx : HashContext ) {
714- observe ( & ctx , self ) . await
717+ ctx . observe ( self ) . await
715718 }
716719 async fn on_error ( self , _arg : SpawnArg < EmParams > ) { }
717720}
718721impl HashSpecificCommand for ExportPathMsg {
719722 async fn handle ( self , ctx : HashContext ) {
720- export_path ( & ctx , self ) . await
723+ ctx . export_path ( self ) . await
721724 }
722725 async fn on_error ( self , _arg : SpawnArg < EmParams > ) { }
723726}
724727impl HashSpecificCommand for ExportBaoMsg {
725728 async fn handle ( self , ctx : HashContext ) {
726- export_bao ( & ctx , self ) . await
729+ ctx . export_bao ( self ) . await
727730 }
728731 async fn on_error ( self , _arg : SpawnArg < EmParams > ) { }
729732}
730733impl HashSpecificCommand for ExportRangesMsg {
731734 async fn handle ( self , ctx : HashContext ) {
732- export_ranges ( & ctx , self ) . await
735+ ctx . export_ranges ( self ) . await
733736 }
734737 async fn on_error ( self , _arg : SpawnArg < EmParams > ) { }
735738}
736739impl HashSpecificCommand for ImportBaoMsg {
737740 async fn handle ( self , ctx : HashContext ) {
738- import_bao ( & ctx , self ) . await
741+ ctx . import_bao ( self ) . await
739742 }
740743 async fn on_error ( self , _arg : SpawnArg < EmParams > ) { }
741744}
@@ -747,7 +750,7 @@ impl HashSpecific for (TempTag, ImportEntryMsg) {
747750impl HashSpecificCommand for ( TempTag , ImportEntryMsg ) {
748751 async fn handle ( self , ctx : HashContext ) {
749752 let ( tt, cmd) = self ;
750- finish_import ( & ctx , cmd, tt) . await
753+ ctx . finish_import ( cmd, tt) . await
751754 }
752755 async fn on_error ( self , _arg : SpawnArg < EmParams > ) { }
753756}
@@ -806,80 +809,87 @@ async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc<TempTagScope>)
806809 Ok ( ( ) )
807810}
808811
809- #[ instrument( skip_all, fields( hash = %cmd. hash_short( ) ) ) ]
810- async fn import_bao ( ctx : & HashContext , cmd : ImportBaoMsg ) {
811- trace ! ( "{cmd:?}" ) ;
812- let ImportBaoMsg {
813- inner : ImportBaoRequest { size, .. } ,
814- rx,
815- tx,
816- ..
817- } = cmd;
818- ctx. load ( ) . await ;
819- let res = import_bao_impl ( ctx, size, rx) . await ;
820- trace ! ( "{res:?}" ) ;
821- tx. send ( res) . await . ok ( ) ;
822- }
823-
824- #[ instrument( skip_all, fields( hash = %cmd. hash_short( ) ) ) ]
825- async fn observe ( ctx : & HashContext , cmd : ObserveMsg ) {
826- trace ! ( "{cmd:?}" ) ;
827- ctx. load ( ) . await ;
828- BaoFileStorageSubscriber :: new ( ctx. state . subscribe ( ) )
829- . forward ( cmd. tx )
830- . await
831- . ok ( ) ;
832- }
833-
834- #[ instrument( skip_all, fields( hash = %cmd. hash_short( ) ) ) ]
835- async fn export_ranges ( ctx : & HashContext , mut cmd : ExportRangesMsg ) {
836- trace ! ( "{cmd:?}" ) ;
837- ctx. load ( ) . await ;
838- if let Err ( cause) = export_ranges_impl ( ctx, cmd. inner , & mut cmd. tx ) . await {
839- cmd. tx
840- . send ( ExportRangesItem :: Error ( cause. into ( ) ) )
812+ /// The high level entry point per entry.
813+ impl HashContext {
814+ #[ instrument( skip_all, fields( hash = %cmd. hash_short( ) ) ) ]
815+ async fn import_bao ( & self , cmd : ImportBaoMsg ) {
816+ trace ! ( "{cmd:?}" ) ;
817+ self . load ( ) . await ;
818+ let ImportBaoMsg {
819+ inner : ImportBaoRequest { size, .. } ,
820+ rx,
821+ tx,
822+ ..
823+ } = cmd;
824+ let res = import_bao_impl ( self , size, rx) . await ;
825+ trace ! ( "{res:?}" ) ;
826+ tx. send ( res) . await . ok ( ) ;
827+ }
828+
829+ #[ instrument( skip_all, fields( hash = %cmd. hash_short( ) ) ) ]
830+ async fn observe ( & self , cmd : ObserveMsg ) {
831+ trace ! ( "{cmd:?}" ) ;
832+ self . load ( ) . await ;
833+ BaoFileStorageSubscriber :: new ( self . state . subscribe ( ) )
834+ . forward ( cmd. tx )
841835 . await
842836 . ok ( ) ;
843837 }
844- }
845838
846- #[ instrument( skip_all, fields( hash = %cmd. hash_short( ) ) ) ]
847- async fn export_bao ( ctx : & HashContext , mut cmd : ExportBaoMsg ) {
848- ctx . load ( ) . await ;
849- if let Err ( cause ) = export_bao_impl ( ctx , cmd . inner , & mut cmd . tx ) . await {
850- // if the entry is in state NonExisting, this will be an io error with
851- // kind NotFound. So we must not wrap this somehow but pass it on directly.
852- cmd . tx
853- . send ( bao_tree :: io :: EncodeError :: Io ( cause ) . into ( ) )
854- . await
855- . ok ( ) ;
839+ #[ instrument( skip_all, fields( hash = %cmd. hash_short( ) ) ) ]
840+ async fn export_ranges ( & self , mut cmd : ExportRangesMsg ) {
841+ trace ! ( "{cmd:?}" ) ;
842+ self . load ( ) . await ;
843+ if let Err ( cause ) = export_ranges_impl ( self , cmd . inner , & mut cmd . tx ) . await {
844+ cmd . tx
845+ . send ( ExportRangesItem :: Error ( cause . into ( ) ) )
846+ . await
847+ . ok ( ) ;
848+ }
856849 }
857- }
858850
859- #[ instrument( skip_all, fields( hash = %cmd. hash_short( ) ) ) ]
860- async fn export_path ( ctx : & HashContext , cmd : ExportPathMsg ) {
861- let ExportPathMsg { inner, mut tx, .. } = cmd;
862- if let Err ( cause) = export_path_impl ( ctx, inner, & mut tx) . await {
863- tx. send ( cause. into ( ) ) . await . ok ( ) ;
851+ #[ instrument( skip_all, fields( hash = %cmd. hash_short( ) ) ) ]
852+ async fn export_bao ( & self , mut cmd : ExportBaoMsg ) {
853+ trace ! ( "{cmd:?}" ) ;
854+ self . load ( ) . await ;
855+ if let Err ( cause) = export_bao_impl ( self , cmd. inner , & mut cmd. tx ) . await {
856+ // if the entry is in state NonExisting, this will be an io error with
857+ // kind NotFound. So we must not wrap this somehow but pass it on directly.
858+ cmd. tx
859+ . send ( bao_tree:: io:: EncodeError :: Io ( cause) . into ( ) )
860+ . await
861+ . ok ( ) ;
862+ }
864863 }
865- }
866864
867- #[ instrument( skip_all, fields( hash = %cmd. hash_short( ) ) ) ]
868- async fn finish_import ( ctx : & HashContext , cmd : ImportEntryMsg , mut tt : TempTag ) {
869- trace ! ( "{cmd:?}" ) ;
870- let res = match finish_import_impl ( ctx, cmd. inner ) . await {
871- Ok ( ( ) ) => {
872- // for a remote call, we can't have the on_drop callback, so we have to leak the temp tag
873- // it will be cleaned up when either the process exits or scope ends
874- if cmd. tx . is_rpc ( ) {
875- trace ! ( "leaking temp tag {}" , tt. hash_and_format( ) ) ;
876- tt. leak ( ) ;
877- }
878- AddProgressItem :: Done ( tt)
865+ #[ instrument( skip_all, fields( hash = %cmd. hash_short( ) ) ) ]
866+ async fn export_path ( & self , cmd : ExportPathMsg ) {
867+ trace ! ( "{cmd:?}" ) ;
868+ self . load ( ) . await ;
869+ let ExportPathMsg { inner, mut tx, .. } = cmd;
870+ if let Err ( cause) = export_path_impl ( self , inner, & mut tx) . await {
871+ tx. send ( cause. into ( ) ) . await . ok ( ) ;
879872 }
880- Err ( cause) => AddProgressItem :: Error ( cause) ,
881- } ;
882- cmd. tx . send ( res) . await . ok ( ) ;
873+ }
874+
875+ #[ instrument( skip_all, fields( hash = %cmd. hash_short( ) ) ) ]
876+ async fn finish_import ( & self , cmd : ImportEntryMsg , mut tt : TempTag ) {
877+ trace ! ( "{cmd:?}" ) ;
878+ self . load ( ) . await ;
879+ let res = match finish_import_impl ( self , cmd. inner ) . await {
880+ Ok ( ( ) ) => {
881+ // for a remote call, we can't have the on_drop callback, so we have to leak the temp tag
882+ // it will be cleaned up when either the process exits or scope ends
883+ if cmd. tx . is_rpc ( ) {
884+ trace ! ( "leaking temp tag {}" , tt. hash_and_format( ) ) ;
885+ tt. leak ( ) ;
886+ }
887+ AddProgressItem :: Done ( tt)
888+ }
889+ Err ( cause) => AddProgressItem :: Error ( cause) ,
890+ } ;
891+ cmd. tx . send ( res) . await . ok ( ) ;
892+ }
883893}
884894
885895async fn finish_import_impl ( ctx : & HashContext , import_data : ImportEntry ) -> io:: Result < ( ) > {
@@ -1213,8 +1223,14 @@ impl FsStore {
12131223
12141224 /// Load or create a new store with custom options, returning an additional sender for file store specific commands.
12151225 pub async fn load_with_opts ( db_path : PathBuf , options : Options ) -> anyhow:: Result < FsStore > {
1226+ static THREAD_NR : AtomicU64 = AtomicU64 :: new ( 0 ) ;
12161227 let rt = tokio:: runtime:: Builder :: new_multi_thread ( )
1217- . thread_name ( "iroh-blob-store" )
1228+ . thread_name_fn ( || {
1229+ format ! (
1230+ "iroh-blob-store-{}" ,
1231+ THREAD_NR . fetch_add( 1 , Ordering :: SeqCst )
1232+ )
1233+ } )
12181234 . enable_time ( )
12191235 . build ( ) ?;
12201236 let handle = rt. handle ( ) . clone ( ) ;
0 commit comments