@@ -51,7 +51,7 @@ use crate::{
5151 ImportByteStreamMsg , ImportByteStreamUpdate , ImportBytesMsg , ImportBytesRequest ,
5252 ImportPathMsg , ImportPathRequest , ListBlobsMsg , ListTagsMsg , ListTagsRequest ,
5353 ObserveMsg , ObserveRequest , RenameTagMsg , RenameTagRequest , Scope , SetTagMsg ,
54- SetTagRequest , ShutdownMsg , SyncDbMsg ,
54+ SetTagRequest , ShutdownMsg , SyncDbMsg , WaitIdleMsg ,
5555 } ,
5656 tags:: TagInfo ,
5757 ApiClient ,
@@ -122,6 +122,7 @@ impl MemStore {
122122 options : Arc :: new ( Options :: default ( ) ) ,
123123 temp_tags : Default :: default ( ) ,
124124 protected : Default :: default ( ) ,
125+ idle_waiters : Default :: default ( ) ,
125126 }
126127 . run ( ) ,
127128 ) ;
@@ -137,6 +138,8 @@ struct Actor {
137138 options : Arc < Options > ,
138139 // temp tags
139140 temp_tags : TempTags ,
141+ // idle waiters
142+ idle_waiters : Vec < irpc:: channel:: oneshot:: Sender < ( ) > > ,
140143 protected : HashSet < Hash > ,
141144}
142145
@@ -162,6 +165,16 @@ impl Actor {
162165 let entry = self . get_or_create_entry ( hash) ;
163166 self . spawn ( import_bao ( entry, size, data, tx) ) ;
164167 }
168+ Command :: WaitIdle ( WaitIdleMsg { tx, .. } ) => {
169+ trace ! ( "wait idle" ) ;
170+ if self . tasks . is_empty ( ) {
171+ // we are currently idle
172+ tx. send ( ( ) ) . await . ok ( ) ;
173+ } else {
174+ // wait for idle state
175+ self . idle_waiters . push ( tx) ;
176+ }
177+ }
165178 Command :: Observe ( ObserveMsg {
166179 inner : ObserveRequest { hash } ,
167180 tx,
@@ -485,6 +498,12 @@ impl Actor {
485498 }
486499 TaskResult :: Unit ( _) => { }
487500 }
501+ if self . tasks. is_empty( ) {
502+ // we are idle now
503+ for tx in self . idle_waiters. drain( ..) {
504+ tx. send( ( ) ) . await . ok( ) ;
505+ }
506+ }
488507 }
489508 }
490509 } ;
0 commit comments