diff --git a/vortex-io/src/file/object_store.rs b/vortex-io/src/file/object_store.rs index 0d09cbdcd2b..e3d58734fb2 100644 --- a/vortex-io/src/file/object_store.rs +++ b/vortex-io/src/file/object_store.rs @@ -3,6 +3,7 @@ use std::io; use std::sync::Arc; +use std::time::Duration; use async_compat::Compat; use futures::FutureExt; @@ -94,8 +95,7 @@ impl VortexReadAt for ObjectStoreSource { let store = self.store.clone(); let path = self.path.clone(); Compat::new(async move { - store - .head(&path) + hedge_request(|| store.head(&path)) .await .map(|h| h.size) .map_err(VortexError::from) @@ -117,15 +117,16 @@ impl VortexReadAt for ObjectStoreSource { Compat::new(async move { let mut buffer = ByteBufferMut::with_capacity_aligned(length, alignment); - let response = store - .get_opts( + let response = hedge_request(|| { + store.get_opts( &path, GetOptions { range: Some(GetRange::Bounded(range.clone())), ..Default::default() }, ) - .await?; + }) + .await?; let buffer = match response.payload { #[cfg(not(target_arch = "wasm32"))] @@ -166,3 +167,27 @@ impl VortexReadAt for ObjectStoreSource { .boxed() } } + +async fn hedge_request(f: F) -> T +where + F: Fn() -> FUT, + FUT: Future, +{ + let head = f(); + let hedged = async move { + // See "the AnyBlob paper": Durner et al., "Exploiting Cloud Object Storage for + // High-Performance Analytics". VLDB Vol 16 Iss 11. + // https://www.durner.dev/app/media/papers/anyblob-vldb23.pdf + tokio::time::sleep(Duration::from_millis(200)).await; + f().await + }; + + tokio::select! { + r = head => { + r + } + r = hedged => { + r + } + } +}