From 238c6a0bd284e7090b1de06f34c73fee4ce91496 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 14 Jan 2026 11:39:35 +0000 Subject: [PATCH 1/5] Scan API Signed-off-by: Nicholas Gates --- vortex-scan/Cargo.toml | 1 + vortex-scan/src/lib.rs | 1 + vortex-scan/src/v2/mod.rs | 5 ++ vortex-scan/src/v2/reader.rs | 49 ++++++++++++++++++ vortex-scan/src/v2/source.rs | 98 ++++++++++++++++++++++++++++++++++++ 5 files changed, 154 insertions(+) create mode 100644 vortex-scan/src/v2/mod.rs create mode 100644 vortex-scan/src/v2/reader.rs create mode 100644 vortex-scan/src/v2/source.rs diff --git a/vortex-scan/Cargo.toml b/vortex-scan/Cargo.toml index d78e92fa147..41961760041 100644 --- a/vortex-scan/Cargo.toml +++ b/vortex-scan/Cargo.toml @@ -29,6 +29,7 @@ vortex-mask = { workspace = true } vortex-metrics = { workspace = true } vortex-session = { workspace = true } +async-trait = { workspace = true } bit-vec = { workspace = true } futures = { workspace = true } itertools = { workspace = true } diff --git a/vortex-scan/src/lib.rs b/vortex-scan/src/lib.rs index 52eb0dd47bb..ed62775f7b0 100644 --- a/vortex-scan/src/lib.rs +++ b/vortex-scan/src/lib.rs @@ -26,5 +26,6 @@ pub mod gpu; mod repeated_scan; #[cfg(test)] mod test; +pub mod v2; pub use repeated_scan::RepeatedScan; diff --git a/vortex-scan/src/v2/mod.rs b/vortex-scan/src/v2/mod.rs new file mode 100644 index 00000000000..af92f452a33 --- /dev/null +++ b/vortex-scan/src/v2/mod.rs @@ -0,0 +1,5 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +pub mod reader; +pub mod source; diff --git a/vortex-scan/src/v2/reader.rs b/vortex-scan/src/v2/reader.rs new file mode 100644 index 00000000000..6292933645e --- /dev/null +++ b/vortex-scan/src/v2/reader.rs @@ -0,0 +1,49 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::ops::Range; + +use async_trait::async_trait; +use vortex_array::ArrayRef; +use vortex_dtype::DType; +use vortex_error::VortexResult; +use vortex_mask::Mask; + +/// A reader provides an interface for loading data from row-indexed layouts. +/// +/// Unlike a [`super::source::Source`], readers have a concrete row count allowing fixed +/// partitions over a known set of rows. Readers are driven by providing an input stream of +/// array data that can be used to provide arguments to parameterized filter and projection +/// expressions. +#[async_trait] +pub trait Reader: 'static + Send + Sync { + /// Get the data type of the layout being read. + fn dtype(&self) -> &DType; + + /// Returns the number of rows in the reader. + fn row_count(&self) -> u64; + + /// Creates a scan over the given row range of the reader. + async fn scan(&self, row_range: Range) -> VortexResult; +} + +pub type ReaderScanRef = Box; + +/// A scan over a reader, producing output arrays given an input array to parameterize the filter +/// and projection expressions. +#[async_trait] +pub trait ReaderScan { + /// The data type of the returned data. + fn dtype(&self) -> &DType; + + /// The preferred maximum row count for the next batch. + /// + /// Returns [`None`] if there are no more batches. + fn next_batch_size(&mut self) -> Option; + + /// Returns the next batch of data given an input array. + /// + /// The returned batch must have the same number of rows as the [`Mask::true_count`]. + /// The provided mask will have at most [`next_batch_size`] rows. + async fn next_batch(&mut self, mask: Mask) -> VortexResult; +} diff --git a/vortex-scan/src/v2/source.rs b/vortex-scan/src/v2/source.rs new file mode 100644 index 00000000000..bfd7d6ee36e --- /dev/null +++ b/vortex-scan/src/v2/source.rs @@ -0,0 +1,98 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::any::Any; +use std::sync::Arc; + +use async_trait::async_trait; +use futures::stream::BoxStream; +use vortex_array::expr::Expression; +use vortex_array::stream::SendableArrayStream; +use vortex_dtype::DType; +use vortex_error::VortexResult; + +/// Create a Vortex source from serialized configuration. +/// +/// Providers can be registered with Vortex under a specific +#[async_trait(?Send)] +pub trait SourceProvider: 'static { + /// URI schemes handled by this source provider. + /// + /// TODO(ngates): this might not be the right way to plugin sources. + fn schemes(&self) -> &[&str]; + + /// Initialize a new source. + async fn init_source(&self, uri: String) -> VortexResult; + + /// Serialize a source split to bytes. + async fn serialize_split(&self, split: &dyn Split) -> VortexResult>; + + /// Deserialize a source split from bytes. + async fn deserialize_split(&self, data: &[u8]) -> VortexResult; +} + +/// A reference-counted source. +pub type SourceRef = Arc; + +/// A source represents a streamable dataset that can be scanned with projection and filter +/// expressions. Each scan produces splits that can be executed in parallel to read data. +/// Each split can be serialized for remote execution. +#[async_trait] +pub trait Source: 'static + Send + Sync { + /// Returns the dtype of the source. + fn dtype(&self) -> &DType; + + /// Returns an estimate of the row count of the source. + fn row_count_estimate(&self) -> Estimate; + + /// Returns a scan over the source. + async fn scan(&self, scan_request: ScanRequest) -> VortexResult; +} + +#[derive(Debug, Clone, Default)] +pub struct ScanRequest { + pub projection: Option, + pub filter: Option, + pub limit: Option, +} + +pub type SourceScanRef = Box; + +#[async_trait] +pub trait SourceScan: 'static + Send + Sync { + /// The returned dtype of the scan. + fn dtype(&self) -> &DType; + + /// An estimate of the remaining splits. + fn remaining_splits_estimate(&self) -> Estimate; + + /// Returns the next batch of splits to be processed. + /// + /// This should not return _more_ than the max_batch_size splits, but may return fewer. + async fn next_splits(&mut self, max_splits: usize) -> VortexResult>; +} + +pub type SplitStream = BoxStream<'static, VortexResult>; +pub type SplitRef = Arc; + +pub trait Split: 'static + Send + Sync { + /// Downcast the split to a concrete type. + fn as_any(&self) -> &dyn Any; + + /// Executes the split. + fn execute(&self) -> VortexResult; + + /// Returns an estimate of the row count for this split. + fn row_count_estimate(&self) -> Estimate; + + /// Returns an estimate of the byte size for this split. + fn byte_size_estimate(&self) -> Estimate; +} + +#[derive(Default)] +pub enum Estimate { + Exact(T), + UpperBound(T), + #[default] + Unknown, +} From 36f521d5c86e97517a6a444a3108e14686061023 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 14 Jan 2026 13:30:47 +0000 Subject: [PATCH 2/5] Scan API Signed-off-by: Nicholas Gates --- Cargo.lock | 1 + vortex-scan/src/v2/reader.rs | 23 ++++++++++++++++++++--- vortex-scan/src/v2/source.rs | 14 +++++++------- 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 11b61602873..c9462d2acdf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10683,6 +10683,7 @@ version = "0.1.0" dependencies = [ "arrow-array 57.2.0", "arrow-schema 57.2.0", + "async-trait", "bit-vec 0.8.0", "futures", "itertools 0.14.0", diff --git a/vortex-scan/src/v2/reader.rs b/vortex-scan/src/v2/reader.rs index 6292933645e..d78d5d5e0c5 100644 --- a/vortex-scan/src/v2/reader.rs +++ b/vortex-scan/src/v2/reader.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use std::ops::Range; +use std::sync::Arc; use async_trait::async_trait; use vortex_array::ArrayRef; @@ -9,13 +10,14 @@ use vortex_dtype::DType; use vortex_error::VortexResult; use vortex_mask::Mask; +pub type ReaderRef = Arc; + /// A reader provides an interface for loading data from row-indexed layouts. /// -/// Unlike a [`super::source::Source`], readers have a concrete row count allowing fixed +/// Unlike a [`super::source::DataSource`], readers have a concrete row count allowing fixed /// partitions over a known set of rows. Readers are driven by providing an input stream of /// array data that can be used to provide arguments to parameterized filter and projection /// expressions. -#[async_trait] pub trait Reader: 'static + Send + Sync { /// Get the data type of the layout being read. fn dtype(&self) -> &DType; @@ -23,8 +25,23 @@ pub trait Reader: 'static + Send + Sync { /// Returns the number of rows in the reader. fn row_count(&self) -> u64; + /// Reduces the reader, simplifying its internal structure if possible. + fn reduce(&self) -> VortexResult> { + Ok(None) + } + + /// Reduce the parent reader if possible, returning a new reader if successful. + fn reduce_parent( + &self, + parent: &ReaderRef, + child_idx: usize, + ) -> VortexResult> { + let _ = (parent, child_idx); + Ok(None) + } + /// Creates a scan over the given row range of the reader. - async fn scan(&self, row_range: Range) -> VortexResult; + fn scan(&self, row_range: Range) -> VortexResult; } pub type ReaderScanRef = Box; diff --git a/vortex-scan/src/v2/source.rs b/vortex-scan/src/v2/source.rs index bfd7d6ee36e..5a4a770ba9d 100644 --- a/vortex-scan/src/v2/source.rs +++ b/vortex-scan/src/v2/source.rs @@ -15,14 +15,14 @@ use vortex_error::VortexResult; /// /// Providers can be registered with Vortex under a specific #[async_trait(?Send)] -pub trait SourceProvider: 'static { +pub trait DataSourceProvider: 'static { /// URI schemes handled by this source provider. /// /// TODO(ngates): this might not be the right way to plugin sources. fn schemes(&self) -> &[&str]; /// Initialize a new source. - async fn init_source(&self, uri: String) -> VortexResult; + async fn init_source(&self, uri: String) -> VortexResult; /// Serialize a source split to bytes. async fn serialize_split(&self, split: &dyn Split) -> VortexResult>; @@ -32,13 +32,13 @@ pub trait SourceProvider: 'static { } /// A reference-counted source. -pub type SourceRef = Arc; +pub type DataSourceRef = Arc; /// A source represents a streamable dataset that can be scanned with projection and filter /// expressions. Each scan produces splits that can be executed in parallel to read data. /// Each split can be serialized for remote execution. #[async_trait] -pub trait Source: 'static + Send + Sync { +pub trait DataSource: 'static + Send + Sync { /// Returns the dtype of the source. fn dtype(&self) -> &DType; @@ -46,7 +46,7 @@ pub trait Source: 'static + Send + Sync { fn row_count_estimate(&self) -> Estimate; /// Returns a scan over the source. - async fn scan(&self, scan_request: ScanRequest) -> VortexResult; + async fn scan(&self, scan_request: ScanRequest) -> VortexResult; } #[derive(Debug, Clone, Default)] @@ -56,10 +56,10 @@ pub struct ScanRequest { pub limit: Option, } -pub type SourceScanRef = Box; +pub type DataSourceScanRef = Box; #[async_trait] -pub trait SourceScan: 'static + Send + Sync { +pub trait DataSourceScan: 'static + Send + Sync { /// The returned dtype of the scan. fn dtype(&self) -> &DType; From fad409533578dd8134537dc77174aa528acc54b8 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 14 Jan 2026 14:03:57 +0000 Subject: [PATCH 3/5] Scan API Signed-off-by: Nicholas Gates --- Cargo.lock | 12 +++++ Cargo.toml | 2 + vortex-dataset/Cargo.toml | 28 +++++++++++ vortex-dataset/README.md | 67 ++++++++++++++++++++++++ vortex-dataset/src/lib.rs | 4 ++ vortex-dataset/src/source.rs | 98 ++++++++++++++++++++++++++++++++++++ vortex/Cargo.toml | 1 + vortex/src/lib.rs | 5 ++ 8 files changed, 217 insertions(+) create mode 100644 vortex-dataset/Cargo.toml create mode 100644 vortex-dataset/README.md create mode 100644 vortex-dataset/src/lib.rs create mode 100644 vortex-dataset/src/source.rs diff --git a/Cargo.lock b/Cargo.lock index c9462d2acdf..8b671598a44 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9907,6 +9907,7 @@ dependencies = [ "vortex-buffer", "vortex-bytebool", "vortex-compute", + "vortex-dataset", "vortex-datetime-parts", "vortex-decimal-byte-parts", "vortex-dtype", @@ -10210,6 +10211,17 @@ dependencies = [ "walkdir", ] +[[package]] +name = "vortex-dataset" +version = "0.1.0" +dependencies = [ + "async-trait", + "futures", + "vortex-array", + "vortex-dtype", + "vortex-error", +] + [[package]] name = "vortex-datetime-parts" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 8f09ef253a7..6029fb9df48 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ members = [ "vortex-compute", "vortex-cxx", "vortex-datafusion", + "vortex-dataset", "vortex-dtype", "vortex-duckdb", "vortex-error", @@ -221,6 +222,7 @@ vortex-buffer = { version = "0.1.0", path = "./vortex-buffer", default-features vortex-bytebool = { version = "0.1.0", path = "./encodings/bytebool", default-features = false } vortex-compute = { version = "0.1.0", path = "./vortex-compute", default-features = false } vortex-datafusion = { version = "0.1.0", path = "./vortex-datafusion", default-features = false } +vortex-dataset = { version = "0.1.0", path = "./vortex-dataset", default-features = false } vortex-datetime-parts = { version = "0.1.0", path = "./encodings/datetime-parts", default-features = false } vortex-decimal-byte-parts = { version = "0.1.0", path = "encodings/decimal-byte-parts", default-features = false } vortex-dtype = { version = "0.1.0", path = "./vortex-dtype", default-features = false } diff --git a/vortex-dataset/Cargo.toml b/vortex-dataset/Cargo.toml new file mode 100644 index 00000000000..6958e63be54 --- /dev/null +++ b/vortex-dataset/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "vortex-dataset" +authors.workspace = true +description = "Dataset API for Vortex" +edition = { workspace = true } +homepage = { workspace = true } +categories = { workspace = true } +include = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +readme = "README.md" +repository = { workspace = true } +rust-version = { workspace = true } +version = { workspace = true } + +[package.metadata.docs.rs] +all-features = true + +[dependencies] +vortex-array = { workspace = true } +vortex-dtype = { workspace = true } +vortex-error = { workspace = true } + +async-trait = { workspace = true } +futures = { workspace = true } + +[lints] +workspace = true diff --git a/vortex-dataset/README.md b/vortex-dataset/README.md new file mode 100644 index 00000000000..7e9d08c1c25 --- /dev/null +++ b/vortex-dataset/README.md @@ -0,0 +1,67 @@ +# Vortex Dataset API + +The Vortex Dataset API exposes an interoperable interface for scanning data. It is designed to solve the `NxM` problem +of integration query engines with data formats. + +It is defined as a C API with language bindings for Rust, Java, and Python. This allows both data sources and query +engines to be implemented in any language that can interoperate with C. + +The API passes data in the form of Vortex ArrayStreams, which are zero-copy, columnar, compressed, and even support +passing device buffers. This allows query engines to efficiently scan data into their internal execution formats with +minimal overhead and (very) late materialization. + +Known implementations of the Vortex Dataset API are: + +* `vortex-iceberg` - Expose Iceberg tables as a Vortex Dataset +* `vortex-python` - Expose PyArrow Datasets as a Vortex Dataset +* `vortex-layout` - Expose a Vortex Layout as a Vortex Dataset + +Known consumers of the Vortex Dataset API are: + +* `vortex-datafusion` - Scan Vortex Datasets in DataFusion +* `vortex-duckdb` - Scan Vortex Datasets in DuckDB +* `vortex-spark` - Scan Vortex Datasets in Spark +* `vortex-trino` - Scan Vortex Datasets in Trino +* `vortex-polars` - Scan Vortex Datasets in Polars +* `vortex-python` - Wrap Vortex Datasets as PyArrow Datasets + +╔═══════════════════════════════════════════════════════════════════════════════╗ +║ WITH VORTEX DATASET API (N+M integrations = 9 connections) ║ +╠═══════════════════════════════════════════════════════════════════════════════╣ +║ ║ +║ DATA SOURCES VORTEX API QUERY ENGINES ║ +║ ║ +║ ┌──────────────┐ ╔═══════════════╗ ┌────────────────┐ ║ +║ │ Iceberg │─────▶║ ║──────▶│ DataFusion │ ║ +║ │ (vortex- │ ║ Vortex ║ │(vortex- │ ║ +║ │ iceberg) │ ║ Dataset ║ │ datafusion) │ ║ +║ └──────────────┘ ║ API ║ └────────────────┘ ║ +║ ║ ║ ║ +║ ┌──────────────┐ ║ ┌───────────┐ ║ ┌────────────────┐ ║ +║ │ PyArrow │─────▶║ │ • C ABI │ ║──────▶│ DuckDB │ ║ +║ │ Datasets │ ║ │ • Zero- │ ║ │ (vortex-duckdb)│ ║ +║ │ (vortex- │ ║ │ copy │ ║ └────────────────┘ ║ +║ │ python) │ ║ │ • Columnar│ ║ ║ +║ └──────────────┘ ║ │ • Compress│ ║ ┌────────────────┐ ║ +║ ║ │ • Device │ ║──────▶│ Spark │ ║ +║ ┌──────────────┐ ║ │ buffers │ ║ │ (vortex-spark) │ ║ +║ │ Vortex │─────▶║ └───────────┘ ║ └────────────────┘ ║ +║ │ Layout │ ║ ║ ║ +║ │ (vortex- │ ║ Language ║ ┌────────────────┐ ║ +║ │ layout) │ ║ Bindings: ║──────▶│ Trino │ ║ +║ └──────────────┘ ║ Rust│Java│Py ║ │ (vortex-trino) │ ║ +║ ║ ║ └────────────────┘ ║ +║ ║ ║ ║ +║ ║ ║ ┌────────────────┐ ║ +║ ║ ║──────▶│ Polars │ ║ +║ ║ ║ │(vortex-polars) │ ║ +║ ║ ║ └────────────────┘ ║ +║ ║ ║ ║ +║ ║ ║ ┌────────────────┐ ║ +║ ║ ║──────▶│ Python │ ║ +║ ╚═══════════════╝ │(vortex-python) │ ║ +║ └────────────────┘ ║ +║ ║ +║ ✅ Add a new source? All engines get it. Add a new engine? All sources work! ║ +║ ║ +╚═══════════════════════════════════════════════════════════════════════════════╝ diff --git a/vortex-dataset/src/lib.rs b/vortex-dataset/src/lib.rs new file mode 100644 index 00000000000..dc91c5f6216 --- /dev/null +++ b/vortex-dataset/src/lib.rs @@ -0,0 +1,4 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +pub mod source; diff --git a/vortex-dataset/src/source.rs b/vortex-dataset/src/source.rs new file mode 100644 index 00000000000..5a4a770ba9d --- /dev/null +++ b/vortex-dataset/src/source.rs @@ -0,0 +1,98 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::any::Any; +use std::sync::Arc; + +use async_trait::async_trait; +use futures::stream::BoxStream; +use vortex_array::expr::Expression; +use vortex_array::stream::SendableArrayStream; +use vortex_dtype::DType; +use vortex_error::VortexResult; + +/// Create a Vortex source from serialized configuration. +/// +/// Providers can be registered with Vortex under a specific +#[async_trait(?Send)] +pub trait DataSourceProvider: 'static { + /// URI schemes handled by this source provider. + /// + /// TODO(ngates): this might not be the right way to plugin sources. + fn schemes(&self) -> &[&str]; + + /// Initialize a new source. + async fn init_source(&self, uri: String) -> VortexResult; + + /// Serialize a source split to bytes. + async fn serialize_split(&self, split: &dyn Split) -> VortexResult>; + + /// Deserialize a source split from bytes. + async fn deserialize_split(&self, data: &[u8]) -> VortexResult; +} + +/// A reference-counted source. +pub type DataSourceRef = Arc; + +/// A source represents a streamable dataset that can be scanned with projection and filter +/// expressions. Each scan produces splits that can be executed in parallel to read data. +/// Each split can be serialized for remote execution. +#[async_trait] +pub trait DataSource: 'static + Send + Sync { + /// Returns the dtype of the source. + fn dtype(&self) -> &DType; + + /// Returns an estimate of the row count of the source. + fn row_count_estimate(&self) -> Estimate; + + /// Returns a scan over the source. + async fn scan(&self, scan_request: ScanRequest) -> VortexResult; +} + +#[derive(Debug, Clone, Default)] +pub struct ScanRequest { + pub projection: Option, + pub filter: Option, + pub limit: Option, +} + +pub type DataSourceScanRef = Box; + +#[async_trait] +pub trait DataSourceScan: 'static + Send + Sync { + /// The returned dtype of the scan. + fn dtype(&self) -> &DType; + + /// An estimate of the remaining splits. + fn remaining_splits_estimate(&self) -> Estimate; + + /// Returns the next batch of splits to be processed. + /// + /// This should not return _more_ than the max_batch_size splits, but may return fewer. + async fn next_splits(&mut self, max_splits: usize) -> VortexResult>; +} + +pub type SplitStream = BoxStream<'static, VortexResult>; +pub type SplitRef = Arc; + +pub trait Split: 'static + Send + Sync { + /// Downcast the split to a concrete type. + fn as_any(&self) -> &dyn Any; + + /// Executes the split. + fn execute(&self) -> VortexResult; + + /// Returns an estimate of the row count for this split. + fn row_count_estimate(&self) -> Estimate; + + /// Returns an estimate of the byte size for this split. + fn byte_size_estimate(&self) -> Estimate; +} + +#[derive(Default)] +pub enum Estimate { + Exact(T), + UpperBound(T), + #[default] + Unknown, +} diff --git a/vortex/Cargo.toml b/vortex/Cargo.toml index 31f6daf0932..80ce0a5cd64 100644 --- a/vortex/Cargo.toml +++ b/vortex/Cargo.toml @@ -26,6 +26,7 @@ vortex-btrblocks = { workspace = true } vortex-buffer = { workspace = true } vortex-bytebool = { workspace = true } vortex-compute = { workspace = true } +vortex-dataset = { workspace = true } vortex-datetime-parts = { workspace = true } vortex-decimal-byte-parts = { workspace = true } vortex-dtype = { workspace = true, default-features = true } diff --git a/vortex/src/lib.rs b/vortex/src/lib.rs index 1fa11ac14da..3f9ceed8514 100644 --- a/vortex/src/lib.rs +++ b/vortex/src/lib.rs @@ -36,9 +36,14 @@ pub mod compressor { pub use vortex_layout::layouts::compact::CompactCompressor; } +pub mod dataset { + pub use vortex_dataset::*; +} + pub mod dtype { pub use vortex_dtype::*; } + pub mod error { pub use vortex_error::*; } From 67f2af6b23d8476050c1946c733ae1cd6467ff62 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Tue, 20 Jan 2026 21:02:29 +0000 Subject: [PATCH 4/5] Merge Signed-off-by: Nicholas Gates --- Cargo.lock | 12 ---- Cargo.toml | 2 - vortex-dataset/Cargo.toml | 28 -------- vortex-dataset/README.md | 67 ------------------- vortex-dataset/src/lib.rs | 4 -- .../src/datasource.rs | 14 ++-- vortex-scan/src/lib.rs | 1 + vortex/Cargo.toml | 1 - vortex/src/lib.rs | 4 -- 9 files changed, 11 insertions(+), 122 deletions(-) delete mode 100644 vortex-dataset/Cargo.toml delete mode 100644 vortex-dataset/README.md delete mode 100644 vortex-dataset/src/lib.rs rename vortex-dataset/src/source.rs => vortex-scan/src/datasource.rs (83%) diff --git a/Cargo.lock b/Cargo.lock index 11949ab93dc..917c1544870 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10024,7 +10024,6 @@ dependencies = [ "vortex-buffer", "vortex-bytebool", "vortex-compute", - "vortex-dataset", "vortex-datetime-parts", "vortex-decimal-byte-parts", "vortex-dtype", @@ -10331,17 +10330,6 @@ dependencies = [ "walkdir", ] -[[package]] -name = "vortex-dataset" -version = "0.1.0" -dependencies = [ - "async-trait", - "futures", - "vortex-array", - "vortex-dtype", - "vortex-error", -] - [[package]] name = "vortex-datetime-parts" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index fdbd19acf78..6a60714b37d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,6 @@ members = [ "vortex-compute", "vortex-cxx", "vortex-datafusion", - "vortex-dataset", "vortex-dtype", "vortex-duckdb", "vortex-error", @@ -223,7 +222,6 @@ vortex-buffer = { version = "0.1.0", path = "./vortex-buffer", default-features vortex-bytebool = { version = "0.1.0", path = "./encodings/bytebool", default-features = false } vortex-compute = { version = "0.1.0", path = "./vortex-compute", default-features = false } vortex-datafusion = { version = "0.1.0", path = "./vortex-datafusion", default-features = false } -vortex-dataset = { version = "0.1.0", path = "./vortex-dataset", default-features = false } vortex-datetime-parts = { version = "0.1.0", path = "./encodings/datetime-parts", default-features = false } vortex-decimal-byte-parts = { version = "0.1.0", path = "encodings/decimal-byte-parts", default-features = false } vortex-dtype = { version = "0.1.0", path = "./vortex-dtype", default-features = false } diff --git a/vortex-dataset/Cargo.toml b/vortex-dataset/Cargo.toml deleted file mode 100644 index 6958e63be54..00000000000 --- a/vortex-dataset/Cargo.toml +++ /dev/null @@ -1,28 +0,0 @@ -[package] -name = "vortex-dataset" -authors.workspace = true -description = "Dataset API for Vortex" -edition = { workspace = true } -homepage = { workspace = true } -categories = { workspace = true } -include = { workspace = true } -keywords = { workspace = true } -license = { workspace = true } -readme = "README.md" -repository = { workspace = true } -rust-version = { workspace = true } -version = { workspace = true } - -[package.metadata.docs.rs] -all-features = true - -[dependencies] -vortex-array = { workspace = true } -vortex-dtype = { workspace = true } -vortex-error = { workspace = true } - -async-trait = { workspace = true } -futures = { workspace = true } - -[lints] -workspace = true diff --git a/vortex-dataset/README.md b/vortex-dataset/README.md deleted file mode 100644 index 7e9d08c1c25..00000000000 --- a/vortex-dataset/README.md +++ /dev/null @@ -1,67 +0,0 @@ -# Vortex Dataset API - -The Vortex Dataset API exposes an interoperable interface for scanning data. It is designed to solve the `NxM` problem -of integration query engines with data formats. - -It is defined as a C API with language bindings for Rust, Java, and Python. This allows both data sources and query -engines to be implemented in any language that can interoperate with C. - -The API passes data in the form of Vortex ArrayStreams, which are zero-copy, columnar, compressed, and even support -passing device buffers. This allows query engines to efficiently scan data into their internal execution formats with -minimal overhead and (very) late materialization. - -Known implementations of the Vortex Dataset API are: - -* `vortex-iceberg` - Expose Iceberg tables as a Vortex Dataset -* `vortex-python` - Expose PyArrow Datasets as a Vortex Dataset -* `vortex-layout` - Expose a Vortex Layout as a Vortex Dataset - -Known consumers of the Vortex Dataset API are: - -* `vortex-datafusion` - Scan Vortex Datasets in DataFusion -* `vortex-duckdb` - Scan Vortex Datasets in DuckDB -* `vortex-spark` - Scan Vortex Datasets in Spark -* `vortex-trino` - Scan Vortex Datasets in Trino -* `vortex-polars` - Scan Vortex Datasets in Polars -* `vortex-python` - Wrap Vortex Datasets as PyArrow Datasets - -╔═══════════════════════════════════════════════════════════════════════════════╗ -║ WITH VORTEX DATASET API (N+M integrations = 9 connections) ║ -╠═══════════════════════════════════════════════════════════════════════════════╣ -║ ║ -║ DATA SOURCES VORTEX API QUERY ENGINES ║ -║ ║ -║ ┌──────────────┐ ╔═══════════════╗ ┌────────────────┐ ║ -║ │ Iceberg │─────▶║ ║──────▶│ DataFusion │ ║ -║ │ (vortex- │ ║ Vortex ║ │(vortex- │ ║ -║ │ iceberg) │ ║ Dataset ║ │ datafusion) │ ║ -║ └──────────────┘ ║ API ║ └────────────────┘ ║ -║ ║ ║ ║ -║ ┌──────────────┐ ║ ┌───────────┐ ║ ┌────────────────┐ ║ -║ │ PyArrow │─────▶║ │ • C ABI │ ║──────▶│ DuckDB │ ║ -║ │ Datasets │ ║ │ • Zero- │ ║ │ (vortex-duckdb)│ ║ -║ │ (vortex- │ ║ │ copy │ ║ └────────────────┘ ║ -║ │ python) │ ║ │ • Columnar│ ║ ║ -║ └──────────────┘ ║ │ • Compress│ ║ ┌────────────────┐ ║ -║ ║ │ • Device │ ║──────▶│ Spark │ ║ -║ ┌──────────────┐ ║ │ buffers │ ║ │ (vortex-spark) │ ║ -║ │ Vortex │─────▶║ └───────────┘ ║ └────────────────┘ ║ -║ │ Layout │ ║ ║ ║ -║ │ (vortex- │ ║ Language ║ ┌────────────────┐ ║ -║ │ layout) │ ║ Bindings: ║──────▶│ Trino │ ║ -║ └──────────────┘ ║ Rust│Java│Py ║ │ (vortex-trino) │ ║ -║ ║ ║ └────────────────┘ ║ -║ ║ ║ ║ -║ ║ ║ ┌────────────────┐ ║ -║ ║ ║──────▶│ Polars │ ║ -║ ║ ║ │(vortex-polars) │ ║ -║ ║ ║ └────────────────┘ ║ -║ ║ ║ ║ -║ ║ ║ ┌────────────────┐ ║ -║ ║ ║──────▶│ Python │ ║ -║ ╚═══════════════╝ │(vortex-python) │ ║ -║ └────────────────┘ ║ -║ ║ -║ ✅ Add a new source? All engines get it. Add a new engine? All sources work! ║ -║ ║ -╚═══════════════════════════════════════════════════════════════════════════════╝ diff --git a/vortex-dataset/src/lib.rs b/vortex-dataset/src/lib.rs deleted file mode 100644 index dc91c5f6216..00000000000 --- a/vortex-dataset/src/lib.rs +++ /dev/null @@ -1,4 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -pub mod source; diff --git a/vortex-dataset/src/source.rs b/vortex-scan/src/datasource.rs similarity index 83% rename from vortex-dataset/src/source.rs rename to vortex-scan/src/datasource.rs index 5a4a770ba9d..62a0c8d280c 100644 --- a/vortex-dataset/src/source.rs +++ b/vortex-scan/src/datasource.rs @@ -31,12 +31,12 @@ pub trait DataSourceProvider: 'static { async fn deserialize_split(&self, data: &[u8]) -> VortexResult; } -/// A reference-counted source. +/// A reference-counted data source. pub type DataSourceRef = Arc; -/// A source represents a streamable dataset that can be scanned with projection and filter -/// expressions. Each scan produces splits that can be executed in parallel to read data. -/// Each split can be serialized for remote execution. +/// A data source represents a streamable dataset that can be scanned with projection and filter +/// expressions. Each scan produces splits that can be executed (potentially in parallel) to read +/// data. Each split can be serialized for remote execution. #[async_trait] pub trait DataSource: 'static + Send + Sync { /// Returns the dtype of the source. @@ -51,11 +51,15 @@ pub trait DataSource: 'static + Send + Sync { #[derive(Debug, Clone, Default)] pub struct ScanRequest { + /// Projection expression, `None` implies `root()`. pub projection: Option, + /// Filter expression, `None` implies no filter. pub filter: Option, + /// Optional limit on the number of rows to scan. pub limit: Option, } +/// A boxed data source scan. pub type DataSourceScanRef = Box; #[async_trait] @@ -75,6 +79,7 @@ pub trait DataSourceScan: 'static + Send + Sync { pub type SplitStream = BoxStream<'static, VortexResult>; pub type SplitRef = Arc; +/// A split represents a unit of work that can be executed to produce a stream of arrays. pub trait Split: 'static + Send + Sync { /// Downcast the split to a concrete type. fn as_any(&self) -> &dyn Any; @@ -89,6 +94,7 @@ pub trait Split: 'static + Send + Sync { fn byte_size_estimate(&self) -> Estimate; } +/// An estimate that can be exact, an upper bound, or unknown. #[derive(Default)] pub enum Estimate { Exact(T), diff --git a/vortex-scan/src/lib.rs b/vortex-scan/src/lib.rs index ed62775f7b0..d062ff33ce7 100644 --- a/vortex-scan/src/lib.rs +++ b/vortex-scan/src/lib.rs @@ -7,6 +7,7 @@ const IDEAL_SPLIT_SIZE: u64 = 100_000; pub mod arrow; +pub mod datasource; mod filter; pub mod row_mask; mod splits; diff --git a/vortex/Cargo.toml b/vortex/Cargo.toml index 80ce0a5cd64..31f6daf0932 100644 --- a/vortex/Cargo.toml +++ b/vortex/Cargo.toml @@ -26,7 +26,6 @@ vortex-btrblocks = { workspace = true } vortex-buffer = { workspace = true } vortex-bytebool = { workspace = true } vortex-compute = { workspace = true } -vortex-dataset = { workspace = true } vortex-datetime-parts = { workspace = true } vortex-decimal-byte-parts = { workspace = true } vortex-dtype = { workspace = true, default-features = true } diff --git a/vortex/src/lib.rs b/vortex/src/lib.rs index c0f6ccdd5a7..dc2c1231277 100644 --- a/vortex/src/lib.rs +++ b/vortex/src/lib.rs @@ -36,10 +36,6 @@ pub mod compressor { pub use vortex_layout::layouts::compact::CompactCompressor; } -pub mod dataset { - pub use vortex_dataset::*; -} - pub mod dtype { pub use vortex_dtype::*; } From 18663ca122b2fdafc1ff7d7ae3781dbb6eb76a48 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Tue, 20 Jan 2026 21:11:28 +0000 Subject: [PATCH 5/5] Merge Signed-off-by: Nicholas Gates --- vortex-scan/src/datasource.rs | 2 +- vortex-scan/src/lib.rs | 1 - vortex-scan/src/v2/mod.rs | 5 -- vortex-scan/src/v2/reader.rs | 66 ----------------------- vortex-scan/src/v2/source.rs | 98 ----------------------------------- 5 files changed, 1 insertion(+), 171 deletions(-) delete mode 100644 vortex-scan/src/v2/mod.rs delete mode 100644 vortex-scan/src/v2/reader.rs delete mode 100644 vortex-scan/src/v2/source.rs diff --git a/vortex-scan/src/datasource.rs b/vortex-scan/src/datasource.rs index 62a0c8d280c..c6ea110ee44 100644 --- a/vortex-scan/src/datasource.rs +++ b/vortex-scan/src/datasource.rs @@ -72,7 +72,7 @@ pub trait DataSourceScan: 'static + Send + Sync { /// Returns the next batch of splits to be processed. /// - /// This should not return _more_ than the max_batch_size splits, but may return fewer. + /// This should not return _more_ than `max_splits` splits, but may return fewer. async fn next_splits(&mut self, max_splits: usize) -> VortexResult>; } diff --git a/vortex-scan/src/lib.rs b/vortex-scan/src/lib.rs index d062ff33ce7..c7a94d24b29 100644 --- a/vortex-scan/src/lib.rs +++ b/vortex-scan/src/lib.rs @@ -27,6 +27,5 @@ pub mod gpu; mod repeated_scan; #[cfg(test)] mod test; -pub mod v2; pub use repeated_scan::RepeatedScan; diff --git a/vortex-scan/src/v2/mod.rs b/vortex-scan/src/v2/mod.rs deleted file mode 100644 index af92f452a33..00000000000 --- a/vortex-scan/src/v2/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -pub mod reader; -pub mod source; diff --git a/vortex-scan/src/v2/reader.rs b/vortex-scan/src/v2/reader.rs deleted file mode 100644 index d78d5d5e0c5..00000000000 --- a/vortex-scan/src/v2/reader.rs +++ /dev/null @@ -1,66 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use std::ops::Range; -use std::sync::Arc; - -use async_trait::async_trait; -use vortex_array::ArrayRef; -use vortex_dtype::DType; -use vortex_error::VortexResult; -use vortex_mask::Mask; - -pub type ReaderRef = Arc; - -/// A reader provides an interface for loading data from row-indexed layouts. -/// -/// Unlike a [`super::source::DataSource`], readers have a concrete row count allowing fixed -/// partitions over a known set of rows. Readers are driven by providing an input stream of -/// array data that can be used to provide arguments to parameterized filter and projection -/// expressions. -pub trait Reader: 'static + Send + Sync { - /// Get the data type of the layout being read. - fn dtype(&self) -> &DType; - - /// Returns the number of rows in the reader. - fn row_count(&self) -> u64; - - /// Reduces the reader, simplifying its internal structure if possible. - fn reduce(&self) -> VortexResult> { - Ok(None) - } - - /// Reduce the parent reader if possible, returning a new reader if successful. - fn reduce_parent( - &self, - parent: &ReaderRef, - child_idx: usize, - ) -> VortexResult> { - let _ = (parent, child_idx); - Ok(None) - } - - /// Creates a scan over the given row range of the reader. - fn scan(&self, row_range: Range) -> VortexResult; -} - -pub type ReaderScanRef = Box; - -/// A scan over a reader, producing output arrays given an input array to parameterize the filter -/// and projection expressions. -#[async_trait] -pub trait ReaderScan { - /// The data type of the returned data. - fn dtype(&self) -> &DType; - - /// The preferred maximum row count for the next batch. - /// - /// Returns [`None`] if there are no more batches. - fn next_batch_size(&mut self) -> Option; - - /// Returns the next batch of data given an input array. - /// - /// The returned batch must have the same number of rows as the [`Mask::true_count`]. - /// The provided mask will have at most [`next_batch_size`] rows. - async fn next_batch(&mut self, mask: Mask) -> VortexResult; -} diff --git a/vortex-scan/src/v2/source.rs b/vortex-scan/src/v2/source.rs deleted file mode 100644 index 5a4a770ba9d..00000000000 --- a/vortex-scan/src/v2/source.rs +++ /dev/null @@ -1,98 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use std::any::Any; -use std::sync::Arc; - -use async_trait::async_trait; -use futures::stream::BoxStream; -use vortex_array::expr::Expression; -use vortex_array::stream::SendableArrayStream; -use vortex_dtype::DType; -use vortex_error::VortexResult; - -/// Create a Vortex source from serialized configuration. -/// -/// Providers can be registered with Vortex under a specific -#[async_trait(?Send)] -pub trait DataSourceProvider: 'static { - /// URI schemes handled by this source provider. - /// - /// TODO(ngates): this might not be the right way to plugin sources. - fn schemes(&self) -> &[&str]; - - /// Initialize a new source. - async fn init_source(&self, uri: String) -> VortexResult; - - /// Serialize a source split to bytes. - async fn serialize_split(&self, split: &dyn Split) -> VortexResult>; - - /// Deserialize a source split from bytes. - async fn deserialize_split(&self, data: &[u8]) -> VortexResult; -} - -/// A reference-counted source. -pub type DataSourceRef = Arc; - -/// A source represents a streamable dataset that can be scanned with projection and filter -/// expressions. Each scan produces splits that can be executed in parallel to read data. -/// Each split can be serialized for remote execution. -#[async_trait] -pub trait DataSource: 'static + Send + Sync { - /// Returns the dtype of the source. - fn dtype(&self) -> &DType; - - /// Returns an estimate of the row count of the source. - fn row_count_estimate(&self) -> Estimate; - - /// Returns a scan over the source. - async fn scan(&self, scan_request: ScanRequest) -> VortexResult; -} - -#[derive(Debug, Clone, Default)] -pub struct ScanRequest { - pub projection: Option, - pub filter: Option, - pub limit: Option, -} - -pub type DataSourceScanRef = Box; - -#[async_trait] -pub trait DataSourceScan: 'static + Send + Sync { - /// The returned dtype of the scan. - fn dtype(&self) -> &DType; - - /// An estimate of the remaining splits. - fn remaining_splits_estimate(&self) -> Estimate; - - /// Returns the next batch of splits to be processed. - /// - /// This should not return _more_ than the max_batch_size splits, but may return fewer. - async fn next_splits(&mut self, max_splits: usize) -> VortexResult>; -} - -pub type SplitStream = BoxStream<'static, VortexResult>; -pub type SplitRef = Arc; - -pub trait Split: 'static + Send + Sync { - /// Downcast the split to a concrete type. - fn as_any(&self) -> &dyn Any; - - /// Executes the split. - fn execute(&self) -> VortexResult; - - /// Returns an estimate of the row count for this split. - fn row_count_estimate(&self) -> Estimate; - - /// Returns an estimate of the byte size for this split. - fn byte_size_estimate(&self) -> Estimate; -} - -#[derive(Default)] -pub enum Estimate { - Exact(T), - UpperBound(T), - #[default] - Unknown, -}