diff --git a/vortex-array/src/expr/analysis/immediate_access.rs b/vortex-array/src/expr/analysis/immediate_access.rs index 6dc26a7c950..4fd3efc6bdc 100644 --- a/vortex-array/src/expr/analysis/immediate_access.rs +++ b/vortex-array/src/expr/analysis/immediate_access.rs @@ -11,6 +11,7 @@ use crate::expr::analysis::AnnotationFn; use crate::expr::analysis::Annotations; use crate::expr::descendent_annotations; use crate::expr::exprs::get_item::GetItem; +use crate::expr::exprs::get_item_list::GetItemList; use crate::expr::exprs::root::Root; use crate::expr::exprs::select::Select; @@ -24,11 +25,20 @@ pub fn annotate_scope_access(scope: &StructFields) -> impl AnnotationFn() { - if expr.child(0).is::() { - return vec![field_name.clone()]; - } - } else if expr.is::() { + if let Some(field_name) = expr.as_opt::() + && expr.child(0).is::() + { + return vec![field_name.clone()]; + } + + if expr.is::() + && let Some(field_name) = expr.child(0).as_opt::() + && expr.child(0).child(0).is::() + { + return vec![field_name.clone()]; + } + + if expr.is::() { return scope.names().iter().cloned().collect(); } diff --git a/vortex-array/src/expr/exprs/get_item.rs b/vortex-array/src/expr/exprs/get_item.rs index e72b8036a0c..ecdd67cf46e 100644 --- a/vortex-array/src/expr/exprs/get_item.rs +++ b/vortex-array/src/expr/exprs/get_item.rs @@ -30,9 +30,11 @@ use crate::expr::Pack; use crate::expr::ReduceCtx; use crate::expr::ReduceNode; use crate::expr::ReduceNodeRef; +use crate::expr::SimplifyCtx; use crate::expr::StatsCatalog; use crate::expr::VTable; use crate::expr::VTableExt; +use crate::expr::exprs::get_item_list::get_item_list; use crate::expr::exprs::root::root; use crate::expr::lit; use crate::expr::stats::Stat; @@ -90,15 +92,7 @@ impl VTable for GetItem { vortex_err!("Couldn't find the {} field in the input scope", field_name) })?; - // Match here to avoid cloning the dtype if nullability doesn't need to change - if matches!( - (struct_dtype.nullability(), field_dtype.nullability()), - (Nullability::Nullable, Nullability::NonNullable) - ) { - return Ok(field_dtype.with_nullability(Nullability::Nullable)); - } - - Ok(field_dtype) + Ok(field_dtype.union_nullability(struct_dtype.nullability())) } fn execute( @@ -120,6 +114,30 @@ impl VTable for GetItem { .execute(args.ctx) } + fn simplify( + &self, + field_name: &FieldName, + expr: &Expression, + ctx: &dyn SimplifyCtx, + ) -> VortexResult> { + let child = expr.child(0); + let child_dtype = ctx.return_dtype(child)?; + + let element_dtype = match child_dtype { + DType::List(element_dtype, _) => Some(element_dtype), + DType::FixedSizeList(element_dtype, ..) => Some(element_dtype), + _ => None, + }; + + if let Some(element_dtype) = element_dtype + && element_dtype.as_struct_fields_opt().is_some() + { + Ok(Some(get_item_list(field_name.clone(), child.clone()))) + } else { + Ok(None) + } + } + fn reduce( &self, field_name: &FieldName, @@ -240,6 +258,8 @@ pub fn get_item(field: impl Into, child: Expression) -> Expression { #[cfg(test)] mod tests { + use std::sync::Arc; + use vortex_buffer::buffer; use vortex_dtype::DType; use vortex_dtype::FieldNames; @@ -251,9 +271,12 @@ mod tests { use crate::Array; use crate::IntoArray; + use crate::arrays::FixedSizeListArray; + use crate::arrays::ListArray; use crate::arrays::StructArray; use crate::expr::exprs::binary::checked_add; use crate::expr::exprs::get_item::get_item; + use crate::expr::exprs::get_item_list::get_item_list; use crate::expr::exprs::literal::lit; use crate::expr::exprs::pack::pack; use crate::expr::exprs::root::root; @@ -301,6 +324,182 @@ mod tests { ); } + #[test] + fn get_item_list_of_struct() { + let element_dtype = Arc::new(DType::Struct( + [ + ("a", DType::Primitive(PType::I32, NonNullable)), + ("b", DType::Utf8(NonNullable)), + ] + .into_iter() + .collect(), + NonNullable, + )); + + let row_count = 4; + let items = ListArray::from_iter_opt_slow::( + [ + Some(vec![ + Scalar::struct_( + (*element_dtype).clone(), + vec![ + Scalar::primitive(1i32, NonNullable), + Scalar::utf8("x", NonNullable), + ], + ), + Scalar::struct_( + (*element_dtype).clone(), + vec![ + Scalar::primitive(2i32, NonNullable), + Scalar::utf8("y", NonNullable), + ], + ), + ]), + Some(Vec::new()), + None, + Some(vec![Scalar::struct_( + (*element_dtype).clone(), + vec![ + Scalar::primitive(3i32, NonNullable), + Scalar::utf8("z", NonNullable), + ], + )]), + ], + element_dtype, + ) + .unwrap(); + + let ids = buffer![0i32, 1, 2, 3].into_array(); + + let data = StructArray::new( + FieldNames::from(["id", "items"]), + vec![ids, items], + row_count, + Validity::NonNullable, + ) + .into_array(); + + // Regression for nested field projection on list-of-struct: `items.a`. + let projection = get_item_list("a", get_item("items", root())); + let out = data.apply(&projection).expect("apply"); + + assert_eq!( + out.dtype(), + &DType::List( + Arc::new(DType::Primitive(PType::I32, NonNullable)), + Nullability::Nullable + ) + ); + + assert_eq!( + out.scalar_at(0) + .unwrap() + .as_list() + .elements() + .unwrap() + .to_vec(), + vec![ + Scalar::primitive(1i32, NonNullable), + Scalar::primitive(2i32, NonNullable), + ] + ); + assert!( + out.scalar_at(1) + .unwrap() + .as_list() + .elements() + .unwrap() + .is_empty() + ); + assert!(out.scalar_at(2).unwrap().is_null()); + assert_eq!( + out.scalar_at(3) + .unwrap() + .as_list() + .elements() + .unwrap() + .to_vec(), + vec![Scalar::primitive(3i32, NonNullable)] + ); + } + + #[test] + fn get_item_fixed_size_list_of_struct() { + let n_lists: usize = 3; + let list_size: u32 = 2; + let n_elements = n_lists * list_size as usize; + + let struct_elems = StructArray::try_new( + FieldNames::from(["a", "b"]), + vec![ + buffer![1i32, 2, 3, 4, 5, 6].into_array(), + buffer![10i64, 20, 30, 40, 50, 60].into_array(), + ], + n_elements, + Validity::from_iter([true, false, true, true, false, true]), + ) + .unwrap() + .into_array(); + + let items = FixedSizeListArray::try_new( + struct_elems, + list_size, + Validity::from_iter([true, false, true]), + n_lists, + ) + .unwrap() + .into_array(); + + let ids = buffer![0i32, 1, 2].into_array(); + + let data = StructArray::new( + FieldNames::from(["id", "items"]), + vec![ids, items], + n_lists, + Validity::NonNullable, + ) + .into_array(); + + // FixedSizeList-of-struct projection: `items.a`, including struct-level nulls inside the list. + let projection = get_item_list("a", get_item("items", root())); + let out = data.apply(&projection).expect("apply"); + + assert_eq!( + out.dtype(), + &DType::FixedSizeList( + Arc::new(DType::Primitive(PType::I32, Nullability::Nullable)), + list_size, + Nullability::Nullable + ) + ); + + assert_eq!( + out.scalar_at(0) + .unwrap() + .as_list() + .elements() + .unwrap() + .to_vec(), + vec![ + Scalar::primitive(1i32, Nullability::Nullable), + Scalar::null(DType::Primitive(PType::I32, Nullability::Nullable)), + ] + ); + assert!(out.scalar_at(1).unwrap().is_null()); + assert_eq!( + out.scalar_at(2) + .unwrap() + .as_list() + .elements() + .unwrap() + .to_vec(), + vec![ + Scalar::null(DType::Primitive(PType::I32, Nullability::Nullable)), + Scalar::primitive(6i32, Nullability::Nullable), + ] + ); + } + #[test] fn test_pack_get_item_rule() { // Create: pack(a: lit(1), b: lit(2)).get_item("b") diff --git a/vortex-array/src/expr/exprs/get_item_list.rs b/vortex-array/src/expr/exprs/get_item_list.rs new file mode 100644 index 00000000000..23f842cde52 --- /dev/null +++ b/vortex-array/src/expr/exprs/get_item_list.rs @@ -0,0 +1,203 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::fmt::Formatter; +use std::ops::Not; +use std::sync::Arc; + +use vortex_dtype::DType; +use vortex_dtype::FieldName; +use vortex_dtype::Nullability; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_err; + +use crate::ArrayRef; +use crate::IntoArray; +use crate::arrays::FixedSizeListArray; +use crate::arrays::ListViewArray; +use crate::arrays::StructArray; +use crate::compute::mask; +use crate::expr::Arity; +use crate::expr::ChildName; +use crate::expr::ExecutionArgs; +use crate::expr::ExecutionResult; +use crate::expr::ExprId; +use crate::expr::Expression; +use crate::expr::VTable; +use crate::expr::VTableExt; +use crate::vtable::ValidityHelper; + +/// UNSTABLE: project a struct field from each element of a list. +/// +/// Semantics: +/// `get_item_list(field, list) == map(lambda x: get_item(field, x), list)`. +/// +/// This is a temporary internal expression used to support nested projections like `items.a` on +/// `list` and `fixed_size_list` without a general `map` expression. +/// +/// Do not serialize or persist this expression. It is not a stable part of the expression wire +/// format and may be removed or replaced by a proper `map`. +pub struct GetItemList; + +fn mask_struct_field(field: ArrayRef, struct_elems: &StructArray) -> VortexResult { + match struct_elems.dtype().nullability() { + Nullability::NonNullable => Ok(field), + Nullability::Nullable => { + let invalid = struct_elems.validity_mask()?.not(); + mask(field.as_ref(), &invalid) + } + } +} + +impl VTable for GetItemList { + type Options = FieldName; + + fn id(&self) -> ExprId { + ExprId::from("vortex.get_item_list") + } + + fn serialize(&self, _field_name: &FieldName) -> VortexResult>> { + vortex_bail!("UNSTABLE expression {} must not be serialized", self.id()) + } + + fn deserialize(&self, metadata: &[u8]) -> VortexResult { + _ = metadata; + vortex_bail!("UNSTABLE expression {} must not be deserialized", self.id()) + } + + fn arity(&self, _field_name: &FieldName) -> Arity { + Arity::Exact(1) + } + + fn child_name(&self, _field_name: &FieldName, child_idx: usize) -> ChildName { + match child_idx { + 0 => ChildName::from("list"), + _ => unreachable!( + "Invalid child index {} for GetItemList expression", + child_idx + ), + } + } + + fn fmt_sql( + &self, + field_name: &FieldName, + expr: &Expression, + f: &mut Formatter<'_>, + ) -> std::fmt::Result { + expr.child(0).fmt_sql(f)?; + write!(f, ".{}", field_name) + } + + fn return_dtype(&self, field_name: &FieldName, arg_dtypes: &[DType]) -> VortexResult { + let list_dtype = &arg_dtypes[0]; + + let (element_dtype, list_nullability, list_size) = match list_dtype { + DType::List(element_dtype, list_nullability) => { + (element_dtype.as_ref(), *list_nullability, None) + } + DType::FixedSizeList(element_dtype, list_size, list_nullability) => { + (element_dtype.as_ref(), *list_nullability, Some(*list_size)) + } + _ => { + return Err(vortex_err!( + "Expected list dtype for child of GetItemList expression, got {}", + list_dtype + )); + } + }; + + let struct_fields = element_dtype.as_struct_fields_opt().ok_or_else(|| { + vortex_err!( + "Expected list element struct dtype for GetItemList, got {}", + element_dtype + ) + })?; + + let field_dtype = struct_fields.field(field_name).ok_or_else(|| { + vortex_err!( + "Couldn't find the {} field in the list element struct dtype", + field_name + ) + })?; + + let projected = field_dtype.union_nullability(element_dtype.nullability()); + + Ok(match list_size { + Some(list_size) => { + DType::FixedSizeList(Arc::new(projected), list_size, list_nullability) + } + None => DType::List(Arc::new(projected), list_nullability), + }) + } + + fn execute( + &self, + field_name: &FieldName, + mut args: ExecutionArgs, + ) -> VortexResult { + let input = args + .inputs + .pop() + .vortex_expect("missing list for GetItemList expression"); + + match input.dtype() { + DType::List(..) => { + let list = input.execute::(args.ctx)?; + let struct_elems = list.elements().clone().execute::(args.ctx)?; + + let field = mask_struct_field( + struct_elems.unmasked_field_by_name(field_name)?.clone(), + &struct_elems, + )?; + + ListViewArray::try_new( + field, + list.offsets().clone(), + list.sizes().clone(), + list.validity().clone(), + )? + .into_array() + .execute(args.ctx) + } + DType::FixedSizeList(..) => { + let list = input.execute::(args.ctx)?; + let struct_elems = list.elements().clone().execute::(args.ctx)?; + + let field = mask_struct_field( + struct_elems.unmasked_field_by_name(field_name)?.clone(), + &struct_elems, + )?; + + FixedSizeListArray::try_new( + field, + list.list_size(), + list.validity().clone(), + list.len(), + )? + .into_array() + .execute(args.ctx) + } + _ => Err(vortex_err!( + "Expected list scope for GetItemList execution, got {}", + input.dtype() + )), + } + } + + fn is_null_sensitive(&self, _field_name: &FieldName) -> bool { + true + } + + fn is_fallible(&self, _field_name: &FieldName) -> bool { + false + } +} + +/// Creates an expression that projects a struct field from each element of a list. +#[doc(hidden)] +pub fn get_item_list(field: impl Into, list: Expression) -> Expression { + GetItemList.new_expr(field.into(), vec![list]) +} diff --git a/vortex-array/src/expr/exprs/mod.rs b/vortex-array/src/expr/exprs/mod.rs index c606b53f5a0..748d439dd93 100644 --- a/vortex-array/src/expr/exprs/mod.rs +++ b/vortex-array/src/expr/exprs/mod.rs @@ -6,6 +6,7 @@ pub(crate) mod binary; pub(crate) mod cast; pub(crate) mod dynamic; pub(crate) mod get_item; +pub(crate) mod get_item_list; pub(crate) mod is_null; pub(crate) mod like; pub(crate) mod list_contains; @@ -23,6 +24,7 @@ pub use binary::*; pub use cast::*; pub use dynamic::*; pub use get_item::*; +pub use get_item_list::*; pub use is_null::*; pub use like::*; pub use list_contains::*; diff --git a/vortex-array/src/expr/session.rs b/vortex-array/src/expr/session.rs index 13106c354c4..19ac2c7a24e 100644 --- a/vortex-array/src/expr/session.rs +++ b/vortex-array/src/expr/session.rs @@ -10,6 +10,7 @@ use crate::expr::exprs::between::Between; use crate::expr::exprs::binary::Binary; use crate::expr::exprs::cast::Cast; use crate::expr::exprs::get_item::GetItem; +use crate::expr::exprs::get_item_list::GetItemList; use crate::expr::exprs::is_null::IsNull; use crate::expr::exprs::like::Like; use crate::expr::exprs::list_contains::ListContains; @@ -57,6 +58,7 @@ impl Default for ExprSession { ExprVTable::new_static(&Binary), ExprVTable::new_static(&Cast), ExprVTable::new_static(&GetItem), + ExprVTable::new_static(&GetItemList), ExprVTable::new_static(&IsNull), ExprVTable::new_static(&Like), ExprVTable::new_static(&ListContains), diff --git a/vortex-array/src/expr/vtable.rs b/vortex-array/src/expr/vtable.rs index fd5c2fe7f79..e29145b8a7a 100644 --- a/vortex-array/src/expr/vtable.rs +++ b/vortex-array/src/expr/vtable.rs @@ -732,6 +732,7 @@ mod tests { use crate::expr::exprs::cast::cast; use crate::expr::exprs::get_item::col; use crate::expr::exprs::get_item::get_item; + use crate::expr::exprs::get_item_list::get_item_list; use crate::expr::exprs::is_null::is_null; use crate::expr::exprs::list_contains::list_contains; use crate::expr::exprs::literal::lit; @@ -814,4 +815,16 @@ mod tests { Ok(()) } + + #[test] + fn get_item_list_is_not_serializable() { + let expr = get_item_list("field", root()); + let err = expr + .serialize_proto() + .expect_err("get_item_list must not be serializable"); + assert!( + err.to_string().contains("must not be serialized"), + "unexpected error: {err}" + ); + } } diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index a7219441363..bacf800543e 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -204,8 +204,8 @@ impl FileOpener for VortexOpener { // The schema of the stream returned from the vortex scan. // We use the physical_file_schema as reference for types that don't roundtrip. - let scan_dtype = scan_projection.return_dtype(vxf.dtype()).map_err(|_e| { - exec_datafusion_err!("Couldn't get the dtype for the underlying Vortex scan") + let scan_dtype = scan_projection.return_dtype(vxf.dtype()).map_err(|e| { + exec_datafusion_err!("Couldn't get the dtype for the underlying Vortex scan: {e}") })?; let stream_schema = calculate_physical_schema(&scan_dtype, &projected_physical_schema)?; diff --git a/vortex-layout/src/layouts/list/mod.rs b/vortex-layout/src/layouts/list/mod.rs new file mode 100644 index 00000000000..cd1ccc0b0ad --- /dev/null +++ b/vortex-layout/src/layouts/list/mod.rs @@ -0,0 +1,226 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +mod reader; +pub mod writer; + +use std::sync::Arc; + +use reader::ListReader; +use vortex_array::ArrayContext; +use vortex_array::DeserializeMetadata; +use vortex_array::EmptyMetadata; +use vortex_dtype::DType; +use vortex_dtype::Nullability; +use vortex_dtype::PType; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_ensure; +use vortex_error::vortex_err; +use vortex_session::SessionExt; +use vortex_session::VortexSession; + +use crate::LayoutChildType; +use crate::LayoutEncodingRef; +use crate::LayoutId; +use crate::LayoutReaderRef; +use crate::LayoutRef; +use crate::VTable; +use crate::children::LayoutChildren; +use crate::children::OwnedLayoutChildren; +use crate::segments::SegmentId; +use crate::segments::SegmentSource; +use crate::vtable; + +vtable!(List); + +impl VTable for ListVTable { + type Layout = ListLayout; + type Encoding = ListLayoutEncoding; + type Metadata = EmptyMetadata; + + fn id(_encoding: &Self::Encoding) -> LayoutId { + LayoutId::new_ref("vortex.list") + } + + fn encoding(_layout: &Self::Layout) -> LayoutEncodingRef { + LayoutEncodingRef::new_ref(ListLayoutEncoding.as_ref()) + } + + fn row_count(layout: &Self::Layout) -> u64 { + layout.row_count + } + + fn dtype(layout: &Self::Layout) -> &DType { + &layout.dtype + } + + fn metadata(_layout: &Self::Layout) -> Self::Metadata { + EmptyMetadata + } + + fn segment_ids(_layout: &Self::Layout) -> Vec { + vec![] + } + + fn nchildren(layout: &Self::Layout) -> usize { + let validity_children = layout.dtype.is_nullable() as usize; + match &layout.dtype { + DType::List(..) => 2 + validity_children, // offsets + elements + DType::FixedSizeList(..) => 1 + validity_children, // elements + _ => unreachable!( + "ListLayout only supports List and FixedSizeList dtypes, got {}", + layout.dtype + ), + } + } + + fn child(layout: &Self::Layout, index: usize) -> VortexResult { + let is_nullable = layout.dtype.is_nullable(); + let offsets_dtype = DType::Primitive(PType::U64, Nullability::NonNullable); + + let child_dtype = match (&layout.dtype, is_nullable, index) { + // validity + (_, true, 0) => DType::Bool(Nullability::NonNullable), + + // variable-size list + (DType::List(..), false, 0) => offsets_dtype, + (DType::List(element_dtype, _), false, 1) => (*element_dtype.as_ref()).clone(), + (DType::List(..), true, 1) => offsets_dtype, + (DType::List(element_dtype, _), true, 2) => (*element_dtype.as_ref()).clone(), + + // fixed-size list + (DType::FixedSizeList(element_dtype, ..), false, 0) => { + (*element_dtype.as_ref()).clone() + } + (DType::FixedSizeList(element_dtype, ..), true, 1) => (*element_dtype.as_ref()).clone(), + + _ => return Err(vortex_err!("Invalid child index {index} for list layout")), + }; + + layout.children.child(index, &child_dtype) + } + + fn child_type(layout: &Self::Layout, idx: usize) -> LayoutChildType { + let is_nullable = layout.dtype.is_nullable(); + + if is_nullable && idx == 0 { + return LayoutChildType::Auxiliary("validity".into()); + } + + match &layout.dtype { + DType::List(..) => { + let offsets_idx = if is_nullable { 1 } else { 0 }; + if idx == offsets_idx { + LayoutChildType::Auxiliary("offsets".into()) + } else { + LayoutChildType::Auxiliary("elements".into()) + } + } + DType::FixedSizeList(..) => LayoutChildType::Auxiliary("elements".into()), + _ => unreachable!( + "ListLayout only supports List and FixedSizeList dtypes, got {}", + layout.dtype() + ), + } + } + + fn new_reader( + layout: &Self::Layout, + name: Arc, + segment_source: Arc, + session: &VortexSession, + ) -> VortexResult { + Ok(Arc::new(ListReader::try_new( + layout.clone(), + name, + segment_source, + session.session(), + )?)) + } + + fn build( + _encoding: &Self::Encoding, + dtype: &DType, + row_count: u64, + _metadata: &::Output, + _segment_ids: Vec, + children: &dyn LayoutChildren, + _ctx: &ArrayContext, + ) -> VortexResult { + vortex_ensure!( + matches!(dtype, DType::List(..) | DType::FixedSizeList(..)), + "Expected list dtype, got {}", + dtype + ); + + let expected_children = match dtype { + DType::List(..) => 2 + (dtype.is_nullable() as usize), + DType::FixedSizeList(..) => 1 + (dtype.is_nullable() as usize), + _ => unreachable!(), + }; + vortex_ensure!( + children.nchildren() == expected_children, + "List layout has {} children, expected {}", + children.nchildren(), + expected_children + ); + + Ok(ListLayout { + row_count, + dtype: dtype.clone(), + children: children.to_arc(), + }) + } + + fn with_children(layout: &mut Self::Layout, children: Vec) -> VortexResult<()> { + let expected_children = match layout.dtype { + DType::List(..) => 2 + (layout.dtype.is_nullable() as usize), + DType::FixedSizeList(..) => 1 + (layout.dtype.is_nullable() as usize), + _ => vortex_bail!("Expected list dtype, got {}", layout.dtype), + }; + vortex_ensure!( + children.len() == expected_children, + "ListLayout expects {} children, got {}", + expected_children, + children.len() + ); + layout.children = OwnedLayoutChildren::layout_children(children); + Ok(()) + } +} + +#[derive(Debug)] +pub struct ListLayoutEncoding; + +#[derive(Clone, Debug)] +pub struct ListLayout { + row_count: u64, + dtype: DType, + children: Arc, +} + +impl ListLayout { + pub fn new(row_count: u64, dtype: DType, children: Vec) -> Self { + Self { + row_count, + dtype, + children: OwnedLayoutChildren::layout_children(children), + } + } + + #[inline] + pub fn row_count(&self) -> u64 { + self.row_count + } + + #[inline] + pub fn dtype(&self) -> &DType { + &self.dtype + } + + #[inline] + pub fn children(&self) -> &Arc { + &self.children + } +} diff --git a/vortex-layout/src/layouts/list/reader.rs b/vortex-layout/src/layouts/list/reader.rs new file mode 100644 index 00000000000..c40aef682c5 --- /dev/null +++ b/vortex-layout/src/layouts/list/reader.rs @@ -0,0 +1,648 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::collections::BTreeSet; +use std::ops::BitAnd; +use std::ops::Range; +use std::sync::Arc; + +use futures::try_join; +use vortex_array::Array; +use vortex_array::IntoArray; +use vortex_array::MaskFuture; +use vortex_array::ToCanonical; +use vortex_array::VortexSessionExecute; +use vortex_array::arrays::FixedSizeListArray; +use vortex_array::arrays::ListArray; +use vortex_array::expr::Expression; +use vortex_array::expr::get_item; +use vortex_array::expr::root; +use vortex_dtype::DType; +use vortex_dtype::FieldMask; +use vortex_dtype::FieldName; +use vortex_dtype::Nullability; +use vortex_dtype::PType; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_err; +use vortex_mask::Mask; +use vortex_session::VortexSession; + +use crate::ArrayFuture; +use crate::LayoutReader; +use crate::LayoutReaderRef; +use crate::LazyReaderChildren; +use crate::layouts::list::ListLayout; +use crate::segments::SegmentSource; + +pub struct ListReader { + layout: ListLayout, + name: Arc, + lazy_children: LazyReaderChildren, + session: VortexSession, +} + +impl ListReader { + pub(super) fn try_new( + layout: ListLayout, + name: Arc, + segment_source: Arc, + session: VortexSession, + ) -> VortexResult { + let mut dtypes: Vec = Vec::new(); + let mut names: Vec> = Vec::new(); + + if layout.dtype().is_nullable() { + dtypes.push(DType::Bool(Nullability::NonNullable)); + names.push(Arc::from("validity")); + } + + match layout.dtype() { + DType::List(element_dtype, _) => { + dtypes.push(DType::Primitive(PType::U64, Nullability::NonNullable)); + names.push(Arc::from("offsets")); + dtypes.push((**element_dtype).clone()); + names.push(Arc::from("elements")); + } + DType::FixedSizeList(element_dtype, ..) => { + dtypes.push((**element_dtype).clone()); + names.push(Arc::from("elements")); + } + _ => vortex_bail!("Expected list dtype, got {}", layout.dtype()), + } + + let lazy_children = LazyReaderChildren::new( + layout.children().clone(), + dtypes, + names, + segment_source, + session.clone(), + ); + + Ok(Self { + layout, + name, + lazy_children, + session, + }) + } + + fn validity(&self) -> VortexResult> { + self.dtype() + .is_nullable() + .then(|| self.lazy_children.get(0)) + .transpose() + } + + fn offsets(&self) -> VortexResult<&LayoutReaderRef> { + let idx = if self.dtype().is_nullable() { 1 } else { 0 }; + self.lazy_children.get(idx) + } + + fn elements(&self) -> VortexResult<&LayoutReaderRef> { + let dtype = self.dtype(); + let idx = match dtype { + DType::List(..) => 1 + dtype.is_nullable() as usize, + DType::FixedSizeList(..) => dtype.is_nullable() as usize, + _ => return Err(vortex_err!("Expected list dtype, got {}", self.dtype())), + }; + self.lazy_children.get(idx) + } + + /// Creates a future that will produce a slice of this list array. + /// + /// The produced slice may have a projection applied to its elements. + fn list_slice_future( + &self, + row_range: Range, + element_expr: &Expression, + ) -> VortexResult { + let dtype = self.dtype().clone(); + let validity_fut = self + .validity()? + .map(|reader| { + let len = usize::try_from(row_range.end - row_range.start) + .vortex_expect("row range must fit in usize"); + reader.projection_evaluation(&row_range, &root(), MaskFuture::new_true(len)) + }) + .transpose()?; + + match dtype { + DType::List(_, list_nullability) => { + let offsets_reader = self.offsets()?.clone(); + let elements_reader = self.elements()?.clone(); + let row_range_clone = row_range.clone(); + let element_expr = element_expr.clone(); + + Ok(Box::pin(async move { + let row_len = usize::try_from(row_range_clone.end - row_range_clone.start) + .vortex_expect("row range must fit in usize"); + + let offsets_row_range = row_range_clone.start..row_range_clone.end + 1; + let offsets_len = row_len + 1; + let offsets_fut = offsets_reader.projection_evaluation( + &offsets_row_range, + &root(), + MaskFuture::new_true(offsets_len), + )?; + + let (offsets, validity) = try_join!(offsets_fut, async move { + match validity_fut { + Some(v) => v.await.map(Some), + None => Ok(None), + } + })?; + + let offsets = offsets.to_primitive(); + let offsets_slice = offsets.as_slice::(); + let base = *offsets_slice.first().unwrap_or(&0u64); + let end = *offsets_slice.last().unwrap_or(&base); + + let elements_row_range = base..end; + let elements_len = usize::try_from(end - base) + .vortex_expect("element range must fit in usize"); + let elements = elements_reader.projection_evaluation( + &elements_row_range, + &element_expr, + MaskFuture::new_true(elements_len), + )?; + + let elements = elements.await?; + + let normalized_offsets = vortex_array::arrays::PrimitiveArray::from_iter( + offsets_slice.iter().map(|v| *v - base), + ) + .into_array(); + + let validity = match (list_nullability, validity) { + (Nullability::NonNullable, _) => { + vortex_array::validity::Validity::NonNullable + } + (Nullability::Nullable, Some(v)) => { + vortex_array::validity::Validity::Array(v) + } + (Nullability::Nullable, None) => vortex_array::validity::Validity::AllValid, + }; + + Ok(ListArray::try_new(elements, normalized_offsets, validity)?.into_array()) + })) + } + DType::FixedSizeList(_, list_size, list_nullability) => { + let elements_reader = self.elements()?.clone(); + let row_range_clone = row_range.clone(); + let element_expr = element_expr.clone(); + + Ok(Box::pin(async move { + let row_len_u64 = row_range_clone.end - row_range_clone.start; + let row_len = + usize::try_from(row_len_u64).vortex_expect("row range must fit in usize"); + + let list_size_u64 = u64::from(list_size); + let element_start = row_range_clone + .start + .checked_mul(list_size_u64) + .ok_or_else(|| vortex_err!("FixedSizeList element start overflow"))?; + let element_end = row_range_clone + .end + .checked_mul(list_size_u64) + .ok_or_else(|| vortex_err!("FixedSizeList element end overflow"))?; + + let elements_row_range = element_start..element_end; + let elements_len = usize::try_from(element_end - element_start) + .vortex_expect("element range must fit in usize"); + let elements_fut = elements_reader.projection_evaluation( + &elements_row_range, + &element_expr, + MaskFuture::new_true(elements_len), + )?; + + let (elements, validity) = try_join!(elements_fut, async move { + match validity_fut { + Some(v) => v.await.map(Some), + None => Ok(None), + } + })?; + + let validity = match (list_nullability, validity) { + (Nullability::NonNullable, _) => { + vortex_array::validity::Validity::NonNullable + } + (Nullability::Nullable, Some(v)) => { + vortex_array::validity::Validity::Array(v) + } + (Nullability::Nullable, None) => vortex_array::validity::Validity::AllValid, + }; + + Ok( + FixedSizeListArray::try_new(elements, list_size, validity, row_len)? + .into_array(), + ) + })) + } + _ => Err(vortex_err!("Expected list dtype, got {}", dtype)), + } + } +} + +impl LayoutReader for ListReader { + fn name(&self) -> &Arc { + &self.name + } + + fn dtype(&self) -> &DType { + self.layout.dtype() + } + + fn row_count(&self) -> u64 { + self.layout.row_count() + } + + fn register_splits( + &self, + field_mask: &[FieldMask], + row_range: &Range, + splits: &mut BTreeSet, + ) -> VortexResult<()> { + splits.insert(row_range.end); + + match self.dtype() { + DType::FixedSizeList(_, list_size, _) => { + let list_size_u64 = u64::from(*list_size); + + let element_start = row_range + .start + .checked_mul(list_size_u64) + .ok_or_else(|| vortex_err!("FixedSizeList element start overflow"))?; + let element_end = row_range + .end + .checked_mul(list_size_u64) + .ok_or_else(|| vortex_err!("FixedSizeList element end overflow"))?; + + let element_range = element_start..element_end; + let mut element_splits = BTreeSet::new(); + self.elements()?.register_splits( + field_mask, + &element_range, + &mut element_splits, + )?; + + // Convert element splits back to row splits, but only when the element split + // is aligned to a row boundary. + for element_split in element_splits { + if element_split % list_size_u64 != 0 { + continue; + } + + let row_split = element_split / list_size_u64; + if row_split > row_range.start && row_split < row_range.end { + splits.insert(row_split); + } + } + } + DType::List(..) => { + // Variable-size lists are split based on the offsets child. + // + // NOTE: The offsets child has one extra entry (len = rows + 1), so a split at an + // offsets index `i` corresponds to a list row split at `i - 1`. + let offsets_end = row_range + .end + .checked_add(1) + .ok_or_else(|| vortex_err!("List offsets row_range end overflow"))?; + let offsets_row_range = row_range.start..offsets_end; + let mut offsets_splits = BTreeSet::new(); + self.offsets()?.register_splits( + field_mask, + &offsets_row_range, + &mut offsets_splits, + )?; + + for offsets_split in offsets_splits { + let row_split = offsets_split.saturating_sub(1); + if row_split > row_range.start && row_split < row_range.end { + splits.insert(row_split); + } + } + } + _ => {} + } + + Ok(()) + } + + fn pruning_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + mask: Mask, + ) -> VortexResult { + Ok(MaskFuture::ready(mask)) + } + + fn filter_evaluation( + &self, + row_range: &Range, + expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + let row_range = row_range.clone(); + let expr = expr.clone(); + let session = self.session.clone(); + + let list_fut = self.list_slice_future(row_range.clone(), &root())?; + + Ok(MaskFuture::new( + usize::try_from(row_range.end - row_range.start) + .vortex_expect("row range must fit in usize"), + async move { + let (array, mask) = try_join!(list_fut, mask)?; + if mask.all_false() { + return Ok(mask); + } + + let array = array.apply(&expr)?; + let mut ctx = session.create_execution_ctx(); + let array_mask = array.execute::(&mut ctx)?; + + Ok(mask.bitand(&array_mask)) + }, + )) + } + + fn projection_evaluation( + &self, + row_range: &Range, + expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + // If the expression is a simple element projection, we can push it down to the elements. + // + // NOTE: `vortex.get_item_list` is a temporary list-of-struct projection expression; + // when pushing down we construct the element projection and pass it into the elements reader. + let (is_pushdown, element_expr) = if expr.id().as_ref() == "vortex.get_item_list" + && expr.child(0).id().as_ref() == "vortex.root" + { + let field_name = expr + .options() + .as_any() + .downcast_ref::() + .vortex_expect("vortex.get_item_list options must be a FieldName"); + (true, get_item(field_name.clone(), root())) + } else if expr.id().as_ref() == "vortex.select" { + (true, expr.clone()) + } else { + (false, root()) + }; + + let row_range = row_range.clone(); + let expr = expr.clone(); + let list_fut = self.list_slice_future(row_range, &element_expr)?; + + Ok(Box::pin(async move { + let (mut array, mask) = try_join!(list_fut, mask)?; + + // Apply the selection mask before applying the expression, matching `FlatReader`. + if !mask.all_true() { + array = array.filter(mask)?; + } + + if is_pushdown { + Ok(array) + } else { + array.apply(&expr) + } + })) + } +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeSet; + use std::sync::Arc; + + use futures::stream; + use vortex_array::Array; + use vortex_array::ArrayContext; + use vortex_array::IntoArray; + use vortex_array::MaskFuture; + use vortex_array::ToCanonical; + use vortex_array::arrays::FixedSizeListArray; + use vortex_array::arrays::ListArray; + use vortex_array::arrays::StructArray; + use vortex_array::expr::get_item_list; + use vortex_array::expr::root; + use vortex_array::validity::Validity; + use vortex_buffer::buffer; + use vortex_dtype::FieldNames; + use vortex_dtype::Nullability::NonNullable; + use vortex_dtype::PType; + use vortex_io::runtime::single::block_on; + + use crate::LayoutStrategy; + use crate::layouts::chunked::writer::ChunkedLayoutStrategy; + use crate::layouts::flat::writer::FlatLayoutStrategy; + use crate::layouts::list::writer::ListStrategy; + use crate::segments::TestSegments; + use crate::sequence::SequenceId; + use crate::sequence::SequentialStreamAdapter; + use crate::sequence::SequentialStreamExt as _; + use crate::test::SESSION; + + #[test] + fn register_splits_fixed_size_list_maps_element_splits_to_rows() { + let ctx = ArrayContext::empty(); + + let segments = Arc::new(TestSegments::default()); + + let list_size: u32 = 2; + + let chunk1_elements = buffer![1i32, 2, 3, 4].into_array(); + let chunk1 = + FixedSizeListArray::try_new(chunk1_elements, list_size, Validity::NonNullable, 2) + .unwrap() + .into_array(); + + let chunk2_elements = buffer![5i32, 6, 7, 8].into_array(); + let chunk2 = + FixedSizeListArray::try_new(chunk2_elements, list_size, Validity::NonNullable, 2) + .unwrap() + .into_array(); + + let list_dtype = chunk1.dtype().clone(); + + let elements_strategy = Arc::new(ChunkedLayoutStrategy::new(FlatLayoutStrategy::default())); + let strategy = ListStrategy::new( + Arc::new(FlatLayoutStrategy::default()), + Arc::new(FlatLayoutStrategy::default()), + elements_strategy, + ); + + let (mut sequence_id, eof) = SequenceId::root().split(); + let layout = block_on(|handle| { + strategy.write_stream( + ctx, + segments.clone(), + SequentialStreamAdapter::new( + vortex_dtype::DType::FixedSizeList( + Arc::new(vortex_dtype::DType::Primitive(PType::I32, NonNullable)), + list_size, + NonNullable, + ), + stream::iter([ + Ok((sequence_id.advance(), chunk1)), + Ok((sequence_id.advance(), chunk2)), + ]), + ) + .sendable(), + eof, + handle, + ) + }) + .unwrap(); + + // Sanity check we produced the expected fixed-size list shape. + assert_eq!(layout.row_count(), 4); + assert_eq!(layout.dtype(), &list_dtype); + + // The elements child is chunked with a split at element index 4, which should map to row 2. + let reader = layout.new_reader("".into(), segments, &SESSION).unwrap(); + let mut splits = BTreeSet::new(); + reader + .register_splits(&[], &(0..layout.row_count()), &mut splits) + .unwrap(); + + assert!(splits.contains(&2), "splits = {splits:?}"); + assert!(splits.contains(&layout.row_count())); + } + + #[test] + fn register_splits_variable_size_list_maps_offset_splits_to_rows() { + let ctx = ArrayContext::empty(); + let segments = Arc::new(TestSegments::default()); + + let chunk1_elements = StructArray::new( + FieldNames::from(["a", "b"]), + vec![ + buffer![10i32, 11].into_array(), + buffer![110i32, 111].into_array(), + ], + 2, + Validity::NonNullable, + ) + .into_array(); + let chunk1_offsets = buffer![0u64, 2, 2].into_array(); + let chunk1 = ListArray::try_new(chunk1_elements, chunk1_offsets, Validity::NonNullable) + .unwrap() + .into_array(); + + let chunk2_elements = StructArray::new( + FieldNames::from(["a", "b"]), + vec![ + buffer![20i32, 21, 22, 23].into_array(), + buffer![120i32, 121, 122, 123].into_array(), + ], + 4, + Validity::NonNullable, + ) + .into_array(); + let chunk2_offsets = buffer![0u64, 1, 4].into_array(); + let chunk2 = ListArray::try_new(chunk2_elements, chunk2_offsets, Validity::NonNullable) + .unwrap() + .into_array(); + + let list_dtype = chunk1.dtype().clone(); + + let chunked_flat = Arc::new(ChunkedLayoutStrategy::new(FlatLayoutStrategy::default())); + let strategy = ListStrategy::new( + Arc::new(FlatLayoutStrategy::default()), + chunked_flat.clone(), + chunked_flat, + ); + + let (mut sequence_id, eof) = SequenceId::root().split(); + let layout = block_on(|handle| { + strategy.write_stream( + ctx, + segments.clone(), + SequentialStreamAdapter::new( + list_dtype.clone(), + stream::iter([ + Ok((sequence_id.advance(), chunk1)), + Ok((sequence_id.advance(), chunk2)), + ]), + ) + .sendable(), + eof, + handle, + ) + }) + .unwrap(); + + assert_eq!(layout.row_count(), 4); + assert_eq!(layout.dtype(), &list_dtype); + + let reader = layout.new_reader("".into(), segments, &SESSION).unwrap(); + let mut splits = BTreeSet::new(); + splits.insert(0); + reader + .register_splits(&[], &(0..layout.row_count()), &mut splits) + .unwrap(); + + // The offsets child is chunked at offset-index 3, which corresponds to row 2. + assert!(splits.contains(&2), "splits = {splits:?}"); + assert!(splits.contains(&layout.row_count())); + + // Read each split with a list-of-struct projection pushdown and ensure we match the unsplit + // read. + let expr = get_item_list("a", root()); + let row_count = + usize::try_from(layout.row_count()).expect("layout.row_count must fit in usize"); + let full = block_on(|_| async { + reader + .projection_evaluation( + &(0..layout.row_count()), + &expr, + MaskFuture::new_true(row_count), + )? + .await + }) + .unwrap(); + + for (start, end) in splits + .iter() + .copied() + .collect::>() + .windows(2) + .map(|w| { + let start = w[0]; + let end = w[1]; + (start, end) + }) + { + let range = start..end; + let len = usize::try_from(end - start).unwrap(); + let actual = block_on(|_| async { + reader + .projection_evaluation(&range, &expr, MaskFuture::new_true(len))? + .await + }) + .unwrap(); + + let start_usize = usize::try_from(start).unwrap(); + let end_usize = usize::try_from(end).unwrap(); + let expected = full.slice(start_usize..end_usize).unwrap(); + + let actual_list = actual.to_listview(); + let expected_list = expected.to_listview(); + assert_eq!(actual_list.len(), expected_list.len()); + for i in 0..actual_list.len() { + let actual_elems = actual_list.list_elements_at(i).unwrap().to_primitive(); + let expected_elems = expected_list.list_elements_at(i).unwrap().to_primitive(); + assert_eq!( + actual_elems.as_slice::(), + expected_elems.as_slice::() + ); + } + } + } +} diff --git a/vortex-layout/src/layouts/list/writer.rs b/vortex-layout/src/layouts/list/writer.rs new file mode 100644 index 00000000000..5a643966f3e --- /dev/null +++ b/vortex-layout/src/layouts/list/writer.rs @@ -0,0 +1,357 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::Arc; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; + +use async_trait::async_trait; +use futures::StreamExt; +use futures::future::try_join_all; +use futures::pin_mut; +use itertools::Itertools; +use vortex_array::Array; +use vortex_array::ArrayContext; +use vortex_array::IntoArray; +use vortex_array::ToCanonical; +use vortex_array::arrays::list_from_list_view; +use vortex_dtype::DType; +use vortex_dtype::Nullability; +use vortex_dtype::PType; +use vortex_error::VortexError; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_ensure; +use vortex_error::vortex_err; +use vortex_io::kanal_ext::KanalExt; +use vortex_io::runtime::Handle; + +use crate::IntoLayout as _; +use crate::LayoutRef; +use crate::LayoutStrategy; +use crate::layouts::list::ListLayout; +use crate::segments::SegmentSinkRef; +use crate::sequence::SendableSequentialStream; +use crate::sequence::SequencePointer; +use crate::sequence::SequentialStreamAdapter; +use crate::sequence::SequentialStreamExt; + +trait ToU64 { + fn to_u64(self) -> u64; +} + +impl ToU64 for u8 { + fn to_u64(self) -> u64 { + u64::from(self) + } +} + +impl ToU64 for u16 { + fn to_u64(self) -> u64 { + u64::from(self) + } +} + +impl ToU64 for u32 { + fn to_u64(self) -> u64 { + u64::from(self) + } +} + +impl ToU64 for u64 { + fn to_u64(self) -> u64 { + self + } +} + +fn offsets_to_u64(offsets: &vortex_array::arrays::PrimitiveArray) -> VortexResult> { + match offsets.ptype() { + ptype if ptype.is_unsigned_int() => { + vortex_dtype::match_each_unsigned_integer_ptype!(ptype, |T| { + Ok(offsets + .as_slice::() + .iter() + .map(|&v| v.to_u64()) + .collect()) + }) + } + ptype if ptype.is_signed_int() => { + vortex_dtype::match_each_signed_integer_ptype!(ptype, |T| { + offsets + .as_slice::() + .iter() + .map(|&v| { + u64::try_from(v) + .map_err(|_| vortex_err!("List offsets must be convertible to u64")) + }) + .collect() + }) + } + other => Err(vortex_err!( + "List offsets must be an integer type, got {other}" + )), + } +} + +/// A write strategy that performs component shredding for list types. +/// +/// - Variable-size lists are written as: +/// - optional validity (is_valid: bool) +/// - offsets (u64, length = rows + 1) +/// - elements (concatenated) +/// - Fixed-size lists are written as: +/// - optional validity (is_valid: bool) +/// - elements (concatenated) +#[derive(Clone)] +pub struct ListStrategy { + validity: Arc, + offsets: Arc, + elements: Arc, +} + +impl ListStrategy { + pub fn new( + validity: Arc, + offsets: Arc, + elements: Arc, + ) -> Self { + Self { + validity, + offsets, + elements, + } + } +} + +#[async_trait] +impl LayoutStrategy for ListStrategy { + async fn write_stream( + &self, + ctx: ArrayContext, + segment_sink: SegmentSinkRef, + stream: SendableSequentialStream, + mut eof: SequencePointer, + handle: Handle, + ) -> VortexResult { + let dtype = stream.dtype().clone(); + + let is_nullable = dtype.is_nullable(); + let offsets_dtype = DType::Primitive(PType::U64, Nullability::NonNullable); + + let (stream_count, column_dtypes): (usize, Vec) = match &dtype { + DType::List(element_dtype, _) => { + let mut dtypes = Vec::new(); + if is_nullable { + dtypes.push(DType::Bool(Nullability::NonNullable)); + } + dtypes.push(offsets_dtype.clone()); + dtypes.push((**element_dtype).clone()); + (dtypes.len(), dtypes) + } + DType::FixedSizeList(element_dtype, ..) => { + let mut dtypes = Vec::new(); + if is_nullable { + dtypes.push(DType::Bool(Nullability::NonNullable)); + } + dtypes.push((**element_dtype).clone()); + (dtypes.len(), dtypes) + } + _ => { + vortex_bail!("ListStrategy expected list dtype, got {}", dtype); + } + }; + + let (column_streams_tx, column_streams_rx): (Vec<_>, Vec<_>) = + (0..stream_count).map(|_| kanal::bounded_async(1)).unzip(); + + let total_rows = Arc::new(AtomicU64::new(0)); + + // Spawn a task to fan out chunk components to their respective transposed streams. + { + let total_rows = total_rows.clone(); + let dtype = dtype.clone(); + handle + .spawn(async move { + let mut base_elements: u64 = 0; + let mut first_offsets = true; + + pin_mut!(stream); + while let Some(result) = stream.next().await { + match result { + Ok((sequence_id, chunk)) => { + total_rows.fetch_add(chunk.len() as u64, Ordering::SeqCst); + + let mut sequence_pointer = sequence_id.descend(); + + // validity (optional) + if is_nullable { + let validity = match chunk.validity_mask() { + Ok(validity) => validity.into_array(), + Err(e) => { + let e: Arc = Arc::new(e); + for tx in column_streams_tx.iter() { + let _ = tx + .send(Err(VortexError::from(e.clone()))) + .await; + } + break; + } + }; + let _ = column_streams_tx[0] + .send(Ok((sequence_pointer.advance(), validity))) + .await; + } + + match &dtype { + DType::List(..) => { + let list_view = chunk.to_listview(); + let list = match list_from_list_view(list_view) { + Ok(list) => list, + Err(e) => { + let e: Arc = Arc::new(e); + for tx in column_streams_tx.iter() { + let _ = tx + .send(Err(VortexError::from(e.clone()))) + .await; + } + break; + } + }; + + // Build global u64 offsets, dropping the leading 0 for all but the first chunk. + let offsets = list.offsets().to_primitive(); + let offsets_slice_u64 = match offsets_to_u64(&offsets) { + Ok(v) => v, + Err(e) => { + let e: Arc = Arc::new(e); + for tx in column_streams_tx.iter() { + let _ = tx + .send(Err(VortexError::from(e.clone()))) + .await; + } + break; + } + }; + + let mut adjusted: Vec = Vec::with_capacity( + offsets_slice_u64 + .len() + .saturating_sub((!first_offsets) as usize), + ); + for (i, v) in offsets_slice_u64.into_iter().enumerate() { + if !first_offsets && i == 0 { + continue; + } + adjusted.push(v + base_elements); + } + + let offsets_arr = + vortex_array::arrays::PrimitiveArray::from_iter( + adjusted, + ) + .into_array(); + + // offsets index depends on nullable validity child + let offsets_idx = if is_nullable { 1 } else { 0 }; + let elements_idx = offsets_idx + 1; + + let _ = column_streams_tx[offsets_idx] + .send(Ok((sequence_pointer.advance(), offsets_arr))) + .await; + let _ = column_streams_tx[elements_idx] + .send(Ok(( + sequence_pointer.advance(), + list.elements().clone(), + ))) + .await; + + base_elements += list.elements().len() as u64; + first_offsets = false; + } + DType::FixedSizeList(..) => { + let list = chunk.to_fixed_size_list(); + + let elements_idx = if is_nullable { 1 } else { 0 }; + let _ = column_streams_tx[elements_idx] + .send(Ok(( + sequence_pointer.advance(), + list.elements().clone(), + ))) + .await; + } + _ => unreachable!(), + } + } + Err(e) => { + let e: Arc = Arc::new(e); + for tx in column_streams_tx.iter() { + let _ = tx.send(Err(VortexError::from(e.clone()))).await; + } + break; + } + } + } + }) + .detach(); + } + + let layout_futures: Vec<_> = column_dtypes + .into_iter() + .zip_eq(column_streams_rx) + .enumerate() + .map(|(index, (dtype, recv))| { + let column_stream = + SequentialStreamAdapter::new(dtype.clone(), recv.into_stream().boxed()) + .sendable(); + let child_eof = eof.split_off(); + handle.spawn_nested(|h| { + let validity = self.validity.clone(); + let offsets = self.offsets.clone(); + let elements = self.elements.clone(); + let ctx = ctx.clone(); + let segment_sink = segment_sink.clone(); + async move { + if is_nullable && index == 0 { + validity + .write_stream(ctx, segment_sink, column_stream, child_eof, h) + .await + } else if matches!(dtype, DType::Primitive(PType::U64, _)) { + offsets + .write_stream(ctx, segment_sink, column_stream, child_eof, h) + .await + } else { + elements + .write_stream(ctx, segment_sink, column_stream, child_eof, h) + .await + } + } + }) + }) + .collect(); + + let children = try_join_all(layout_futures).await?; + + let row_count = total_rows.load(Ordering::SeqCst); + + // Basic invariant: for variable-size lists, offsets must have row_count + 1 entries. + if matches!(dtype, DType::List(..)) { + let offsets_layout = if is_nullable { + &children[1] + } else { + &children[0] + }; + vortex_ensure!( + offsets_layout.row_count() == row_count + 1, + "ListLayout offsets row_count {} does not match list row_count + 1 ({})", + offsets_layout.row_count(), + row_count + 1 + ); + } + + Ok(ListLayout::new(row_count, dtype, children).into_layout()) + } + + fn buffered_bytes(&self) -> u64 { + self.elements.buffered_bytes() + } +} diff --git a/vortex-layout/src/layouts/mod.rs b/vortex-layout/src/layouts/mod.rs index 61193c54b6e..e33d33cf7e8 100644 --- a/vortex-layout/src/layouts/mod.rs +++ b/vortex-layout/src/layouts/mod.rs @@ -17,6 +17,7 @@ pub mod compressed; pub mod dict; pub mod file_stats; pub mod flat; +pub mod list; pub(crate) mod partitioned; pub mod repartition; pub mod row_idx; diff --git a/vortex-layout/src/layouts/table.rs b/vortex-layout/src/layouts/table.rs index 13ad7f8f51d..b6b4fe7c60b 100644 --- a/vortex-layout/src/layouts/table.rs +++ b/vortex-layout/src/layouts/table.rs @@ -34,6 +34,7 @@ use crate::IntoLayout; use crate::LayoutRef; use crate::LayoutStrategy; use crate::layouts::flat::writer::FlatLayoutStrategy; +use crate::layouts::list::writer::ListStrategy; use crate::layouts::struct_::StructLayout; use crate::segments::SegmentSinkRef; use crate::sequence::SendableSequentialStream; @@ -97,8 +98,7 @@ impl TableStrategy { /// /// ```no_run /// # use std::sync::Arc; - /// # use vortex_dtype::{field_path, Field, FieldPath}; - /// # use vortex_layout::layouts::compact::CompactCompressor; + /// # use vortex_dtype::field_path; /// # use vortex_layout::layouts::compressed::CompressingStrategy; /// # use vortex_layout::layouts::flat::writer::FlatLayoutStrategy; /// # use vortex_layout::layouts::table::TableStrategy; @@ -106,19 +106,15 @@ impl TableStrategy { /// // A strategy for compressing data using the balanced BtrBlocks compressor. /// let compress_btrblocks = CompressingStrategy::new_btrblocks(FlatLayoutStrategy::default(), true); /// - /// // A strategy that compresses data using ZSTD - /// let compress_compact = CompressingStrategy::new_compact(FlatLayoutStrategy::default(), CompactCompressor::default()); - /// /// // Our combined strategy uses no compression for validity buffers, BtrBlocks compression - /// // for most columns, and will use ZSTD compression for a nested binary column that we know - /// // is never filtered in. + /// // for most columns, and will leave a nested binary column uncompressed. /// let strategy = TableStrategy::new( /// Arc::new(FlatLayoutStrategy::default()), /// Arc::new(compress_btrblocks), /// ) /// .with_field_writer( /// field_path!(request.body.bytes), - /// Arc::new(compress_compact), + /// Arc::new(FlatLayoutStrategy::default()), /// ); /// ``` pub fn with_field_writer( @@ -336,6 +332,15 @@ impl LayoutStrategy for TableStrategy { if dtype.is_struct() { // Step into the field path for struct columns Arc::new(self.descend(&field)) + } else if matches!(dtype, DType::List(..) | DType::FixedSizeList(..)) { + // Component shredding for lists: descend into the element type. + let elements = + Arc::new(self.descend(&field).descend(&Field::ElementType)); + Arc::new(ListStrategy::new( + self.validity.clone(), + self.fallback.clone(), + elements, + )) } else { // Use fallback for leaf columns self.fallback.clone() diff --git a/vortex-layout/src/session.rs b/vortex-layout/src/session.rs index 131115950e4..f7cfcefcc88 100644 --- a/vortex-layout/src/session.rs +++ b/vortex-layout/src/session.rs @@ -9,6 +9,7 @@ use crate::LayoutEncodingRef; use crate::layouts::chunked::ChunkedLayoutEncoding; use crate::layouts::dict::DictLayoutEncoding; use crate::layouts::flat::FlatLayoutEncoding; +use crate::layouts::list::ListLayoutEncoding; use crate::layouts::struct_::StructLayoutEncoding; use crate::layouts::zoned::ZonedLayoutEncoding; @@ -46,6 +47,7 @@ impl Default for LayoutSession { // Register the built-in layout encodings. layouts.register(ChunkedLayoutEncoding.id(), ChunkedLayoutEncoding.as_ref()); layouts.register(FlatLayoutEncoding.id(), FlatLayoutEncoding.as_ref()); + layouts.register(ListLayoutEncoding.id(), ListLayoutEncoding.as_ref()); layouts.register(StructLayoutEncoding.id(), StructLayoutEncoding.as_ref()); layouts.register(ZonedLayoutEncoding.id(), ZonedLayoutEncoding.as_ref()); layouts.register(DictLayoutEncoding.id(), DictLayoutEncoding.as_ref()); diff --git a/vortex-scan/src/gpu/gpubuilder.rs b/vortex-scan/src/gpu/gpubuilder.rs index 58859bfe036..6a8281ac479 100644 --- a/vortex-scan/src/gpu/gpubuilder.rs +++ b/vortex-scan/src/gpu/gpubuilder.rs @@ -64,7 +64,8 @@ impl GpuScanBuilder { /// The [`DType`] returned by the scan, after applying the projection. pub fn dtype(&self) -> VortexResult { - self.projection.return_dtype(self.layout_reader.dtype()) + let projection = simplify_typed(self.projection.clone(), self.layout_reader.dtype())?; + projection.return_dtype(self.layout_reader.dtype()) } /// Map each split of the scan. The function will be run on the spawned task. @@ -82,7 +83,6 @@ impl GpuScanBuilder { } pub fn prepare(self) -> VortexResult> { - let dtype = self.dtype()?; let handle = self.session.handle(); // Spin up the root layout reader, and wrap it in a FilterLayoutReader to perform @@ -91,6 +91,7 @@ impl GpuScanBuilder { // Normalize and simplify the expressions. let projection = simplify_typed(self.projection, layout_reader.dtype())?; + let dtype = projection.return_dtype(layout_reader.dtype())?; // Construct field masks and compute the row splits of the scan. let (filter_mask, projection_mask) = diff --git a/vortex-scan/src/scan_builder.rs b/vortex-scan/src/scan_builder.rs index a8b562d134b..c36fdd6d44e 100644 --- a/vortex-scan/src/scan_builder.rs +++ b/vortex-scan/src/scan_builder.rs @@ -187,7 +187,19 @@ impl ScanBuilder { /// The [`DType`] returned by the scan, after applying the projection. pub fn dtype(&self) -> VortexResult { - self.projection.return_dtype(self.layout_reader.dtype()) + // NOTE: `GetItem` may simplify into `GetItemList` for list-of-struct projections. + // To avoid rejecting valid nested projections (like `items.a`) we must simplify before + // validating the return dtype. + // + // Also, `row_idx` support is provided by `RowIdxLayoutReader`, so use the same reader + // enrichment as `prepare`. + let layout_reader = Arc::new(RowIdxLayoutReader::new( + self.row_offset, + self.layout_reader.clone(), + self.session.clone(), + )); + let projection = self.projection.optimize_recursive(layout_reader.dtype())?; + projection.return_dtype(layout_reader.dtype()) } /// The session used by the scan. @@ -220,8 +232,6 @@ impl ScanBuilder { } pub fn prepare(self) -> VortexResult> { - let dtype = self.dtype()?; - if self.filter.is_some() && self.limit.is_some() { vortex_bail!("Vortex doesn't support scans with both a filter and a limit") } @@ -241,6 +251,7 @@ impl ScanBuilder { // Normalize and simplify the expressions. let projection = self.projection.optimize_recursive(layout_reader.dtype())?; + let dtype = projection.return_dtype(layout_reader.dtype())?; let filter = self .filter @@ -451,10 +462,14 @@ mod test { use vortex_array::ToCanonical; use vortex_array::arrays::PrimitiveArray; use vortex_array::expr::Expression; + use vortex_array::expr::get_item; + use vortex_array::expr::root; use vortex_dtype::DType; use vortex_dtype::FieldMask; + use vortex_dtype::FieldNames; use vortex_dtype::Nullability; use vortex_dtype::PType; + use vortex_dtype::StructFields; use vortex_error::VortexResult; use vortex_error::vortex_err; use vortex_io::runtime::BlockingRuntime; @@ -465,6 +480,76 @@ mod test { use super::ScanBuilder; + #[derive(Debug)] + struct DTypeOnlyLayoutReader { + name: Arc, + dtype: DType, + row_count: u64, + } + + impl DTypeOnlyLayoutReader { + fn new(dtype: DType) -> Self { + Self { + name: Arc::from("dtype-only"), + dtype, + row_count: 1, + } + } + } + + impl LayoutReader for DTypeOnlyLayoutReader { + fn name(&self) -> &Arc { + &self.name + } + + fn dtype(&self) -> &DType { + &self.dtype + } + + fn row_count(&self) -> u64 { + self.row_count + } + + fn register_splits( + &self, + _field_mask: &[FieldMask], + row_range: &Range, + splits: &mut BTreeSet, + ) -> VortexResult<()> { + splits.insert(row_range.end); + Ok(()) + } + + fn pruning_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + _mask: Mask, + ) -> VortexResult { + unimplemented!("not needed for this test"); + } + + fn filter_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + _mask: MaskFuture, + ) -> VortexResult { + unimplemented!("not needed for this test"); + } + + fn projection_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + _mask: MaskFuture, + ) -> VortexResult { + Ok(Box::pin(async move { + unreachable!("scan should not be polled in this test") + })) + } + } + #[derive(Debug)] struct CountingLayoutReader { name: Arc, @@ -538,6 +623,47 @@ mod test { } } + #[test] + fn dtype_simplifies_list_of_struct_nested_projection() -> VortexResult<()> { + let element_dtype = DType::Struct( + StructFields::new( + FieldNames::from(["a", "b"]), + vec![ + DType::Primitive(PType::I32, Nullability::NonNullable), + DType::Utf8(Nullability::NonNullable), + ], + ), + Nullability::NonNullable, + ); + let dtype = DType::Struct( + StructFields::new( + FieldNames::from(["items"]), + vec![DType::List( + Arc::new(element_dtype), + Nullability::NonNullable, + )], + ), + Nullability::NonNullable, + ); + + let reader = Arc::new(DTypeOnlyLayoutReader::new(dtype)); + let session = crate::test::SCAN_SESSION.clone(); + + // users express `items.a` as `get_item("a", get_item("items", root()))`. + // the outer get_item must be simplified (typed) into a list-aware projection before `dtype` + // validation, otherwise it fails with "Couldn't find the a field in the input scope". + let projection = get_item("a", get_item("items", root())); + let builder = ScanBuilder::new(session, reader).with_projection(projection); + + let actual = builder.dtype()?; + let expected = DType::List( + Arc::new(DType::Primitive(PType::I32, Nullability::NonNullable)), + Nullability::NonNullable, + ); + assert_eq!(actual, expected); + Ok(()) + } + #[test] fn into_stream_is_lazy() { let calls = Arc::new(AtomicUsize::new(0));