diff --git a/.gitignore b/.gitignore index a68417f916f..63fd9846d40 100644 --- a/.gitignore +++ b/.gitignore @@ -218,8 +218,9 @@ profile.json.gz # Clang compilation database (https://clang.llvm.org/docs/JSONCompilationDatabase.html) compile_commands.json -# Claude +# AI Agents .claude +.opencode # cargo sweep output sweep.timestamp diff --git a/vortex-duckdb/build.rs b/vortex-duckdb/build.rs index 7a058322935..f12ed48166e 100644 --- a/vortex-duckdb/build.rs +++ b/vortex-duckdb/build.rs @@ -465,6 +465,7 @@ fn main() { .file("cpp/data_chunk.cpp") .file("cpp/error.cpp") .file("cpp/expr.cpp") + .file("cpp/file_system.cpp") .file("cpp/logical_type.cpp") .file("cpp/object_cache.cpp") .file("cpp/replacement_scan.cpp") diff --git a/vortex-duckdb/cpp/copy_function.cpp b/vortex-duckdb/cpp/copy_function.cpp index 3768ba9d1a5..819df028afe 100644 --- a/vortex-duckdb/cpp/copy_function.cpp +++ b/vortex-duckdb/cpp/copy_function.cpp @@ -71,7 +71,8 @@ unique_ptr c_init_global(ClientContext &context, FunctionDat const string &file_path) { auto &bind = bind_data.Cast(); duckdb_vx_error error_out = nullptr; - auto global_data = bind.vtab.init_global(bind.ffi_data->DataPtr(), file_path.c_str(), &error_out); + auto global_data = bind.vtab.init_global(reinterpret_cast(&context), + bind.ffi_data->DataPtr(), file_path.c_str(), &error_out); if (error_out) { throw ExecutorException(IntoErrString(error_out)); } diff --git a/vortex-duckdb/cpp/file_system.cpp b/vortex-duckdb/cpp/file_system.cpp new file mode 100644 index 00000000000..ba98567b754 --- /dev/null +++ b/vortex-duckdb/cpp/file_system.cpp @@ -0,0 +1,208 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#include "duckdb_vx.h" + +#include +#include +#include +#include + +#include +#include +#include +#include + +using namespace duckdb; + +struct FileHandleWrapper { + explicit FileHandleWrapper(unique_ptr handle_p) : handle(std::move(handle_p)) { + } + + unique_ptr handle; +}; + +static void SetError(duckdb_vx_error *error_out, const std::string &message) { + if (!error_out) { + return; + } + *error_out = duckdb_vx_error_create(message.data(), message.size()); +} + +static duckdb_state HandleException(std::exception_ptr ex, duckdb_vx_error *error_out) { + if (!ex) { + SetError(error_out, "Unknown error"); + return DuckDBError; + } + + try { + std::rethrow_exception(ex); + } catch (const Exception &caught) { + SetError(error_out, caught.what()); + } catch (const std::exception &caught) { + SetError(error_out, caught.what()); + } catch (...) { + SetError(error_out, "Unknown error"); + } + return DuckDBError; +} + +extern "C" duckdb_vx_file_handle duckdb_vx_fs_open(duckdb_vx_client_context ctx, const char *path, + duckdb_vx_error *error_out) { + if (!ctx || !path) { + SetError(error_out, "Invalid filesystem open arguments"); + return nullptr; + } + + try { + auto *client_context = reinterpret_cast(ctx); + auto &fs = FileSystem::GetFileSystem(*client_context); + auto handle = fs.OpenFile(path, FileFlags::FILE_FLAGS_READ); + return reinterpret_cast(new FileHandleWrapper(std::move(handle))); + } catch (...) { + HandleException(std::current_exception(), error_out); + return nullptr; + } +} + +extern "C" duckdb_vx_file_handle duckdb_vx_fs_create(duckdb_vx_client_context ctx, const char *path, + duckdb_vx_error *error_out) { + if (!ctx || !path) { + SetError(error_out, "Invalid filesystem create arguments"); + return nullptr; + } + + try { + auto *client_context = reinterpret_cast(ctx); + auto &fs = FileSystem::GetFileSystem(*client_context); + auto handle = fs.OpenFile(path, FileFlags::FILE_FLAGS_WRITE | FileFlags::FILE_FLAGS_FILE_CREATE); + handle->Truncate(0); + return reinterpret_cast(new FileHandleWrapper(std::move(handle))); + } catch (...) { + HandleException(std::current_exception(), error_out); + return nullptr; + } +} + +extern "C" void duckdb_vx_fs_close(duckdb_vx_file_handle *handle) { + if (!handle || !*handle) { + return; + } + auto wrapper = reinterpret_cast(*handle); + delete wrapper; + *handle = nullptr; +} + +extern "C" duckdb_state duckdb_vx_fs_get_size(duckdb_vx_file_handle handle, idx_t *size_out, + duckdb_vx_error *error_out) { + if (!handle || !size_out) { + SetError(error_out, "Invalid arguments to fs_get_size"); + return DuckDBError; + } + + try { + auto *wrapper = reinterpret_cast(handle); + *size_out = wrapper->handle->GetFileSize(); + return DuckDBSuccess; + } catch (...) { + return HandleException(std::current_exception(), error_out); + } +} + +extern "C" duckdb_state duckdb_vx_fs_read(duckdb_vx_file_handle handle, idx_t offset, idx_t len, uint8_t *buffer, + idx_t *out_len, duckdb_vx_error *error_out) { + if (!handle || !buffer || !out_len) { + SetError(error_out, "Invalid arguments to fs_read"); + return DuckDBError; + } + + try { + auto *wrapper = reinterpret_cast(handle); + wrapper->handle->Read(buffer, len, offset); + *out_len = len; + return DuckDBSuccess; + } catch (...) { + return HandleException(std::current_exception(), error_out); + } +} + +extern "C" duckdb_state duckdb_vx_fs_write(duckdb_vx_file_handle handle, idx_t offset, idx_t len, + const uint8_t *buffer, idx_t *out_len, + duckdb_vx_error *error_out) { + if (!handle || !buffer || !out_len) { + SetError(error_out, "Invalid arguments to fs_write"); + return DuckDBError; + } + + try { + auto *wrapper = reinterpret_cast(handle); + wrapper->handle->Seek(offset); + wrapper->handle->Write(const_cast(buffer), len); + *out_len = len; + return DuckDBSuccess; + } catch (...) { + return HandleException(std::current_exception(), error_out); + } +} + +extern "C" duckdb_vx_string_list duckdb_vx_fs_glob(duckdb_vx_client_context ctx, const char *pattern, + duckdb_vx_error *error_out) { + duckdb_vx_string_list result{nullptr, 0}; + + if (!ctx || !pattern) { + SetError(error_out, "Invalid arguments to fs_glob"); + return result; + } + + try { + auto *client_context = reinterpret_cast(ctx); + auto &fs = FileSystem::GetFileSystem(*client_context); + auto matches = fs.Glob(pattern); + + if (matches.empty()) { + return result; + } + + result.count = matches.size(); + result.entries = static_cast(duckdb_malloc(sizeof(char *) * matches.size())); + for (size_t i = 0; i < matches.size(); i++) { + const auto &entry = matches[i].path; + auto *owned = static_cast(duckdb_malloc(entry.size() + 1)); + std::memcpy(owned, entry.data(), entry.size()); + owned[entry.size()] = '\0'; + result.entries[i] = owned; + } + + return result; + } catch (...) { + HandleException(std::current_exception(), error_out); + return result; + } +} + +extern "C" void duckdb_vx_string_list_free(duckdb_vx_string_list *list) { + if (!list || !list->entries) { + return; + } + for (size_t i = 0; i < list->count; i++) { + duckdb_free(const_cast(list->entries[i])); + } + duckdb_free(list->entries); + list->entries = nullptr; + list->count = 0; +} + +extern "C" duckdb_state duckdb_vx_fs_sync(duckdb_vx_file_handle handle, duckdb_vx_error *error_out) { + if (!handle) { + SetError(error_out, "Invalid arguments to fs_sync"); + return DuckDBError; + } + + try { + auto *wrapper = reinterpret_cast(handle); + wrapper->handle->Sync(); + return DuckDBSuccess; + } catch (...) { + return HandleException(std::current_exception(), error_out); + } +} diff --git a/vortex-duckdb/cpp/include/duckdb_vx.h b/vortex-duckdb/cpp/include/duckdb_vx.h index b18eb79c522..9882afbb310 100644 --- a/vortex-duckdb/cpp/include/duckdb_vx.h +++ b/vortex-duckdb/cpp/include/duckdb_vx.h @@ -10,6 +10,7 @@ #include "duckdb_vx/data_chunk.h" #include "duckdb_vx/error.h" #include "duckdb_vx/expr.h" +#include "duckdb_vx/file_system.h" #include "duckdb_vx/logical_type.h" #include "duckdb_vx/object_cache.h" #include "duckdb_vx/replacement_scan.h" diff --git a/vortex-duckdb/cpp/include/duckdb_vx/copy_function.h b/vortex-duckdb/cpp/include/duckdb_vx/copy_function.h index 168921fb3a4..a5cd89fdf63 100644 --- a/vortex-duckdb/cpp/include/duckdb_vx/copy_function.h +++ b/vortex-duckdb/cpp/include/duckdb_vx/copy_function.h @@ -45,7 +45,8 @@ typedef struct { unsigned long column_name_count, const duckdb_logical_type *column_types, unsigned long column_type_count, duckdb_vx_error *error_out); - duckdb_vx_data (*init_global)(const void *bind_data, const char *file_path, duckdb_vx_error *error_out); + duckdb_vx_data (*init_global)(duckdb_vx_client_context ctx, const void *bind_data, const char *file_path, + duckdb_vx_error *error_out); duckdb_vx_data (*init_local)(const void *bind_data, duckdb_vx_error *error_out); diff --git a/vortex-duckdb/cpp/include/duckdb_vx/file_system.h b/vortex-duckdb/cpp/include/duckdb_vx/file_system.h new file mode 100644 index 00000000000..59caff68d64 --- /dev/null +++ b/vortex-duckdb/cpp/include/duckdb_vx/file_system.h @@ -0,0 +1,56 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#pragma once + +#include "duckdb.h" +#include "duckdb_vx/client_context.h" +#include "duckdb_vx/error.h" + +#ifdef __cplusplus /* If compiled as C++, use C ABI */ +extern "C" { +#endif + +typedef struct duckdb_vx_file_handle_ *duckdb_vx_file_handle; + +typedef struct { + const char **entries; + size_t count; +} duckdb_vx_string_list; + +// Open a file using DuckDB's filesystem (supports httpfs, s3, etc.). +duckdb_vx_file_handle duckdb_vx_fs_open(duckdb_vx_client_context ctx, const char *path, + duckdb_vx_error *error_out); + +// Close a previously opened file handle. +void duckdb_vx_fs_close(duckdb_vx_file_handle *handle); + +// Get the size of an opened file. +duckdb_state duckdb_vx_fs_get_size(duckdb_vx_file_handle handle, idx_t *size_out, + duckdb_vx_error *error_out); + +// Read up to len bytes at the given offset into buffer. Returns bytes read via out_len. +duckdb_state duckdb_vx_fs_read(duckdb_vx_file_handle handle, idx_t offset, idx_t len, uint8_t *buffer, + idx_t *out_len, duckdb_vx_error *error_out); + +// Expand a glob using DuckDB's filesystem. +duckdb_vx_string_list duckdb_vx_fs_glob(duckdb_vx_client_context ctx, const char *pattern, + duckdb_vx_error *error_out); + +// Free a string list allocated by duckdb_vx_fs_glob. +void duckdb_vx_string_list_free(duckdb_vx_string_list *list); + +// Create/truncate a file for writing using DuckDB's filesystem. +duckdb_vx_file_handle duckdb_vx_fs_create(duckdb_vx_client_context ctx, const char *path, + duckdb_vx_error *error_out); + +// Write len bytes at the given offset from buffer. +duckdb_state duckdb_vx_fs_write(duckdb_vx_file_handle handle, idx_t offset, idx_t len, const uint8_t *buffer, + idx_t *out_len, duckdb_vx_error *error_out); + +// Flush pending writes to storage. +duckdb_state duckdb_vx_fs_sync(duckdb_vx_file_handle handle, duckdb_vx_error *error_out); + +#ifdef __cplusplus /* End C ABI */ +} +#endif diff --git a/vortex-duckdb/src/copy.rs b/vortex-duckdb/src/copy.rs index 40a0c72ea58..725c8a75132 100644 --- a/vortex-duckdb/src/copy.rs +++ b/vortex-duckdb/src/copy.rs @@ -3,6 +3,7 @@ use std::fmt::Debug; use std::iter; +use std::ptr::NonNull; use futures::SinkExt; use futures::TryStreamExt; @@ -29,13 +30,26 @@ use crate::RUNTIME; use crate::SESSION; use crate::convert::data_chunk_to_vortex; use crate::convert::from_duckdb_table; +use crate::cpp; +use crate::duckdb::ClientContext; use crate::duckdb::CopyFunction; use crate::duckdb::DataChunk; use crate::duckdb::LogicalType; +use crate::duckdb::duckdb_fs_create_writer; #[derive(Debug)] pub struct VortexCopyFunction; +#[derive(Clone, Copy)] +struct SendableClientCtx(NonNull); +unsafe impl Send for SendableClientCtx {} + +impl SendableClientCtx { + fn as_ptr(self) -> cpp::duckdb_vx_client_context { + self.0.as_ptr() + } +} + pub struct BindData { dtype: DType, fields: StructFields, @@ -118,6 +132,7 @@ impl CopyFunction for VortexCopyFunction { } fn init_global( + client_context: ClientContext, bind_data: &Self::BindData, file_path: String, ) -> VortexResult { @@ -126,9 +141,17 @@ impl CopyFunction for VortexCopyFunction { let array_stream = ArrayStreamAdapter::new(bind_data.dtype.clone(), rx.into_stream()); let handle = SESSION.handle(); + let ctx_ptr = SendableClientCtx( + NonNull::new(client_context.as_ptr()) + .vortex_expect("Client context pointer should not be null"), + ); let write_task = handle.spawn(async move { - let mut file = async_fs::File::create(file_path).await?; - SESSION.write_options().write(&mut file, array_stream).await + // Use DuckDB FS exclusively to match the DuckDB client context configuration. + let writer = + unsafe { duckdb_fs_create_writer(ctx_ptr.as_ptr(), &file_path) }.map_err(|e| { + vortex_err!("Failed to create DuckDB FS writer for {file_path}: {e}") + })?; + SESSION.write_options().write(writer, array_stream).await }); let worker_pool = RUNTIME.new_pool(); diff --git a/vortex-duckdb/src/duckdb/copy_function/callback.rs b/vortex-duckdb/src/duckdb/copy_function/callback.rs index 05982003544..783bd9f2a2e 100644 --- a/vortex-duckdb/src/duckdb/copy_function/callback.rs +++ b/vortex-duckdb/src/duckdb/copy_function/callback.rs @@ -15,6 +15,7 @@ use crate::cpp::duckdb_data_chunk; use crate::cpp::duckdb_logical_type; use crate::cpp::duckdb_vx_copy_func_bind_input; use crate::cpp::duckdb_vx_error; +use crate::duckdb::ClientContext; use crate::duckdb::CopyFunction; use crate::duckdb::Data; use crate::duckdb::DataChunk; @@ -52,6 +53,7 @@ pub(crate) unsafe extern "C-unwind" fn bind_callback( } pub(crate) unsafe extern "C-unwind" fn global_callback( + client_context: cpp::duckdb_vx_client_context, bind_data: *const c_void, file_path: *const c_char, error_out: *mut duckdb_vx_error, @@ -62,7 +64,8 @@ pub(crate) unsafe extern "C-unwind" fn global_callback( let bind_data = unsafe { bind_data.cast::().as_ref() } .vortex_expect("global_init_data null pointer"); try_or_null(error_out, || { - let bind_data = T::init_global(bind_data, file_path)?; + let ctx = unsafe { ClientContext::borrow(client_context) }; + let bind_data = T::init_global(ctx, bind_data, file_path)?; Ok(Data::from(Box::new(bind_data)).as_ptr()) }) } diff --git a/vortex-duckdb/src/duckdb/copy_function/mod.rs b/vortex-duckdb/src/duckdb/copy_function/mod.rs index fc8a51d0fd9..0c4a6f903b8 100644 --- a/vortex-duckdb/src/duckdb/copy_function/mod.rs +++ b/vortex-duckdb/src/duckdb/copy_function/mod.rs @@ -11,6 +11,7 @@ use vortex::error::VortexResult; use crate::Connection; use crate::cpp; +use crate::duckdb::ClientContext; use crate::duckdb::DataChunk; use crate::duckdb::LogicalType; use crate::duckdb::copy_function::callback::bind_callback; @@ -49,6 +50,7 @@ pub trait CopyFunction: Sized + Debug { /// The global operator state is used to keep track of the progress in the copy function and /// is shared between all threads working on the copy function. fn init_global( + client_context: ClientContext, bind_data: &Self::BindData, file_path: String, ) -> VortexResult; diff --git a/vortex-duckdb/src/duckdb/file_system.rs b/vortex-duckdb/src/duckdb/file_system.rs new file mode 100644 index 00000000000..8f70b627d07 --- /dev/null +++ b/vortex-duckdb/src/duckdb/file_system.rs @@ -0,0 +1,297 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::ffi::CStr; +use std::ffi::CString; +use std::ptr; +use std::sync::Arc; +use std::sync::OnceLock; + +use futures::FutureExt; +use futures::future::BoxFuture; +use vortex::array::buffer::BufferHandle; +use vortex::buffer::Alignment; +use vortex::buffer::ByteBufferMut; +use vortex::error::VortexError; +use vortex::error::VortexResult; +use vortex::error::vortex_err; +use vortex::io::CoalesceConfig; +use vortex::io::IoBuf; +use vortex::io::VortexReadAt; +use vortex::io::VortexWrite; +use vortex::io::runtime::BlockingRuntime; + +use crate::RUNTIME; +use crate::cpp; +use crate::duckdb::ClientContext; +use crate::lifetime_wrapper; + +const DEFAULT_COALESCE: CoalesceConfig = CoalesceConfig { + distance: 1024 * 1024, // 1 MB + max_size: 8 * 1024 * 1024, // 8 MB +}; + +// Local cap to keep remote reads parallel without overwhelming typical per-host connection limits. +// The DuckDB httpfs extension does not expose a fixed default concurrency in the vendored sources; +// 64 is a conservative ceiling that stays well below common cloud limits while keeping range reads +// busy. +const DEFAULT_CONCURRENCY: usize = 64; + +lifetime_wrapper!(FsFileHandle, cpp::duckdb_vx_file_handle, cpp::duckdb_vx_fs_close, [owned, ref]); +unsafe impl Send for FsFileHandle {} +unsafe impl Sync for FsFileHandle {} +unsafe impl Send for cpp::duckdb_vx_client_context_ {} +unsafe impl Sync for cpp::duckdb_vx_client_context_ {} + +fn fs_error(err: cpp::duckdb_vx_error) -> VortexError { + if err.is_null() { + return vortex_err!("DuckDB filesystem error (unknown)"); + } + let message = unsafe { CStr::from_ptr(cpp::duckdb_vx_error_value(err)) } + .to_string_lossy() + .to_string(); + unsafe { cpp::duckdb_vx_error_free(err) }; + vortex_err!("{message}") +} + +pub fn duckdb_fs_glob(ctx: &ClientContext, pattern: &str) -> VortexResult> { + let c_pattern = CString::new(pattern).map_err(|e| vortex_err!("Invalid glob pattern: {e}"))?; + let mut err: cpp::duckdb_vx_error = ptr::null_mut(); + let mut list = + unsafe { cpp::duckdb_vx_fs_glob(ctx.as_ptr(), c_pattern.as_ptr(), &raw mut err) }; + if !err.is_null() { + return Err(fs_error(err)); + } + + let mut urls = Vec::with_capacity(list.count); + for idx in 0..list.count { + let entry = unsafe { CStr::from_ptr(*list.entries.add(idx)) }; + let entry_str = entry.to_string_lossy(); + let url = url::Url::parse(&entry_str) + .map_err(|e| vortex_err!("Invalid URL returned by DuckDB glob {entry_str}: {e}"))?; + urls.push(url); + } + + unsafe { cpp::duckdb_vx_string_list_free(&raw mut list) }; + + Ok(urls) +} + +pub unsafe fn duckdb_fs_create_writer( + ctx: cpp::duckdb_vx_client_context, + path: &str, +) -> VortexResult { + unsafe { DuckDbFsWriter::create(ctx, path) } +} + +/// A VortexReadAt implementation backed by DuckDB's filesystem (e.g., httpfs/s3). +pub struct DuckDbFsReadAt { + handle: Arc, + uri: Arc, + size: Arc>, +} + +impl DuckDbFsReadAt { + pub unsafe fn open_url( + ctx: cpp::duckdb_vx_client_context, + url: &url::Url, + ) -> VortexResult { + let c_path = CString::new(url.as_str()).map_err(|e| vortex_err!("Invalid URL: {e}"))?; + let mut err: cpp::duckdb_vx_error = ptr::null_mut(); + let handle = unsafe { cpp::duckdb_vx_fs_open(ctx, c_path.as_ptr(), &raw mut err) }; + if handle.is_null() { + return Err(fs_error(err)); + } + + Ok(Self { + handle: Arc::new(unsafe { FsFileHandle::own(handle) }), + uri: Arc::from(url.as_str()), + size: Arc::new(OnceLock::new()), + }) + } +} + +impl VortexReadAt for DuckDbFsReadAt { + fn uri(&self) -> Option<&Arc> { + Some(&self.uri) + } + + fn coalesce_config(&self) -> Option { + Some(DEFAULT_COALESCE) + } + + fn concurrency(&self) -> usize { + DEFAULT_CONCURRENCY + } + + fn size(&self) -> BoxFuture<'static, VortexResult> { + let handle = self.handle.clone(); + let size_cell = self.size.clone(); + + async move { + if let Some(size) = size_cell.get() { + return Ok(*size); + } + + let runtime = RUNTIME.handle(); + let size = runtime + .spawn_blocking(move || { + let mut err: cpp::duckdb_vx_error = ptr::null_mut(); + let mut size_out: cpp::idx_t = 0; + let status = unsafe { + cpp::duckdb_vx_fs_get_size(handle.as_ptr(), &raw mut size_out, &raw mut err) + }; + if status != cpp::duckdb_state::DuckDBSuccess { + return Err(fs_error(err)); + } + Ok::<_, VortexError>(size_out as u64) + }) + .await?; + + let _ = size_cell.set(size); + Ok(size) + } + .boxed() + } + + fn read_at( + &self, + offset: u64, + length: usize, + alignment: Alignment, + ) -> BoxFuture<'static, VortexResult> { + let handle = self.handle.clone(); + + async move { + let runtime = RUNTIME.handle(); + let result: VortexResult = runtime + .spawn_blocking(move || -> VortexResult { + let mut buffer = ByteBufferMut::with_capacity_aligned(length, alignment); + unsafe { buffer.set_len(length) }; + + let mut err: cpp::duckdb_vx_error = ptr::null_mut(); + let mut out_len: cpp::idx_t = 0; + let status = unsafe { + cpp::duckdb_vx_fs_read( + handle.as_ptr(), + offset as cpp::idx_t, + length as cpp::idx_t, + buffer.as_mut_slice().as_mut_ptr(), + &raw mut out_len, + &raw mut err, + ) + }; + + if status != cpp::duckdb_state::DuckDBSuccess { + return Err(fs_error(err)); + } + + let used = usize::try_from(out_len) + .map_err(|e| vortex_err!("Invalid read len: {e}"))?; + unsafe { buffer.set_len(used) }; + + let frozen = buffer.freeze(); + Ok::<_, VortexError>(BufferHandle::new_host(frozen)) + }) + .await; + result + } + .boxed() + } +} + +// SAFETY: DuckDB file handles are thread-safe for reads; writes are done via DuckDbFsWriter. +unsafe impl Send for DuckDbFsReadAt {} +unsafe impl Sync for DuckDbFsReadAt {} + +pub struct DuckDbFsWriter { + handle: FsFileHandle, + pos: u64, +} + +impl DuckDbFsWriter { + pub unsafe fn create(ctx: cpp::duckdb_vx_client_context, path: &str) -> VortexResult { + let c_path = CString::new(path).map_err(|e| vortex_err!("Invalid path: {e}"))?; + let mut err: cpp::duckdb_vx_error = ptr::null_mut(); + let handle = unsafe { cpp::duckdb_vx_fs_create(ctx, c_path.as_ptr(), &raw mut err) }; + if handle.is_null() { + return Err(fs_error(err)); + } + + Ok(Self { + handle: unsafe { FsFileHandle::own(handle) }, + pos: 0, + }) + } +} + +impl VortexWrite for DuckDbFsWriter { + async fn write_all(&mut self, buffer: B) -> std::io::Result { + let len = buffer.bytes_init(); + let mut err: cpp::duckdb_vx_error = ptr::null_mut(); + let mut out_len: cpp::idx_t = 0; + + let status = unsafe { + cpp::duckdb_vx_fs_write( + self.handle.as_ptr(), + self.pos as cpp::idx_t, + len as cpp::idx_t, + buffer.read_ptr(), + &raw mut out_len, + &raw mut err, + ) + }; + + if status != cpp::duckdb_state::DuckDBSuccess { + return Err(std::io::Error::other(fs_error(err).to_string())); + } + + self.pos += len as u64; + Ok(buffer) + } + + async fn flush(&mut self) -> std::io::Result<()> { + let mut err: cpp::duckdb_vx_error = ptr::null_mut(); + let status = unsafe { cpp::duckdb_vx_fs_sync(self.handle.as_ptr(), &raw mut err) }; + if status != cpp::duckdb_state::DuckDBSuccess { + return Err(std::io::Error::other(fs_error(err).to_string())); + } + Ok(()) + } + + async fn shutdown(&mut self) -> std::io::Result<()> { + self.flush().await + } +} + +#[cfg(test)] +mod tests { + use std::fs; + use std::path::PathBuf; + + use super::*; + use crate::duckdb::Database; + + #[test] + fn test_writer_roundtrip_local() { + let db = Database::open_in_memory().unwrap(); + let conn = db.connect().unwrap(); + let ctx = conn.client_context().unwrap(); + + let dir = tempfile::tempdir().unwrap(); + let path: PathBuf = dir.path().join("writer_local.vortex"); + let path_str = path.to_string_lossy(); + + let mut writer = unsafe { duckdb_fs_create_writer(ctx.as_ptr(), &path_str) }.unwrap(); + + futures::executor::block_on(async { + VortexWrite::write_all(&mut writer, vec![1_u8, 2, 3]) + .await + .unwrap(); + VortexWrite::flush(&mut writer).await.unwrap(); + }); + + let data = fs::read(path).unwrap(); + assert_eq!(data, vec![1, 2, 3]); + } +} diff --git a/vortex-duckdb/src/duckdb/mod.rs b/vortex-duckdb/src/duckdb/mod.rs index 1960eb71ded..a88610a63e0 100644 --- a/vortex-duckdb/src/duckdb/mod.rs +++ b/vortex-duckdb/src/duckdb/mod.rs @@ -9,6 +9,7 @@ mod data; mod data_chunk; mod database; mod expr; +mod file_system; pub mod footer_cache; mod logical_type; mod macro_; @@ -34,6 +35,7 @@ pub use data::*; pub use data_chunk::*; pub use database::*; pub use expr::*; +pub use file_system::*; pub use logical_type::*; pub use object_cache::*; pub use query_result::*; diff --git a/vortex-duckdb/src/e2e_test/vortex_scan_test.rs b/vortex-duckdb/src/e2e_test/vortex_scan_test.rs index 9812dcd2728..c271985eb47 100644 --- a/vortex-duckdb/src/e2e_test/vortex_scan_test.rs +++ b/vortex-duckdb/src/e2e_test/vortex_scan_test.rs @@ -4,6 +4,8 @@ //! This module contains tests for the `vortex_scan` table function. use std::ffi::CStr; +use std::io::Write; +use std::net::TcpListener; use std::path::Path; use std::slice; use std::str::FromStr; @@ -347,6 +349,51 @@ fn test_vortex_scan_multiple_files() { assert_eq!(total_sum, 21); } +#[test] +fn test_vortex_scan_over_http() { + let file = RUNTIME.block_on(async { + let strings = VarBinArray::from(vec!["a", "b", "c"]); + write_single_column_vortex_file("strings", strings).await + }); + + let file_bytes = std::fs::read(file.path()).unwrap(); + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = listener.local_addr().unwrap(); + + std::thread::spawn(move || { + for _ in 0..2 { + if let Ok((mut stream, _)) = listener.accept() { + let response = format!( + "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n", + file_bytes.len() + ); + stream.write_all(response.as_bytes()).unwrap(); + stream.write_all(&file_bytes).unwrap(); + } + } + }); + + let conn = database_connection(); + conn.query("INSTALL httpfs;").unwrap(); + conn.query("LOAD httpfs;").unwrap(); + + let url = format!( + "http://{}/{}", + addr, + file.path().file_name().unwrap().to_string_lossy() + ); + + let result = conn + .query(&format!("SELECT COUNT(*) FROM read_vortex('{url}')")) + .unwrap(); + let chunk = result.into_iter().next().unwrap(); + let count = chunk + .get_vector(0) + .as_slice_with_len::(chunk.len().as_())[0]; + + assert_eq!(count, 3); +} + #[test] fn test_write_file() { let conn = database_connection(); diff --git a/vortex-duckdb/src/scan.rs b/vortex-duckdb/src/scan.rs index f48ce764fc5..c1d179036b3 100644 --- a/vortex-duckdb/src/scan.rs +++ b/vortex-duckdb/src/scan.rs @@ -7,6 +7,7 @@ use std::fmt; use std::fmt::Debug; use std::fmt::Formatter; use std::pin::Pin; +use std::ptr::NonNull; use std::sync::Arc; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; @@ -54,12 +55,14 @@ use crate::RUNTIME; use crate::SESSION; use crate::convert::try_from_bound_expression; use crate::convert::try_from_table_filter; +use crate::cpp; use crate::duckdb; use crate::duckdb::BindInput; use crate::duckdb::BindResult; use crate::duckdb::Cardinality; use crate::duckdb::ClientContext; use crate::duckdb::DataChunk; +use crate::duckdb::DuckDbFsReadAt; use crate::duckdb::ExtractedValue; use crate::duckdb::LogicalType; use crate::duckdb::TableFunction; @@ -69,7 +72,6 @@ use crate::duckdb::footer_cache::FooterCache; use crate::exporter::ArrayExporter; use crate::exporter::ConversionCache; use crate::utils::glob::expand_glob; -use crate::utils::object_store::s3_store; pub struct VortexBindData { first_file: VortexFile, @@ -203,26 +205,33 @@ fn extract_table_filter_expr( Ok(and_collect(table_filter_exprs.into_iter().collect_vec())) } -/// Helper function to open a Vortex file from either a local or S3 URL -async fn open_file(url: Url, options: VortexOpenOptions) -> VortexResult { - if url.scheme() == "s3" { - assert_eq!(url.scheme(), "s3"); - let bucket = url - .host_str() - .ok_or_else(|| vortex_err!("Failed to extract bucket name from URL: {url}"))?; +/// Helper function to open a Vortex file from either a local path or a URL supported by DuckDB's filesystem. +#[derive(Clone, Copy)] +struct SendableClientCtx(NonNull); +unsafe impl Send for SendableClientCtx {} +unsafe impl Sync for SendableClientCtx {} - let path = url - .path() - .strip_prefix("/") - .ok_or_else(|| vortex_err!("Invalid S3 path: {url}"))?; +fn open_duckdb_reader(client_ctx: SendableClientCtx, url: &Url) -> VortexResult { + unsafe { DuckDbFsReadAt::open_url(client_ctx.0.as_ptr(), url) } +} - options.open_object_store(&s3_store(bucket)?, path).await - } else { - let path = url - .to_file_path() - .map_err(|_| vortex_err!("Invalid file URL: {url}"))?; +async fn open_file( + client_ctx: SendableClientCtx, + url: Url, + options: VortexOpenOptions, +) -> VortexResult { + match url.scheme() { + "http" | "https" | "s3" => { + let reader = Arc::new(open_duckdb_reader(client_ctx, &url)?); + options.open(reader).await + } + _ => { + let path = url + .to_file_path() + .map_err(|_| vortex_err!("Invalid file URL: {url}"))?; - options.open_path(path).await + options.open_path(path).await + } } } @@ -277,6 +286,7 @@ impl TableFunction for VortexTableFunction { tracing::trace!("running scan with max_threads {max_threads}"); let (file_urls, _metadata) = RUNTIME.block_on(Compat::new(expand_glob( + ctx, file_glob_string.as_ref().as_string(), )))?; @@ -286,10 +296,13 @@ impl TableFunction for VortexTableFunction { }; let footer_cache = FooterCache::new(ctx.object_cache()); + let client_ctx = SendableClientCtx( + NonNull::new(ctx.as_ptr()).vortex_expect("Client context pointer should not be null"), + ); let entry = footer_cache.entry(first_file_url.as_ref()); let first_file = RUNTIME.block_on(async move { let options = entry.apply_to_file(SESSION.open_options()); - let file = open_file(first_file_url.clone(), options).await?; + let file = open_file(client_ctx, first_file_url.clone(), options).await?; entry.put_if_absent(|| file.footer().clone()); VortexResult::Ok(file) })?; @@ -392,6 +405,10 @@ impl TableFunction for VortexTableFunction { let num_workers = bind_data.max_threads as usize; let client_context = init_input.client_context()?; + let client_ctx_ptr = SendableClientCtx( + NonNull::new(client_context.as_ptr()) + .vortex_expect("Client context pointer should not be null"), + ); let object_cache = client_context.object_cache(); let handle = RUNTIME.handle(); @@ -415,7 +432,7 @@ impl TableFunction for VortexTableFunction { let cache = FooterCache::new(object_cache); let entry = cache.entry(url.as_ref()); let options = entry.apply_to_file(SESSION.open_options()); - let file = open_file(url.clone(), options).await?; + let file = open_file(client_ctx_ptr, url.clone(), options).await?; entry.put_if_absent(|| file.footer().clone()); VortexResult::Ok(file) }?; diff --git a/vortex-duckdb/src/utils/glob.rs b/vortex-duckdb/src/utils/glob.rs index 0c21c4f3034..01db505b03b 100644 --- a/vortex-duckdb/src/utils/glob.rs +++ b/vortex-duckdb/src/utils/glob.rs @@ -1,141 +1,48 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use futures::StreamExt; +#[cfg(test)] +use std::sync::OnceLock; + use object_store::ObjectMeta; +#[cfg(test)] +use parking_lot::Mutex; use url::Url; use vortex::error::VortexResult; use vortex::error::vortex_bail; use vortex::error::vortex_err; -use super::object_store::s3_store; +use crate::duckdb::ClientContext; +use crate::duckdb::duckdb_fs_glob; + +#[cfg(test)] +type DuckdbGlobHook = Box VortexResult> + Send + Sync>; + +#[cfg(test)] +static DUCKDB_GLOB_HOOK: OnceLock>> = OnceLock::new(); + +#[cfg(test)] +static GLOB_TEST_GUARD: OnceLock> = OnceLock::new(); /// Expand a glob pattern into a list of URLs. /// /// Example: s3://bucket/files/*.vortex -> (urls, Some(object_store_metadata)) pub async fn expand_glob>( + client_context: &ClientContext, url_glob: T, ) -> VortexResult<(Vec, Option>)> { let url_str = url_glob.as_ref(); // We prefer using string prefix matching here over extracting a URL scheme // as local files with an absolute path but without the file:// prefix can't // be parsed into a URL. - match &url_str[..url_str.len().min(5)] { - "s3://" => s3::expand_glob(&url_glob).await, - "gs://" => vortex_bail!("GCS glob expansion not yet implemented"), - _ => local_filesystem::expand_glob(url_str), - } -} - -mod s3 { - use object_store::ObjectMeta; - - use super::*; - - /// Expand a glob pattern into a list of S3 URLs. - /// - /// Makes a single request based on the position of the first glob character - /// and filters the results on the client side. In case no glob characters - /// are provided, the last directory in the path is used as the list prefix. - pub(super) async fn expand_glob>( - url_glob: T, - ) -> VortexResult<(Vec, Option>)> { - validate_glob(&url_glob)?; - assert_eq!("s3://", &url_glob.as_ref()[..5]); - let url = Url::parse(url_glob.as_ref())?; - - let bucket = url - .host_str() - .ok_or_else(|| vortex_err!("Failed to extract bucket name from URL: {url}"))?; - - let list_prefix = list_prefix(url_path(&url)?); - let object_store = s3_store(bucket)?; - - // The AWS S3 `ListObjectsV2` API returns multiple objects per HTTP request, up to 1000 by default. - let stream = object_store.list(Some(&object_store::path::Path::from(list_prefix))); - - let glob_pattern = glob::Pattern::new(url_glob.as_ref()) - .map_err(|e| vortex_err!("Invalid glob pattern: {}", e))?; - - let matching_paths = process_object_store_stream(stream, &glob_pattern, bucket).await?; - - let (urls, metadata) = matching_paths.into_iter().unzip(); - Ok((urls, Some(metadata))) - } - - /// Validates that a glob pattern does not contain escaped glob characters. - /// Returns an error if backslash-escaped characters like \*, \?, etc. are found. - pub(super) fn validate_glob>(pattern: T) -> VortexResult<()> { - let pattern_str = pattern.as_ref(); - - // Check for backslash escape sequences. - for escape_pattern in ["\\*", "\\?", "\\["] { - if pattern_str.contains(escape_pattern) { - vortex_bail!( - "Escaped glob characters are not allowed in patterns. Found '{}' in: {}", - escape_pattern, - pattern_str - ); - } + match url_str { + s if s.starts_with("s3://") || s.starts_with("https://") || s.starts_with("http://") => { + duckdb_fs_glob(client_context, url_str).map(|urls| (urls, None)) } - - Ok(()) - } - - /// Returns the path from an S3 URL. - /// - /// Example: "s3://bucket/path/to/file.txt" -> "path/to/file.txt" - pub(super) fn url_path(url: &Url) -> VortexResult<&str> { - url.path() - .strip_prefix("/") - .ok_or_else(|| vortex_err!("Invalid URL: {url}")) - } - - /// Returns the list prefix for a URL path which can contain glob characters. - /// - /// Unlike `aws s3 ls`, the object store crate does not support support - /// incomplete file names as a prefix. Therefore, the prefix is the - /// directory path up to the first glob character. - /// - /// Example: "path/to/file_*.txt" -> "path/to/" - pub(super) fn list_prefix>(url_path: T) -> String { - let url_path = url_path.as_ref(); - // Find first glob character index. - let special_char_index = url_path - .find(|c| ['*', '?', '['].contains(&c)) - .unwrap_or(url_path.len()); - - match &url_path[..special_char_index].rsplit_once('/') { - Some((dir_path, _)) => format!("{dir_path}/"), - None => url_path[..special_char_index].to_string(), + s if s.starts_with("gs://") => { + vortex_bail!("GCS glob expansion not yet implemented") } - } - - /// Process an object store stream and filter the results on the client - /// based on the glob pattern. - async fn process_object_store_stream( - stream: impl futures::Stream>, - glob_pattern: &glob::Pattern, - bucket: &str, - ) -> VortexResult> { - let matching_paths: Vec<(Url, ObjectMeta)> = stream - .map(|object_meta| async move { - if let Ok(object_meta) = object_meta { - let url_string = format!("s3://{}/{}", bucket, object_meta.location); - if glob_pattern.matches(&url_string) - && let Ok(parsed_url) = Url::parse(&url_string) - { - return Some((parsed_url, object_meta)); - } - } - None - }) - .buffer_unordered(16) - .filter_map(|result| async { result }) - .collect() - .await; - - Ok(matching_paths) + _ => local_filesystem::expand_glob(url_str), } } @@ -177,6 +84,20 @@ mod tests { use tempfile::TempDir; use super::*; + use crate::duckdb::Database; + + fn ctx_bundle() -> (Database, crate::duckdb::Connection, ClientContext) { + let db = Database::open_in_memory().unwrap(); + let conn = db.connect().unwrap(); + let ctx = conn.client_context().unwrap(); + (db, conn, ctx) + } + + fn set_glob_hook(hook: Option) { + let cell = DUCKDB_GLOB_HOOK.get_or_init(|| Mutex::new(None)); + let mut guard = cell.lock(); + *guard = hook; + } #[test] fn test_expand_local_disk_glob_relative_path() { @@ -198,6 +119,25 @@ mod tests { env::set_current_dir(&original_dir).unwrap(); } + #[test] + fn test_duckdb_glob_http_hook() { + let _guard = GLOB_TEST_GUARD.get_or_init(|| Mutex::new(())).lock(); + let url = Url::parse("http://example.com/data.vortex").unwrap(); + let expected = url.clone(); + set_glob_hook(Some(Box::new(move |_, _| Ok(vec![url.clone()])))); + + let (_db, _conn, ctx) = ctx_bundle(); + + let (urls, meta) = + futures::executor::block_on(expand_glob(&ctx, "http://example.com/data.vortex")) + .unwrap(); + assert_eq!(meta, None); + assert_eq!(urls.len(), 1); + assert_eq!(urls[0], expected); + + set_glob_hook(None); + } + #[test] fn test_expand_local_disk_glob_single_file() { let temp_dir = TempDir::new().unwrap(); @@ -268,129 +208,4 @@ mod tests { assert_eq!(result.0.len(), 2); } - - #[test] - fn test_extract_s3_url_path() { - // Test valid S3 URL - let url = Url::parse("s3://bucket/path/to/file.txt").unwrap(); - let result = s3::url_path(&url).unwrap(); - assert_eq!(result, "path/to/file.txt"); - - // Test URL with nested path - let url = Url::parse("s3://my-bucket/folder/subfolder/data.parquet").unwrap(); - let result = s3::url_path(&url).unwrap(); - assert_eq!(result, "folder/subfolder/data.parquet"); - - // Test URL with root path - let url = Url::parse("s3://bucket/file.txt").unwrap(); - let result = s3::url_path(&url).unwrap(); - assert_eq!(result, "file.txt"); - - // Test URL without leading slash should fail - let url = Url::parse("s3://bucket").unwrap(); - let result = s3::url_path(&url); - assert!(result.is_err()); - } - - #[test] - fn test_calculate_list_prefix() { - // Test with wildcard in filename - let result = s3::list_prefix("folder/file*.txt"); - assert_eq!(result, "folder/"); - - // Test with wildcard in directory - let result = s3::list_prefix("folder/*/file.txt"); - assert_eq!(result, "folder/"); - - // Test with nested directories and wildcard - let result = s3::list_prefix("data/2023/*/logs/*.log"); - assert_eq!(result, "data/2023/"); - - // Test with wildcard at root level - let result = s3::list_prefix("*.txt"); - assert_eq!(result, ""); - - // Test with no wildcards - let result = s3::list_prefix("folder/subfolder/file.txt"); - assert_eq!(result, "folder/subfolder/"); - - // Test with question mark wildcard - let result = s3::list_prefix("folder/file?.txt"); - assert_eq!(result, "folder/"); - - // Test with bracket wildcards - let result = s3::list_prefix("folder/file[abc].txt"); - assert_eq!(result, "folder/"); - - // Test empty path - let result = s3::list_prefix(""); - assert_eq!(result, ""); - } - - #[test] - fn test_s3_url_parsing_integration() { - // Test complete S3 URL parsing workflow - let url = Url::parse("s3://my-bucket/data/year=2023/month=*/day=*/events.parquet").unwrap(); - - let url_path = s3::url_path(&url).unwrap(); - assert_eq!(url_path, "data/year=2023/month=*/day=*/events.parquet"); - - let list_prefix = s3::list_prefix(url_path); - assert_eq!(list_prefix, "data/year=2023/"); - } - - #[test] - fn test_s3_url_parsing_edge_cases() { - // Test URL with multiple consecutive wildcards - let url = Url::parse("s3://bucket/logs/**/*.log").unwrap(); - let url_path = s3::url_path(&url).unwrap(); - let list_prefix = s3::list_prefix(url_path); - assert_eq!(list_prefix, "logs/"); - - // Test URL with wildcard at the beginning - let url = Url::parse("s3://bucket/*.txt").unwrap(); - let url_path = s3::url_path(&url).unwrap(); - let list_prefix = s3::list_prefix(url_path); - assert_eq!(list_prefix, ""); - - // Test deeply nested path with wildcard - let url = Url::parse("s3://bucket/a/b/c/d/e/f/g/*.json").unwrap(); - let url_path = s3::url_path(&url).unwrap(); - let list_prefix = s3::list_prefix(url_path); - assert_eq!(list_prefix, "a/b/c/d/e/f/g/"); - } - - #[test] - fn test_s3_url_parsing_no_wildcards() { - let url = Url::parse("s3://bucket/path/to/specific/file.txt").unwrap(); - let url_path = s3::url_path(&url).unwrap(); - let list_prefix = s3::list_prefix(url_path); - assert_eq!(list_prefix, "path/to/specific/"); - } - - #[test] - fn test_validate_glob_valid_pattern() { - assert!(s3::validate_glob("s3://bucket/path/*.txt").is_ok()); - } - - #[test] - fn test_validate_glob_escaped_asterisk() { - let result = s3::validate_glob("s3://bucket/path\\*.txt"); - assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("\\*")); - } - - #[test] - fn test_validate_glob_escaped_question_mark() { - let result = s3::validate_glob("s3://bucket/path\\?.txt"); - assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("\\?")); - } - - #[test] - fn test_validate_glob_escaped_bracket() { - let result = s3::validate_glob("s3://bucket/path\\[test].txt"); - assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("\\[")); - } } diff --git a/vortex-duckdb/src/utils/mod.rs b/vortex-duckdb/src/utils/mod.rs index ecb071cbc82..b5090ad8fd4 100644 --- a/vortex-duckdb/src/utils/mod.rs +++ b/vortex-duckdb/src/utils/mod.rs @@ -2,4 +2,3 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors pub mod glob; -pub mod object_store; diff --git a/vortex-duckdb/src/utils/object_store.rs b/vortex-duckdb/src/utils/object_store.rs deleted file mode 100644 index d9be329177f..00000000000 --- a/vortex-duckdb/src/utils/object_store.rs +++ /dev/null @@ -1,37 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use std::sync::Arc; -use std::sync::OnceLock; - -use object_store::ObjectStore; -use object_store::aws::AmazonS3Builder; -use vortex::error::VortexResult; -use vortex::error::vortex_err; -use vortex_utils::aliases::dash_map::DashMap; - -// Global S3 object store cache. -pub fn s3_store(bucket: &str) -> VortexResult> { - static S3_STORES: OnceLock>> = OnceLock::new(); - let stores = S3_STORES.get_or_init(|| DashMap::with_hasher(Default::default())); - - fn create_s3_object_store(bucket: &str) -> VortexResult> { - Ok(Arc::new( - AmazonS3Builder::from_env() - .with_bucket_name(bucket) - .build() - .map_err(|e| vortex_err!("Failed to create S3 store: {}", e))?, - ) as Arc) - } - - let object_store = match stores.get(bucket) { - Some(store) => store.clone(), - None => { - let store = create_s3_object_store(bucket)?; - stores.insert(bucket.to_string(), store.clone()); - store - } - }; - - Ok(object_store) -}