@@ -35,7 +35,6 @@ use vss_client::util::retry::{
3535use vss_client:: util:: storable_builder:: { EntropySource , StorableBuilder } ;
3636
3737use crate :: io:: utils:: check_namespace_key_validity;
38- use crate :: runtime:: Runtime ;
3938
4039type CustomRetryPolicy = FilteredRetryPolicy <
4140 JitteredRetryPolicy <
@@ -71,7 +70,6 @@ pub struct VssStore {
7170 // Version counter to ensure that writes are applied in the correct order. It is assumed that read and list
7271 // operations aren't sensitive to the order of execution.
7372 next_version : AtomicU64 ,
74- runtime : Arc < Runtime > ,
7573 // A VSS-internal runtime we use to avoid any deadlocks we could hit when waiting on a spawned
7674 // blocking task to finish while the blocked thread had acquired the reactor. In particular,
7775 // this works around a previously-hit case where a concurrent call to
@@ -84,7 +82,7 @@ pub struct VssStore {
8482impl VssStore {
8583 pub ( crate ) fn new (
8684 base_url : String , store_id : String , vss_seed : [ u8 ; 32 ] ,
87- header_provider : Arc < dyn VssHeaderProvider > , runtime : Arc < Runtime > ,
85+ header_provider : Arc < dyn VssHeaderProvider > ,
8886 ) -> Self {
8987 let next_version = AtomicU64 :: new ( 1 ) ;
9088 let internal_runtime = Some (
@@ -128,7 +126,7 @@ impl VssStore {
128126 key_obfuscator,
129127 ) ) ;
130128
131- Self { inner, next_version, runtime , internal_runtime }
129+ Self { inner, next_version, internal_runtime }
132130 }
133131
134132 // Same logic as for the obfuscated keys below, but just for locking, using the plaintext keys
@@ -175,13 +173,14 @@ impl KVStoreSync for VssStore {
175173 async move { inner. read_internal ( primary_namespace, secondary_namespace, key) . await } ;
176174 // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
177175 // times out.
178- let spawned_fut = internal_runtime. spawn ( async move {
179- tokio:: time:: timeout ( VSS_IO_TIMEOUT , fut) . await . map_err ( |_| {
180- let msg = "VssStore::read timed out" ;
181- Error :: new ( ErrorKind :: Other , msg)
182- } )
183- } ) ;
184- self . runtime . block_on ( spawned_fut) . expect ( "We should always finish" ) ?
176+ tokio:: task:: block_in_place ( move || {
177+ internal_runtime. block_on ( async move {
178+ tokio:: time:: timeout ( VSS_IO_TIMEOUT , fut) . await . map_err ( |_| {
179+ let msg = "VssStore::read timed out" ;
180+ Error :: new ( ErrorKind :: Other , msg)
181+ } )
182+ } ) ?
183+ } )
185184 }
186185
187186 fn write (
@@ -213,13 +212,14 @@ impl KVStoreSync for VssStore {
213212 } ;
214213 // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
215214 // times out.
216- let spawned_fut = internal_runtime. spawn ( async move {
217- tokio:: time:: timeout ( VSS_IO_TIMEOUT , fut) . await . map_err ( |_| {
218- let msg = "VssStore::write timed out" ;
219- Error :: new ( ErrorKind :: Other , msg)
220- } )
221- } ) ;
222- self . runtime . block_on ( spawned_fut) . expect ( "We should always finish" ) ?
215+ tokio:: task:: block_in_place ( move || {
216+ internal_runtime. block_on ( async move {
217+ tokio:: time:: timeout ( VSS_IO_TIMEOUT , fut) . await . map_err ( |_| {
218+ let msg = "VssStore::write timed out" ;
219+ Error :: new ( ErrorKind :: Other , msg)
220+ } )
221+ } ) ?
222+ } )
223223 }
224224
225225 fn remove (
@@ -250,13 +250,14 @@ impl KVStoreSync for VssStore {
250250 } ;
251251 // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
252252 // times out.
253- let spawned_fut = internal_runtime. spawn ( async move {
254- tokio:: time:: timeout ( VSS_IO_TIMEOUT , fut) . await . map_err ( |_| {
255- let msg = "VssStore::remove timed out" ;
256- Error :: new ( ErrorKind :: Other , msg)
257- } )
258- } ) ;
259- self . runtime . block_on ( spawned_fut) . expect ( "We should always finish" ) ?
253+ tokio:: task:: block_in_place ( move || {
254+ internal_runtime. block_on ( async move {
255+ tokio:: time:: timeout ( VSS_IO_TIMEOUT , fut) . await . map_err ( |_| {
256+ let msg = "VssStore::remove timed out" ;
257+ Error :: new ( ErrorKind :: Other , msg)
258+ } )
259+ } ) ?
260+ } )
260261 }
261262
262263 fn list ( & self , primary_namespace : & str , secondary_namespace : & str ) -> io:: Result < Vec < String > > {
@@ -271,13 +272,14 @@ impl KVStoreSync for VssStore {
271272 let fut = async move { inner. list_internal ( primary_namespace, secondary_namespace) . await } ;
272273 // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
273274 // times out.
274- let spawned_fut = internal_runtime. spawn ( async move {
275- tokio:: time:: timeout ( VSS_IO_TIMEOUT , fut) . await . map_err ( |_| {
276- let msg = "VssStore::list timed out" ;
277- Error :: new ( ErrorKind :: Other , msg)
278- } )
279- } ) ;
280- self . runtime . block_on ( spawned_fut) . expect ( "We should always finish" ) ?
275+ tokio:: task:: block_in_place ( move || {
276+ internal_runtime. block_on ( async move {
277+ tokio:: time:: timeout ( VSS_IO_TIMEOUT , fut) . await . map_err ( |_| {
278+ let msg = "VssStore::list timed out" ;
279+ Error :: new ( ErrorKind :: Other , msg)
280+ } )
281+ } ) ?
282+ } )
281283 }
282284}
283285
@@ -669,7 +671,6 @@ mod tests {
669671
670672 use super :: * ;
671673 use crate :: io:: test_utils:: do_read_write_remove_list_persist;
672- use crate :: logger:: Logger ;
673674
674675 #[ test]
675676 fn vss_read_write_remove_list_persist ( ) {
@@ -679,11 +680,7 @@ mod tests {
679680 let mut vss_seed = [ 0u8 ; 32 ] ;
680681 rng. fill_bytes ( & mut vss_seed) ;
681682 let header_provider = Arc :: new ( FixedHeaders :: new ( HashMap :: new ( ) ) ) ;
682- let logger = Arc :: new ( Logger :: new_log_facade ( ) ) ;
683- let runtime = Arc :: new ( Runtime :: new ( logger) . unwrap ( ) ) ;
684- let vss_store =
685- VssStore :: new ( vss_base_url, rand_store_id, vss_seed, header_provider, runtime) ;
686-
683+ let vss_store = VssStore :: new ( vss_base_url, rand_store_id, vss_seed, header_provider) ;
687684 do_read_write_remove_list_persist ( & vss_store) ;
688685 }
689686
@@ -695,10 +692,7 @@ mod tests {
695692 let mut vss_seed = [ 0u8 ; 32 ] ;
696693 rng. fill_bytes ( & mut vss_seed) ;
697694 let header_provider = Arc :: new ( FixedHeaders :: new ( HashMap :: new ( ) ) ) ;
698- let logger = Arc :: new ( Logger :: new_log_facade ( ) ) ;
699- let runtime = Arc :: new ( Runtime :: new ( logger) . unwrap ( ) ) ;
700- let vss_store =
701- VssStore :: new ( vss_base_url, rand_store_id, vss_seed, header_provider, runtime) ;
695+ let vss_store = VssStore :: new ( vss_base_url, rand_store_id, vss_seed, header_provider) ;
702696
703697 do_read_write_remove_list_persist ( & vss_store) ;
704698 drop ( vss_store)
0 commit comments