Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vortex-scan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ vortex-mask = { workspace = true }
vortex-metrics = { workspace = true }
vortex-session = { workspace = true }

async-trait = { workspace = true }
bit-vec = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
Expand Down
104 changes: 104 additions & 0 deletions vortex-scan/src/datasource.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::any::Any;
use std::sync::Arc;

use async_trait::async_trait;
use futures::stream::BoxStream;
use vortex_array::expr::Expression;
use vortex_array::stream::SendableArrayStream;
use vortex_dtype::DType;
use vortex_error::VortexResult;

/// Create a Vortex source from serialized configuration.
///
/// Providers can be registered with Vortex under a specific
#[async_trait(?Send)]
pub trait DataSourceProvider: 'static {
/// URI schemes handled by this source provider.
///
/// TODO(ngates): this might not be the right way to plugin sources.
fn schemes(&self) -> &[&str];

/// Initialize a new source.
async fn init_source(&self, uri: String) -> VortexResult<DataSourceRef>;

/// Serialize a source split to bytes.
async fn serialize_split(&self, split: &dyn Split) -> VortexResult<Vec<u8>>;

/// Deserialize a source split from bytes.
async fn deserialize_split(&self, data: &[u8]) -> VortexResult<SplitRef>;
}

/// A reference-counted data source.
pub type DataSourceRef = Arc<dyn DataSource>;

/// A data source represents a streamable dataset that can be scanned with projection and filter
/// expressions. Each scan produces splits that can be executed (potentially in parallel) to read
/// data. Each split can be serialized for remote execution.
#[async_trait]
pub trait DataSource: 'static + Send + Sync {
/// Returns the dtype of the source.
fn dtype(&self) -> &DType;

/// Returns an estimate of the row count of the source.
fn row_count_estimate(&self) -> Estimate<u64>;

/// Returns a scan over the source.
async fn scan(&self, scan_request: ScanRequest) -> VortexResult<DataSourceScanRef>;
}

#[derive(Debug, Clone, Default)]
pub struct ScanRequest {
/// Projection expression, `None` implies `root()`.
pub projection: Option<Expression>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love the fact that, in theory, someone could write Spark SQL which gets compiled to vortex Expression which gets compiled to PyArrow Expression.

Hunting a semantic-bug across those two compilers gives me the heebie jeebies but I don't have an alternative solution! It's better to hunt bugs in O(N+M) integrations than O(NM), assuming I care about all the integrations.

/// Filter expression, `None` implies no filter.
pub filter: Option<Expression>,
/// Optional limit on the number of rows to scan.
pub limit: Option<u64>,
}

/// A boxed data source scan.
pub type DataSourceScanRef = Box<dyn DataSourceScan>;

#[async_trait]
pub trait DataSourceScan: 'static + Send + Sync {
/// The returned dtype of the scan.
fn dtype(&self) -> &DType;

/// An estimate of the remaining splits.
fn remaining_splits_estimate(&self) -> Estimate<usize>;

/// Returns the next batch of splits to be processed.
///
/// This should not return _more_ than `max_splits` splits, but may return fewer.
async fn next_splits(&mut self, max_splits: usize) -> VortexResult<Vec<SplitRef>>;
}

pub type SplitStream = BoxStream<'static, VortexResult<SplitRef>>;
pub type SplitRef = Arc<dyn Split>;

/// A split represents a unit of work that can be executed to produce a stream of arrays.
pub trait Split: 'static + Send + Sync {
/// Downcast the split to a concrete type.
fn as_any(&self) -> &dyn Any;

/// Executes the split.
fn execute(&self) -> VortexResult<SendableArrayStream>;

/// Returns an estimate of the row count for this split.
fn row_count_estimate(&self) -> Estimate<u64>;

/// Returns an estimate of the byte size for this split.
fn byte_size_estimate(&self) -> Estimate<u64>;
}

/// An estimate that can be exact, an upper bound, or unknown.
#[derive(Default)]
pub enum Estimate<T> {
Exact(T),
UpperBound(T),
#[default]
Unknown,
}
1 change: 1 addition & 0 deletions vortex-scan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
const IDEAL_SPLIT_SIZE: u64 = 100_000;

pub mod arrow;
pub mod datasource;
mod filter;
pub mod row_mask;
mod splits;
Expand Down
1 change: 1 addition & 0 deletions vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub mod compressor {
pub mod dtype {
pub use vortex_dtype::*;
}

pub mod error {
pub use vortex_error::*;
}
Expand Down
Loading