@@ -84,7 +84,7 @@ use bao_tree::{
8484} ;
8585use bytes:: Bytes ;
8686use delete_set:: { BaoFilePart , ProtectHandle } ;
87- use entity_manager:: { EntityManager , Options as EntityManagerOptions } ;
87+ use entity_manager:: { EntityManager , Options as EntityManagerOptions , SpawnArg } ;
8888use entry_state:: { DataLocation , OutboardLocation } ;
8989use gc:: run_gc;
9090use import:: { ImportEntry , ImportSource } ;
@@ -201,11 +201,10 @@ impl TaskContext {
201201 }
202202}
203203
204- #[ derive( Debug , Clone , Default ) ]
205- struct EntityState ;
206-
207- impl entity_manager:: Reset for EntityState {
208- fn reset ( & mut self ) { }
204+ impl entity_manager:: Reset for Slot {
205+ fn reset ( & mut self ) {
206+ self . 0 = Arc :: new ( tokio:: sync:: Mutex :: new ( None ) ) ;
207+ }
209208}
210209
211210#[ derive( Debug ) ]
@@ -216,7 +215,7 @@ impl entity_manager::Params for EmParams {
216215
217216 type GlobalState = Arc < TaskContext > ;
218217
219- type EntityState = EntityState ;
218+ type EntityState = Slot ;
220219
221220 async fn on_shutdown (
222221 state : entity_manager:: ActiveEntityState < Self > ,
@@ -235,11 +234,7 @@ struct Actor {
235234 fs_cmd_rx : tokio:: sync:: mpsc:: Receiver < InternalCommand > ,
236235 // Tasks for import and export operations.
237236 tasks : JoinSet < ( ) > ,
238- // Running tasks
239- running : HashSet < Id > ,
240- // handles
241- handles : HashMap < Hash , Slot > ,
242-
237+ // Entity handler
243238 handles2 : EntityManager < EmParams > ,
244239 // temp tags
245240 temp_tags : TempTags ,
@@ -293,14 +288,6 @@ impl HashContext {
293288 self . db ( ) . set ( hash, state) . await
294289 }
295290
296- pub async fn get_maybe_create ( & self , hash : Hash , create : bool ) -> api:: Result < BaoFileHandle > {
297- if create {
298- self . get_or_create ( hash) . await
299- } else {
300- self . get ( hash) . await
301- }
302- }
303-
304291 pub async fn get ( & self , hash : Hash ) -> api:: Result < BaoFileHandle > {
305292 if hash == Hash :: EMPTY {
306293 return Ok ( self . ctx . empty . clone ( ) ) ;
@@ -433,17 +420,12 @@ impl Actor {
433420
434421 fn spawn ( & mut self , fut : impl Future < Output = ( ) > + Send + ' static ) {
435422 let span = tracing:: Span :: current ( ) ;
436- let id = self . tasks . spawn ( fut. instrument ( span) ) . id ( ) ;
437- self . running . insert ( id) ;
423+ self . tasks . spawn ( fut. instrument ( span) ) ;
438424 }
439425
440- fn log_task_result ( & mut self , res : Result < ( Id , ( ) ) , JoinError > ) {
426+ fn log_task_result ( & mut self , res : Result < ( ) , JoinError > ) {
441427 match res {
442- Ok ( ( id, _) ) => {
443- // println!("task {id} finished");
444- self . running . remove ( & id) ;
445- // println!("{:?}", self.running);
446- }
428+ Ok ( _) => { }
447429 Err ( e) => {
448430 error ! ( "task failed: {e}" ) ;
449431 }
@@ -459,26 +441,6 @@ impl Actor {
459441 tx. send ( tt) . await . ok ( ) ;
460442 }
461443
462- async fn clear_dead_handles ( & mut self ) {
463- let mut to_remove = Vec :: new ( ) ;
464- for ( hash, slot) in & self . handles {
465- if !slot. is_live ( ) . await {
466- to_remove. push ( * hash) ;
467- }
468- }
469- for hash in to_remove {
470- if let Some ( slot) = self . handles . remove ( & hash) {
471- // do a quick check if the handle has become alive in the meantime, and reinsert it
472- let guard = slot. 0 . lock ( ) . await ;
473- let is_live = guard. as_ref ( ) . map ( |x| !x. is_dead ( ) ) . unwrap_or_default ( ) ;
474- if is_live {
475- drop ( guard) ;
476- self . handles . insert ( hash, slot) ;
477- }
478- }
479- }
480- }
481-
482444 async fn handle_command ( & mut self , cmd : Command ) {
483445 let span = cmd. parent_span ( ) ;
484446 let _entered = span. enter ( ) ;
@@ -513,7 +475,6 @@ impl Actor {
513475 }
514476 Command :: ClearProtected ( cmd) => {
515477 trace ! ( "{cmd:?}" ) ;
516- self . clear_dead_handles ( ) . await ;
517478 self . db ( ) . send ( cmd. into ( ) ) . await . ok ( ) ;
518479 }
519480 Command :: BlobStatus ( cmd) => {
@@ -569,40 +530,112 @@ impl Actor {
569530 }
570531 Command :: ExportPath ( cmd) => {
571532 trace ! ( "{cmd:?}" ) ;
572- let ctx = self . hash_context ( cmd. hash ) ;
573- self . spawn ( export_path ( cmd, ctx) ) ;
533+ let ctx = self . context . clone ( ) ;
534+ self . handles2
535+ . spawn ( cmd. hash , |state| async move {
536+ match state {
537+ SpawnArg :: Active ( state) => {
538+ let ctx = HashContext {
539+ slot : state. state ,
540+ ctx,
541+ } ;
542+ export_path ( cmd, ctx) . await
543+ }
544+ _ => { }
545+ }
546+ } )
547+ . await
548+ . ok ( ) ;
549+ // let ctx = self.hash_context(cmd.hash);
550+ // self.spawn(export_path(cmd, ctx));
574551 }
575552 Command :: ExportBao ( cmd) => {
576553 trace ! ( "{cmd:?}" ) ;
577- let ctx = self . hash_context ( cmd. hash ) ;
578- self . spawn ( export_bao ( cmd, ctx) ) ;
554+ let ctx = self . context . clone ( ) ;
555+ self . handles2
556+ . spawn ( cmd. hash , |state| async move {
557+ match state {
558+ SpawnArg :: Active ( state) => {
559+ let ctx = HashContext {
560+ slot : state. state ,
561+ ctx,
562+ } ;
563+ export_bao ( cmd, ctx) . await
564+ }
565+ _ => { }
566+ }
567+ } )
568+ . await
569+ . ok ( ) ;
570+ // let ctx = self.hash_context(cmd.hash);
571+ // self.spawn(export_bao(cmd, ctx));
579572 }
580573 Command :: ExportRanges ( cmd) => {
581574 trace ! ( "{cmd:?}" ) ;
582- let ctx = self . hash_context ( cmd. hash ) ;
583- self . spawn ( export_ranges ( cmd, ctx) ) ;
575+ let ctx = self . context . clone ( ) ;
576+ self . handles2
577+ . spawn ( cmd. hash , |state| async move {
578+ match state {
579+ SpawnArg :: Active ( state) => {
580+ let ctx = HashContext {
581+ slot : state. state ,
582+ ctx,
583+ } ;
584+ export_ranges ( cmd, ctx) . await
585+ }
586+ _ => { }
587+ }
588+ } )
589+ . await
590+ . ok ( ) ;
591+ // let ctx = self.hash_context(cmd.hash);
592+ // self.spawn(export_ranges(cmd, ctx));
584593 }
585594 Command :: ImportBao ( cmd) => {
586595 trace ! ( "{cmd:?}" ) ;
587- let ctx = self . hash_context ( cmd. hash ) ;
588- self . spawn ( import_bao ( cmd, ctx) ) ;
596+ let ctx = self . context . clone ( ) ;
597+ self . handles2
598+ . spawn ( cmd. hash , |state| async move {
599+ match state {
600+ SpawnArg :: Active ( state) => {
601+ let ctx = HashContext {
602+ slot : state. state ,
603+ ctx,
604+ } ;
605+ import_bao ( cmd, ctx) . await
606+ }
607+ _ => { }
608+ }
609+ } )
610+ . await
611+ . ok ( ) ;
612+ // let ctx = self.hash_context(cmd.hash);
613+ // self.spawn(import_bao(cmd, ctx));
589614 }
590615 Command :: Observe ( cmd) => {
591616 trace ! ( "{cmd:?}" ) ;
592- let ctx = self . hash_context ( cmd. hash ) ;
593- self . spawn ( observe ( cmd, ctx) ) ;
617+ let ctx = self . context . clone ( ) ;
618+ self . handles2
619+ . spawn ( cmd. hash , |state| async move {
620+ match state {
621+ SpawnArg :: Active ( state) => {
622+ let ctx = HashContext {
623+ slot : state. state ,
624+ ctx,
625+ } ;
626+ observe ( cmd, ctx) . await
627+ }
628+ _ => { }
629+ }
630+ } )
631+ . await
632+ . ok ( ) ;
633+ // let ctx = self.hash_context(cmd.hash);
634+ // self.spawn(observe(cmd, ctx));
594635 }
595636 }
596637 }
597638
598- /// Create a hash context for a given hash.
599- fn hash_context ( & mut self , hash : Hash ) -> HashContext {
600- HashContext {
601- slot : self . handles . entry ( hash) . or_default ( ) . clone ( ) ,
602- ctx : self . context . clone ( ) ,
603- }
604- }
605-
606639 async fn handle_fs_command ( & mut self , cmd : InternalCommand ) {
607640 let span = cmd. parent_span ( ) ;
608641 let _entered = span. enter ( ) ;
@@ -630,8 +663,22 @@ impl Actor {
630663 format : cmd. format ,
631664 } ,
632665 ) ;
633- let ctx = self . hash_context ( cmd. hash ) ;
634- self . spawn ( finish_import ( cmd, tt, ctx) ) ;
666+ let ctx = self . context . clone ( ) ;
667+ self . handles2
668+ . spawn ( cmd. hash , |state| async move {
669+ match state {
670+ SpawnArg :: Active ( state) => {
671+ let ctx = HashContext {
672+ slot : state. state ,
673+ ctx,
674+ } ;
675+ finish_import ( cmd, tt, ctx) . await
676+ }
677+ _ => { }
678+ }
679+ } )
680+ . await
681+ . ok ( ) ;
635682 }
636683 }
637684 }
@@ -649,7 +696,7 @@ impl Actor {
649696 Some ( cmd) = self . fs_cmd_rx. recv( ) => {
650697 self . handle_fs_command( cmd) . await ;
651698 }
652- Some ( res) = self . tasks. join_next_with_id ( ) , if !self . tasks. is_empty( ) => {
699+ Some ( res) = self . tasks. join_next ( ) , if !self . tasks. is_empty( ) => {
653700 self . log_task_result( res) ;
654701 }
655702 }
@@ -700,8 +747,6 @@ impl Actor {
700747 cmd_rx,
701748 fs_cmd_rx : fs_commands_rx,
702749 tasks : JoinSet :: new ( ) ,
703- running : HashSet :: new ( ) ,
704- handles : Default :: default ( ) ,
705750 handles2 : EntityManager :: new ( slot_context, EntityManagerOptions :: default ( ) ) ,
706751 temp_tags : Default :: default ( ) ,
707752 _rt : rt,
@@ -1017,7 +1062,7 @@ async fn export_ranges_impl(
10171062
10181063#[ instrument( skip_all, fields( hash = %cmd. hash_short( ) ) ) ]
10191064async fn export_bao ( mut cmd : ExportBaoMsg , ctx : HashContext ) {
1020- match ctx. get_maybe_create ( cmd. hash , false ) . await {
1065+ match ctx. get ( cmd. hash ) . await {
10211066 Ok ( handle) => {
10221067 if let Err ( cause) = export_bao_impl ( cmd. inner , & mut cmd. tx , handle) . await {
10231068 cmd. tx
0 commit comments