Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 30 additions & 5 deletions vortex-io/src/file/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use std::io;
use std::sync::Arc;
use std::time::Duration;

use async_compat::Compat;
use futures::FutureExt;
Expand Down Expand Up @@ -94,8 +95,7 @@ impl VortexReadAt for ObjectStoreSource {
let store = self.store.clone();
let path = self.path.clone();
Compat::new(async move {
store
.head(&path)
hedge_request(|| store.head(&path))
.await
.map(|h| h.size)
.map_err(VortexError::from)
Expand All @@ -117,15 +117,16 @@ impl VortexReadAt for ObjectStoreSource {
Compat::new(async move {
let mut buffer = ByteBufferMut::with_capacity_aligned(length, alignment);

let response = store
.get_opts(
let response = hedge_request(|| {
store.get_opts(
&path,
GetOptions {
range: Some(GetRange::Bounded(range.clone())),
..Default::default()
},
)
.await?;
})
.await?;

let buffer = match response.payload {
#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -166,3 +167,27 @@ impl VortexReadAt for ObjectStoreSource {
.boxed()
}
}

async fn hedge_request<F, FUT, T>(f: F) -> T
where
F: Fn() -> FUT,
FUT: Future<Output = T>,
{
let head = f();
let hedged = async move {
// See "the AnyBlob paper": Durner et al., "Exploiting Cloud Object Storage for
// High-Performance Analytics". VLDB Vol 16 Iss 11.
// https://www.durner.dev/app/media/papers/anyblob-vldb23.pdf
tokio::time::sleep(Duration::from_millis(200)).await;
f().await
};

tokio::select! {
r = head => {
r
}
r = hedged => {
r
}
}
}
Loading