From c408ffa5f1dbe9943106ab40d2a44d8544fd74f7 Mon Sep 17 00:00:00 2001 From: Ruoshi Date: Wed, 28 Jan 2026 21:32:00 -0800 Subject: [PATCH 1/3] feat: implement DuckDB filesystem integration for Vortex file handling --- vortex-duckdb/build.rs | 1 + vortex-duckdb/cpp/copy_function.cpp | 3 +- vortex-duckdb/cpp/file_system.cpp | 203 ++++++++++++ vortex-duckdb/cpp/include/duckdb_vx.h | 1 + .../cpp/include/duckdb_vx/copy_function.h | 3 +- .../cpp/include/duckdb_vx/file_system.h | 56 ++++ vortex-duckdb/src/copy.rs | 19 +- .../src/duckdb/copy_function/callback.rs | 5 +- vortex-duckdb/src/duckdb/copy_function/mod.rs | 2 + vortex-duckdb/src/duckdb/file_system.rs | 296 +++++++++++++++++ vortex-duckdb/src/duckdb/mod.rs | 2 + .../src/e2e_test/vortex_scan_test.rs | 47 +++ vortex-duckdb/src/scan.rs | 67 ++-- vortex-duckdb/src/utils/glob.rs | 306 ++++-------------- 14 files changed, 746 insertions(+), 265 deletions(-) create mode 100644 vortex-duckdb/cpp/file_system.cpp create mode 100644 vortex-duckdb/cpp/include/duckdb_vx/file_system.h create mode 100644 vortex-duckdb/src/duckdb/file_system.rs diff --git a/vortex-duckdb/build.rs b/vortex-duckdb/build.rs index 7a058322935..f12ed48166e 100644 --- a/vortex-duckdb/build.rs +++ b/vortex-duckdb/build.rs @@ -465,6 +465,7 @@ fn main() { .file("cpp/data_chunk.cpp") .file("cpp/error.cpp") .file("cpp/expr.cpp") + .file("cpp/file_system.cpp") .file("cpp/logical_type.cpp") .file("cpp/object_cache.cpp") .file("cpp/replacement_scan.cpp") diff --git a/vortex-duckdb/cpp/copy_function.cpp b/vortex-duckdb/cpp/copy_function.cpp index 3768ba9d1a5..819df028afe 100644 --- a/vortex-duckdb/cpp/copy_function.cpp +++ b/vortex-duckdb/cpp/copy_function.cpp @@ -71,7 +71,8 @@ unique_ptr c_init_global(ClientContext &context, FunctionDat const string &file_path) { auto &bind = bind_data.Cast(); duckdb_vx_error error_out = nullptr; - auto global_data = bind.vtab.init_global(bind.ffi_data->DataPtr(), file_path.c_str(), &error_out); + auto global_data = bind.vtab.init_global(reinterpret_cast(&context), + bind.ffi_data->DataPtr(), file_path.c_str(), &error_out); if (error_out) { throw ExecutorException(IntoErrString(error_out)); } diff --git a/vortex-duckdb/cpp/file_system.cpp b/vortex-duckdb/cpp/file_system.cpp new file mode 100644 index 00000000000..d515fa9b280 --- /dev/null +++ b/vortex-duckdb/cpp/file_system.cpp @@ -0,0 +1,203 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#include "duckdb_vx.h" + +#include +#include +#include + +#include +#include + +using namespace duckdb; + +namespace { +struct FileHandleWrapper { + explicit FileHandleWrapper(unique_ptr handle_p) : handle(std::move(handle_p)) { + } + + unique_ptr handle; +}; + +void SetError(duckdb_vx_error *error_out, const std::string &message) { + if (!error_out) { + return; + } + *error_out = duckdb_vx_error_create(message.data(), message.size()); +} + +duckdb_state HandleException(duckdb_vx_error *error_out) { + try { + throw; + } catch (const Exception &ex) { + SetError(error_out, ex.what()); + } catch (const std::exception &ex) { + SetError(error_out, ex.what()); + } catch (...) { + SetError(error_out, "Unknown error"); + } + return DuckDBError; +} + +} // namespace + +extern "C" duckdb_vx_file_handle duckdb_vx_fs_open(duckdb_vx_client_context ctx, const char *path, + duckdb_vx_error *error_out) { + if (!ctx || !path) { + SetError(error_out, "Invalid filesystem open arguments"); + return nullptr; + } + + try { + auto *client_context = reinterpret_cast(ctx); + auto &fs = FileSystem::GetFileSystem(*client_context); + auto handle = fs.OpenFile(path, FileFlags::FILE_FLAGS_READ); + return reinterpret_cast(new FileHandleWrapper(std::move(handle))); + } catch (...) { + HandleException(error_out); + return nullptr; + } +} + +extern "C" duckdb_vx_file_handle duckdb_vx_fs_create(duckdb_vx_client_context ctx, const char *path, + duckdb_vx_error *error_out) { + if (!ctx || !path) { + SetError(error_out, "Invalid filesystem create arguments"); + return nullptr; + } + + try { + auto *client_context = reinterpret_cast(ctx); + auto &fs = FileSystem::GetFileSystem(*client_context); + auto handle = fs.OpenFile(path, FileFlags::FILE_FLAGS_WRITE | FileFlags::FILE_FLAGS_FILE_CREATE); + handle->Truncate(0); + return reinterpret_cast(new FileHandleWrapper(std::move(handle))); + } catch (...) { + HandleException(error_out); + return nullptr; + } +} + +extern "C" void duckdb_vx_fs_close(duckdb_vx_file_handle *handle) { + if (!handle || !*handle) { + return; + } + auto wrapper = reinterpret_cast(*handle); + delete wrapper; + *handle = nullptr; +} + +extern "C" duckdb_state duckdb_vx_fs_get_size(duckdb_vx_file_handle handle, idx_t *size_out, + duckdb_vx_error *error_out) { + if (!handle || !size_out) { + SetError(error_out, "Invalid arguments to fs_get_size"); + return DuckDBError; + } + + try { + auto *wrapper = reinterpret_cast(handle); + *size_out = wrapper->handle->GetFileSize(); + return DuckDBSuccess; + } catch (...) { + return HandleException(error_out); + } +} + +extern "C" duckdb_state duckdb_vx_fs_read(duckdb_vx_file_handle handle, idx_t offset, idx_t len, uint8_t *buffer, + idx_t *out_len, duckdb_vx_error *error_out) { + if (!handle || !buffer || !out_len) { + SetError(error_out, "Invalid arguments to fs_read"); + return DuckDBError; + } + + try { + auto *wrapper = reinterpret_cast(handle); + wrapper->handle->Read(buffer, len, offset); + *out_len = len; + return DuckDBSuccess; + } catch (...) { + return HandleException(error_out); + } +} + +extern "C" duckdb_state duckdb_vx_fs_write(duckdb_vx_file_handle handle, idx_t offset, idx_t len, + const uint8_t *buffer, idx_t *out_len, + duckdb_vx_error *error_out) { + if (!handle || !buffer || !out_len) { + SetError(error_out, "Invalid arguments to fs_write"); + return DuckDBError; + } + + try { + auto *wrapper = reinterpret_cast(handle); + wrapper->handle->Seek(offset); + wrapper->handle->Write(const_cast(buffer), len); + *out_len = len; + return DuckDBSuccess; + } catch (...) { + return HandleException(error_out); + } +} + +extern "C" duckdb_vx_string_list duckdb_vx_fs_glob(duckdb_vx_client_context ctx, const char *pattern, + duckdb_vx_error *error_out) { + duckdb_vx_string_list result{nullptr, 0}; + + if (!ctx || !pattern) { + SetError(error_out, "Invalid arguments to fs_glob"); + return result; + } + + try { + auto *client_context = reinterpret_cast(ctx); + auto &fs = FileSystem::GetFileSystem(*client_context); + auto matches = fs.Glob(pattern); + + if (matches.empty()) { + return result; + } + + result.count = matches.size(); + result.entries = static_cast(malloc(sizeof(char *) * matches.size())); + for (size_t i = 0; i < matches.size(); i++) { + const auto &entry = matches[i].path; + auto *owned = static_cast(malloc(entry.size() + 1)); + std::memcpy(owned, entry.data(), entry.size()); + owned[entry.size()] = '\0'; + result.entries[i] = owned; + } + + return result; + } catch (...) { + HandleException(error_out); + return result; + } +} + +extern "C" void duckdb_vx_string_list_free(duckdb_vx_string_list *list) { + if (!list || !list->entries) { + return; + } + for (size_t i = 0; i < list->count; i++) { + free(const_cast(list->entries[i])); + } + free(list->entries); + list->entries = nullptr; + list->count = 0; +} + +extern "C" duckdb_state duckdb_vx_fs_sync(duckdb_vx_file_handle handle, duckdb_vx_error *error_out) { + if (!handle) { + SetError(error_out, "Invalid arguments to fs_sync"); + return DuckDBError; + } + + try { + auto *wrapper = reinterpret_cast(handle); + wrapper->handle->Sync(); + return DuckDBSuccess; + } catch (...) { + return HandleException(error_out); + } +} diff --git a/vortex-duckdb/cpp/include/duckdb_vx.h b/vortex-duckdb/cpp/include/duckdb_vx.h index b18eb79c522..9882afbb310 100644 --- a/vortex-duckdb/cpp/include/duckdb_vx.h +++ b/vortex-duckdb/cpp/include/duckdb_vx.h @@ -10,6 +10,7 @@ #include "duckdb_vx/data_chunk.h" #include "duckdb_vx/error.h" #include "duckdb_vx/expr.h" +#include "duckdb_vx/file_system.h" #include "duckdb_vx/logical_type.h" #include "duckdb_vx/object_cache.h" #include "duckdb_vx/replacement_scan.h" diff --git a/vortex-duckdb/cpp/include/duckdb_vx/copy_function.h b/vortex-duckdb/cpp/include/duckdb_vx/copy_function.h index 168921fb3a4..a5cd89fdf63 100644 --- a/vortex-duckdb/cpp/include/duckdb_vx/copy_function.h +++ b/vortex-duckdb/cpp/include/duckdb_vx/copy_function.h @@ -45,7 +45,8 @@ typedef struct { unsigned long column_name_count, const duckdb_logical_type *column_types, unsigned long column_type_count, duckdb_vx_error *error_out); - duckdb_vx_data (*init_global)(const void *bind_data, const char *file_path, duckdb_vx_error *error_out); + duckdb_vx_data (*init_global)(duckdb_vx_client_context ctx, const void *bind_data, const char *file_path, + duckdb_vx_error *error_out); duckdb_vx_data (*init_local)(const void *bind_data, duckdb_vx_error *error_out); diff --git a/vortex-duckdb/cpp/include/duckdb_vx/file_system.h b/vortex-duckdb/cpp/include/duckdb_vx/file_system.h new file mode 100644 index 00000000000..59caff68d64 --- /dev/null +++ b/vortex-duckdb/cpp/include/duckdb_vx/file_system.h @@ -0,0 +1,56 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#pragma once + +#include "duckdb.h" +#include "duckdb_vx/client_context.h" +#include "duckdb_vx/error.h" + +#ifdef __cplusplus /* If compiled as C++, use C ABI */ +extern "C" { +#endif + +typedef struct duckdb_vx_file_handle_ *duckdb_vx_file_handle; + +typedef struct { + const char **entries; + size_t count; +} duckdb_vx_string_list; + +// Open a file using DuckDB's filesystem (supports httpfs, s3, etc.). +duckdb_vx_file_handle duckdb_vx_fs_open(duckdb_vx_client_context ctx, const char *path, + duckdb_vx_error *error_out); + +// Close a previously opened file handle. +void duckdb_vx_fs_close(duckdb_vx_file_handle *handle); + +// Get the size of an opened file. +duckdb_state duckdb_vx_fs_get_size(duckdb_vx_file_handle handle, idx_t *size_out, + duckdb_vx_error *error_out); + +// Read up to len bytes at the given offset into buffer. Returns bytes read via out_len. +duckdb_state duckdb_vx_fs_read(duckdb_vx_file_handle handle, idx_t offset, idx_t len, uint8_t *buffer, + idx_t *out_len, duckdb_vx_error *error_out); + +// Expand a glob using DuckDB's filesystem. +duckdb_vx_string_list duckdb_vx_fs_glob(duckdb_vx_client_context ctx, const char *pattern, + duckdb_vx_error *error_out); + +// Free a string list allocated by duckdb_vx_fs_glob. +void duckdb_vx_string_list_free(duckdb_vx_string_list *list); + +// Create/truncate a file for writing using DuckDB's filesystem. +duckdb_vx_file_handle duckdb_vx_fs_create(duckdb_vx_client_context ctx, const char *path, + duckdb_vx_error *error_out); + +// Write len bytes at the given offset from buffer. +duckdb_state duckdb_vx_fs_write(duckdb_vx_file_handle handle, idx_t offset, idx_t len, const uint8_t *buffer, + idx_t *out_len, duckdb_vx_error *error_out); + +// Flush pending writes to storage. +duckdb_state duckdb_vx_fs_sync(duckdb_vx_file_handle handle, duckdb_vx_error *error_out); + +#ifdef __cplusplus /* End C ABI */ +} +#endif diff --git a/vortex-duckdb/src/copy.rs b/vortex-duckdb/src/copy.rs index 40a0c72ea58..56f32fc41e2 100644 --- a/vortex-duckdb/src/copy.rs +++ b/vortex-duckdb/src/copy.rs @@ -29,13 +29,20 @@ use crate::RUNTIME; use crate::SESSION; use crate::convert::data_chunk_to_vortex; use crate::convert::from_duckdb_table; +use crate::cpp; +use crate::duckdb::ClientContext; use crate::duckdb::CopyFunction; use crate::duckdb::DataChunk; use crate::duckdb::LogicalType; +use crate::duckdb::duckdb_fs_create_writer; #[derive(Debug)] pub struct VortexCopyFunction; +#[derive(Clone, Copy)] +struct SendableClientCtx(usize); +unsafe impl Send for SendableClientCtx {} + pub struct BindData { dtype: DType, fields: StructFields, @@ -118,6 +125,7 @@ impl CopyFunction for VortexCopyFunction { } fn init_global( + client_context: ClientContext, bind_data: &Self::BindData, file_path: String, ) -> VortexResult { @@ -126,9 +134,16 @@ impl CopyFunction for VortexCopyFunction { let array_stream = ArrayStreamAdapter::new(bind_data.dtype.clone(), rx.into_stream()); let handle = SESSION.handle(); + let ctx_ptr = SendableClientCtx(client_context.as_ptr() as usize); let write_task = handle.spawn(async move { - let mut file = async_fs::File::create(file_path).await?; - SESSION.write_options().write(&mut file, array_stream).await + // Prefer DuckDB FS (httpfs/s3/etc.), fallback to local async fs if unavailable. + let ctx_raw = ctx_ptr.0 as cpp::duckdb_vx_client_context; + if let Ok(writer) = unsafe { duckdb_fs_create_writer(ctx_raw, &file_path) } { + SESSION.write_options().write(writer, array_stream).await + } else { + let mut file = async_fs::File::create(&file_path).await?; + SESSION.write_options().write(&mut file, array_stream).await + } }); let worker_pool = RUNTIME.new_pool(); diff --git a/vortex-duckdb/src/duckdb/copy_function/callback.rs b/vortex-duckdb/src/duckdb/copy_function/callback.rs index 05982003544..783bd9f2a2e 100644 --- a/vortex-duckdb/src/duckdb/copy_function/callback.rs +++ b/vortex-duckdb/src/duckdb/copy_function/callback.rs @@ -15,6 +15,7 @@ use crate::cpp::duckdb_data_chunk; use crate::cpp::duckdb_logical_type; use crate::cpp::duckdb_vx_copy_func_bind_input; use crate::cpp::duckdb_vx_error; +use crate::duckdb::ClientContext; use crate::duckdb::CopyFunction; use crate::duckdb::Data; use crate::duckdb::DataChunk; @@ -52,6 +53,7 @@ pub(crate) unsafe extern "C-unwind" fn bind_callback( } pub(crate) unsafe extern "C-unwind" fn global_callback( + client_context: cpp::duckdb_vx_client_context, bind_data: *const c_void, file_path: *const c_char, error_out: *mut duckdb_vx_error, @@ -62,7 +64,8 @@ pub(crate) unsafe extern "C-unwind" fn global_callback( let bind_data = unsafe { bind_data.cast::().as_ref() } .vortex_expect("global_init_data null pointer"); try_or_null(error_out, || { - let bind_data = T::init_global(bind_data, file_path)?; + let ctx = unsafe { ClientContext::borrow(client_context) }; + let bind_data = T::init_global(ctx, bind_data, file_path)?; Ok(Data::from(Box::new(bind_data)).as_ptr()) }) } diff --git a/vortex-duckdb/src/duckdb/copy_function/mod.rs b/vortex-duckdb/src/duckdb/copy_function/mod.rs index fc8a51d0fd9..0c4a6f903b8 100644 --- a/vortex-duckdb/src/duckdb/copy_function/mod.rs +++ b/vortex-duckdb/src/duckdb/copy_function/mod.rs @@ -11,6 +11,7 @@ use vortex::error::VortexResult; use crate::Connection; use crate::cpp; +use crate::duckdb::ClientContext; use crate::duckdb::DataChunk; use crate::duckdb::LogicalType; use crate::duckdb::copy_function::callback::bind_callback; @@ -49,6 +50,7 @@ pub trait CopyFunction: Sized + Debug { /// The global operator state is used to keep track of the progress in the copy function and /// is shared between all threads working on the copy function. fn init_global( + client_context: ClientContext, bind_data: &Self::BindData, file_path: String, ) -> VortexResult; diff --git a/vortex-duckdb/src/duckdb/file_system.rs b/vortex-duckdb/src/duckdb/file_system.rs new file mode 100644 index 00000000000..8e7dfe30212 --- /dev/null +++ b/vortex-duckdb/src/duckdb/file_system.rs @@ -0,0 +1,296 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::ffi::CStr; +use std::ffi::CString; +use std::ptr; +use std::sync::Arc; +use std::sync::OnceLock; + +use futures::FutureExt; +use futures::future::BoxFuture; +use parking_lot::Mutex; +use vortex::buffer::Alignment; +use vortex::buffer::ByteBuffer; +use vortex::buffer::ByteBufferMut; +use vortex::error::VortexError; +use vortex::error::VortexResult; +use vortex::error::vortex_err; +use vortex::io::CoalesceConfig; +use vortex::io::VortexReadAt; +use vortex::io::runtime::BlockingRuntime; +use vortex::io::IoBuf; +use vortex::io::VortexWrite; + +use crate::RUNTIME; +use crate::cpp; +use crate::duckdb::ClientContext; +use crate::lifetime_wrapper; + +const DEFAULT_COALESCE: CoalesceConfig = CoalesceConfig { + distance: 1024 * 1024, // 1 MB + max_size: 8 * 1024 * 1024, // 8 MB +}; + +const DEFAULT_CONCURRENCY: usize = 64; + +lifetime_wrapper!(FsFileHandle, cpp::duckdb_vx_file_handle, cpp::duckdb_vx_fs_close, [owned, ref]); +unsafe impl Send for FsFileHandle {} +unsafe impl Sync for FsFileHandle {} +unsafe impl Send for cpp::duckdb_vx_client_context_ {} +unsafe impl Sync for cpp::duckdb_vx_client_context_ {} + +fn fs_error(err: cpp::duckdb_vx_error) -> VortexError { + if err.is_null() { + return vortex_err!("DuckDB filesystem error (unknown)"); + } + let message = unsafe { CStr::from_ptr(cpp::duckdb_vx_error_value(err)) } + .to_string_lossy() + .to_string(); + unsafe { cpp::duckdb_vx_error_free(err) }; + vortex_err!("{message}") +} + +pub fn duckdb_fs_glob(ctx: &ClientContext, pattern: &str) -> VortexResult> { + let c_pattern = CString::new(pattern).map_err(|e| vortex_err!("Invalid glob pattern: {e}"))?; + let mut err: cpp::duckdb_vx_error = ptr::null_mut(); + let mut list = + unsafe { cpp::duckdb_vx_fs_glob(ctx.as_ptr(), c_pattern.as_ptr(), &raw mut err) }; + if !err.is_null() { + return Err(fs_error(err)); + } + + let mut urls = Vec::with_capacity(list.count); + for idx in 0..list.count { + let entry = unsafe { CStr::from_ptr(*list.entries.add(idx)) }; + let entry_str = entry.to_string_lossy(); + let url = url::Url::parse(&entry_str) + .map_err(|e| vortex_err!("Invalid URL returned by DuckDB glob {entry_str}: {e}"))?; + urls.push(url); + } + + unsafe { cpp::duckdb_vx_string_list_free(&raw mut list) }; + + Ok(urls) +} + +pub unsafe fn duckdb_fs_create_writer( + ctx: cpp::duckdb_vx_client_context, + path: &str, +) -> VortexResult { + unsafe { DuckDbFsWriter::create(ctx, path) } +} + +/// A VortexReadAt implementation backed by DuckDB's filesystem (e.g., httpfs/s3). +pub struct DuckDbFsReadAt { + handle: Arc>, + uri: Arc, + size: Arc>, +} + +impl DuckDbFsReadAt { + pub unsafe fn open_url( + ctx: cpp::duckdb_vx_client_context, + url: &url::Url, + ) -> VortexResult { + let c_path = CString::new(url.as_str()).map_err(|e| vortex_err!("Invalid URL: {e}"))?; + let mut err: cpp::duckdb_vx_error = ptr::null_mut(); + let handle = unsafe { cpp::duckdb_vx_fs_open(ctx, c_path.as_ptr(), &raw mut err) }; + if handle.is_null() { + return Err(fs_error(err)); + } + + Ok(Self { + handle: Arc::new(Mutex::new(unsafe { FsFileHandle::own(handle) })), + uri: Arc::from(url.as_str()), + size: Arc::new(OnceLock::new()), + }) + } +} + +impl VortexReadAt for DuckDbFsReadAt { + fn uri(&self) -> Option<&Arc> { + Some(&self.uri) + } + + fn coalesce_config(&self) -> Option { + Some(DEFAULT_COALESCE) + } + + fn concurrency(&self) -> usize { + DEFAULT_CONCURRENCY + } + + fn size(&self) -> BoxFuture<'static, VortexResult> { + let handle = self.handle.clone(); + let size_cell = self.size.clone(); + + async move { + if let Some(size) = size_cell.get() { + return Ok(*size); + } + + let runtime = RUNTIME.handle(); + let size = runtime + .spawn_blocking(move || { + let mut err: cpp::duckdb_vx_error = ptr::null_mut(); + let mut size_out: cpp::idx_t = 0; + let status = unsafe { + cpp::duckdb_vx_fs_get_size( + handle.lock().as_ptr(), + &raw mut size_out, + &raw mut err, + ) + }; + if status != cpp::duckdb_state::DuckDBSuccess { + return Err(fs_error(err)); + } + Ok::<_, VortexError>(size_out as u64) + }) + .await?; + + let _ = size_cell.set(size); + Ok(size) + } + .boxed() + } + + fn read_at( + &self, + offset: u64, + length: usize, + alignment: Alignment, + ) -> BoxFuture<'static, VortexResult> { + let handle = self.handle.clone(); + + async move { + let runtime = RUNTIME.handle(); + runtime + .spawn_blocking(move || { + let mut buffer = ByteBufferMut::with_capacity_aligned(length, alignment); + unsafe { buffer.set_len(length) }; + + let mut err: cpp::duckdb_vx_error = ptr::null_mut(); + let mut out_len: cpp::idx_t = 0; + let status = unsafe { + cpp::duckdb_vx_fs_read( + handle.lock().as_ptr(), + offset as cpp::idx_t, + length as cpp::idx_t, + buffer.as_mut_slice().as_mut_ptr(), + &raw mut out_len, + &raw mut err, + ) + }; + + if status != cpp::duckdb_state::DuckDBSuccess { + return Err(fs_error(err)); + } + + let used = usize::try_from(out_len) + .map_err(|e| vortex_err!("Invalid read len: {e}"))?; + unsafe { buffer.set_len(used) }; + + Ok::<_, VortexError>(buffer.freeze()) + }) + .await + } + .boxed() + } +} + +// SAFETY: Access is serialized via a mutex and DuckDB's file handles are thread-safe for reads. +unsafe impl Send for DuckDbFsReadAt {} +unsafe impl Sync for DuckDbFsReadAt {} + +pub struct DuckDbFsWriter { + handle: FsFileHandle, + pos: u64, +} + +impl DuckDbFsWriter { + pub unsafe fn create(ctx: cpp::duckdb_vx_client_context, path: &str) -> VortexResult { + let c_path = CString::new(path).map_err(|e| vortex_err!("Invalid path: {e}"))?; + let mut err: cpp::duckdb_vx_error = ptr::null_mut(); + let handle = unsafe { cpp::duckdb_vx_fs_create(ctx, c_path.as_ptr(), &raw mut err) }; + if handle.is_null() { + return Err(fs_error(err)); + } + + Ok(Self { + handle: unsafe { FsFileHandle::own(handle) }, + pos: 0, + }) + } +} + +impl VortexWrite for DuckDbFsWriter { + async fn write_all(&mut self, buffer: B) -> std::io::Result { + let len = buffer.bytes_init(); + let mut err: cpp::duckdb_vx_error = ptr::null_mut(); + let mut out_len: cpp::idx_t = 0; + + let status = unsafe { + cpp::duckdb_vx_fs_write( + self.handle.as_ptr(), + self.pos as cpp::idx_t, + len as cpp::idx_t, + buffer.read_ptr(), + &raw mut out_len, + &raw mut err, + ) + }; + + if status != cpp::duckdb_state::DuckDBSuccess { + return Err(std::io::Error::other(fs_error(err).to_string())); + } + + self.pos += len as u64; + Ok(buffer) + } + + async fn flush(&mut self) -> std::io::Result<()> { + let mut err: cpp::duckdb_vx_error = ptr::null_mut(); + let status = unsafe { cpp::duckdb_vx_fs_sync(self.handle.as_ptr(), &raw mut err) }; + if status != cpp::duckdb_state::DuckDBSuccess { + return Err(std::io::Error::other(fs_error(err).to_string())); + } + Ok(()) + } + + async fn shutdown(&mut self) -> std::io::Result<()> { + self.flush().await + } +} + +#[cfg(test)] +mod tests { + use std::fs; + use std::path::PathBuf; + + use super::*; + use crate::duckdb::Database; + + #[test] + fn test_writer_roundtrip_local() { + let db = Database::open_in_memory().unwrap(); + let conn = db.connect().unwrap(); + let ctx = conn.client_context().unwrap(); + + let dir = tempfile::tempdir().unwrap(); + let path: PathBuf = dir.path().join("writer_local.vortex"); + let path_str = path.to_string_lossy(); + + let mut writer = unsafe { duckdb_fs_create_writer(ctx.as_ptr(), &path_str) }.unwrap(); + + futures::executor::block_on(async { + VortexWrite::write_all(&mut writer, vec![1_u8, 2, 3]) + .await + .unwrap(); + VortexWrite::flush(&mut writer).await.unwrap(); + }); + + let data = fs::read(path).unwrap(); + assert_eq!(data, vec![1, 2, 3]); + } +} diff --git a/vortex-duckdb/src/duckdb/mod.rs b/vortex-duckdb/src/duckdb/mod.rs index 1960eb71ded..a88610a63e0 100644 --- a/vortex-duckdb/src/duckdb/mod.rs +++ b/vortex-duckdb/src/duckdb/mod.rs @@ -9,6 +9,7 @@ mod data; mod data_chunk; mod database; mod expr; +mod file_system; pub mod footer_cache; mod logical_type; mod macro_; @@ -34,6 +35,7 @@ pub use data::*; pub use data_chunk::*; pub use database::*; pub use expr::*; +pub use file_system::*; pub use logical_type::*; pub use object_cache::*; pub use query_result::*; diff --git a/vortex-duckdb/src/e2e_test/vortex_scan_test.rs b/vortex-duckdb/src/e2e_test/vortex_scan_test.rs index 9812dcd2728..c271985eb47 100644 --- a/vortex-duckdb/src/e2e_test/vortex_scan_test.rs +++ b/vortex-duckdb/src/e2e_test/vortex_scan_test.rs @@ -4,6 +4,8 @@ //! This module contains tests for the `vortex_scan` table function. use std::ffi::CStr; +use std::io::Write; +use std::net::TcpListener; use std::path::Path; use std::slice; use std::str::FromStr; @@ -347,6 +349,51 @@ fn test_vortex_scan_multiple_files() { assert_eq!(total_sum, 21); } +#[test] +fn test_vortex_scan_over_http() { + let file = RUNTIME.block_on(async { + let strings = VarBinArray::from(vec!["a", "b", "c"]); + write_single_column_vortex_file("strings", strings).await + }); + + let file_bytes = std::fs::read(file.path()).unwrap(); + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = listener.local_addr().unwrap(); + + std::thread::spawn(move || { + for _ in 0..2 { + if let Ok((mut stream, _)) = listener.accept() { + let response = format!( + "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n", + file_bytes.len() + ); + stream.write_all(response.as_bytes()).unwrap(); + stream.write_all(&file_bytes).unwrap(); + } + } + }); + + let conn = database_connection(); + conn.query("INSTALL httpfs;").unwrap(); + conn.query("LOAD httpfs;").unwrap(); + + let url = format!( + "http://{}/{}", + addr, + file.path().file_name().unwrap().to_string_lossy() + ); + + let result = conn + .query(&format!("SELECT COUNT(*) FROM read_vortex('{url}')")) + .unwrap(); + let chunk = result.into_iter().next().unwrap(); + let count = chunk + .get_vector(0) + .as_slice_with_len::(chunk.len().as_())[0]; + + assert_eq!(count, 3); +} + #[test] fn test_write_file() { let conn = database_connection(); diff --git a/vortex-duckdb/src/scan.rs b/vortex-duckdb/src/scan.rs index f48ce764fc5..786ed15102c 100644 --- a/vortex-duckdb/src/scan.rs +++ b/vortex-duckdb/src/scan.rs @@ -54,12 +54,14 @@ use crate::RUNTIME; use crate::SESSION; use crate::convert::try_from_bound_expression; use crate::convert::try_from_table_filter; +use crate::cpp; use crate::duckdb; use crate::duckdb::BindInput; use crate::duckdb::BindResult; use crate::duckdb::Cardinality; use crate::duckdb::ClientContext; use crate::duckdb::DataChunk; +use crate::duckdb::DuckDbFsReadAt; use crate::duckdb::ExtractedValue; use crate::duckdb::LogicalType; use crate::duckdb::TableFunction; @@ -203,26 +205,50 @@ fn extract_table_filter_expr( Ok(and_collect(table_filter_exprs.into_iter().collect_vec())) } -/// Helper function to open a Vortex file from either a local or S3 URL -async fn open_file(url: Url, options: VortexOpenOptions) -> VortexResult { - if url.scheme() == "s3" { - assert_eq!(url.scheme(), "s3"); - let bucket = url - .host_str() - .ok_or_else(|| vortex_err!("Failed to extract bucket name from URL: {url}"))?; +/// Helper function to open a Vortex file from either a local path or a URL supported by DuckDB's filesystem. +#[derive(Clone, Copy)] +struct SendableClientCtx(usize); +unsafe impl Send for SendableClientCtx {} +unsafe impl Sync for SendableClientCtx {} - let path = url - .path() - .strip_prefix("/") - .ok_or_else(|| vortex_err!("Invalid S3 path: {url}"))?; +fn open_duckdb_reader(client_ctx: SendableClientCtx, url: &Url) -> VortexResult { + let ctx_ptr = client_ctx.0 as cpp::duckdb_vx_client_context; + unsafe { DuckDbFsReadAt::open_url(ctx_ptr, url) } +} - options.open_object_store(&s3_store(bucket)?, path).await - } else { - let path = url - .to_file_path() - .map_err(|_| vortex_err!("Invalid file URL: {url}"))?; +async fn open_file( + client_ctx: SendableClientCtx, + url: Url, + options: VortexOpenOptions, +) -> VortexResult { + match url.scheme() { + "http" | "https" | "s3" => { + let reader = open_duckdb_reader(client_ctx, &url); + + // Fallback to the legacy object_store path for s3 if DuckDB fs isn't configured. + if url.scheme() == "s3" && reader.is_err() { + let bucket = url + .host_str() + .ok_or_else(|| vortex_err!("Failed to extract bucket name from URL: {url}"))?; + + let path = url + .path() + .strip_prefix("/") + .ok_or_else(|| vortex_err!("Invalid S3 path: {url}"))?; + + return options.open_object_store(&s3_store(bucket)?, path).await; + } - options.open_path(path).await + let reader = Arc::new(reader?); + options.open(reader).await + } + _ => { + let path = url + .to_file_path() + .map_err(|_| vortex_err!("Invalid file URL: {url}"))?; + + options.open_path(path).await + } } } @@ -277,6 +303,7 @@ impl TableFunction for VortexTableFunction { tracing::trace!("running scan with max_threads {max_threads}"); let (file_urls, _metadata) = RUNTIME.block_on(Compat::new(expand_glob( + ctx, file_glob_string.as_ref().as_string(), )))?; @@ -286,10 +313,11 @@ impl TableFunction for VortexTableFunction { }; let footer_cache = FooterCache::new(ctx.object_cache()); + let client_ctx = SendableClientCtx(ctx.as_ptr() as usize); let entry = footer_cache.entry(first_file_url.as_ref()); let first_file = RUNTIME.block_on(async move { let options = entry.apply_to_file(SESSION.open_options()); - let file = open_file(first_file_url.clone(), options).await?; + let file = open_file(client_ctx, first_file_url.clone(), options).await?; entry.put_if_absent(|| file.footer().clone()); VortexResult::Ok(file) })?; @@ -392,6 +420,7 @@ impl TableFunction for VortexTableFunction { let num_workers = bind_data.max_threads as usize; let client_context = init_input.client_context()?; + let client_ctx_ptr = SendableClientCtx(client_context.as_ptr() as usize); let object_cache = client_context.object_cache(); let handle = RUNTIME.handle(); @@ -415,7 +444,7 @@ impl TableFunction for VortexTableFunction { let cache = FooterCache::new(object_cache); let entry = cache.entry(url.as_ref()); let options = entry.apply_to_file(SESSION.open_options()); - let file = open_file(url.clone(), options).await?; + let file = open_file(client_ctx_ptr, url.clone(), options).await?; entry.put_if_absent(|| file.footer().clone()); VortexResult::Ok(file) }?; diff --git a/vortex-duckdb/src/utils/glob.rs b/vortex-duckdb/src/utils/glob.rs index 0c21c4f3034..13718ea63dd 100644 --- a/vortex-duckdb/src/utils/glob.rs +++ b/vortex-duckdb/src/utils/glob.rs @@ -1,141 +1,48 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use futures::StreamExt; +#[cfg(test)] +use std::sync::OnceLock; + use object_store::ObjectMeta; +#[cfg(test)] +use parking_lot::Mutex; use url::Url; use vortex::error::VortexResult; use vortex::error::vortex_bail; use vortex::error::vortex_err; -use super::object_store::s3_store; +use crate::duckdb::ClientContext; +use crate::duckdb::duckdb_fs_glob; + +#[cfg(test)] +type DuckdbGlobHook = Box VortexResult> + Send + Sync>; + +#[cfg(test)] +static DUCKDB_GLOB_HOOK: OnceLock>> = OnceLock::new(); + +#[cfg(test)] +static GLOB_TEST_GUARD: OnceLock> = OnceLock::new(); /// Expand a glob pattern into a list of URLs. /// /// Example: s3://bucket/files/*.vortex -> (urls, Some(object_store_metadata)) pub async fn expand_glob>( + client_context: &ClientContext, url_glob: T, ) -> VortexResult<(Vec, Option>)> { let url_str = url_glob.as_ref(); // We prefer using string prefix matching here over extracting a URL scheme // as local files with an absolute path but without the file:// prefix can't // be parsed into a URL. - match &url_str[..url_str.len().min(5)] { - "s3://" => s3::expand_glob(&url_glob).await, - "gs://" => vortex_bail!("GCS glob expansion not yet implemented"), - _ => local_filesystem::expand_glob(url_str), - } -} - -mod s3 { - use object_store::ObjectMeta; - - use super::*; - - /// Expand a glob pattern into a list of S3 URLs. - /// - /// Makes a single request based on the position of the first glob character - /// and filters the results on the client side. In case no glob characters - /// are provided, the last directory in the path is used as the list prefix. - pub(super) async fn expand_glob>( - url_glob: T, - ) -> VortexResult<(Vec, Option>)> { - validate_glob(&url_glob)?; - assert_eq!("s3://", &url_glob.as_ref()[..5]); - let url = Url::parse(url_glob.as_ref())?; - - let bucket = url - .host_str() - .ok_or_else(|| vortex_err!("Failed to extract bucket name from URL: {url}"))?; - - let list_prefix = list_prefix(url_path(&url)?); - let object_store = s3_store(bucket)?; - - // The AWS S3 `ListObjectsV2` API returns multiple objects per HTTP request, up to 1000 by default. - let stream = object_store.list(Some(&object_store::path::Path::from(list_prefix))); - - let glob_pattern = glob::Pattern::new(url_glob.as_ref()) - .map_err(|e| vortex_err!("Invalid glob pattern: {}", e))?; - - let matching_paths = process_object_store_stream(stream, &glob_pattern, bucket).await?; - - let (urls, metadata) = matching_paths.into_iter().unzip(); - Ok((urls, Some(metadata))) - } - - /// Validates that a glob pattern does not contain escaped glob characters. - /// Returns an error if backslash-escaped characters like \*, \?, etc. are found. - pub(super) fn validate_glob>(pattern: T) -> VortexResult<()> { - let pattern_str = pattern.as_ref(); - - // Check for backslash escape sequences. - for escape_pattern in ["\\*", "\\?", "\\["] { - if pattern_str.contains(escape_pattern) { - vortex_bail!( - "Escaped glob characters are not allowed in patterns. Found '{}' in: {}", - escape_pattern, - pattern_str - ); - } + match url_str { + s if s.starts_with("s3://") || s.starts_with("https://") || s.starts_with("http://") => { + duckdb_fs_glob(client_context, url_str).map(|urls| (urls, None)) } - - Ok(()) - } - - /// Returns the path from an S3 URL. - /// - /// Example: "s3://bucket/path/to/file.txt" -> "path/to/file.txt" - pub(super) fn url_path(url: &Url) -> VortexResult<&str> { - url.path() - .strip_prefix("/") - .ok_or_else(|| vortex_err!("Invalid URL: {url}")) - } - - /// Returns the list prefix for a URL path which can contain glob characters. - /// - /// Unlike `aws s3 ls`, the object store crate does not support support - /// incomplete file names as a prefix. Therefore, the prefix is the - /// directory path up to the first glob character. - /// - /// Example: "path/to/file_*.txt" -> "path/to/" - pub(super) fn list_prefix>(url_path: T) -> String { - let url_path = url_path.as_ref(); - // Find first glob character index. - let special_char_index = url_path - .find(|c| ['*', '?', '['].contains(&c)) - .unwrap_or(url_path.len()); - - match &url_path[..special_char_index].rsplit_once('/') { - Some((dir_path, _)) => format!("{dir_path}/"), - None => url_path[..special_char_index].to_string(), + s if s.starts_with("gs://") => { + vortex_bail!("GCS glob expansion not yet implemented") } - } - - /// Process an object store stream and filter the results on the client - /// based on the glob pattern. - async fn process_object_store_stream( - stream: impl futures::Stream>, - glob_pattern: &glob::Pattern, - bucket: &str, - ) -> VortexResult> { - let matching_paths: Vec<(Url, ObjectMeta)> = stream - .map(|object_meta| async move { - if let Ok(object_meta) = object_meta { - let url_string = format!("s3://{}/{}", bucket, object_meta.location); - if glob_pattern.matches(&url_string) - && let Ok(parsed_url) = Url::parse(&url_string) - { - return Some((parsed_url, object_meta)); - } - } - None - }) - .buffer_unordered(16) - .filter_map(|result| async { result }) - .collect() - .await; - - Ok(matching_paths) + _ => local_filesystem::expand_glob(url_str), } } @@ -177,6 +84,18 @@ mod tests { use tempfile::TempDir; use super::*; + use crate::duckdb::Database; + + fn ctx() -> ClientContext { + let db = Database::open_in_memory().unwrap(); + db.connect().unwrap().client_context().unwrap() + } + + fn set_glob_hook(hook: Option) { + let cell = DUCKDB_GLOB_HOOK.get_or_init(|| Mutex::new(None)); + let mut guard = cell.lock(); + *guard = hook; + } #[test] fn test_expand_local_disk_glob_relative_path() { @@ -198,6 +117,36 @@ mod tests { env::set_current_dir(&original_dir).unwrap(); } + #[test] + fn test_duckdb_glob_http_hook() { + let _guard = GLOB_TEST_GUARD.get_or_init(|| Mutex::new(())).lock(); + let url = Url::parse("http://example.com/data.vortex").unwrap(); + let expected = url.clone(); + set_glob_hook(Some(Box::new(move |_, _| Ok(vec![url.clone()])))); + + let (urls, meta) = + futures::executor::block_on(expand_glob(&ctx(), "http://example.com/data.vortex")) + .unwrap(); + assert_eq!(meta, None); + assert_eq!(urls.len(), 1); + assert_eq!(urls[0], expected); + + set_glob_hook(None); + } + + #[test] + fn test_duckdb_glob_http_empty() { + let _guard = GLOB_TEST_GUARD.get_or_init(|| Mutex::new(())).lock(); + set_glob_hook(Some(Box::new(|_, _| Ok(vec![])))); + + let (urls, _) = + futures::executor::block_on(expand_glob(&ctx(), "http://example.com/none.vortex")) + .unwrap(); + assert!(urls.is_empty()); + + set_glob_hook(None); + } + #[test] fn test_expand_local_disk_glob_single_file() { let temp_dir = TempDir::new().unwrap(); @@ -268,129 +217,4 @@ mod tests { assert_eq!(result.0.len(), 2); } - - #[test] - fn test_extract_s3_url_path() { - // Test valid S3 URL - let url = Url::parse("s3://bucket/path/to/file.txt").unwrap(); - let result = s3::url_path(&url).unwrap(); - assert_eq!(result, "path/to/file.txt"); - - // Test URL with nested path - let url = Url::parse("s3://my-bucket/folder/subfolder/data.parquet").unwrap(); - let result = s3::url_path(&url).unwrap(); - assert_eq!(result, "folder/subfolder/data.parquet"); - - // Test URL with root path - let url = Url::parse("s3://bucket/file.txt").unwrap(); - let result = s3::url_path(&url).unwrap(); - assert_eq!(result, "file.txt"); - - // Test URL without leading slash should fail - let url = Url::parse("s3://bucket").unwrap(); - let result = s3::url_path(&url); - assert!(result.is_err()); - } - - #[test] - fn test_calculate_list_prefix() { - // Test with wildcard in filename - let result = s3::list_prefix("folder/file*.txt"); - assert_eq!(result, "folder/"); - - // Test with wildcard in directory - let result = s3::list_prefix("folder/*/file.txt"); - assert_eq!(result, "folder/"); - - // Test with nested directories and wildcard - let result = s3::list_prefix("data/2023/*/logs/*.log"); - assert_eq!(result, "data/2023/"); - - // Test with wildcard at root level - let result = s3::list_prefix("*.txt"); - assert_eq!(result, ""); - - // Test with no wildcards - let result = s3::list_prefix("folder/subfolder/file.txt"); - assert_eq!(result, "folder/subfolder/"); - - // Test with question mark wildcard - let result = s3::list_prefix("folder/file?.txt"); - assert_eq!(result, "folder/"); - - // Test with bracket wildcards - let result = s3::list_prefix("folder/file[abc].txt"); - assert_eq!(result, "folder/"); - - // Test empty path - let result = s3::list_prefix(""); - assert_eq!(result, ""); - } - - #[test] - fn test_s3_url_parsing_integration() { - // Test complete S3 URL parsing workflow - let url = Url::parse("s3://my-bucket/data/year=2023/month=*/day=*/events.parquet").unwrap(); - - let url_path = s3::url_path(&url).unwrap(); - assert_eq!(url_path, "data/year=2023/month=*/day=*/events.parquet"); - - let list_prefix = s3::list_prefix(url_path); - assert_eq!(list_prefix, "data/year=2023/"); - } - - #[test] - fn test_s3_url_parsing_edge_cases() { - // Test URL with multiple consecutive wildcards - let url = Url::parse("s3://bucket/logs/**/*.log").unwrap(); - let url_path = s3::url_path(&url).unwrap(); - let list_prefix = s3::list_prefix(url_path); - assert_eq!(list_prefix, "logs/"); - - // Test URL with wildcard at the beginning - let url = Url::parse("s3://bucket/*.txt").unwrap(); - let url_path = s3::url_path(&url).unwrap(); - let list_prefix = s3::list_prefix(url_path); - assert_eq!(list_prefix, ""); - - // Test deeply nested path with wildcard - let url = Url::parse("s3://bucket/a/b/c/d/e/f/g/*.json").unwrap(); - let url_path = s3::url_path(&url).unwrap(); - let list_prefix = s3::list_prefix(url_path); - assert_eq!(list_prefix, "a/b/c/d/e/f/g/"); - } - - #[test] - fn test_s3_url_parsing_no_wildcards() { - let url = Url::parse("s3://bucket/path/to/specific/file.txt").unwrap(); - let url_path = s3::url_path(&url).unwrap(); - let list_prefix = s3::list_prefix(url_path); - assert_eq!(list_prefix, "path/to/specific/"); - } - - #[test] - fn test_validate_glob_valid_pattern() { - assert!(s3::validate_glob("s3://bucket/path/*.txt").is_ok()); - } - - #[test] - fn test_validate_glob_escaped_asterisk() { - let result = s3::validate_glob("s3://bucket/path\\*.txt"); - assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("\\*")); - } - - #[test] - fn test_validate_glob_escaped_question_mark() { - let result = s3::validate_glob("s3://bucket/path\\?.txt"); - assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("\\?")); - } - - #[test] - fn test_validate_glob_escaped_bracket() { - let result = s3::validate_glob("s3://bucket/path\\[test].txt"); - assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("\\[")); - } } From 4fffe00d03cd0cf7edcba971d5bd475adfb3af87 Mon Sep 17 00:00:00 2001 From: Ruoshi Date: Wed, 28 Jan 2026 21:42:41 -0800 Subject: [PATCH 2/3] refactor(tests): update context setup for DuckDB glob tests --- vortex-duckdb/src/utils/glob.rs | 29 +++++++++++------------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/vortex-duckdb/src/utils/glob.rs b/vortex-duckdb/src/utils/glob.rs index 13718ea63dd..c799c2afff3 100644 --- a/vortex-duckdb/src/utils/glob.rs +++ b/vortex-duckdb/src/utils/glob.rs @@ -86,9 +86,11 @@ mod tests { use super::*; use crate::duckdb::Database; - fn ctx() -> ClientContext { + fn ctx_bundle() -> (Database, crate::duckdb::Connection, ClientContext) { let db = Database::open_in_memory().unwrap(); - db.connect().unwrap().client_context().unwrap() + let conn = db.connect().unwrap(); + let ctx = conn.client_context().unwrap(); + (db, conn, ctx) } fn set_glob_hook(hook: Option) { @@ -124,9 +126,13 @@ mod tests { let expected = url.clone(); set_glob_hook(Some(Box::new(move |_, _| Ok(vec![url.clone()])))); - let (urls, meta) = - futures::executor::block_on(expand_glob(&ctx(), "http://example.com/data.vortex")) - .unwrap(); + let (_db, _conn, ctx) = ctx_bundle(); + + let (urls, meta) = futures::executor::block_on(expand_glob( + &ctx, + "http://example.com/data.vortex", + )) + .unwrap(); assert_eq!(meta, None); assert_eq!(urls.len(), 1); assert_eq!(urls[0], expected); @@ -134,19 +140,6 @@ mod tests { set_glob_hook(None); } - #[test] - fn test_duckdb_glob_http_empty() { - let _guard = GLOB_TEST_GUARD.get_or_init(|| Mutex::new(())).lock(); - set_glob_hook(Some(Box::new(|_, _| Ok(vec![])))); - - let (urls, _) = - futures::executor::block_on(expand_glob(&ctx(), "http://example.com/none.vortex")) - .unwrap(); - assert!(urls.is_empty()); - - set_glob_hook(None); - } - #[test] fn test_expand_local_disk_glob_single_file() { let temp_dir = TempDir::new().unwrap(); From a131078864195a6e5ea4244334acc004a0dab2c7 Mon Sep 17 00:00:00 2001 From: Ruoshi Date: Sat, 31 Jan 2026 13:56:58 -0800 Subject: [PATCH 3/3] refactor: address PR comments. --- .gitignore | 3 +- vortex-duckdb/cpp/file_system.cpp | 49 ++++++++++++++----------- vortex-duckdb/src/copy.rs | 28 +++++++++----- vortex-duckdb/src/duckdb/file_system.rs | 37 ++++++++++--------- vortex-duckdb/src/scan.rs | 34 ++++++----------- vortex-duckdb/src/utils/glob.rs | 8 ++-- vortex-duckdb/src/utils/mod.rs | 1 - vortex-duckdb/src/utils/object_store.rs | 37 ------------------- 8 files changed, 80 insertions(+), 117 deletions(-) delete mode 100644 vortex-duckdb/src/utils/object_store.rs diff --git a/.gitignore b/.gitignore index a68417f916f..63fd9846d40 100644 --- a/.gitignore +++ b/.gitignore @@ -218,8 +218,9 @@ profile.json.gz # Clang compilation database (https://clang.llvm.org/docs/JSONCompilationDatabase.html) compile_commands.json -# Claude +# AI Agents .claude +.opencode # cargo sweep output sweep.timestamp diff --git a/vortex-duckdb/cpp/file_system.cpp b/vortex-duckdb/cpp/file_system.cpp index d515fa9b280..ba98567b754 100644 --- a/vortex-duckdb/cpp/file_system.cpp +++ b/vortex-duckdb/cpp/file_system.cpp @@ -5,14 +5,16 @@ #include #include +#include #include -#include #include +#include +#include +#include using namespace duckdb; -namespace { struct FileHandleWrapper { explicit FileHandleWrapper(unique_ptr handle_p) : handle(std::move(handle_p)) { } @@ -20,28 +22,31 @@ struct FileHandleWrapper { unique_ptr handle; }; -void SetError(duckdb_vx_error *error_out, const std::string &message) { +static void SetError(duckdb_vx_error *error_out, const std::string &message) { if (!error_out) { return; } *error_out = duckdb_vx_error_create(message.data(), message.size()); } -duckdb_state HandleException(duckdb_vx_error *error_out) { +static duckdb_state HandleException(std::exception_ptr ex, duckdb_vx_error *error_out) { + if (!ex) { + SetError(error_out, "Unknown error"); + return DuckDBError; + } + try { - throw; - } catch (const Exception &ex) { - SetError(error_out, ex.what()); - } catch (const std::exception &ex) { - SetError(error_out, ex.what()); + std::rethrow_exception(ex); + } catch (const Exception &caught) { + SetError(error_out, caught.what()); + } catch (const std::exception &caught) { + SetError(error_out, caught.what()); } catch (...) { SetError(error_out, "Unknown error"); } return DuckDBError; } -} // namespace - extern "C" duckdb_vx_file_handle duckdb_vx_fs_open(duckdb_vx_client_context ctx, const char *path, duckdb_vx_error *error_out) { if (!ctx || !path) { @@ -55,7 +60,7 @@ extern "C" duckdb_vx_file_handle duckdb_vx_fs_open(duckdb_vx_client_context ctx, auto handle = fs.OpenFile(path, FileFlags::FILE_FLAGS_READ); return reinterpret_cast(new FileHandleWrapper(std::move(handle))); } catch (...) { - HandleException(error_out); + HandleException(std::current_exception(), error_out); return nullptr; } } @@ -74,7 +79,7 @@ extern "C" duckdb_vx_file_handle duckdb_vx_fs_create(duckdb_vx_client_context ct handle->Truncate(0); return reinterpret_cast(new FileHandleWrapper(std::move(handle))); } catch (...) { - HandleException(error_out); + HandleException(std::current_exception(), error_out); return nullptr; } } @@ -100,7 +105,7 @@ extern "C" duckdb_state duckdb_vx_fs_get_size(duckdb_vx_file_handle handle, idx_ *size_out = wrapper->handle->GetFileSize(); return DuckDBSuccess; } catch (...) { - return HandleException(error_out); + return HandleException(std::current_exception(), error_out); } } @@ -117,7 +122,7 @@ extern "C" duckdb_state duckdb_vx_fs_read(duckdb_vx_file_handle handle, idx_t of *out_len = len; return DuckDBSuccess; } catch (...) { - return HandleException(error_out); + return HandleException(std::current_exception(), error_out); } } @@ -136,7 +141,7 @@ extern "C" duckdb_state duckdb_vx_fs_write(duckdb_vx_file_handle handle, idx_t o *out_len = len; return DuckDBSuccess; } catch (...) { - return HandleException(error_out); + return HandleException(std::current_exception(), error_out); } } @@ -159,10 +164,10 @@ extern "C" duckdb_vx_string_list duckdb_vx_fs_glob(duckdb_vx_client_context ctx, } result.count = matches.size(); - result.entries = static_cast(malloc(sizeof(char *) * matches.size())); + result.entries = static_cast(duckdb_malloc(sizeof(char *) * matches.size())); for (size_t i = 0; i < matches.size(); i++) { const auto &entry = matches[i].path; - auto *owned = static_cast(malloc(entry.size() + 1)); + auto *owned = static_cast(duckdb_malloc(entry.size() + 1)); std::memcpy(owned, entry.data(), entry.size()); owned[entry.size()] = '\0'; result.entries[i] = owned; @@ -170,7 +175,7 @@ extern "C" duckdb_vx_string_list duckdb_vx_fs_glob(duckdb_vx_client_context ctx, return result; } catch (...) { - HandleException(error_out); + HandleException(std::current_exception(), error_out); return result; } } @@ -180,9 +185,9 @@ extern "C" void duckdb_vx_string_list_free(duckdb_vx_string_list *list) { return; } for (size_t i = 0; i < list->count; i++) { - free(const_cast(list->entries[i])); + duckdb_free(const_cast(list->entries[i])); } - free(list->entries); + duckdb_free(list->entries); list->entries = nullptr; list->count = 0; } @@ -198,6 +203,6 @@ extern "C" duckdb_state duckdb_vx_fs_sync(duckdb_vx_file_handle handle, duckdb_v wrapper->handle->Sync(); return DuckDBSuccess; } catch (...) { - return HandleException(error_out); + return HandleException(std::current_exception(), error_out); } } diff --git a/vortex-duckdb/src/copy.rs b/vortex-duckdb/src/copy.rs index 56f32fc41e2..725c8a75132 100644 --- a/vortex-duckdb/src/copy.rs +++ b/vortex-duckdb/src/copy.rs @@ -3,6 +3,7 @@ use std::fmt::Debug; use std::iter; +use std::ptr::NonNull; use futures::SinkExt; use futures::TryStreamExt; @@ -40,9 +41,15 @@ use crate::duckdb::duckdb_fs_create_writer; pub struct VortexCopyFunction; #[derive(Clone, Copy)] -struct SendableClientCtx(usize); +struct SendableClientCtx(NonNull); unsafe impl Send for SendableClientCtx {} +impl SendableClientCtx { + fn as_ptr(self) -> cpp::duckdb_vx_client_context { + self.0.as_ptr() + } +} + pub struct BindData { dtype: DType, fields: StructFields, @@ -134,16 +141,17 @@ impl CopyFunction for VortexCopyFunction { let array_stream = ArrayStreamAdapter::new(bind_data.dtype.clone(), rx.into_stream()); let handle = SESSION.handle(); - let ctx_ptr = SendableClientCtx(client_context.as_ptr() as usize); + let ctx_ptr = SendableClientCtx( + NonNull::new(client_context.as_ptr()) + .vortex_expect("Client context pointer should not be null"), + ); let write_task = handle.spawn(async move { - // Prefer DuckDB FS (httpfs/s3/etc.), fallback to local async fs if unavailable. - let ctx_raw = ctx_ptr.0 as cpp::duckdb_vx_client_context; - if let Ok(writer) = unsafe { duckdb_fs_create_writer(ctx_raw, &file_path) } { - SESSION.write_options().write(writer, array_stream).await - } else { - let mut file = async_fs::File::create(&file_path).await?; - SESSION.write_options().write(&mut file, array_stream).await - } + // Use DuckDB FS exclusively to match the DuckDB client context configuration. + let writer = + unsafe { duckdb_fs_create_writer(ctx_ptr.as_ptr(), &file_path) }.map_err(|e| { + vortex_err!("Failed to create DuckDB FS writer for {file_path}: {e}") + })?; + SESSION.write_options().write(writer, array_stream).await }); let worker_pool = RUNTIME.new_pool(); diff --git a/vortex-duckdb/src/duckdb/file_system.rs b/vortex-duckdb/src/duckdb/file_system.rs index 8e7dfe30212..8f70b627d07 100644 --- a/vortex-duckdb/src/duckdb/file_system.rs +++ b/vortex-duckdb/src/duckdb/file_system.rs @@ -9,18 +9,17 @@ use std::sync::OnceLock; use futures::FutureExt; use futures::future::BoxFuture; -use parking_lot::Mutex; +use vortex::array::buffer::BufferHandle; use vortex::buffer::Alignment; -use vortex::buffer::ByteBuffer; use vortex::buffer::ByteBufferMut; use vortex::error::VortexError; use vortex::error::VortexResult; use vortex::error::vortex_err; use vortex::io::CoalesceConfig; -use vortex::io::VortexReadAt; -use vortex::io::runtime::BlockingRuntime; use vortex::io::IoBuf; +use vortex::io::VortexReadAt; use vortex::io::VortexWrite; +use vortex::io::runtime::BlockingRuntime; use crate::RUNTIME; use crate::cpp; @@ -32,6 +31,10 @@ const DEFAULT_COALESCE: CoalesceConfig = CoalesceConfig { max_size: 8 * 1024 * 1024, // 8 MB }; +// Local cap to keep remote reads parallel without overwhelming typical per-host connection limits. +// The DuckDB httpfs extension does not expose a fixed default concurrency in the vendored sources; +// 64 is a conservative ceiling that stays well below common cloud limits while keeping range reads +// busy. const DEFAULT_CONCURRENCY: usize = 64; lifetime_wrapper!(FsFileHandle, cpp::duckdb_vx_file_handle, cpp::duckdb_vx_fs_close, [owned, ref]); @@ -83,7 +86,7 @@ pub unsafe fn duckdb_fs_create_writer( /// A VortexReadAt implementation backed by DuckDB's filesystem (e.g., httpfs/s3). pub struct DuckDbFsReadAt { - handle: Arc>, + handle: Arc, uri: Arc, size: Arc>, } @@ -101,7 +104,7 @@ impl DuckDbFsReadAt { } Ok(Self { - handle: Arc::new(Mutex::new(unsafe { FsFileHandle::own(handle) })), + handle: Arc::new(unsafe { FsFileHandle::own(handle) }), uri: Arc::from(url.as_str()), size: Arc::new(OnceLock::new()), }) @@ -136,11 +139,7 @@ impl VortexReadAt for DuckDbFsReadAt { let mut err: cpp::duckdb_vx_error = ptr::null_mut(); let mut size_out: cpp::idx_t = 0; let status = unsafe { - cpp::duckdb_vx_fs_get_size( - handle.lock().as_ptr(), - &raw mut size_out, - &raw mut err, - ) + cpp::duckdb_vx_fs_get_size(handle.as_ptr(), &raw mut size_out, &raw mut err) }; if status != cpp::duckdb_state::DuckDBSuccess { return Err(fs_error(err)); @@ -160,13 +159,13 @@ impl VortexReadAt for DuckDbFsReadAt { offset: u64, length: usize, alignment: Alignment, - ) -> BoxFuture<'static, VortexResult> { + ) -> BoxFuture<'static, VortexResult> { let handle = self.handle.clone(); async move { let runtime = RUNTIME.handle(); - runtime - .spawn_blocking(move || { + let result: VortexResult = runtime + .spawn_blocking(move || -> VortexResult { let mut buffer = ByteBufferMut::with_capacity_aligned(length, alignment); unsafe { buffer.set_len(length) }; @@ -174,7 +173,7 @@ impl VortexReadAt for DuckDbFsReadAt { let mut out_len: cpp::idx_t = 0; let status = unsafe { cpp::duckdb_vx_fs_read( - handle.lock().as_ptr(), + handle.as_ptr(), offset as cpp::idx_t, length as cpp::idx_t, buffer.as_mut_slice().as_mut_ptr(), @@ -191,15 +190,17 @@ impl VortexReadAt for DuckDbFsReadAt { .map_err(|e| vortex_err!("Invalid read len: {e}"))?; unsafe { buffer.set_len(used) }; - Ok::<_, VortexError>(buffer.freeze()) + let frozen = buffer.freeze(); + Ok::<_, VortexError>(BufferHandle::new_host(frozen)) }) - .await + .await; + result } .boxed() } } -// SAFETY: Access is serialized via a mutex and DuckDB's file handles are thread-safe for reads. +// SAFETY: DuckDB file handles are thread-safe for reads; writes are done via DuckDbFsWriter. unsafe impl Send for DuckDbFsReadAt {} unsafe impl Sync for DuckDbFsReadAt {} diff --git a/vortex-duckdb/src/scan.rs b/vortex-duckdb/src/scan.rs index 786ed15102c..c1d179036b3 100644 --- a/vortex-duckdb/src/scan.rs +++ b/vortex-duckdb/src/scan.rs @@ -7,6 +7,7 @@ use std::fmt; use std::fmt::Debug; use std::fmt::Formatter; use std::pin::Pin; +use std::ptr::NonNull; use std::sync::Arc; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; @@ -71,7 +72,6 @@ use crate::duckdb::footer_cache::FooterCache; use crate::exporter::ArrayExporter; use crate::exporter::ConversionCache; use crate::utils::glob::expand_glob; -use crate::utils::object_store::s3_store; pub struct VortexBindData { first_file: VortexFile, @@ -207,13 +207,12 @@ fn extract_table_filter_expr( /// Helper function to open a Vortex file from either a local path or a URL supported by DuckDB's filesystem. #[derive(Clone, Copy)] -struct SendableClientCtx(usize); +struct SendableClientCtx(NonNull); unsafe impl Send for SendableClientCtx {} unsafe impl Sync for SendableClientCtx {} fn open_duckdb_reader(client_ctx: SendableClientCtx, url: &Url) -> VortexResult { - let ctx_ptr = client_ctx.0 as cpp::duckdb_vx_client_context; - unsafe { DuckDbFsReadAt::open_url(ctx_ptr, url) } + unsafe { DuckDbFsReadAt::open_url(client_ctx.0.as_ptr(), url) } } async fn open_file( @@ -223,23 +222,7 @@ async fn open_file( ) -> VortexResult { match url.scheme() { "http" | "https" | "s3" => { - let reader = open_duckdb_reader(client_ctx, &url); - - // Fallback to the legacy object_store path for s3 if DuckDB fs isn't configured. - if url.scheme() == "s3" && reader.is_err() { - let bucket = url - .host_str() - .ok_or_else(|| vortex_err!("Failed to extract bucket name from URL: {url}"))?; - - let path = url - .path() - .strip_prefix("/") - .ok_or_else(|| vortex_err!("Invalid S3 path: {url}"))?; - - return options.open_object_store(&s3_store(bucket)?, path).await; - } - - let reader = Arc::new(reader?); + let reader = Arc::new(open_duckdb_reader(client_ctx, &url)?); options.open(reader).await } _ => { @@ -313,7 +296,9 @@ impl TableFunction for VortexTableFunction { }; let footer_cache = FooterCache::new(ctx.object_cache()); - let client_ctx = SendableClientCtx(ctx.as_ptr() as usize); + let client_ctx = SendableClientCtx( + NonNull::new(ctx.as_ptr()).vortex_expect("Client context pointer should not be null"), + ); let entry = footer_cache.entry(first_file_url.as_ref()); let first_file = RUNTIME.block_on(async move { let options = entry.apply_to_file(SESSION.open_options()); @@ -420,7 +405,10 @@ impl TableFunction for VortexTableFunction { let num_workers = bind_data.max_threads as usize; let client_context = init_input.client_context()?; - let client_ctx_ptr = SendableClientCtx(client_context.as_ptr() as usize); + let client_ctx_ptr = SendableClientCtx( + NonNull::new(client_context.as_ptr()) + .vortex_expect("Client context pointer should not be null"), + ); let object_cache = client_context.object_cache(); let handle = RUNTIME.handle(); diff --git a/vortex-duckdb/src/utils/glob.rs b/vortex-duckdb/src/utils/glob.rs index c799c2afff3..01db505b03b 100644 --- a/vortex-duckdb/src/utils/glob.rs +++ b/vortex-duckdb/src/utils/glob.rs @@ -128,11 +128,9 @@ mod tests { let (_db, _conn, ctx) = ctx_bundle(); - let (urls, meta) = futures::executor::block_on(expand_glob( - &ctx, - "http://example.com/data.vortex", - )) - .unwrap(); + let (urls, meta) = + futures::executor::block_on(expand_glob(&ctx, "http://example.com/data.vortex")) + .unwrap(); assert_eq!(meta, None); assert_eq!(urls.len(), 1); assert_eq!(urls[0], expected); diff --git a/vortex-duckdb/src/utils/mod.rs b/vortex-duckdb/src/utils/mod.rs index ecb071cbc82..b5090ad8fd4 100644 --- a/vortex-duckdb/src/utils/mod.rs +++ b/vortex-duckdb/src/utils/mod.rs @@ -2,4 +2,3 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors pub mod glob; -pub mod object_store; diff --git a/vortex-duckdb/src/utils/object_store.rs b/vortex-duckdb/src/utils/object_store.rs deleted file mode 100644 index d9be329177f..00000000000 --- a/vortex-duckdb/src/utils/object_store.rs +++ /dev/null @@ -1,37 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use std::sync::Arc; -use std::sync::OnceLock; - -use object_store::ObjectStore; -use object_store::aws::AmazonS3Builder; -use vortex::error::VortexResult; -use vortex::error::vortex_err; -use vortex_utils::aliases::dash_map::DashMap; - -// Global S3 object store cache. -pub fn s3_store(bucket: &str) -> VortexResult> { - static S3_STORES: OnceLock>> = OnceLock::new(); - let stores = S3_STORES.get_or_init(|| DashMap::with_hasher(Default::default())); - - fn create_s3_object_store(bucket: &str) -> VortexResult> { - Ok(Arc::new( - AmazonS3Builder::from_env() - .with_bucket_name(bucket) - .build() - .map_err(|e| vortex_err!("Failed to create S3 store: {}", e))?, - ) as Arc) - } - - let object_store = match stores.get(bucket) { - Some(store) => store.clone(), - None => { - let store = create_s3_object_store(bucket)?; - stores.insert(bucket.to_string(), store.clone()); - store - } - }; - - Ok(object_store) -}