@@ -31,7 +31,6 @@ use vss_client_ng::util::key_obfuscator::KeyObfuscator;
3131use vss_client_ng:: util:: storable_builder:: { EntropySource , StorableBuilder } ;
3232
3333use crate :: io:: utils:: check_namespace_key_validity;
34- use crate :: runtime:: Runtime ;
3534
3635// We set this to a small number of threads that would still allow to make some progress if one
3736// would hit a blocking case
@@ -44,7 +43,6 @@ pub struct VssStore {
4443 // Version counter to ensure that writes are applied in the correct order. It is assumed that read and list
4544 // operations aren't sensitive to the order of execution.
4645 next_version : AtomicU64 ,
47- runtime : Arc < Runtime > ,
4846 // A VSS-internal runtime we use to avoid any deadlocks we could hit when waiting on a spawned
4947 // blocking task to finish while the blocked thread had acquired the reactor. In particular,
5048 // this works around a previously-hit case where a concurrent call to
@@ -57,7 +55,7 @@ pub struct VssStore {
5755impl VssStore {
5856 pub ( crate ) fn new (
5957 base_url : String , store_id : String , vss_seed : [ u8 ; 32 ] ,
60- header_provider : Arc < dyn VssHeaderProvider > , runtime : Arc < Runtime > ,
58+ header_provider : Arc < dyn VssHeaderProvider > ,
6159 ) -> io:: Result < Self > {
6260 let inner = Arc :: new ( VssStoreInner :: new ( base_url, store_id, vss_seed, header_provider) ?) ;
6361 let next_version = AtomicU64 :: new ( 1 ) ;
@@ -75,7 +73,7 @@ impl VssStore {
7573 . unwrap ( ) ,
7674 ) ;
7775
78- Ok ( Self { inner, next_version, runtime , internal_runtime } )
76+ Ok ( Self { inner, next_version, internal_runtime } )
7977 }
8078
8179 // Same logic as for the obfuscated keys below, but just for locking, using the plaintext keys
@@ -122,13 +120,14 @@ impl KVStoreSync for VssStore {
122120 async move { inner. read_internal ( primary_namespace, secondary_namespace, key) . await } ;
123121 // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
124122 // times out.
125- let spawned_fut = internal_runtime. spawn ( async move {
126- tokio:: time:: timeout ( VSS_IO_TIMEOUT , fut) . await . map_err ( |_| {
127- let msg = "VssStore::read timed out" ;
128- Error :: new ( ErrorKind :: Other , msg)
129- } )
130- } ) ;
131- self . runtime . block_on ( spawned_fut) . expect ( "We should always finish" ) ?
123+ tokio:: task:: block_in_place ( move || {
124+ internal_runtime. block_on ( async move {
125+ tokio:: time:: timeout ( VSS_IO_TIMEOUT , fut) . await . map_err ( |_| {
126+ let msg = "VssStore::read timed out" ;
127+ Error :: new ( ErrorKind :: Other , msg)
128+ } )
129+ } ) ?
130+ } )
132131 }
133132
134133 fn write (
@@ -160,13 +159,14 @@ impl KVStoreSync for VssStore {
160159 } ;
161160 // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
162161 // times out.
163- let spawned_fut = internal_runtime. spawn ( async move {
164- tokio:: time:: timeout ( VSS_IO_TIMEOUT , fut) . await . map_err ( |_| {
165- let msg = "VssStore::write timed out" ;
166- Error :: new ( ErrorKind :: Other , msg)
167- } )
168- } ) ;
169- self . runtime . block_on ( spawned_fut) . expect ( "We should always finish" ) ?
162+ tokio:: task:: block_in_place ( move || {
163+ internal_runtime. block_on ( async move {
164+ tokio:: time:: timeout ( VSS_IO_TIMEOUT , fut) . await . map_err ( |_| {
165+ let msg = "VssStore::write timed out" ;
166+ Error :: new ( ErrorKind :: Other , msg)
167+ } )
168+ } ) ?
169+ } )
170170 }
171171
172172 fn remove (
@@ -197,13 +197,14 @@ impl KVStoreSync for VssStore {
197197 } ;
198198 // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
199199 // times out.
200- let spawned_fut = internal_runtime. spawn ( async move {
201- tokio:: time:: timeout ( VSS_IO_TIMEOUT , fut) . await . map_err ( |_| {
202- let msg = "VssStore::remove timed out" ;
203- Error :: new ( ErrorKind :: Other , msg)
204- } )
205- } ) ;
206- self . runtime . block_on ( spawned_fut) . expect ( "We should always finish" ) ?
200+ tokio:: task:: block_in_place ( move || {
201+ internal_runtime. block_on ( async move {
202+ tokio:: time:: timeout ( VSS_IO_TIMEOUT , fut) . await . map_err ( |_| {
203+ let msg = "VssStore::remove timed out" ;
204+ Error :: new ( ErrorKind :: Other , msg)
205+ } )
206+ } ) ?
207+ } )
207208 }
208209
209210 fn list ( & self , primary_namespace : & str , secondary_namespace : & str ) -> io:: Result < Vec < String > > {
@@ -218,13 +219,14 @@ impl KVStoreSync for VssStore {
218219 let fut = async move { inner. list_internal ( primary_namespace, secondary_namespace) . await } ;
219220 // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
220221 // times out.
221- let spawned_fut = internal_runtime. spawn ( async move {
222- tokio:: time:: timeout ( VSS_IO_TIMEOUT , fut) . await . map_err ( |_| {
223- let msg = "VssStore::list timed out" ;
224- Error :: new ( ErrorKind :: Other , msg)
225- } )
226- } ) ;
227- self . runtime . block_on ( spawned_fut) . expect ( "We should always finish" ) ?
222+ tokio:: task:: block_in_place ( move || {
223+ internal_runtime. block_on ( async move {
224+ tokio:: time:: timeout ( VSS_IO_TIMEOUT , fut) . await . map_err ( |_| {
225+ let msg = "VssStore::list timed out" ;
226+ Error :: new ( ErrorKind :: Other , msg)
227+ } )
228+ } ) ?
229+ } )
228230 }
229231}
230232
@@ -597,7 +599,6 @@ mod tests {
597599
598600 use super :: * ;
599601 use crate :: io:: test_utils:: do_read_write_remove_list_persist;
600- use crate :: logger:: Logger ;
601602
602603 #[ test]
603604 fn vss_read_write_remove_list_persist ( ) {
@@ -607,10 +608,8 @@ mod tests {
607608 let mut vss_seed = [ 0u8 ; 32 ] ;
608609 rng. fill_bytes ( & mut vss_seed) ;
609610 let header_provider = Arc :: new ( FixedHeaders :: new ( HashMap :: new ( ) ) ) ;
610- let logger = Arc :: new ( Logger :: new_log_facade ( ) ) ;
611- let runtime = Arc :: new ( Runtime :: new ( logger) . unwrap ( ) ) ;
612611 let vss_store =
613- VssStore :: new ( vss_base_url, rand_store_id, vss_seed, header_provider, runtime ) . unwrap ( ) ;
612+ VssStore :: new ( vss_base_url, rand_store_id, vss_seed, header_provider) . unwrap ( ) ;
614613
615614 do_read_write_remove_list_persist ( & vss_store) ;
616615 }
@@ -623,10 +622,8 @@ mod tests {
623622 let mut vss_seed = [ 0u8 ; 32 ] ;
624623 rng. fill_bytes ( & mut vss_seed) ;
625624 let header_provider = Arc :: new ( FixedHeaders :: new ( HashMap :: new ( ) ) ) ;
626- let logger = Arc :: new ( Logger :: new_log_facade ( ) ) ;
627- let runtime = Arc :: new ( Runtime :: new ( logger) . unwrap ( ) ) ;
628625 let vss_store =
629- VssStore :: new ( vss_base_url, rand_store_id, vss_seed, header_provider, runtime ) . unwrap ( ) ;
626+ VssStore :: new ( vss_base_url, rand_store_id, vss_seed, header_provider) . unwrap ( ) ;
630627
631628 do_read_write_remove_list_persist ( & vss_store) ;
632629 drop ( vss_store)
0 commit comments