-
Notifications
You must be signed in to change notification settings - Fork 126
feat: implement DuckDB filesystem integration for Vortex file handling #6198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
c408ffa
4fffe00
94e617b
a131078
7e51cf2
c5b1e85
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,208 @@ | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| // SPDX-FileCopyrightText: Copyright the Vortex contributors | ||
|
|
||
| #include "duckdb_vx.h" | ||
|
|
||
| #include <duckdb/common/exception.hpp> | ||
| #include <duckdb/common/file_system.hpp> | ||
| #include <duckdb/common/helper.hpp> | ||
| #include <duckdb/main/client_context.hpp> | ||
|
|
||
| #include <cstring> | ||
| #include <memory> | ||
| #include <string> | ||
| #include <utility> | ||
|
|
||
| using namespace duckdb; | ||
|
|
||
| struct FileHandleWrapper { | ||
| explicit FileHandleWrapper(unique_ptr<FileHandle> handle_p) : handle(std::move(handle_p)) { | ||
| } | ||
|
|
||
| unique_ptr<FileHandle> handle; | ||
| }; | ||
|
|
||
| static void SetError(duckdb_vx_error *error_out, const std::string &message) { | ||
| if (!error_out) { | ||
| return; | ||
| } | ||
| *error_out = duckdb_vx_error_create(message.data(), message.size()); | ||
| } | ||
|
|
||
| static duckdb_state HandleException(std::exception_ptr ex, duckdb_vx_error *error_out) { | ||
| if (!ex) { | ||
| SetError(error_out, "Unknown error"); | ||
| return DuckDBError; | ||
| } | ||
|
|
||
| try { | ||
| std::rethrow_exception(ex); | ||
| } catch (const Exception &caught) { | ||
| SetError(error_out, caught.what()); | ||
| } catch (const std::exception &caught) { | ||
| SetError(error_out, caught.what()); | ||
| } catch (...) { | ||
| SetError(error_out, "Unknown error"); | ||
| } | ||
| return DuckDBError; | ||
| } | ||
|
|
||
| extern "C" duckdb_vx_file_handle duckdb_vx_fs_open(duckdb_vx_client_context ctx, const char *path, | ||
| duckdb_vx_error *error_out) { | ||
| if (!ctx || !path) { | ||
| SetError(error_out, "Invalid filesystem open arguments"); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For all
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. addressed |
||
| return nullptr; | ||
| } | ||
|
|
||
| try { | ||
| auto *client_context = reinterpret_cast<ClientContext *>(ctx); | ||
| auto &fs = FileSystem::GetFileSystem(*client_context); | ||
| auto handle = fs.OpenFile(path, FileFlags::FILE_FLAGS_READ); | ||
| return reinterpret_cast<duckdb_vx_file_handle>(new FileHandleWrapper(std::move(handle))); | ||
| } catch (...) { | ||
| HandleException(std::current_exception(), error_out); | ||
| return nullptr; | ||
| } | ||
| } | ||
|
|
||
| extern "C" duckdb_vx_file_handle duckdb_vx_fs_create(duckdb_vx_client_context ctx, const char *path, | ||
| duckdb_vx_error *error_out) { | ||
| if (!ctx || !path) { | ||
| SetError(error_out, "Invalid filesystem create arguments"); | ||
| return nullptr; | ||
| } | ||
|
|
||
| try { | ||
| auto *client_context = reinterpret_cast<ClientContext *>(ctx); | ||
| auto &fs = FileSystem::GetFileSystem(*client_context); | ||
| auto handle = fs.OpenFile(path, FileFlags::FILE_FLAGS_WRITE | FileFlags::FILE_FLAGS_FILE_CREATE); | ||
| handle->Truncate(0); | ||
| return reinterpret_cast<duckdb_vx_file_handle>(new FileHandleWrapper(std::move(handle))); | ||
| } catch (...) { | ||
| HandleException(std::current_exception(), error_out); | ||
| return nullptr; | ||
| } | ||
| } | ||
|
|
||
| extern "C" void duckdb_vx_fs_close(duckdb_vx_file_handle *handle) { | ||
| if (!handle || !*handle) { | ||
| return; | ||
| } | ||
| auto wrapper = reinterpret_cast<FileHandleWrapper *>(*handle); | ||
| delete wrapper; | ||
| *handle = nullptr; | ||
| } | ||
|
|
||
| extern "C" duckdb_state duckdb_vx_fs_get_size(duckdb_vx_file_handle handle, idx_t *size_out, | ||
| duckdb_vx_error *error_out) { | ||
| if (!handle || !size_out) { | ||
| SetError(error_out, "Invalid arguments to fs_get_size"); | ||
| return DuckDBError; | ||
| } | ||
|
|
||
| try { | ||
| auto *wrapper = reinterpret_cast<FileHandleWrapper *>(handle); | ||
| *size_out = wrapper->handle->GetFileSize(); | ||
| return DuckDBSuccess; | ||
| } catch (...) { | ||
| return HandleException(std::current_exception(), error_out); | ||
| } | ||
| } | ||
|
|
||
| extern "C" duckdb_state duckdb_vx_fs_read(duckdb_vx_file_handle handle, idx_t offset, idx_t len, uint8_t *buffer, | ||
| idx_t *out_len, duckdb_vx_error *error_out) { | ||
| if (!handle || !buffer || !out_len) { | ||
| SetError(error_out, "Invalid arguments to fs_read"); | ||
| return DuckDBError; | ||
| } | ||
|
|
||
| try { | ||
| auto *wrapper = reinterpret_cast<FileHandleWrapper *>(handle); | ||
| wrapper->handle->Read(buffer, len, offset); | ||
| *out_len = len; | ||
| return DuckDBSuccess; | ||
| } catch (...) { | ||
| return HandleException(std::current_exception(), error_out); | ||
| } | ||
| } | ||
|
|
||
| extern "C" duckdb_state duckdb_vx_fs_write(duckdb_vx_file_handle handle, idx_t offset, idx_t len, | ||
| const uint8_t *buffer, idx_t *out_len, | ||
| duckdb_vx_error *error_out) { | ||
| if (!handle || !buffer || !out_len) { | ||
| SetError(error_out, "Invalid arguments to fs_write"); | ||
| return DuckDBError; | ||
| } | ||
|
|
||
| try { | ||
| auto *wrapper = reinterpret_cast<FileHandleWrapper *>(handle); | ||
| wrapper->handle->Seek(offset); | ||
| wrapper->handle->Write(const_cast<uint8_t *>(buffer), len); | ||
| *out_len = len; | ||
| return DuckDBSuccess; | ||
| } catch (...) { | ||
| return HandleException(std::current_exception(), error_out); | ||
| } | ||
| } | ||
|
|
||
| extern "C" duckdb_vx_string_list duckdb_vx_fs_glob(duckdb_vx_client_context ctx, const char *pattern, | ||
| duckdb_vx_error *error_out) { | ||
| duckdb_vx_string_list result{nullptr, 0}; | ||
|
|
||
| if (!ctx || !pattern) { | ||
| SetError(error_out, "Invalid arguments to fs_glob"); | ||
| return result; | ||
| } | ||
|
|
||
| try { | ||
| auto *client_context = reinterpret_cast<ClientContext *>(ctx); | ||
| auto &fs = FileSystem::GetFileSystem(*client_context); | ||
| auto matches = fs.Glob(pattern); | ||
|
|
||
| if (matches.empty()) { | ||
| return result; | ||
| } | ||
|
|
||
| result.count = matches.size(); | ||
| result.entries = static_cast<const char **>(duckdb_malloc(sizeof(char *) * matches.size())); | ||
| for (size_t i = 0; i < matches.size(); i++) { | ||
| const auto &entry = matches[i].path; | ||
| auto *owned = static_cast<char *>(duckdb_malloc(entry.size() + 1)); | ||
| std::memcpy(owned, entry.data(), entry.size()); | ||
| owned[entry.size()] = '\0'; | ||
| result.entries[i] = owned; | ||
| } | ||
|
|
||
| return result; | ||
| } catch (...) { | ||
| HandleException(std::current_exception(), error_out); | ||
| return result; | ||
| } | ||
| } | ||
|
|
||
| extern "C" void duckdb_vx_string_list_free(duckdb_vx_string_list *list) { | ||
| if (!list || !list->entries) { | ||
| return; | ||
| } | ||
| for (size_t i = 0; i < list->count; i++) { | ||
| duckdb_free(const_cast<char *>(list->entries[i])); | ||
| } | ||
| duckdb_free(list->entries); | ||
| list->entries = nullptr; | ||
| list->count = 0; | ||
| } | ||
|
|
||
| extern "C" duckdb_state duckdb_vx_fs_sync(duckdb_vx_file_handle handle, duckdb_vx_error *error_out) { | ||
| if (!handle) { | ||
| SetError(error_out, "Invalid arguments to fs_sync"); | ||
| return DuckDBError; | ||
| } | ||
|
|
||
| try { | ||
| auto *wrapper = reinterpret_cast<FileHandleWrapper *>(handle); | ||
| wrapper->handle->Sync(); | ||
| return DuckDBSuccess; | ||
| } catch (...) { | ||
| return HandleException(std::current_exception(), error_out); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| // SPDX-FileCopyrightText: Copyright the Vortex contributors | ||
|
|
||
| #pragma once | ||
|
|
||
| #include "duckdb.h" | ||
| #include "duckdb_vx/client_context.h" | ||
| #include "duckdb_vx/error.h" | ||
|
|
||
| #ifdef __cplusplus /* If compiled as C++, use C ABI */ | ||
| extern "C" { | ||
| #endif | ||
|
|
||
| typedef struct duckdb_vx_file_handle_ *duckdb_vx_file_handle; | ||
|
|
||
| typedef struct { | ||
| const char **entries; | ||
| size_t count; | ||
| } duckdb_vx_string_list; | ||
|
|
||
| // Open a file using DuckDB's filesystem (supports httpfs, s3, etc.). | ||
| duckdb_vx_file_handle duckdb_vx_fs_open(duckdb_vx_client_context ctx, const char *path, | ||
| duckdb_vx_error *error_out); | ||
|
|
||
| // Close a previously opened file handle. | ||
| void duckdb_vx_fs_close(duckdb_vx_file_handle *handle); | ||
|
|
||
| // Get the size of an opened file. | ||
| duckdb_state duckdb_vx_fs_get_size(duckdb_vx_file_handle handle, idx_t *size_out, | ||
| duckdb_vx_error *error_out); | ||
|
|
||
| // Read up to len bytes at the given offset into buffer. Returns bytes read via out_len. | ||
| duckdb_state duckdb_vx_fs_read(duckdb_vx_file_handle handle, idx_t offset, idx_t len, uint8_t *buffer, | ||
| idx_t *out_len, duckdb_vx_error *error_out); | ||
|
|
||
| // Expand a glob using DuckDB's filesystem. | ||
| duckdb_vx_string_list duckdb_vx_fs_glob(duckdb_vx_client_context ctx, const char *pattern, | ||
| duckdb_vx_error *error_out); | ||
|
|
||
| // Free a string list allocated by duckdb_vx_fs_glob. | ||
| void duckdb_vx_string_list_free(duckdb_vx_string_list *list); | ||
|
|
||
| // Create/truncate a file for writing using DuckDB's filesystem. | ||
| duckdb_vx_file_handle duckdb_vx_fs_create(duckdb_vx_client_context ctx, const char *path, | ||
| duckdb_vx_error *error_out); | ||
|
|
||
| // Write len bytes at the given offset from buffer. | ||
| duckdb_state duckdb_vx_fs_write(duckdb_vx_file_handle handle, idx_t offset, idx_t len, const uint8_t *buffer, | ||
| idx_t *out_len, duckdb_vx_error *error_out); | ||
|
|
||
| // Flush pending writes to storage. | ||
| duckdb_state duckdb_vx_fs_sync(duckdb_vx_file_handle handle, duckdb_vx_error *error_out); | ||
|
|
||
| #ifdef __cplusplus /* End C ABI */ | ||
| } | ||
| #endif |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,7 @@ | |
|
|
||
| use std::fmt::Debug; | ||
| use std::iter; | ||
| use std::ptr::NonNull; | ||
|
|
||
| use futures::SinkExt; | ||
| use futures::TryStreamExt; | ||
|
|
@@ -29,13 +30,26 @@ use crate::RUNTIME; | |
| use crate::SESSION; | ||
| use crate::convert::data_chunk_to_vortex; | ||
| use crate::convert::from_duckdb_table; | ||
| use crate::cpp; | ||
| use crate::duckdb::ClientContext; | ||
| use crate::duckdb::CopyFunction; | ||
| use crate::duckdb::DataChunk; | ||
| use crate::duckdb::LogicalType; | ||
| use crate::duckdb::duckdb_fs_create_writer; | ||
|
|
||
| #[derive(Debug)] | ||
| pub struct VortexCopyFunction; | ||
|
|
||
| #[derive(Clone, Copy)] | ||
| struct SendableClientCtx(NonNull<cpp::duckdb_vx_client_context_>); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's 2 definitions of
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When you add send + sync to |
||
| unsafe impl Send for SendableClientCtx {} | ||
|
|
||
| impl SendableClientCtx { | ||
| fn as_ptr(self) -> cpp::duckdb_vx_client_context { | ||
| self.0.as_ptr() | ||
| } | ||
| } | ||
|
|
||
| pub struct BindData { | ||
| dtype: DType, | ||
| fields: StructFields, | ||
|
|
@@ -118,6 +132,7 @@ impl CopyFunction for VortexCopyFunction { | |
| } | ||
|
|
||
| fn init_global( | ||
| client_context: ClientContext, | ||
| bind_data: &Self::BindData, | ||
| file_path: String, | ||
| ) -> VortexResult<Self::GlobalState> { | ||
|
|
@@ -126,9 +141,17 @@ impl CopyFunction for VortexCopyFunction { | |
| let array_stream = ArrayStreamAdapter::new(bind_data.dtype.clone(), rx.into_stream()); | ||
|
|
||
| let handle = SESSION.handle(); | ||
| let ctx_ptr = SendableClientCtx( | ||
| NonNull::new(client_context.as_ptr()) | ||
| .vortex_expect("Client context pointer should not be null"), | ||
| ); | ||
| let write_task = handle.spawn(async move { | ||
| let mut file = async_fs::File::create(file_path).await?; | ||
| SESSION.write_options().write(&mut file, array_stream).await | ||
| // Use DuckDB FS exclusively to match the DuckDB client context configuration. | ||
| let writer = | ||
| unsafe { duckdb_fs_create_writer(ctx_ptr.as_ptr(), &file_path) }.map_err(|e| { | ||
| vortex_err!("Failed to create DuckDB FS writer for {file_path}: {e}") | ||
| })?; | ||
| SESSION.write_options().write(writer, array_stream).await | ||
| }); | ||
|
|
||
| let worker_pool = RUNTIME.new_pool(); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where does this error get freed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
C++ allocates errors via
duckdb_vx_error_create; Rust side consistently frees withduckdb_vx_error_freeinfs_error.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest moving
to
error.cppas they're not specific to file system logic.