diff --git a/Cargo.lock b/Cargo.lock index aa3f9c589a9..917c1544870 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10809,6 +10809,7 @@ version = "0.1.0" dependencies = [ "arrow-array 57.2.0", "arrow-schema 57.2.0", + "async-trait", "bit-vec 0.8.0", "futures", "itertools 0.14.0", diff --git a/vortex-array/src/expr/exprs/mod.rs b/vortex-array/src/expr/exprs/mod.rs index c606b53f5a0..c8a5776a131 100644 --- a/vortex-array/src/expr/exprs/mod.rs +++ b/vortex-array/src/expr/exprs/mod.rs @@ -17,6 +17,7 @@ pub(crate) mod operators; pub(crate) mod pack; pub(crate) mod root; pub(crate) mod select; +pub(crate) mod stats; pub use between::*; pub use binary::*; @@ -34,3 +35,4 @@ pub use operators::*; pub use pack::*; pub use root::*; pub use select::*; +pub use stats::*; diff --git a/vortex-array/src/expr/exprs/stats.rs b/vortex-array/src/expr/exprs/stats.rs new file mode 100644 index 00000000000..2afefa98232 --- /dev/null +++ b/vortex-array/src/expr/exprs/stats.rs @@ -0,0 +1,84 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_dtype::DType; +use vortex_error::VortexResult; +use vortex_error::vortex_err; +use vortex_vector::Datum; +use vortex_vector::Scalar; + +use crate::Array; +use crate::ArrayRef; +use crate::IntoArray; +use crate::arrays::ConstantArray; +use crate::expr::Arity; +use crate::expr::ChildName; +use crate::expr::ExecutionArgs; +use crate::expr::ExprId; +use crate::expr::Expression; +use crate::expr::SimplifyCtx; +use crate::expr::VTable; +use crate::expr::VTableExt; +use crate::expr::stats::Stat; + +/// Creates a new expression that returns a minimum bound of its input. +pub fn statistic(stat: Stat, child: Expression) -> Expression { + Statistic.new_expr(stat, vec![child]) +} + +pub struct Statistic; + +impl VTable for Statistic { + type Options = Stat; + + fn id(&self) -> ExprId { + ExprId::from("statistic") + } + + fn arity(&self, _options: &Self::Options) -> Arity { + Arity::Exact(1) + } + + fn child_name(&self, _options: &Self::Options, _child_idx: usize) -> ChildName { + ChildName::from("input") + } + + fn return_dtype(&self, stat: &Stat, arg_dtypes: &[DType]) -> VortexResult { + stat.dtype(&arg_dtypes[0]) + .ok_or_else(|| { + vortex_err!( + "statistic {:?} not supported for dtype {:?}", + stat, + arg_dtypes[0] + ) + }) + // We make all statistics types nullable in case there is no reduction rule to handle + // the statistic expression. + .map(|dt| dt.as_nullable()) + } + + fn evaluate( + &self, + _stat: &Stat, + expr: &Expression, + scope: &ArrayRef, + ) -> VortexResult { + let return_dtype = expr.return_dtype(scope.dtype())?; + Ok(ConstantArray::new(vortex_scalar::Scalar::null(return_dtype), scope.len()).into_array()) + } + + fn execute(&self, _stat: &Stat, args: ExecutionArgs) -> VortexResult { + Ok(Datum::Scalar(Scalar::null(&args.return_dtype))) + } + + fn simplify( + &self, + _options: &Self::Options, + _expr: &Expression, + _ctx: &dyn SimplifyCtx, + ) -> VortexResult> { + // FIXME(ngates): we really want to implement a reduction rule for all arrays? But it's an array. + // And it's a reduction rule. How do we do this without reduce_parent on everything..? + Ok(None) + } +} diff --git a/vortex-array/src/expr/stats/mod.rs b/vortex-array/src/expr/stats/mod.rs index cba33e2743c..e01f807e21d 100644 --- a/vortex-array/src/expr/stats/mod.rs +++ b/vortex-array/src/expr/stats/mod.rs @@ -216,7 +216,7 @@ impl Stat { }) } - pub fn name(&self) -> &str { + pub const fn name(&self) -> &'static str { match self { Self::IsConstant => "is_constant", Self::IsSorted => "is_sorted", diff --git a/vortex-array/src/expr/vtable.rs b/vortex-array/src/expr/vtable.rs index e15d539944c..9e4fa822b51 100644 --- a/vortex-array/src/expr/vtable.rs +++ b/vortex-array/src/expr/vtable.rs @@ -73,7 +73,20 @@ pub trait VTable: 'static + Sized + Send + Sync { options: &Self::Options, expr: &Expression, f: &mut Formatter<'_>, - ) -> fmt::Result; + ) -> fmt::Result { + write!(f, "{}(", expr.id())?; + for (i, child) in expr.children().iter().enumerate() { + if i > 0 { + write!(f, ", ")?; + } + child.fmt_sql(f)?; + } + let options = format!("{}", options); + if !options.is_empty() { + write!(f, ", options={}", options)?; + } + write!(f, ")") + } /// Compute the return [`DType`] of the expression if evaluated over the given input types. fn return_dtype(&self, options: &Self::Options, arg_dtypes: &[DType]) -> VortexResult; @@ -144,6 +157,14 @@ pub trait VTable: 'static + Sized + Send + Sync { Ok(None) } + /// Falsify the expression, returning a new expression that is true whenever the original + /// expression is guaranteed to be false via stats. + fn falsify(&self, options: &Self::Options, expr: &Expression) -> Option { + _ = options; + _ = expr; + None + } + /// See [`Expression::stat_falsification`]. fn stat_falsification( &self, diff --git a/vortex-file/src/file.rs b/vortex-file/src/file.rs index b2161f042c7..10a30cdf0f2 100644 --- a/vortex-file/src/file.rs +++ b/vortex-file/src/file.rs @@ -27,6 +27,7 @@ use vortex_layout::segments::SegmentSource; use vortex_metrics::VortexMetrics; use vortex_scan::ScanBuilder; use vortex_scan::SplitBy; +use vortex_scan::v2::scan::ScanBuilder2; use vortex_session::VortexSession; use vortex_utils::aliases::hash_map::HashMap; @@ -103,6 +104,14 @@ impl VortexFile { ) } + pub fn scan2(&self) -> VortexResult { + let reader_ref = self + .footer + .layout() + .new_reader2(&self.segment_source, &self.session)?; + Ok(ScanBuilder2::new(reader_ref, self.session.clone())) + } + #[cfg(gpu_unstable)] pub fn gpu_scan( &self, diff --git a/vortex-file/src/tests.rs b/vortex-file/src/tests.rs index 2bcc0b73739..47020ac32f9 100644 --- a/vortex-file/src/tests.rs +++ b/vortex-file/src/tests.rs @@ -61,7 +61,7 @@ use vortex_io::session::RuntimeSession; use vortex_layout::session::LayoutSession; use vortex_metrics::VortexMetrics; use vortex_scalar::Scalar; -use vortex_scan::ScanBuilder; +use vortex_scan::v2::scan::ScanBuilder2; use vortex_session::VortexSession; use crate::OpenOptionsSessionExt; @@ -118,7 +118,7 @@ async fn test_read_simple() { .open_options() .open_buffer(buf) .unwrap() - .scan() + .scan2() .unwrap() .into_array_stream() .unwrap(); @@ -198,7 +198,7 @@ async fn test_round_trip_many_types() { .open_options() .open_buffer(buf) .unwrap() - .scan() + .scan2() .unwrap() .into_array_stream() .unwrap() @@ -284,7 +284,7 @@ async fn test_read_projection() { let file = SESSION.open_options().open_buffer(buf).unwrap(); let array = file - .scan() + .scan2() .unwrap() .with_projection(select(["strings"], root())) .into_array_stream() @@ -306,7 +306,7 @@ async fn test_read_projection() { assert_arrays_eq!(actual.as_ref(), expected.as_ref()); let array = file - .scan() + .scan2() .unwrap() .with_projection(select(["numbers"], root())) .into_array_stream() @@ -355,7 +355,7 @@ async fn unequal_batches() { .open_options() .open_buffer(buf) .unwrap() - .scan() + .scan2() .unwrap() .into_array_stream() .unwrap(); @@ -415,7 +415,7 @@ async fn write_chunked() { .open_options() .open_buffer(buf) .unwrap() - .scan() + .scan2() .unwrap() .into_array_stream() .unwrap(); @@ -445,7 +445,7 @@ async fn test_empty_varbin_array_roundtrip() { let file = SESSION.open_options().open_buffer(buf).unwrap(); let result = file - .scan() + .scan2() .unwrap() .into_array_stream() .unwrap() @@ -475,7 +475,7 @@ async fn issue_5385_filter_casted_column() { .open_options() .open_buffer(buf) .unwrap() - .scan() + .scan2() .unwrap() .with_filter(eq( cast( @@ -525,7 +525,7 @@ async fn filter_string() { .open_options() .open_buffer(buf) .unwrap() - .scan() + .scan2() .unwrap() .with_filter(eq(get_item("name", root()), lit("Joseph"))) .into_array_stream() @@ -574,7 +574,7 @@ async fn filter_or() { .open_options() .open_buffer(buf) .unwrap() - .scan() + .scan2() .unwrap() .with_filter(or( eq(get_item("name", root()), lit("Angela")), @@ -634,7 +634,7 @@ async fn filter_and() { .open_options() .open_buffer(buf) .unwrap() - .scan() + .scan2() .unwrap() .with_filter(and( gt(get_item("age", root()), lit(21)), @@ -688,7 +688,7 @@ async fn test_with_indices_simple() { // test no indices let actual_kept_array = file - .scan() + .scan2() .unwrap() .with_row_indices(Buffer::::empty()) .into_array_stream() @@ -704,7 +704,7 @@ async fn test_with_indices_simple() { let kept_indices = [0_u64, 3, 99, 100, 101, 399, 400, 401, 499]; let actual_kept_array = file - .scan() + .scan2() .unwrap() .with_row_indices(Buffer::from_iter(kept_indices)) .into_array_stream() @@ -724,7 +724,7 @@ async fn test_with_indices_simple() { // test all indices let actual_array = file - .scan() + .scan2() .unwrap() .with_row_indices((0u64..500).collect::>()) .into_array_stream() @@ -767,7 +767,7 @@ async fn test_with_indices_on_two_columns() { let kept_indices = [0_u64, 3, 7]; let array = file - .scan() + .scan2() .unwrap() .with_row_indices(Buffer::from_iter(kept_indices)) .into_array_stream() @@ -822,7 +822,7 @@ async fn test_with_indices_and_with_row_filter_simple() { let file = SESSION.open_options().open_buffer(buf).unwrap(); let actual_kept_array = file - .scan() + .scan2() .unwrap() .with_filter(gt(get_item("numbers", root()), lit(50_i16))) .with_row_indices(Buffer::empty()) @@ -839,7 +839,7 @@ async fn test_with_indices_and_with_row_filter_simple() { let kept_indices = [0u64, 3, 99, 100, 101, 399, 400, 401, 499]; let actual_kept_array = file - .scan() + .scan2() .unwrap() .with_filter(gt(get_item("numbers", root()), lit(50_i16))) .with_row_indices(Buffer::from_iter(kept_indices)) @@ -862,7 +862,7 @@ async fn test_with_indices_and_with_row_filter_simple() { // test all indices let actual_array = file - .scan() + .scan2() .unwrap() .with_filter(gt(get_item("numbers", root()), lit(50_i16))) .with_row_indices((0..500).collect::>()) @@ -925,7 +925,7 @@ async fn filter_string_chunked() { let file = SESSION.open_options().open_buffer(buf).unwrap(); let actual_array = file - .scan() + .scan2() .unwrap() .with_filter(eq(get_item("name", root()), lit("Joseph"))) .into_array_stream() @@ -1013,7 +1013,7 @@ async fn test_pruning_with_or() { let file = SESSION.open_options().open_buffer(buf).unwrap(); let actual_array = file - .scan() + .scan2() .unwrap() .with_filter(or( lt_eq(get_item("letter", root()), lit("J")), @@ -1086,7 +1086,7 @@ async fn test_repeated_projection() { let file = SESSION.open_options().open_buffer(buf).unwrap(); let actual = file - .scan() + .scan2() .unwrap() .with_projection(select(["strings", "strings"], root())) .into_array_stream() @@ -1126,7 +1126,7 @@ async fn chunked_file() -> VortexResult { #[tokio::test] async fn basic_file_roundtrip() -> VortexResult<()> { let vxf = chunked_file().await?; - let result = vxf.scan()?.into_array_stream()?.read_all().await?; + let result = vxf.scan2()?.into_array_stream()?.read_all().await?; let expected = buffer![0i32, 1, 2, 3, 4, 5, 6, 7, 8].into_array(); assert_arrays_eq!(result.as_ref(), expected.as_ref()); @@ -1170,7 +1170,7 @@ async fn file_excluding_dtype() -> VortexResult<()> { async fn file_take() -> VortexResult<()> { let vxf = chunked_file().await?; let result = vxf - .scan()? + .scan2()? .with_row_indices(buffer![0, 1, 8]) .into_array_stream()? .read_all() @@ -1208,7 +1208,7 @@ async fn write_nullable_top_level_struct() { async fn round_trip( array: &dyn Array, - f: impl Fn(ScanBuilder) -> VortexResult>, + f: impl Fn(ScanBuilder2) -> VortexResult, ) -> VortexResult { let mut writer = vec![]; SESSION @@ -1225,7 +1225,7 @@ async fn round_trip( assert_eq!(vxf.dtype(), array.dtype()); assert_eq!(vxf.row_count(), array.len() as u64); - f(vxf.scan()?)?.into_array_stream()?.read_all().await + f(vxf.scan2()?)?.into_array_stream()?.read_all().await } #[tokio::test] @@ -1304,7 +1304,7 @@ async fn test_into_tokio_array_stream() -> VortexResult<()> { .await?; let file = SESSION.open_options().open_buffer(buf)?; - let stream = file.scan().unwrap().into_array_stream()?; + let stream = file.scan2().unwrap().into_array_stream()?; let array = stream.read_all().await?; assert_eq!(array.len(), 8); @@ -1326,7 +1326,7 @@ async fn test_array_stream_no_double_dict_encode() -> VortexResult<()> { .write(&mut buf, array.to_array_stream()) .await?; let file = SESSION.open_options().open_buffer(buf)?; - let read_array = file.scan()?.into_array_stream()?.read_all().await?; + let read_array = file.scan2()?.into_array_stream()?.read_all().await?; let dict = read_array .as_opt::() @@ -1354,7 +1354,7 @@ async fn test_writer_basic_push() -> VortexResult<()> { assert_eq!(summary.row_count(), 4); let file = SESSION.open_options().open_buffer(buf)?; - let result = file.scan()?.into_array_stream()?.read_all().await?; + let result = file.scan2()?.into_array_stream()?.read_all().await?; assert_eq!(result.len(), 4); assert_eq!(result.dtype(), &dtype); @@ -1384,7 +1384,7 @@ async fn test_writer_multiple_pushes() -> VortexResult<()> { assert_eq!(summary.row_count(), 9); let file = SESSION.open_options().open_buffer(buf)?; - let result = file.scan()?.into_array_stream()?.read_all().await?; + let result = file.scan2()?.into_array_stream()?.read_all().await?; assert_eq!(result.len(), 9); let numbers = result.to_struct().field_by_name("numbers")?.clone(); @@ -1415,7 +1415,7 @@ async fn test_writer_push_stream() -> VortexResult<()> { assert_eq!(summary.row_count(), 6); let file = SESSION.open_options().open_buffer(buf)?; - let result = file.scan()?.into_array_stream()?.read_all().await?; + let result = file.scan2()?.into_array_stream()?.read_all().await?; assert_eq!(result.len(), 6); let numbers = result.to_struct().field_by_name("numbers")?.clone(); @@ -1476,7 +1476,7 @@ async fn test_writer_empty_chunks() -> VortexResult<()> { assert_eq!(summary.row_count(), 2); let file = SESSION.open_options().open_buffer(buf)?; - let result = file.scan()?.into_array_stream()?.read_all().await?; + let result = file.scan2()?.into_array_stream()?.read_all().await?; assert_eq!(result.len(), 2); let numbers = result.to_struct().field_by_name("numbers")?.clone(); @@ -1511,7 +1511,7 @@ async fn test_writer_mixed_push_and_stream() -> VortexResult<()> { assert_eq!(summary.row_count(), 6); let file = SESSION.open_options().open_buffer(buf)?; - let result = file.scan()?.into_array_stream()?.read_all().await?; + let result = file.scan2()?.into_array_stream()?.read_all().await?; assert_eq!(result.len(), 6); let numbers = result.to_struct().field_by_name("numbers")?.clone(); @@ -1548,7 +1548,7 @@ async fn test_writer_with_complex_types() -> VortexResult<()> { assert_eq!(footer.row_count(), 3); let file = SESSION.open_options().open_buffer(buf)?; - let result = file.scan()?.into_array_stream()?.read_all().await?; + let result = file.scan2()?.into_array_stream()?.read_all().await?; assert_eq!(result.len(), 3); assert_eq!(result.dtype(), &dtype); diff --git a/vortex-layout/src/layout.rs b/vortex-layout/src/layout.rs index 921f8c5c8ce..59329184554 100644 --- a/vortex-layout/src/layout.rs +++ b/vortex-layout/src/layout.rs @@ -25,6 +25,8 @@ use crate::display::DisplayLayoutTree; use crate::display::display_tree_with_segment_sizes; use crate::segments::SegmentId; use crate::segments::SegmentSource; +use crate::segments::SegmentSourceRef; +use crate::v2::reader::ReaderRef; pub type LayoutId = ArcRef; @@ -76,6 +78,12 @@ pub trait Layout: 'static + Send + Sync + Debug + private::Sealed { segment_source: Arc, session: &VortexSession, ) -> VortexResult; + + fn new_reader2( + &self, + segment_source: &SegmentSourceRef, + session: &VortexSession, + ) -> VortexResult; } pub trait IntoLayout { @@ -331,6 +339,14 @@ impl Layout for LayoutAdapter { ) -> VortexResult { V::new_reader(&self.0, name, segment_source, session) } + + fn new_reader2( + &self, + segment_source: &SegmentSourceRef, + session: &VortexSession, + ) -> VortexResult { + V::new_reader2(&self.0, segment_source, session) + } } mod private { diff --git a/vortex-layout/src/lib.rs b/vortex-layout/src/lib.rs index 86d408514c7..91ec7eba442 100644 --- a/vortex-layout/src/lib.rs +++ b/vortex-layout/src/lib.rs @@ -28,6 +28,7 @@ pub mod session; mod strategy; #[cfg(test)] mod test; +pub mod v2; pub mod vtable; pub type LayoutContext = Context; diff --git a/vortex-layout/src/segments/source.rs b/vortex-layout/src/segments/source.rs index a48a79b2889..3a5b3f84b99 100644 --- a/vortex-layout/src/segments/source.rs +++ b/vortex-layout/src/segments/source.rs @@ -1,6 +1,8 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::sync::Arc; + use futures::future::BoxFuture; use vortex_array::buffer::BufferHandle; use vortex_error::VortexResult; @@ -9,6 +11,9 @@ use crate::segments::SegmentId; /// Static future resolving to a segment byte buffer. pub type SegmentFuture = BoxFuture<'static, VortexResult>; +/// A reference-counted segment source. +pub type SegmentSourceRef = Arc; + /// A trait for providing segment data to a [`crate::LayoutReader`]. pub trait SegmentSource: 'static + Send + Sync { /// Request a segment, returning a future that will eventually resolve to the segment data. diff --git a/vortex-layout/src/v2/expression.rs b/vortex-layout/src/v2/expression.rs new file mode 100644 index 00000000000..76f639f987f --- /dev/null +++ b/vortex-layout/src/v2/expression.rs @@ -0,0 +1,52 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::Arc; + +use itertools::Itertools; +use vortex_array::expr::Expression; +use vortex_array::expr::Literal; +use vortex_array::expr::Root; +use vortex_error::VortexResult; + +use crate::v2::reader::Reader; +use crate::v2::reader::ReaderRef; +use crate::v2::readers::constant::ConstantReader; +use crate::v2::readers::scalar_fn::ScalarFnReader; + +impl dyn Reader + '_ { + /// Apply the expression to this reader, producing a new reader in constant time. + pub fn apply(self: Arc, expr: &Expression) -> VortexResult { + // If the expression is a root, return self. + if expr.is::() { + return Ok(self); + } + + // Manually convert literals to ConstantArray. + if let Some(scalar) = expr.as_opt::() { + return Ok(Arc::new(ConstantReader::new( + scalar.clone(), + self.row_count(), + ))); + } + + let row_count = self.row_count(); + + // Otherwise, collect the child readers. + let children: Vec<_> = expr + .children() + .iter() + .map(|e| self.clone().apply(e)) + .try_collect()?; + + // And wrap the scalar function up in an array. + let reader: ReaderRef = Arc::new(ScalarFnReader::try_new( + expr.scalar_fn().clone(), + children, + row_count, + )?); + + // Optimize the resulting reader. + reader.optimize() + } +} diff --git a/vortex-layout/src/v2/expressions/falsify.rs b/vortex-layout/src/v2/expressions/falsify.rs new file mode 100644 index 00000000000..56cfcf8a77c --- /dev/null +++ b/vortex-layout/src/v2/expressions/falsify.rs @@ -0,0 +1,95 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::fmt::Display; +use std::fmt::Formatter; + +use vortex_array::ArrayRef; +use vortex_array::IntoArray; +use vortex_array::arrays::ConstantArray; +use vortex_array::expr::Arity; +use vortex_array::expr::ChildName; +use vortex_array::expr::ExecutionArgs; +use vortex_array::expr::ExprId; +use vortex_array::expr::Expression; +use vortex_array::expr::VTable; +use vortex_dtype::DType; +use vortex_dtype::Nullability; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_vector::Datum; +use vortex_vector::Scalar; +use vortex_vector::bool::BoolScalar; + +/// An expression that evaluates to true when the predicate is provably false, without evaluating +/// it. +/// +/// Falsify typically reduces to operations over statistics expressions. For example, +/// the expression `falsify(col > 5)` may reduce to `col.max() <= 5`. +/// +/// If a falsify expression cannot be reduced, it evaluates to `false` for all inputs. +pub struct Falsify; + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct FalsifyOptions { + predicate: Expression, +} + +impl Display for FalsifyOptions { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "predicate={}", self.predicate) + } +} + +impl VTable for Falsify { + // FIXME(ngates): should the predicate be a child expression, or live like this in the options. + // It's a bit weird? Maybe it makes implementing the optimizer rules a little more fiddly? + // But it's weird to have a child expression that we know is never executed. + type Options = FalsifyOptions; + + fn id(&self) -> ExprId { + ExprId::from("falsify") + } + + fn arity(&self, _options: &Self::Options) -> Arity { + Arity::Exact(0) + } + + fn child_name(&self, _options: &Self::Options, _child_idx: usize) -> ChildName { + ChildName::from("predicate") + } + + fn fmt_sql( + &self, + _options: &Self::Options, + expr: &Expression, + f: &mut Formatter<'_>, + ) -> std::fmt::Result { + write!(f, "falsify(")?; + expr.child(0).fmt_sql(f)?; + write!(f, ")") + } + + fn return_dtype(&self, _options: &Self::Options, arg_dtypes: &[DType]) -> VortexResult { + if !arg_dtypes[0].is_boolean() { + vortex_bail!("falsify() requires a boolean argument"); + } + Ok(DType::Bool(Nullability::NonNullable)) + } + + // NOTE(ngates): do we prefer evaluate or execute semantics??? + fn evaluate( + &self, + _options: &Self::Options, + _expr: &Expression, + scope: &ArrayRef, + ) -> VortexResult { + // Unless falsify has been reduced by another expression, we cannot prove the predicate + // is false. Therefore, we return a constant false array. + Ok(ConstantArray::new(false, scope.len()).into_array()) + } + + fn execute(&self, _options: &Self::Options, _args: ExecutionArgs) -> VortexResult { + Ok(Datum::Scalar(Scalar::Bool(BoolScalar::new(Some(false))))) + } +} diff --git a/vortex-layout/src/v2/expressions/mod.rs b/vortex-layout/src/v2/expressions/mod.rs new file mode 100644 index 00000000000..41b859483a0 --- /dev/null +++ b/vortex-layout/src/v2/expressions/mod.rs @@ -0,0 +1,4 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +mod falsify; diff --git a/vortex-layout/src/v2/matcher.rs b/vortex-layout/src/v2/matcher.rs new file mode 100644 index 00000000000..dcd68f2dc1d --- /dev/null +++ b/vortex-layout/src/v2/matcher.rs @@ -0,0 +1,16 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::expr; + +use crate::v2::reader::Reader; +use crate::v2::readers::scalar_fn::ScalarFnReader; + +impl dyn Reader + '_ { + /// If this reader is a [`ScalarFnReader`], return its scalar function options + pub fn as_scalar_fn(&self) -> Option<&V::Options> { + self.as_any() + .downcast_ref::() + .and_then(|r| r.scalar_fn().as_opt::()) + } +} diff --git a/vortex-layout/src/v2/mod.rs b/vortex-layout/src/v2/mod.rs new file mode 100644 index 00000000000..2fe71dbffb7 --- /dev/null +++ b/vortex-layout/src/v2/mod.rs @@ -0,0 +1,9 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +mod expression; +mod expressions; +mod matcher; +mod optimizer; +pub mod reader; +pub mod readers; diff --git a/vortex-layout/src/v2/optimizer.rs b/vortex-layout/src/v2/optimizer.rs new file mode 100644 index 00000000000..343179037ac --- /dev/null +++ b/vortex-layout/src/v2/optimizer.rs @@ -0,0 +1,16 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::Arc; + +use vortex_error::VortexResult; + +use crate::v2::reader::Reader; +use crate::v2::reader::ReaderRef; + +impl dyn Reader + '_ { + pub fn optimize(self: Arc) -> VortexResult { + // TODO(ngates): run the reduce rules + Ok(self) + } +} diff --git a/vortex-layout/src/v2/reader.rs b/vortex-layout/src/v2/reader.rs new file mode 100644 index 00000000000..589cbe8b322 --- /dev/null +++ b/vortex-layout/src/v2/reader.rs @@ -0,0 +1,73 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::any::Any; +use std::ops::Range; +use std::sync::Arc; + +use futures::future::BoxFuture; +use vortex_array::ArrayRef; +use vortex_dtype::DType; +use vortex_error::VortexResult; +use vortex_mask::Mask; + +pub type ReaderRef = Arc; + +/// A reader provides an interface for loading data from row-indexed layouts. +/// +/// Unlike a [`super::source::DataSource`], readers have a concrete row count allowing fixed +/// partitions over a known set of rows. Readers are driven by providing an input stream of +/// array data that can be used to provide arguments to parameterized filter and projection +/// expressions. +pub trait Reader: 'static + Send + Sync { + /// Downcast the reader to a concrete type. + fn as_any(&self) -> &dyn Any; + + /// Get the data type of the layout being read. + fn dtype(&self) -> &DType; + + /// Returns the number of rows in the reader. + fn row_count(&self) -> u64; + + /// Reduces the reader, simplifying its internal structure if possible. + fn try_reduce(&self) -> VortexResult> { + Ok(None) + } + + /// Reduce the parent reader if possible, returning a new reader if successful. + fn try_reduce_parent( + &self, + parent: &ReaderRef, + child_idx: usize, + ) -> VortexResult> { + let _ = (parent, child_idx); + Ok(None) + } + + /// Creates a scan over the given row range of the reader. + fn execute(&self, row_range: Range) -> VortexResult; +} + +pub type ReaderStreamRef = Box; + +pub trait ReaderStream: 'static + Send + Sync { + /// The data type of the returned data. + fn dtype(&self) -> &DType; + + /// The preferred maximum row count for the next chunk. + /// + /// Returns [`None`] if there are no more chunks. + fn next_chunk_len(&self) -> Option; + + /// Returns the next chunk of data given an input array. + /// + /// The returned chunk must have the same number of rows as the [`Mask::true_count`]. + /// The provided mask will have at most [`next_chunk_len`] rows. + /// + /// The returned future has a `'static` lifetime allowing the calling to drive the stream + /// arbitrarily far without awaiting any data. + fn next_chunk( + &mut self, + mask: &Mask, + ) -> VortexResult>>; +} diff --git a/vortex-layout/src/v2/readers/chunked.rs b/vortex-layout/src/v2/readers/chunked.rs new file mode 100644 index 00000000000..7eaa90e388a --- /dev/null +++ b/vortex-layout/src/v2/readers/chunked.rs @@ -0,0 +1,161 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::any::Any; +use std::ops::Range; + +use futures::future::BoxFuture; +use futures::future::try_join_all; +use moka::future::FutureExt; +use vortex_array::ArrayRef; +use vortex_array::IntoArray; +use vortex_array::arrays::ChunkedArray; +use vortex_dtype::DType; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_err; +use vortex_mask::Mask; + +use crate::v2::reader::Reader; +use crate::v2::reader::ReaderRef; +use crate::v2::reader::ReaderStream; +use crate::v2::reader::ReaderStreamRef; + +pub struct ChunkedReader { + row_count: u64, + dtype: DType, + chunks: Vec, +} + +impl Reader for ChunkedReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn dtype(&self) -> &DType { + &self.dtype + } + + fn row_count(&self) -> u64 { + self.row_count + } + + fn execute(&self, row_range: Range) -> VortexResult { + let mut remaining_start = row_range.start; + let mut remaining_end = row_range.end; + let mut streams = Vec::new(); + + for chunk in &self.chunks { + let chunk_row_count = chunk.row_count(); + + if remaining_start >= chunk_row_count { + // This chunk is before the requested range + remaining_start -= chunk_row_count; + remaining_end -= chunk_row_count; + continue; + } + + let start_in_chunk = remaining_start; + let end_in_chunk = if remaining_end <= chunk_row_count { + remaining_end + } else { + chunk_row_count + }; + + streams.push(chunk.execute(start_in_chunk..end_in_chunk)?); + + remaining_start = 0; + if remaining_end <= chunk_row_count { + break; + } else { + remaining_end -= chunk_row_count; + } + } + + Ok(Box::new(ChunkedReaderStream { + dtype: self.dtype.clone(), + chunks: streams, + })) + } +} + +struct ChunkedReaderStream { + dtype: DType, + chunks: Vec, +} + +impl ReaderStream for ChunkedReaderStream { + fn dtype(&self) -> &DType { + &self.dtype + } + + fn next_chunk_len(&self) -> Option { + self.chunks + .iter() + .map(|s| s.next_chunk_len()) + .find(|len| len.is_some()) + .flatten() + } + + fn next_chunk( + &mut self, + selection: &Mask, + ) -> VortexResult>> { + // Remove any chunks that are already exhausted + loop { + if self.chunks.is_empty() { + vortex_bail!("Early termination of chunked layout"); + } + if self.chunks[0].next_chunk_len().is_none() { + self.chunks.remove(0); + } else { + break; + } + } + + // Get the length of the next chunk + let mut next_len = self.chunks[0] + .next_chunk_len() + .ok_or_else(|| vortex_err!("Early termination of chunked layout"))?; + + if selection.len() <= next_len { + // The selection is smaller than the next chunk length, therefore we only need one chunk + return self.chunks[0].next_chunk(selection); + } + + // Otherwise, we need to gather from multiple chunks + let mut selection = selection.clone(); + let mut futs = vec![]; + while !selection.is_empty() { + if self.chunks.is_empty() { + vortex_bail!("Early termination of chunked layout"); + } + + // Slice off the right amount of selection for this chunk + let next_sel = selection.slice(..next_len); + selection = selection.slice(next_len..); + + let fut = self.chunks[0].next_chunk(&next_sel)?; + futs.push(fut); + + // Remove any chunks that are already exhausted + loop { + if self.chunks.is_empty() { + vortex_bail!("Early termination of chunked layout"); + } + if self.chunks[0].next_chunk_len().is_none() { + self.chunks.remove(0); + } + next_len = self.chunks[0].next_chunk_len().vortex_expect("non-none"); + } + } + + let dtype = self.dtype.clone(); + Ok(async move { + let arrays = try_join_all(futs).await?; + Ok(ChunkedArray::try_new(arrays, dtype)?.into_array()) + } + .boxed()) + } +} diff --git a/vortex-layout/src/v2/readers/constant.rs b/vortex-layout/src/v2/readers/constant.rs new file mode 100644 index 00000000000..a15676b712b --- /dev/null +++ b/vortex-layout/src/v2/readers/constant.rs @@ -0,0 +1,79 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::any::Any; +use std::ops::Range; + +use futures::future::BoxFuture; +use moka::future::FutureExt; +use vortex_array::ArrayRef; +use vortex_array::IntoArray; +use vortex_array::arrays::ConstantArray; +use vortex_dtype::DType; +use vortex_error::VortexResult; +use vortex_mask::Mask; +use vortex_scalar::Scalar; + +use crate::v2::reader::Reader; +use crate::v2::reader::ReaderStream; +use crate::v2::reader::ReaderStreamRef; + +pub struct ConstantReader { + scalar: Scalar, + row_count: u64, +} + +impl ConstantReader { + pub fn new(scalar: Scalar, row_count: u64) -> Self { + Self { scalar, row_count } + } +} + +impl Reader for ConstantReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn dtype(&self) -> &DType { + self.scalar.dtype() + } + + fn row_count(&self) -> u64 { + self.row_count + } + + fn execute(&self, row_range: Range) -> VortexResult { + let remaining = row_range.end.saturating_sub(row_range.start); + Ok(Box::new(ConstantReaderStream { + scalar: self.scalar.clone(), + remaining, + })) + } +} + +struct ConstantReaderStream { + scalar: Scalar, + remaining: u64, +} + +impl ReaderStream for ConstantReaderStream { + fn dtype(&self) -> &DType { + self.scalar.dtype() + } + + fn next_chunk_len(&self) -> Option { + if self.remaining == 0 { + None + } else { + Some(usize::try_from(self.remaining).unwrap_or(usize::MAX)) + } + } + + fn next_chunk( + &mut self, + mask: &Mask, + ) -> VortexResult>> { + let array = ConstantArray::new(self.scalar.clone(), mask.true_count()).into_array(); + Ok(async move { Ok(array) }.boxed()) + } +} diff --git a/vortex-layout/src/v2/readers/flat.rs b/vortex-layout/src/v2/readers/flat.rs new file mode 100644 index 00000000000..fefeec326d3 --- /dev/null +++ b/vortex-layout/src/v2/readers/flat.rs @@ -0,0 +1,114 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::any::Any; +use std::ops::Range; + +use futures::future::BoxFuture; +use moka::future::FutureExt; +use vortex_array::ArrayRef; +use vortex_dtype::DType; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_err; +use vortex_mask::Mask; + +use crate::layouts::SharedArrayFuture; +use crate::v2::reader::Reader; +use crate::v2::reader::ReaderStream; +use crate::v2::reader::ReaderStreamRef; + +pub struct FlatReader { + len: usize, + dtype: DType, + array_fut: SharedArrayFuture, +} + +impl Reader for FlatReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn dtype(&self) -> &DType { + &self.dtype + } + + fn row_count(&self) -> u64 { + self.len as u64 + } + + fn execute(&self, row_range: Range) -> VortexResult { + // We need to share the same array future + let array_fut = self.array_fut.clone(); + + let start = usize::try_from(row_range.start).map_err(|_| { + vortex_err!("Row range start {} is too large for usize", row_range.start) + })?; + let end = usize::try_from(row_range.end) + .map_err(|_| vortex_err!("Row range end {} is too large for usize", row_range.end))?; + + if start > self.len || end > self.len || start > end { + vortex_bail!( + "Row range {:?} is out of bounds for array of length {}", + row_range, + self.len + ); + } + + Ok(Box::new(FlatLayoutReaderStream { + dtype: self.dtype.clone(), + array_fut, + offset: start, + remaining: end - start, + })) + } +} + +struct FlatLayoutReaderStream { + dtype: DType, + array_fut: SharedArrayFuture, + offset: usize, + remaining: usize, +} + +impl ReaderStream for FlatLayoutReaderStream { + fn dtype(&self) -> &DType { + &self.dtype + } + + fn next_chunk_len(&self) -> Option { + if self.remaining == 0 { + None + } else { + Some(self.remaining) + } + } + + fn next_chunk( + &mut self, + selection: &Mask, + ) -> VortexResult>> { + if selection.len() > self.remaining { + vortex_bail!( + "Selection mask length {} exceeds remaining rows {}", + selection.len(), + self.remaining + ); + } + + let array_fut = self.array_fut.clone(); + let offset = self.offset; + let selection = selection.clone(); + + self.offset += selection.len(); + self.remaining -= selection.len(); + + Ok(async move { + let array = array_fut.await?; + let sliced_array = array.slice(offset..offset + selection.len()); + let selected_array = sliced_array.filter(selection.clone())?; + Ok(selected_array) + } + .boxed()) + } +} diff --git a/vortex-layout/src/v2/readers/mod.rs b/vortex-layout/src/v2/readers/mod.rs new file mode 100644 index 00000000000..46d3c793376 --- /dev/null +++ b/vortex-layout/src/v2/readers/mod.rs @@ -0,0 +1,9 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +pub mod chunked; +pub mod constant; +pub mod flat; +pub mod scalar_fn; +pub mod struct_; +pub mod zoned; diff --git a/vortex-layout/src/v2/readers/scalar_fn.rs b/vortex-layout/src/v2/readers/scalar_fn.rs new file mode 100644 index 00000000000..86e2348760f --- /dev/null +++ b/vortex-layout/src/v2/readers/scalar_fn.rs @@ -0,0 +1,144 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::any::Any; +use std::ops::Range; +use std::sync::Arc; + +use futures::future::BoxFuture; +use futures::future::try_join_all; +use vortex_array::ArrayRef; +use vortex_array::IntoArray; +use vortex_array::arrays::ScalarFnArray; +use vortex_array::expr::Expression; +use vortex_array::expr::ScalarFn; +use vortex_array::expr::VTable; +use vortex_array::expr::VTableExt; +use vortex_array::optimizer::ArrayOptimizer; +use vortex_dtype::DType; +use vortex_error::VortexResult; +use vortex_mask::Mask; + +use crate::v2::reader::Reader; +use crate::v2::reader::ReaderRef; +use crate::v2::reader::ReaderStream; +use crate::v2::reader::ReaderStreamRef; + +/// A [`Reader] for applying a scalar function to another layout. +pub struct ScalarFnReader { + scalar_fn: ScalarFn, + dtype: DType, + row_count: u64, + children: Vec, +} + +impl ScalarFnReader { + pub fn try_new( + scalar_fn: ScalarFn, + children: Vec, + row_count: u64, + ) -> VortexResult { + let dtype = scalar_fn.return_dtype( + &children + .iter() + .map(|c| c.dtype().clone()) + .collect::>(), + )?; + + Ok(Self { + scalar_fn, + dtype, + row_count, + children, + }) + } + + pub fn scalar_fn(&self) -> &ScalarFn { + &self.scalar_fn + } +} + +impl Reader for ScalarFnReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn dtype(&self) -> &DType { + &self.dtype + } + + fn row_count(&self) -> u64 { + self.row_count + } + + fn execute(&self, row_range: Range) -> VortexResult { + let input_streams = self + .children + .iter() + .map(|child| child.execute(row_range.clone())) + .collect::>>()?; + + Ok(Box::new(ScalarFnArrayStream { + dtype: self.dtype.clone(), + scalar_fn: self.scalar_fn.clone(), + input_streams, + })) + } +} + +struct ScalarFnArrayStream { + dtype: DType, + scalar_fn: ScalarFn, + input_streams: Vec, +} + +impl ReaderStream for ScalarFnArrayStream { + fn dtype(&self) -> &DType { + &self.dtype + } + + fn next_chunk_len(&self) -> Option { + self.input_streams + .iter() + .map(|s| s.next_chunk_len()) + .min() + .flatten() + } + + fn next_chunk( + &mut self, + selection: &Mask, + ) -> VortexResult>> { + let scalar_fn = self.scalar_fn.clone(); + let len = selection.true_count(); + let futs = self + .input_streams + .iter_mut() + .map(|s| s.next_chunk(selection)) + .collect::>>()?; + + Ok(Box::pin(async move { + let input_arrays = try_join_all(futs).await?; + let array = ScalarFnArray::try_new(scalar_fn, input_arrays, len)?.into_array(); + let array = array.optimize()?; + Ok(array) + })) + } +} + +pub trait ScalarFnReaderExt: VTable { + /// Creates a [`ScalarFnReader`] applying this scalar function to the given children. + fn new_reader( + &'static self, + options: Self::Options, + children: Vec, + row_count: u64, + ) -> VortexResult { + Ok(Arc::new(ScalarFnReader::try_new( + self.bind(options), + children.into(), + row_count, + )?)) + } +} +impl ScalarFnReaderExt for V {} diff --git a/vortex-layout/src/v2/readers/struct_.rs b/vortex-layout/src/v2/readers/struct_.rs new file mode 100644 index 00000000000..d290065e16c --- /dev/null +++ b/vortex-layout/src/v2/readers/struct_.rs @@ -0,0 +1,97 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::any::Any; +use std::ops::Range; + +use futures::future::BoxFuture; +use futures::future::try_join_all; +use moka::future::FutureExt; +use vortex_array::ArrayRef; +use vortex_array::IntoArray; +use vortex_array::arrays::StructArray; +use vortex_array::validity::Validity; +use vortex_dtype::DType; +use vortex_error::VortexResult; +use vortex_mask::Mask; + +use crate::v2::reader::Reader; +use crate::v2::reader::ReaderRef; +use crate::v2::reader::ReaderStream; +use crate::v2::reader::ReaderStreamRef; + +pub struct StructReader { + row_count: u64, + dtype: DType, + // TODO(ngates): we should make this lazy? + fields: Vec, +} + +impl Reader for StructReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn dtype(&self) -> &DType { + &self.dtype + } + + fn row_count(&self) -> u64 { + self.row_count + } + + fn execute(&self, row_range: Range) -> VortexResult { + let field_streams = self + .fields + .iter() + .map(|field| field.execute(row_range.clone())) + .collect::>>()?; + + Ok(Box::new(StructReaderStream { + dtype: self.dtype.clone(), + fields: field_streams, + })) + } +} + +struct StructReaderStream { + dtype: DType, + fields: Vec, +} + +impl ReaderStream for StructReaderStream { + fn dtype(&self) -> &DType { + &self.dtype + } + + fn next_chunk_len(&self) -> Option { + self.fields + .iter() + .map(|s| s.next_chunk_len()) + .min() + .flatten() + } + + fn next_chunk( + &mut self, + selection: &Mask, + ) -> VortexResult>> { + let struct_fields = self.dtype.as_struct_fields().clone(); + let validity: Validity = self.dtype.nullability().into(); + let fields = self + .fields + .iter_mut() + .map(|s| s.next_chunk(selection)) + .collect::>>()?; + let len = selection.true_count(); + + Ok(async move { + let fields = try_join_all(fields).await?; + Ok( + StructArray::try_new_with_dtype(fields, struct_fields, len, validity.clone())? + .into_array(), + ) + } + .boxed()) + } +} diff --git a/vortex-layout/src/v2/readers/zoned.rs b/vortex-layout/src/v2/readers/zoned.rs new file mode 100644 index 00000000000..5fec47880ec --- /dev/null +++ b/vortex-layout/src/v2/readers/zoned.rs @@ -0,0 +1,104 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::any::Any; +use std::ops::Range; +use std::sync::Arc; + +use vortex_array::expr::GetItem; +use vortex_array::expr::Statistic; +use vortex_array::expr::stats::Stat; +use vortex_dtype::DType; +use vortex_dtype::FieldName; +use vortex_error::VortexResult; + +use crate::v2::reader::Reader; +use crate::v2::reader::ReaderRef; +use crate::v2::reader::ReaderStreamRef; +use crate::v2::readers::scalar_fn::ScalarFnReaderExt; + +pub struct ZonedReader { + data: ReaderRef, + zone_map: ReaderRef, + zone_len: usize, + present_stats: Arc<[Stat]>, +} + +impl Reader for ZonedReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn dtype(&self) -> &DType { + self.data.dtype() + } + + fn row_count(&self) -> u64 { + self.data.row_count() + } + + fn try_reduce_parent( + &self, + parent: &ReaderRef, + _child_idx: usize, + ) -> VortexResult> { + if let Some(stat) = parent.as_scalar_fn::() { + if !self.present_stats.contains(stat) { + return Ok(None); + } + + // We know the statistic is present; so we return a new reader that pulls the value + // from the zone map. + let zoned_statistic = GetItem.new_reader( + // FIXME(ngates): construct the field name properly + FieldName::from(stat.name()), + vec![self.zone_map.clone()], + self.zone_map.row_count(), + )?; + + // We now need to explode the zoned_statistic to match the data reader's row count. + // We do this based on the zone map's zone length. + let exploded_statistic = Arc::new(ZonedExpansionReader { + zoned: zoned_statistic, + zone_len: self.zone_len, + row_count: self.data.row_count(), + }); + + return Ok(Some(exploded_statistic)); + } + + Ok(None) + } + + fn execute(&self, row_range: Range) -> VortexResult { + // By default, a zoned reader is just a pass-through. + self.data.execute(row_range) + } +} + +/// A reader that expands zoned statistics to match the data rows. +/// This repeats each row of the zone map `zone_len` times. +/// TODO(ngates): we could use a RunEndReader + Slice to do this? +struct ZonedExpansionReader { + zoned: ReaderRef, + zone_len: usize, + row_count: u64, +} + +impl Reader for ZonedExpansionReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn dtype(&self) -> &DType { + self.zoned.dtype() + } + + fn row_count(&self) -> u64 { + self.row_count + } + + fn execute(&self, row_range: Range) -> VortexResult { + todo!() + } +} diff --git a/vortex-layout/src/vtable.rs b/vortex-layout/src/vtable.rs index b01b86c71d1..d4f794e257d 100644 --- a/vortex-layout/src/vtable.rs +++ b/vortex-layout/src/vtable.rs @@ -24,6 +24,8 @@ use crate::LayoutRef; use crate::children::LayoutChildren; use crate::segments::SegmentId; use crate::segments::SegmentSource; +use crate::segments::SegmentSourceRef; +use crate::v2::reader::ReaderRef; pub trait VTable: 'static + Sized + Send + Sync + Debug { type Layout: 'static + Send + Sync + Clone + Debug + Deref + IntoLayout; @@ -65,6 +67,16 @@ pub trait VTable: 'static + Sized + Send + Sync + Debug { session: &VortexSession, ) -> VortexResult; + /// Create a new v2 reader for the layout. + fn new_reader2( + layout: &Self::Layout, + segment_source: &SegmentSourceRef, + session: &VortexSession, + ) -> VortexResult { + let _ = (layout, segment_source, session); + vortex_bail!("new_reader2 not implemented for this layout") + } + #[cfg(gpu_unstable)] /// Create a new reader for the layout that uses a gpu device fn new_gpu_reader( diff --git a/vortex-scan/Cargo.toml b/vortex-scan/Cargo.toml index 0f9f34b00de..a8ea3c16398 100644 --- a/vortex-scan/Cargo.toml +++ b/vortex-scan/Cargo.toml @@ -29,6 +29,7 @@ vortex-mask = { workspace = true } vortex-metrics = { workspace = true } vortex-session = { workspace = true } +async-trait = { workspace = true } bit-vec = { workspace = true } futures = { workspace = true } itertools = { workspace = true } diff --git a/vortex-scan/src/lib.rs b/vortex-scan/src/lib.rs index 52eb0dd47bb..ed62775f7b0 100644 --- a/vortex-scan/src/lib.rs +++ b/vortex-scan/src/lib.rs @@ -26,5 +26,6 @@ pub mod gpu; mod repeated_scan; #[cfg(test)] mod test; +pub mod v2; pub use repeated_scan::RepeatedScan; diff --git a/vortex-scan/src/v2/mod.rs b/vortex-scan/src/v2/mod.rs new file mode 100644 index 00000000000..4f05171404d --- /dev/null +++ b/vortex-scan/src/v2/mod.rs @@ -0,0 +1,4 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +pub mod scan; diff --git a/vortex-scan/src/v2/scan.rs b/vortex-scan/src/v2/scan.rs new file mode 100644 index 00000000000..834ce905e77 --- /dev/null +++ b/vortex-scan/src/v2/scan.rs @@ -0,0 +1,130 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::ops::Range; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +use futures::Stream; +use vortex_array::ArrayRef; +use vortex_array::expr::Expression; +use vortex_array::expr::root; +use vortex_array::stream::ArrayStream; +use vortex_array::stream::SendableArrayStream; +use vortex_buffer::Buffer; +use vortex_dtype::DType; +use vortex_error::VortexResult; +use vortex_layout::v2::reader::ReaderRef; +use vortex_session::VortexSession; + +use crate::Selection; + +pub struct ScanBuilder2 { + reader: ReaderRef, + projection: Expression, + filter: Option, + limit: Option, + row_range: Range, + row_selection: Selection, // NOTE: applies to the selected row range. + session: VortexSession, +} + +impl ScanBuilder2 { + pub fn new(reader: ReaderRef, session: VortexSession) -> Self { + let row_range = 0..reader.row_count(); + Self { + reader, + projection: root(), + filter: None, + limit: None, + row_range, + row_selection: Selection::All, + session, + } + } + + pub fn with_filter(mut self, filter: Expression) -> Self { + self.filter = Some(filter); + self + } + + pub fn with_some_filter(mut self, filter: Option) -> Self { + self.filter = filter; + self + } + + pub fn with_projection(mut self, projection: Expression) -> Self { + self.projection = projection; + self + } + + pub fn with_limit(mut self, limit: u64) -> Self { + self.limit = Some(limit); + self + } + + pub fn with_row_range(mut self, row_range: Range) -> Self { + self.row_range = row_range; + self + } + + /// Sets the row selection to use the given selection (relative to the row range). + pub fn with_row_selection(mut self, row_selection: Selection) -> Self { + self.row_selection = row_selection; + self + } + + /// Sets the row selection to include only the given row indices (relative to the row range). + pub fn with_row_indices(mut self, row_indices: Buffer) -> Self { + self.row_selection = Selection::IncludeByIndex(row_indices); + self + } + + pub fn into_array_stream(self) -> VortexResult { + let projection = self.projection.optimize_recursive(self.reader.dtype())?; + let filter = self + .filter + .map(|f| f.optimize_recursive(self.reader.dtype())) + .transpose()?; + + let dtype = projection.return_dtype(self.reader.dtype())?; + + // So we wrap the reader for filtering. + let filter_reader = filter.as_ref().map(|f| self.reader.apply(&f)).transpose()?; + let projection_reader = self.reader.apply(&projection)?; + + // And finally, we wrap the reader for pruning. + let pruning_reader = filter + .as_ref() + .map(|f| { + // TODO(ngates): wrap filter in `falsify` expression. + let f = f.falsify()?; + self.reader.apply(&f) + }) + .transpose()?; + + let reader_stream = self.reader.execute(self.row_range)?; + + Ok(Scan { dtype }) + } +} + +struct Scan { + dtype: DType, + stream: SendableArrayStream, +} + +impl ArrayStream for Scan { + fn dtype(&self) -> &DType { + &self.dtype + } +} + +impl Stream for Scan { + type Item = VortexResult; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + todo!() + } +}