@@ -35,7 +35,6 @@ use vss_client_ng::util::retry::{
3535use vss_client_ng:: util:: storable_builder:: { EntropySource , StorableBuilder } ;
3636
3737use crate :: io:: utils:: check_namespace_key_validity;
38- use crate :: runtime:: Runtime ;
3938
4039type CustomRetryPolicy = FilteredRetryPolicy <
4140 JitteredRetryPolicy <
@@ -66,7 +65,6 @@ pub struct VssStore {
6665 // Version counter to ensure that writes are applied in the correct order. It is assumed that read and list
6766 // operations aren't sensitive to the order of execution.
6867 next_version : AtomicU64 ,
69- runtime : Arc < Runtime > ,
7068 // A VSS-internal runtime we use to avoid any deadlocks we could hit when waiting on a spawned
7169 // blocking task to finish while the blocked thread had acquired the reactor. In particular,
7270 // this works around a previously-hit case where a concurrent call to
@@ -79,7 +77,7 @@ pub struct VssStore {
7977impl VssStore {
8078 pub ( crate ) fn new (
8179 base_url : String , store_id : String , vss_seed : [ u8 ; 32 ] ,
82- header_provider : Arc < dyn VssHeaderProvider > , runtime : Arc < Runtime > ,
80+ header_provider : Arc < dyn VssHeaderProvider > ,
8381 ) -> Self {
8482 let next_version = AtomicU64 :: new ( 1 ) ;
8583 let internal_runtime = Some (
@@ -123,7 +121,7 @@ impl VssStore {
123121 key_obfuscator,
124122 ) ) ;
125123
126- Self { inner, next_version, runtime , internal_runtime }
124+ Self { inner, next_version, internal_runtime }
127125 }
128126
129127 // Same logic as for the obfuscated keys below, but just for locking, using the plaintext keys
@@ -170,13 +168,14 @@ impl KVStoreSync for VssStore {
170168 async move { inner. read_internal ( primary_namespace, secondary_namespace, key) . await } ;
171169 // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
172170 // times out.
173- let spawned_fut = internal_runtime. spawn ( async move {
174- tokio:: time:: timeout ( VSS_IO_TIMEOUT , fut) . await . map_err ( |_| {
175- let msg = "VssStore::read timed out" ;
176- Error :: new ( ErrorKind :: Other , msg)
177- } )
178- } ) ;
179- self . runtime . block_on ( spawned_fut) . expect ( "We should always finish" ) ?
171+ tokio:: task:: block_in_place ( move || {
172+ internal_runtime. block_on ( async move {
173+ tokio:: time:: timeout ( VSS_IO_TIMEOUT , fut) . await . map_err ( |_| {
174+ let msg = "VssStore::read timed out" ;
175+ Error :: new ( ErrorKind :: Other , msg)
176+ } )
177+ } ) ?
178+ } )
180179 }
181180
182181 fn write (
@@ -208,13 +207,14 @@ impl KVStoreSync for VssStore {
208207 } ;
209208 // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
210209 // times out.
211- let spawned_fut = internal_runtime. spawn ( async move {
212- tokio:: time:: timeout ( VSS_IO_TIMEOUT , fut) . await . map_err ( |_| {
213- let msg = "VssStore::write timed out" ;
214- Error :: new ( ErrorKind :: Other , msg)
215- } )
216- } ) ;
217- self . runtime . block_on ( spawned_fut) . expect ( "We should always finish" ) ?
210+ tokio:: task:: block_in_place ( move || {
211+ internal_runtime. block_on ( async move {
212+ tokio:: time:: timeout ( VSS_IO_TIMEOUT , fut) . await . map_err ( |_| {
213+ let msg = "VssStore::write timed out" ;
214+ Error :: new ( ErrorKind :: Other , msg)
215+ } )
216+ } ) ?
217+ } )
218218 }
219219
220220 fn remove (
@@ -245,13 +245,14 @@ impl KVStoreSync for VssStore {
245245 } ;
246246 // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
247247 // times out.
248- let spawned_fut = internal_runtime. spawn ( async move {
249- tokio:: time:: timeout ( VSS_IO_TIMEOUT , fut) . await . map_err ( |_| {
250- let msg = "VssStore::remove timed out" ;
251- Error :: new ( ErrorKind :: Other , msg)
252- } )
253- } ) ;
254- self . runtime . block_on ( spawned_fut) . expect ( "We should always finish" ) ?
248+ tokio:: task:: block_in_place ( move || {
249+ internal_runtime. block_on ( async move {
250+ tokio:: time:: timeout ( VSS_IO_TIMEOUT , fut) . await . map_err ( |_| {
251+ let msg = "VssStore::remove timed out" ;
252+ Error :: new ( ErrorKind :: Other , msg)
253+ } )
254+ } ) ?
255+ } )
255256 }
256257
257258 fn list ( & self , primary_namespace : & str , secondary_namespace : & str ) -> io:: Result < Vec < String > > {
@@ -266,13 +267,14 @@ impl KVStoreSync for VssStore {
266267 let fut = async move { inner. list_internal ( primary_namespace, secondary_namespace) . await } ;
267268 // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
268269 // times out.
269- let spawned_fut = internal_runtime. spawn ( async move {
270- tokio:: time:: timeout ( VSS_IO_TIMEOUT , fut) . await . map_err ( |_| {
271- let msg = "VssStore::list timed out" ;
272- Error :: new ( ErrorKind :: Other , msg)
273- } )
274- } ) ;
275- self . runtime . block_on ( spawned_fut) . expect ( "We should always finish" ) ?
270+ tokio:: task:: block_in_place ( move || {
271+ internal_runtime. block_on ( async move {
272+ tokio:: time:: timeout ( VSS_IO_TIMEOUT , fut) . await . map_err ( |_| {
273+ let msg = "VssStore::list timed out" ;
274+ Error :: new ( ErrorKind :: Other , msg)
275+ } )
276+ } ) ?
277+ } )
276278 }
277279}
278280
@@ -660,7 +662,6 @@ mod tests {
660662
661663 use super :: * ;
662664 use crate :: io:: test_utils:: do_read_write_remove_list_persist;
663- use crate :: logger:: Logger ;
664665
665666 #[ test]
666667 fn vss_read_write_remove_list_persist ( ) {
@@ -670,11 +671,7 @@ mod tests {
670671 let mut vss_seed = [ 0u8 ; 32 ] ;
671672 rng. fill_bytes ( & mut vss_seed) ;
672673 let header_provider = Arc :: new ( FixedHeaders :: new ( HashMap :: new ( ) ) ) ;
673- let logger = Arc :: new ( Logger :: new_log_facade ( ) ) ;
674- let runtime = Arc :: new ( Runtime :: new ( logger) . unwrap ( ) ) ;
675- let vss_store =
676- VssStore :: new ( vss_base_url, rand_store_id, vss_seed, header_provider, runtime) ;
677-
674+ let vss_store = VssStore :: new ( vss_base_url, rand_store_id, vss_seed, header_provider) ;
678675 do_read_write_remove_list_persist ( & vss_store) ;
679676 }
680677
@@ -686,10 +683,7 @@ mod tests {
686683 let mut vss_seed = [ 0u8 ; 32 ] ;
687684 rng. fill_bytes ( & mut vss_seed) ;
688685 let header_provider = Arc :: new ( FixedHeaders :: new ( HashMap :: new ( ) ) ) ;
689- let logger = Arc :: new ( Logger :: new_log_facade ( ) ) ;
690- let runtime = Arc :: new ( Runtime :: new ( logger) . unwrap ( ) ) ;
691- let vss_store =
692- VssStore :: new ( vss_base_url, rand_store_id, vss_seed, header_provider, runtime) ;
686+ let vss_store = VssStore :: new ( vss_base_url, rand_store_id, vss_seed, header_provider) ;
693687
694688 do_read_write_remove_list_persist ( & vss_store) ;
695689 drop ( vss_store)
0 commit comments