@@ -243,6 +243,8 @@ struct Actor {
243243 handles : EntityManagerState < EmParams > ,
244244 // temp tags
245245 temp_tags : TempTags ,
246+ // waiters for idle state.
247+ idle_waiters : Vec < irpc:: channel:: oneshot:: Sender < ( ) > > ,
246248 // our private tokio runtime. It has to live somewhere.
247249 _rt : RtWrapper ,
248250}
@@ -456,6 +458,16 @@ impl Actor {
456458 trace ! ( "{cmd:?}" ) ;
457459 self . db ( ) . send ( cmd. into ( ) ) . await . ok ( ) ;
458460 }
461+ Command :: WaitIdle ( cmd) => {
462+ trace ! ( "{cmd:?}" ) ;
463+ if self . tasks . is_empty ( ) {
464+ // we are currently idle
465+ cmd. tx . send ( ( ) ) . await . ok ( ) ;
466+ } else {
467+ // wait for idle state
468+ self . idle_waiters . push ( cmd. tx ) ;
469+ }
470+ }
459471 Command :: Shutdown ( cmd) => {
460472 trace ! ( "{cmd:?}" ) ;
461473 self . db ( ) . send ( cmd. into ( ) ) . await . ok ( ) ;
@@ -599,6 +611,11 @@ impl Actor {
599611 }
600612 Some ( res) = self . tasks. join_next( ) , if !self . tasks. is_empty( ) => {
601613 Self :: log_task_result( res) ;
614+ if self . tasks. is_empty( ) {
615+ for tx in self . idle_waiters. drain( ..) {
616+ let _ = tx. send( ( ) ) ;
617+ }
618+ }
602619 }
603620 }
604621 }
@@ -648,6 +665,7 @@ impl Actor {
648665 tasks : JoinSet :: new ( ) ,
649666 handles : EntityManagerState :: new ( slot_context, 1024 , 32 , 32 , 2 ) ,
650667 temp_tags : Default :: default ( ) ,
668+ idle_waiters : Vec :: new ( ) ,
651669 _rt : rt,
652670 } )
653671 }
@@ -1969,6 +1987,7 @@ pub mod tests {
19691987 println ! ( "dropping batch" ) ;
19701988 drop ( batch) ;
19711989 store. sync_db ( ) . await ?;
1990+ store. wait_idle ( ) . await ?;
19721991 println ! ( "reading temp tags after batch drop" ) ;
19731992 let tts = store
19741993 . tags ( )
0 commit comments