From cc6afa511aaf42f0df9ad4536b23982c322740ee Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 28 Jan 2026 14:09:16 -0500 Subject: [PATCH 1/4] Decimal/VarBinView filter Signed-off-by: Andrew Duffy --- Cargo.lock | 1 + encodings/sparse/src/canonical.rs | 7 +- fuzz/src/array/fill_null.rs | 8 +- fuzz/src/array/mask.rs | 4 +- vortex-array/src/arrays/filter/array.rs | 16 ++ vortex-array/src/arrays/filter/mod.rs | 1 + vortex-array/src/arrays/masked/execute.rs | 4 +- vortex-array/src/arrays/varbinview/array.rs | 100 ++++++- .../src/arrays/varbinview/compute/cast.rs | 4 +- .../src/arrays/varbinview/compute/mask.rs | 4 +- .../src/arrays/varbinview/compute/take.rs | 9 +- .../src/arrays/varbinview/compute/zip.rs | 6 +- .../src/arrays/varbinview/vtable/array.rs | 2 +- .../src/arrays/varbinview/vtable/mod.rs | 8 +- .../src/arrays/varbinview/vtable/visitor.rs | 11 +- vortex-array/src/arrow/executor/byte_view.rs | 8 +- vortex-array/src/builders/varbinview.rs | 31 ++- vortex-array/src/canonical_to_vector.rs | 11 +- vortex-cuda/cub/Cargo.toml | 1 + vortex-cuda/cub/build.rs | 2 + vortex-cuda/cub/kernels/filter.cu | 12 + vortex-cuda/cub/kernels/filter.h | 10 +- vortex-cuda/cub/src/filter.rs | 8 +- vortex-cuda/nvcomp/.gitignore | 1 + vortex-cuda/src/canonical.rs | 33 +++ vortex-cuda/src/kernel/filter.rs | 263 ------------------ vortex-cuda/src/kernel/filter/decimal.rs | 142 ++++++++++ vortex-cuda/src/kernel/filter/mod.rs | 164 +++++++++++ vortex-cuda/src/kernel/filter/primitive.rs | 136 +++++++++ vortex-cuda/src/kernel/filter/varbinview.rs | 89 ++++++ vortex-duckdb/src/exporter/varbinview.rs | 13 +- 31 files changed, 778 insertions(+), 331 deletions(-) create mode 100644 vortex-cuda/nvcomp/.gitignore delete mode 100644 vortex-cuda/src/kernel/filter.rs create mode 100644 vortex-cuda/src/kernel/filter/decimal.rs create mode 100644 vortex-cuda/src/kernel/filter/mod.rs create mode 100644 vortex-cuda/src/kernel/filter/primitive.rs create mode 100644 vortex-cuda/src/kernel/filter/varbinview.rs diff --git a/Cargo.lock b/Cargo.lock index eec44390371..811a8a8a0db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10339,6 +10339,7 @@ dependencies = [ "libloading 0.8.9", "paste", "vortex-cuda-macros", + "vortex-dtype", ] [[package]] diff --git a/encodings/sparse/src/canonical.rs b/encodings/sparse/src/canonical.rs index 2c716d7714a..665d7b94872 100644 --- a/encodings/sparse/src/canonical.rs +++ b/encodings/sparse/src/canonical.rs @@ -17,6 +17,7 @@ use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::StructArray; use vortex_array::arrays::VarBinViewArray; use vortex_array::arrays::build_views::BinaryView; +use vortex_array::buffer::BufferHandle; use vortex_array::builders::ArrayBuilder; use vortex_array::builders::DecimalBuilder; use vortex_array::builders::ListViewBuilder; @@ -480,7 +481,7 @@ fn canonicalize_varbin_inner( let mut buffers = values.buffers().to_vec(); let fill = if let Some(buffer) = &fill_value { - buffers.push(buffer.clone()); + buffers.push(BufferHandle::new_host(buffer.clone())); BinaryView::make_view( buffer.as_ref(), u32::try_from(n_patch_buffers).vortex_expect("too many buffers"), @@ -498,9 +499,11 @@ fn canonicalize_varbin_inner( views[patch_index_usize] = patch; } + let views = BufferHandle::new_host(views.freeze().into_byte_buffer()); + // SAFETY: views are constructed to maintain the invariants let array = unsafe { - VarBinViewArray::new_unchecked(views.freeze(), Arc::from(buffers), dtype, validity) + VarBinViewArray::new_handle_unchecked(views, Arc::from(buffers), dtype, validity) }; Canonical::VarBinView(array) diff --git a/fuzz/src/array/fill_null.rs b/fuzz/src/array/fill_null.rs index 8be717336a2..29c90c61c32 100644 --- a/fuzz/src/array/fill_null.rs +++ b/fuzz/src/array/fill_null.rs @@ -196,8 +196,8 @@ fn fill_varbinview_array( let string_refs: Vec<&str> = strings.iter().map(|s| s.as_str()).collect(); let result = VarBinViewArray::from_iter_str(string_refs).into_array(); if result_nullability == Nullability::Nullable { - VarBinViewArray::new( - result.to_varbinview().views().clone(), + VarBinViewArray::new_handle( + result.to_varbinview().views_handle().clone(), result.to_varbinview().buffers().clone(), result.dtype().as_nullable(), result_nullability.into(), @@ -230,8 +230,8 @@ fn fill_varbinview_array( let binary_refs: Vec<&[u8]> = binaries.iter().map(|b| b.as_slice()).collect(); let result = VarBinViewArray::from_iter_bin(binary_refs).into_array(); if result_nullability == Nullability::Nullable { - VarBinViewArray::new( - result.to_varbinview().views().clone(), + VarBinViewArray::new_handle( + result.to_varbinview().views_handle().clone(), result.to_varbinview().buffers().clone(), result.dtype().as_nullable(), result_nullability.into(), diff --git a/fuzz/src/array/mask.rs b/fuzz/src/array/mask.rs index 805a9b6f1c6..3ccb94dbb46 100644 --- a/fuzz/src/array/mask.rs +++ b/fuzz/src/array/mask.rs @@ -52,8 +52,8 @@ pub fn mask_canonical_array(canonical: Canonical, mask: &Mask) -> VortexResult { let new_validity = array.validity().mask(mask); - VarBinViewArray::new( - array.views().clone(), + VarBinViewArray::new_handle( + array.views_handle().clone(), array.buffers().clone(), array.dtype().with_nullability(new_validity.nullability()), new_validity, diff --git a/vortex-array/src/arrays/filter/array.rs b/vortex-array/src/arrays/filter/array.rs index df3754dc58c..40857487a65 100644 --- a/vortex-array/src/arrays/filter/array.rs +++ b/vortex-array/src/arrays/filter/array.rs @@ -9,6 +9,14 @@ use vortex_mask::Mask; use crate::ArrayRef; use crate::stats::ArrayStats; +/// Decomposed parts of the filter array. +pub struct FilterArrayParts { + /// Child array that is filtered by the mask + pub child: ArrayRef, + /// Mask to apply at filter time. Child elements with set indices are kept, the rest discarded. + pub mask: Mask, +} + // TODO(connor): Write docs on why we have this, and what we had in the old world so that the future // does not repeat the mistakes of the past. /// A lazy array that represents filtering a child array by a boolean [`Mask`]. @@ -56,4 +64,12 @@ impl FilterArray { pub fn filter_mask(&self) -> &Mask { &self.mask } + + /// Consume the array and return its individual components. + pub fn into_parts(self) -> FilterArrayParts { + FilterArrayParts { + child: self.child, + mask: self.mask, + } + } } diff --git a/vortex-array/src/arrays/filter/mod.rs b/vortex-array/src/arrays/filter/mod.rs index 1c72e20773b..3b28df57b44 100644 --- a/vortex-array/src/arrays/filter/mod.rs +++ b/vortex-array/src/arrays/filter/mod.rs @@ -3,6 +3,7 @@ mod array; pub use array::FilterArray; +pub use array::FilterArrayParts; mod execute; diff --git a/vortex-array/src/arrays/masked/execute.rs b/vortex-array/src/arrays/masked/execute.rs index 1ab1fe184ad..1dd9523ed4e 100644 --- a/vortex-array/src/arrays/masked/execute.rs +++ b/vortex-array/src/arrays/masked/execute.rs @@ -103,8 +103,8 @@ fn mask_validity_varbinview(array: VarBinViewArray, mask: &Mask) -> VarBinViewAr let new_validity = combine_validity(array.validity(), mask, len); // SAFETY: We're only changing validity, not the data structure unsafe { - VarBinViewArray::new_unchecked( - array.views().clone(), + VarBinViewArray::new_handle_unchecked( + array.views_handle().clone(), array.buffers().clone(), dtype, new_validity, diff --git a/vortex-array/src/arrays/varbinview/array.rs b/vortex-array/src/arrays/varbinview/array.rs index 63c8636a167..f30bf0ced19 100644 --- a/vortex-array/src/arrays/varbinview/array.rs +++ b/vortex-array/src/arrays/varbinview/array.rs @@ -3,6 +3,7 @@ use std::sync::Arc; +use vortex_buffer::Alignment; use vortex_buffer::Buffer; use vortex_buffer::ByteBuffer; use vortex_dtype::DType; @@ -15,6 +16,7 @@ use vortex_error::vortex_err; use vortex_error::vortex_panic; use vortex_vector::binaryview::BinaryView; +use crate::buffer::BufferHandle; use crate::builders::ArrayBuilder; use crate::builders::VarBinViewBuilder; use crate::stats::ArrayStats; @@ -82,16 +84,16 @@ use crate::validity::Validity; #[derive(Clone, Debug)] pub struct VarBinViewArray { pub(super) dtype: DType, - pub(super) buffers: Arc<[ByteBuffer]>, - pub(super) views: Buffer, + pub(super) buffers: Arc<[BufferHandle]>, + pub(super) views: BufferHandle, pub(super) validity: Validity, pub(super) stats_set: ArrayStats, } pub struct VarBinViewArrayParts { pub dtype: DType, - pub buffers: Arc<[ByteBuffer]>, - pub views: Buffer, + pub buffers: Arc<[BufferHandle]>, + pub views: BufferHandle, pub validity: Validity, } @@ -112,6 +114,22 @@ impl VarBinViewArray { .vortex_expect("VarBinViewArray construction failed") } + /// Creates a new [`VarBinViewArray`] with device or host memory. + /// + /// # Panics + /// + /// Panics if the provided components do not satisfy the invariants documented + /// in [`VarBinViewArray::new_unchecked`]. + pub fn new_handle( + views: BufferHandle, + buffers: Arc<[BufferHandle]>, + dtype: DType, + validity: Validity, + ) -> Self { + Self::try_new_handle(views, buffers, dtype, validity) + .vortex_expect("VarbinViewArray construction failed") + } + /// Constructs a new `VarBinViewArray`. /// /// See [`VarBinViewArray::new_unchecked`] for more information. @@ -132,6 +150,32 @@ impl VarBinViewArray { Ok(unsafe { Self::new_unchecked(views, buffers, dtype, validity) }) } + /// Constructs a new `VarBinViewArray`. + /// + /// See [`VarBinViewArray::new_unchecked`] for more information. + /// + /// # Errors + /// + /// Returns an error if the provided components do not satisfy the invariants documented in + /// [`VarBinViewArray::new_unchecked`]. + pub fn try_new_handle( + views: BufferHandle, + buffers: Arc<[BufferHandle]>, + dtype: DType, + validity: Validity, + ) -> VortexResult { + // TODO(aduffy): device validation. + if let Some(host) = views.as_host_opt() { + vortex_ensure!( + host.is_aligned(Alignment::of::()), + "Views on host must be 16 byte aligned" + ); + } + + // SAFETY: validate ensures all invariants are met. + Ok(unsafe { Self::new_handle_unchecked(views, buffers, dtype, validity) }) + } + /// Creates a new [`VarBinViewArray`] without validation from these components: /// /// * `views` is a buffer of 16-byte view entries (one per logical element). @@ -171,10 +215,38 @@ impl VarBinViewArray { Self::validate(&views, &buffers, &dtype, &validity) .vortex_expect("[Debug Assertion]: Invalid `VarBinViewArray` parameters"); + let handles: Vec = buffers + .iter() + .cloned() + .map(BufferHandle::new_host) + .collect(); + + let handles = Arc::from(handles); + Self { dtype, - buffers, + buffers: handles, + views: BufferHandle::new_host(views.into_byte_buffer()), + validity, + stats_set: Default::default(), + } + } + + /// Construct a new array from `BufferHandle`s without validation. + /// + /// # Safety + /// + /// See documentation in `new_unchecked`. + pub unsafe fn new_handle_unchecked( + views: BufferHandle, + buffers: Arc<[BufferHandle]>, + dtype: DType, + validity: Validity, + ) -> Self { + Self { views, + buffers, + dtype, validity, stats_set: Default::default(), } @@ -290,7 +362,16 @@ impl VarBinViewArray { /// contain either a pointer into one of the array's owned `buffer`s OR an inlined copy of /// the string (if the string has 12 bytes or fewer). #[inline] - pub fn views(&self) -> &Buffer { + pub fn views(&self) -> &[BinaryView] { + let host_views = self.views.as_host(); + let len = host_views.len() / size_of::(); + + // SAFETY: data alignment is checked for host buffers on construction + unsafe { std::slice::from_raw_parts(host_views.as_ptr().cast(), len) } + } + + /// Return the buffer handle backing the views. + pub fn views_handle(&self) -> &BufferHandle { &self.views } @@ -308,7 +389,8 @@ impl VarBinViewArray { .slice(view_ref.as_range()) } else { // Return access to the range of bytes around it. - views + self.views_handle() + .as_host() .clone() .into_byte_buffer() .slice_ref(view.as_inlined().value()) @@ -329,12 +411,12 @@ impl VarBinViewArray { self.nbuffers() ); } - &self.buffers[idx] + self.buffers[idx].as_host() } /// Iterate over the underlying raw data buffers, not including the views buffer. #[inline] - pub fn buffers(&self) -> &Arc<[ByteBuffer]> { + pub fn buffers(&self) -> &Arc<[BufferHandle]> { &self.buffers } diff --git a/vortex-array/src/arrays/varbinview/compute/cast.rs b/vortex-array/src/arrays/varbinview/compute/cast.rs index 19908e190bb..f04ad29851b 100644 --- a/vortex-array/src/arrays/varbinview/compute/cast.rs +++ b/vortex-array/src/arrays/varbinview/compute/cast.rs @@ -29,8 +29,8 @@ impl CastKernel for VarBinViewVTable { // SAFETY: casting just changes the DType, does not affect invariants on views/buffers. unsafe { Ok(Some( - VarBinViewArray::new_unchecked( - array.views().clone(), + VarBinViewArray::new_handle_unchecked( + array.views_handle().clone(), array.buffers().clone(), new_dtype, new_validity, diff --git a/vortex-array/src/arrays/varbinview/compute/mask.rs b/vortex-array/src/arrays/varbinview/compute/mask.rs index 884419c7578..dd32dac5c8b 100644 --- a/vortex-array/src/arrays/varbinview/compute/mask.rs +++ b/vortex-array/src/arrays/varbinview/compute/mask.rs @@ -17,8 +17,8 @@ impl MaskKernel for VarBinViewVTable { fn mask(&self, array: &VarBinViewArray, mask: &Mask) -> VortexResult { // SAFETY: masking the validity does not affect the invariants unsafe { - Ok(VarBinViewArray::new_unchecked( - array.views().clone(), + Ok(VarBinViewArray::new_handle_unchecked( + array.views_handle().clone(), array.buffers().clone(), array.dtype().as_nullable(), array.validity().mask(mask), diff --git a/vortex-array/src/arrays/varbinview/compute/take.rs b/vortex-array/src/arrays/varbinview/compute/take.rs index 90475e6698b..b937c904438 100644 --- a/vortex-array/src/arrays/varbinview/compute/take.rs +++ b/vortex-array/src/arrays/varbinview/compute/take.rs @@ -2,7 +2,6 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use std::iter; -use std::ops::Deref; use num_traits::AsPrimitive; use vortex_buffer::Buffer; @@ -18,6 +17,7 @@ use crate::IntoArray; use crate::ToCanonical; use crate::arrays::VarBinViewArray; use crate::arrays::VarBinViewVTable; +use crate::buffer::BufferHandle; use crate::compute::TakeKernel; use crate::compute::TakeKernelAdapter; use crate::register_kernel; @@ -37,8 +37,8 @@ impl TakeKernel for VarBinViewVTable { // SAFETY: taking all components at same indices maintains invariants unsafe { - Ok(VarBinViewArray::new_unchecked( - views_buffer, + Ok(VarBinViewArray::new_handle_unchecked( + BufferHandle::new_host(views_buffer.into_byte_buffer()), array.buffers().clone(), array .dtype() @@ -53,12 +53,11 @@ impl TakeKernel for VarBinViewVTable { register_kernel!(TakeKernelAdapter(VarBinViewVTable).lift()); fn take_views>( - views: &Buffer, + views_ref: &[BinaryView], indices: &[I], mask: &Mask, ) -> Buffer { // NOTE(ngates): this deref is not actually trivial, so we run it once. - let views_ref = views.deref(); // We do not use iter_bools directly, since the resulting dyn iterator cannot // implement TrustedLen. match mask.bit_buffer() { diff --git a/vortex-array/src/arrays/varbinview/compute/zip.rs b/vortex-array/src/arrays/varbinview/compute/zip.rs index e5674632715..9718c996c38 100644 --- a/vortex-array/src/arrays/varbinview/compute/zip.rs +++ b/vortex-array/src/arrays/varbinview/compute/zip.rs @@ -46,8 +46,10 @@ impl ZipKernel for VarBinViewVTable { // build buffer lookup tables for both arrays, these map from the original buffer idx // to the new buffer index in the result array let mut buffers = DeduplicatedBuffers::default(); - let true_lookup = buffers.extend_from_slice(if_true.buffers()); - let false_lookup = buffers.extend_from_slice(if_false.buffers()); + let true_lookup = + buffers.extend_from_iter(if_true.buffers().iter().map(|b| b.as_host().clone())); + let false_lookup = + buffers.extend_from_iter(if_false.buffers().iter().map(|b| b.as_host().clone())); let mut views_builder = BufferMut::::with_capacity(len); let mut validity_builder = LazyBitBufferBuilder::new(len); diff --git a/vortex-array/src/arrays/varbinview/vtable/array.rs b/vortex-array/src/arrays/varbinview/vtable/array.rs index 63fbf7b4d0c..ee12435fbd7 100644 --- a/vortex-array/src/arrays/varbinview/vtable/array.rs +++ b/vortex-array/src/arrays/varbinview/vtable/array.rs @@ -15,7 +15,7 @@ use crate::vtable::BaseArrayVTable; impl BaseArrayVTable for VarBinViewVTable { fn len(array: &VarBinViewArray) -> usize { - array.views.len() + array.views().len() } fn dtype(array: &VarBinViewArray) -> &DType { diff --git a/vortex-array/src/arrays/varbinview/vtable/mod.rs b/vortex-array/src/arrays/varbinview/vtable/mod.rs index e987fbeeed5..daf0746eda2 100644 --- a/vortex-array/src/arrays/varbinview/vtable/mod.rs +++ b/vortex-array/src/arrays/varbinview/vtable/mod.rs @@ -123,9 +123,11 @@ impl VTable for VarBinViewVTable { fn slice(array: &Self::Array, range: Range) -> VortexResult> { Ok(Some( - VarBinViewArray::new( - array.views().slice(range.clone()), - array.buffers().clone(), + VarBinViewArray::new_handle( + array + .views_handle() + .slice_typed::(range.clone()), + Arc::clone(array.buffers()), array.dtype().clone(), array.validity().slice(range)?, ) diff --git a/vortex-array/src/arrays/varbinview/vtable/visitor.rs b/vortex-array/src/arrays/varbinview/vtable/visitor.rs index 85306484217..a5321d4cc6f 100644 --- a/vortex-array/src/arrays/varbinview/vtable/visitor.rs +++ b/vortex-array/src/arrays/varbinview/vtable/visitor.rs @@ -5,22 +5,15 @@ use super::VarBinViewVTable; use crate::ArrayBufferVisitor; use crate::ArrayChildVisitor; use crate::arrays::VarBinViewArray; -use crate::buffer::BufferHandle; use crate::vtable::ValidityHelper; use crate::vtable::VisitorVTable; impl VisitorVTable for VarBinViewVTable { fn visit_buffers(array: &VarBinViewArray, visitor: &mut dyn ArrayBufferVisitor) { for (i, buffer) in array.buffers().iter().enumerate() { - visitor.visit_buffer_handle( - &format!("buffer_{i}"), - &BufferHandle::new_host(buffer.clone()), - ); + visitor.visit_buffer_handle(&format!("buffer_{i}"), buffer); } - visitor.visit_buffer_handle( - "views", - &BufferHandle::new_host(array.views().clone().into_byte_buffer()), - ); + visitor.visit_buffer_handle("views", array.views_handle()); } fn visit_children(array: &VarBinViewArray, visitor: &mut dyn ArrayChildVisitor) { diff --git a/vortex-array/src/arrow/executor/byte_view.rs b/vortex-array/src/arrow/executor/byte_view.rs index 2171a558c30..798b8bbd430 100644 --- a/vortex-array/src/arrow/executor/byte_view.rs +++ b/vortex-array/src/arrow/executor/byte_view.rs @@ -25,11 +25,11 @@ pub fn canonical_varbinview_to_arrow( array: &VarBinViewArray, ) -> VortexResult { let views = - ScalarBuffer::::from(array.views().clone().into_byte_buffer().into_arrow_buffer()); + ScalarBuffer::::from(array.views_handle().as_host().clone().into_arrow_buffer()); let buffers: Vec<_> = array .buffers() .iter() - .map(|buffer| buffer.clone().into_arrow_buffer()) + .map(|buffer| buffer.as_host().clone().into_arrow_buffer()) .collect(); let nulls = to_null_buffer(array.validity_mask()?); @@ -44,11 +44,11 @@ pub fn execute_varbinview_to_arrow( ctx: &mut ExecutionCtx, ) -> VortexResult { let views = - ScalarBuffer::::from(array.views().clone().into_byte_buffer().into_arrow_buffer()); + ScalarBuffer::::from(array.views_handle().as_host().clone().into_arrow_buffer()); let buffers: Vec<_> = array .buffers() .iter() - .map(|buffer| buffer.clone().into_arrow_buffer()) + .map(|buffer| buffer.as_host().clone().into_arrow_buffer()) .collect(); let nulls = to_arrow_null_buffer(array.validity().clone(), array.len(), ctx)?; diff --git a/vortex-array/src/builders/varbinview.rs b/vortex-array/src/builders/varbinview.rs index e89fc824c19..79d6297f3a5 100644 --- a/vortex-array/src/builders/varbinview.rs +++ b/vortex-array/src/builders/varbinview.rs @@ -5,6 +5,7 @@ use std::any::Any; use std::ops::Range; use std::sync::Arc; +use itertools::Itertools; use vortex_buffer::Buffer; use vortex_buffer::BufferMut; use vortex_buffer::ByteBuffer; @@ -437,7 +438,7 @@ impl CompletedBuffers { Self::Deduplicated(completed_buffers), BuffersWithOffsets::AllKept { buffers, offsets }, ) => { - let buffer_lookup = completed_buffers.extend_from_slice(&buffers); + let buffer_lookup = completed_buffers.extend_from_iter(buffers.iter().cloned()); ViewAdjustment::lookup(buffer_lookup, offsets) } ( @@ -498,11 +499,11 @@ impl DeduplicatedBuffers { .collect() } - pub(crate) fn extend_from_slice(&mut self, buffers: &[ByteBuffer]) -> Vec { - buffers - .iter() - .map(|buffer| self.push(buffer.clone())) - .collect() + pub(crate) fn extend_from_iter( + &mut self, + buffers: impl Iterator, + ) -> Vec { + buffers.map(|buffer| self.push(buffer)).collect() } pub(crate) fn finish(self) -> Arc<[ByteBuffer]> { @@ -589,7 +590,14 @@ impl BuffersWithOffsets { pub fn from_array(array: &VarBinViewArray, compaction_threshold: f64) -> Self { if compaction_threshold == 0.0 { return Self::AllKept { - buffers: array.buffers().clone(), + buffers: Arc::from( + array + .buffers() + .to_vec() + .into_iter() + .map(|b| b.unwrap_host()) + .collect_vec(), + ), offsets: None, }; } @@ -613,10 +621,11 @@ impl BuffersWithOffsets { .zip(array.buffers().iter()) .map(|(utilization, buffer)| { match compaction_strategy(utilization, compaction_threshold) { - CompactionStrategy::KeepFull => (Some(buffer.clone()), 0), - CompactionStrategy::Slice { start, end } => { - (Some(buffer.slice(start as usize..end as usize)), start) - } + CompactionStrategy::KeepFull => (Some(buffer.as_host().clone()), 0), + CompactionStrategy::Slice { start, end } => ( + Some(buffer.as_host().slice(start as usize..end as usize)), + start, + ), CompactionStrategy::Rewrite => (None, 0), } }); diff --git a/vortex-array/src/canonical_to_vector.rs b/vortex-array/src/canonical_to_vector.rs index 687bf93d023..0593fbb909d 100644 --- a/vortex-array/src/canonical_to_vector.rs +++ b/vortex-array/src/canonical_to_vector.rs @@ -16,6 +16,7 @@ use vortex_error::VortexResult; use vortex_mask::Mask; use vortex_vector::Vector; use vortex_vector::binaryview::BinaryVector; +use vortex_vector::binaryview::BinaryView; use vortex_vector::binaryview::StringVector; use vortex_vector::bool::BoolVector; use vortex_vector::decimal::DVector; @@ -98,19 +99,21 @@ impl Canonical { } Canonical::VarBinView(a) => { let validity = a.validity_mask()?; + let views = + Buffer::::from_byte_buffer(a.views_handle().as_host().clone()); match a.dtype() { DType::Utf8(_) => { - let views = a.views().clone(); // Convert Arc<[ByteBuffer]> to Arc> - let buffers: Box<[_]> = a.buffers().iter().cloned().collect(); + let buffers: Box<[_]> = + a.buffers().iter().map(|b| b.as_host()).cloned().collect(); Vector::String(unsafe { StringVector::new_unchecked(views, Arc::new(buffers), validity) }) } DType::Binary(_) => { - let views = a.views().clone(); // Convert Arc<[ByteBuffer]> to Arc> - let buffers: Box<[_]> = a.buffers().iter().cloned().collect(); + let buffers: Box<[_]> = + a.buffers().iter().map(|b| b.as_host()).cloned().collect(); Vector::Binary(unsafe { BinaryVector::new_unchecked(views, Arc::new(buffers), validity) }) diff --git a/vortex-cuda/cub/Cargo.toml b/vortex-cuda/cub/Cargo.toml index 8068e49fd6b..1c92776ddb7 100644 --- a/vortex-cuda/cub/Cargo.toml +++ b/vortex-cuda/cub/Cargo.toml @@ -23,6 +23,7 @@ workspace = true libloading = { workspace = true } paste = { workspace = true } vortex-cuda-macros = { workspace = true } +vortex-dtype = { workspace = true } [build-dependencies] bindgen = { workspace = true } diff --git a/vortex-cuda/cub/build.rs b/vortex-cuda/cub/build.rs index 3d3605e8406..d5e6ae3a786 100644 --- a/vortex-cuda/cub/build.rs +++ b/vortex-cuda/cub/build.rs @@ -105,6 +105,7 @@ fn generate_rust_bindings(kernels_dir: &Path, out_dir: &Path) { .allowlist_type("cudaError_t") // Blocklist cudaStream_t and define it manually as an opaque pointer .blocklist_type("cudaStream_t") + .blocklist_type("__int256_t") // Generate dynamic library loading wrapper .dynamic_library_name("CubLibrary") .dynamic_link_require_all(true) @@ -113,6 +114,7 @@ fn generate_rust_bindings(kernels_dir: &Path, out_dir: &Path) { .raw_line("// Functions are loaded at runtime via libloading.") .raw_line("") .raw_line("pub type cudaStream_t = *mut std::ffi::c_void;") + .raw_line("pub type __int256_t = vortex_dtype::i256;") .generate() .expect("Failed to generate CUB bindings"); diff --git a/vortex-cuda/cub/kernels/filter.cu b/vortex-cuda/cub/kernels/filter.cu index 336acfbc005..18419799fc3 100644 --- a/vortex-cuda/cub/kernels/filter.cu +++ b/vortex-cuda/cub/kernels/filter.cu @@ -9,6 +9,12 @@ #include #include +// i256 type +typedef struct { + __int128_t high; + __int128_t low; +} __int256_t; + // Bit extraction functor for TransformInputIterator struct BitExtractor { const uint8_t* packed; @@ -60,6 +66,8 @@ DEFINE_TEMP_SIZE(u64, uint64_t) DEFINE_TEMP_SIZE(i64, int64_t) DEFINE_TEMP_SIZE(f32, float) DEFINE_TEMP_SIZE(f64, double) +DEFINE_TEMP_SIZE(i128, __int128_t) +DEFINE_TEMP_SIZE(i256, __int256_t) // CUB DeviceSelect::Flagged - Execute filter with byte mask (one byte per element) template @@ -100,6 +108,8 @@ DEFINE_FILTER_BYTEMASK(u64, uint64_t) DEFINE_FILTER_BYTEMASK(i64, int64_t) DEFINE_FILTER_BYTEMASK(f32, float) DEFINE_FILTER_BYTEMASK(f64, double) +DEFINE_FILTER_BYTEMASK(i128, __int128_t) +DEFINE_FILTER_BYTEMASK(i256, __int256_t) // CUB DeviceSelect::Flagged - Execute filter with bit mask (one bit per element) // @@ -161,3 +171,5 @@ DEFINE_FILTER_BITMASK(u64, uint64_t) DEFINE_FILTER_BITMASK(i64, int64_t) DEFINE_FILTER_BITMASK(f32, float) DEFINE_FILTER_BITMASK(f64, double) +DEFINE_FILTER_BITMASK(i128, __int128_t) +DEFINE_FILTER_BITMASK(i256, __int256_t) diff --git a/vortex-cuda/cub/kernels/filter.h b/vortex-cuda/cub/kernels/filter.h index a5e0e5cc304..45458e02985 100644 --- a/vortex-cuda/cub/kernels/filter.h +++ b/vortex-cuda/cub/kernels/filter.h @@ -9,6 +9,12 @@ #include #include +// i256 type +typedef struct { + __int128_t high; + __int128_t low; +} __int256_t; + // CUDA types - defined as opaque for bindgen typedef int cudaError_t; typedef void* cudaStream_t; @@ -28,7 +34,9 @@ extern "C" { X(u64, uint64_t) \ X(i64, int64_t) \ X(f32, float) \ - X(f64, double) + X(f64, double) \ + X(i128, __int128_t) \ + X(i256, __int256_t) // Filter temp size query functions #define DECLARE_FILTER_TEMP_SIZE(suffix, c_type) \ diff --git a/vortex-cuda/cub/src/filter.rs b/vortex-cuda/cub/src/filter.rs index 9df339a8a19..4159285481a 100644 --- a/vortex-cuda/cub/src/filter.rs +++ b/vortex-cuda/cub/src/filter.rs @@ -116,7 +116,7 @@ macro_rules! impl_filter { } } - #[doc = "Get the temporary storage size required for filtering `" $ty "` elements."] + #[doc = "Get the temporary storage size required for filtering elements."] pub fn [](num_items: i64) -> Result { let lib = cub_library()?; let mut temp_bytes: usize = 0; @@ -125,7 +125,7 @@ macro_rules! impl_filter { Ok(temp_bytes) } - #[doc = "Filter `" $ty "` elements using a byte mask (one byte per element)."] + #[doc = "Filter elements using a byte mask (one byte per element)."] /// /// # Safety /// @@ -162,7 +162,7 @@ macro_rules! impl_filter { check_cuda_error(err, concat!("filter_bytemask_", stringify!($suffix))) } - #[doc = "Filter `" $ty "` elements using a bit mask (one bit per element)."] + #[doc = "Filter elements using a bit mask (one bit per element)."] /// /// This version accepts packed bits directly, avoiding the need to expand /// bits to bytes in a separate kernel. @@ -219,4 +219,6 @@ impl_filter! { i64 => i64, f32 => f32, f64 => f64, + i128 => i128, + i256 => vortex_dtype::i256, } diff --git a/vortex-cuda/nvcomp/.gitignore b/vortex-cuda/nvcomp/.gitignore new file mode 100644 index 00000000000..db872fac1e0 --- /dev/null +++ b/vortex-cuda/nvcomp/.gitignore @@ -0,0 +1 @@ +sdk/ diff --git a/vortex-cuda/src/canonical.rs b/vortex-cuda/src/canonical.rs index 9270536dca4..0061ba99851 100644 --- a/vortex-cuda/src/canonical.rs +++ b/vortex-cuda/src/canonical.rs @@ -1,15 +1,23 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::sync::Arc; + use async_trait::async_trait; +use futures::future::try_join_all; use vortex_array::Canonical; +use vortex_array::arrays::BinaryView; use vortex_array::arrays::BoolArray; use vortex_array::arrays::BoolArrayParts; use vortex_array::arrays::DecimalArray; use vortex_array::arrays::DecimalArrayParts; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::PrimitiveArrayParts; +use vortex_array::arrays::VarBinViewArray; +use vortex_array::arrays::VarBinViewArrayParts; use vortex_array::buffer::BufferHandle; +use vortex_buffer::Buffer; +use vortex_buffer::ByteBuffer; use vortex_error::VortexResult; /// Move all canonical data from to_host from device. @@ -69,6 +77,31 @@ impl CanonicalCudaExt for Canonical { ) })) } + Canonical::VarBinView(varbinview) => { + let VarBinViewArrayParts { + views, + buffers, + validity, + dtype, + } = varbinview.into_parts(); + + // Copy all device views to host + let host_views = views.try_into_host()?.await?; + let host_views = Buffer::::from_byte_buffer(host_views); + + // Copy any string data buffers back over to the host + let host_buffers = buffers + .iter() + .cloned() + .map(|b| b.try_into_host()) + .collect::>>()?; + let host_buffers = try_join_all(host_buffers).await?; + let host_buffers: Arc<[ByteBuffer]> = Arc::from(host_buffers); + + Ok(Canonical::VarBinView(unsafe { + VarBinViewArray::new_unchecked(host_views, host_buffers, dtype, validity) + })) + } _ => todo!(), } } diff --git a/vortex-cuda/src/kernel/filter.rs b/vortex-cuda/src/kernel/filter.rs deleted file mode 100644 index a43aa9c3714..00000000000 --- a/vortex-cuda/src/kernel/filter.rs +++ /dev/null @@ -1,263 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -//! GPU filter implementation using CUB DeviceSelect::Flagged. - -use std::ffi::c_void; -use std::sync::Arc; - -use async_trait::async_trait; -use cudarc::driver::CudaSlice; -use cudarc::driver::DevicePtr; -use cudarc::driver::DevicePtrMut; -use cudarc::driver::DeviceRepr; -use vortex_array::ArrayRef; -use vortex_array::Canonical; -use vortex_array::arrays::FilterVTable; -use vortex_array::arrays::PrimitiveArray; -use vortex_array::buffer::BufferHandle; -use vortex_cub::filter::CubFilterable; -use vortex_cub::filter::cudaStream_t; -use vortex_cuda_macros::cuda_tests; -use vortex_dtype::NativePType; -use vortex_dtype::match_each_native_simd_ptype; -use vortex_error::VortexResult; -use vortex_error::vortex_err; -use vortex_mask::Mask; -use vortex_mask::MaskValues; - -use crate::CudaDeviceBuffer; -use crate::executor::CudaArrayExt; -use crate::executor::CudaExecute; -use crate::executor::CudaExecutionCtx; -use crate::stream::await_stream_callback; - -/// CUDA executor for FilterArray using CUB DeviceSelect::Flagged. -#[derive(Debug)] -pub struct FilterExecutor; - -#[async_trait] -impl CudaExecute for FilterExecutor { - async fn execute( - &self, - array: ArrayRef, - ctx: &mut CudaExecutionCtx, - ) -> VortexResult { - let filter_array = array - .as_opt::() - .ok_or_else(|| vortex_err!("Expected FilterArray"))?; - - let mask = filter_array.filter_mask(); - - // Early return for trivial cases. - match mask { - Mask::AllTrue(_) => { - return filter_array.child().clone().execute_cuda(ctx).await; - } - Mask::AllFalse(_) => { - return Ok(Canonical::empty(filter_array.dtype())); - } - _ => {} - } - - let mask_values = mask - .values() - .ok_or_else(|| vortex_err!("Expected Mask::Values but got different variant"))?; - - let canonical = filter_array.child().clone().execute_cuda(ctx).await?; - - match canonical { - Canonical::Primitive(ref prim) => { - match_each_native_simd_ptype!(prim.ptype(), |T| { - filter_primitive::(prim, mask_values, mask, ctx).await - }) - } - _ => unimplemented!(), - } - } -} - -async fn filter_primitive( - array: &PrimitiveArray, - mask_values: &MaskValues, - mask: &Mask, - ctx: &mut CudaExecutionCtx, -) -> VortexResult -where - T: NativePType + DeviceRepr + CubFilterable + Send + Sync + 'static, -{ - let ptype = array.ptype(); - let num_items = array.len() as i64; - let output_len = mask_values.true_count(); - - if output_len == 0 { - return Ok(Canonical::empty(array.dtype())); - } - - let input_handle = array.buffer_handle(); - let d_input: BufferHandle = if input_handle.is_on_device() { - input_handle.clone() - } else { - ctx.move_to_device::(input_handle.clone())?.await? - }; - - // Upload packed bits to device. They are unpacked to bytes in the filter kernel. - let bit_buffer = mask_values.bit_buffer(); - let packed = bit_buffer.inner().as_ref(); - let bit_offset = bit_buffer.offset() as u64; - let d_packed_flags = ctx.copy_to_device(packed.to_vec())?.await?; - - let temp_bytes = T::get_temp_size(num_items) - .map_err(|e| vortex_err!("CUB filter_temp_size failed: {}", e))?; - - // Allocate device buffers. - let d_temp: CudaSlice = ctx.device_alloc(temp_bytes.max(1))?; - let mut d_output: CudaSlice = ctx.device_alloc(output_len)?; - let mut d_num_selected: CudaSlice = ctx.device_alloc(1)?; - - // Get raw pointers for FFI. - let stream = ctx.stream(); - let stream_ptr = stream.cu_stream() as cudaStream_t; - - // Downcast input buffer to get device pointer. - let d_input_cuda = d_input - .as_device() - .as_any() - .downcast_ref::>() - .ok_or_else(|| vortex_err!("Expected CudaDeviceBuffer for input"))?; - let d_input_ptr = d_input_cuda.as_view().device_ptr(stream).0 as *const T; - - // Downcast to get device pointer. - let d_packed_cuda = d_packed_flags - .as_device() - .as_any() - .downcast_ref::>() - .ok_or_else(|| vortex_err!("Expected CudaDeviceBuffer for packed flags"))?; - let d_packed_ptr = d_packed_cuda.as_view().device_ptr(stream).0 as *const u8; - - let d_temp_ptr = d_temp.device_ptr(stream).0 as *mut c_void; - let d_output_ptr = d_output.device_ptr_mut(stream).0 as *mut T; - let d_num_selected_ptr = d_num_selected.device_ptr_mut(stream).0 as *mut i64; - - // CUB uses TransformInputIterator internally to read bits on-the-fly. - unsafe { - T::filter_bitmask( - d_temp_ptr, - temp_bytes, - d_input_ptr, - d_packed_ptr, - bit_offset, - d_output_ptr, - d_num_selected_ptr, - num_items, - stream_ptr, - ) - .map_err(|e| vortex_err!("CUB filter_bitmask failed: {}", e))?; - } - - // Wait for completion - await_stream_callback(stream).await?; - - let filtered_validity = array.validity()?.filter(mask)?; - let output_handle = BufferHandle::new_device(Arc::new(CudaDeviceBuffer::new(d_output))); - - Ok(Canonical::Primitive(PrimitiveArray::from_buffer_handle( - output_handle, - ptype, - filtered_validity, - ))) -} - -#[cuda_tests] -mod tests { - use rstest::rstest; - use vortex_array::IntoArray; - use vortex_array::arrays::FilterArray; - use vortex_array::assert_arrays_eq; - use vortex_error::VortexExpect; - use vortex_session::VortexSession; - - use super::*; - use crate::CanonicalCudaExt; - use crate::session::CudaSession; - - #[rstest] - #[case::i32_sparse( - PrimitiveArray::from_iter([1i32, 2, 3, 4, 5, 6, 7, 8]), - Mask::from_iter([true, false, true, false, true, false, true, false]) - )] - #[case::i32_dense( - PrimitiveArray::from_iter([10i32, 20, 30, 40, 50]), - Mask::from_iter([true, true, true, false, true]) - )] - #[case::i64_large( - PrimitiveArray::from_iter((0..1000i64).collect::>()), - Mask::from_iter((0..1000).map(|i| i % 3 == 0)) - )] - #[case::f64_values( - PrimitiveArray::from_iter([1.1f64, 2.2, 3.3, 4.4, 5.5]), - Mask::from_iter([false, true, false, true, false]) - )] - #[case::u8_all_true( - PrimitiveArray::from_iter([1u8, 2, 3, 4, 5]), - Mask::from_iter([true, true, true, true, true]) - )] - #[case::u32_all_false( - PrimitiveArray::from_iter([1u32, 2, 3, 4, 5]), - Mask::from_iter([false, false, false, false, false]) - )] - #[tokio::test] - async fn test_gpu_filter( - #[case] input: PrimitiveArray, - #[case] mask: Mask, - ) -> VortexResult<()> { - let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) - .vortex_expect("failed to create CUDA execution context"); - - let filter_array = FilterArray::try_new(input.clone().into_array(), mask.clone())?; - - let cpu_result = filter_array.to_canonical()?.into_array(); - - let gpu_result = FilterExecutor - .execute(filter_array.into_array(), &mut cuda_ctx) - .await - .vortex_expect("GPU filter failed") - .into_host() - .await? - .into_array(); - - assert_arrays_eq!(cpu_result, gpu_result); - - Ok(()) - } - - #[tokio::test] - async fn test_gpu_filter_large_array() -> VortexResult<()> { - let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) - .vortex_expect("failed to create CUDA execution context"); - - // Create a large array to test multi-block execution - let data: Vec = (0..100_000).collect(); - let input = PrimitiveArray::from_iter(data); - - // Select every 7th element - let mask = Mask::from_iter((0..100_000).map(|i| i % 7 == 0)); - - let filter_array = FilterArray::try_new(input.into_array(), mask)?; - - let cpu_result = filter_array.to_canonical()?.into_array(); - - let gpu_result = FilterExecutor - .execute(filter_array.into_array(), &mut cuda_ctx) - .await - .vortex_expect("GPU filter failed") - .into_host() - .await? - .into_array(); - - assert_eq!(cpu_result.len(), gpu_result.len()); - assert_arrays_eq!(cpu_result, gpu_result); - - Ok(()) - } -} diff --git a/vortex-cuda/src/kernel/filter/decimal.rs b/vortex-cuda/src/kernel/filter/decimal.rs new file mode 100644 index 00000000000..4cc6ab3340c --- /dev/null +++ b/vortex-cuda/src/kernel/filter/decimal.rs @@ -0,0 +1,142 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use cudarc::driver::DeviceRepr; +use vortex_array::Canonical; +use vortex_array::arrays::DecimalArray; +use vortex_array::arrays::DecimalArrayParts; +use vortex_cub::filter::CubFilterable; +use vortex_cuda_macros::cuda_tests; +use vortex_dtype::NativeDecimalType; +use vortex_error::VortexResult; +use vortex_mask::Mask; + +use crate::CudaExecutionCtx; +use crate::kernel::filter::filter_sized; + +pub(super) async fn filter_decimal( + array: DecimalArray, + mask: Mask, + ctx: &mut CudaExecutionCtx, +) -> VortexResult { + let DecimalArrayParts { + values, + validity, + decimal_dtype, + .. + } = array.into_parts(); + + let filtered_validity = validity.filter(&mask)?; + let filtered_values = filter_sized::(values, mask, ctx).await?; + + Ok(Canonical::Decimal(DecimalArray::new_handle( + filtered_values, + D::DECIMAL_TYPE, + decimal_dtype, + filtered_validity, + ))) +} + +#[cuda_tests] +mod tests { + use rstest::rstest; + use vortex_array::IntoArray; + use vortex_array::arrays::DecimalArray; + use vortex_array::arrays::FilterArray; + use vortex_array::assert_arrays_eq; + use vortex_dtype::DecimalDType; + use vortex_dtype::i256; + use vortex_error::VortexExpect; + use vortex_error::VortexResult; + use vortex_mask::Mask; + use vortex_session::VortexSession; + + use crate::CanonicalCudaExt; + use crate::FilterExecutor; + use crate::executor::CudaExecute; + use crate::session::CudaSession; + + #[rstest] + #[case::i32_sparse( + DecimalArray::from_iter([1i32, 2, 3, 4, 5, 6, 7, 8], DecimalDType::new(19, 5)), + Mask::from_iter([true, false, true, false, true, false, true, false]) + )] + #[case::i32_dense( + DecimalArray::from_iter([10i32, 20, 30, 40, 50], DecimalDType::new(19, 5)), + Mask::from_iter([true, true, true, false, true]) + )] + #[case::i64_large( + DecimalArray::from_iter(0..1000i64, DecimalDType::new(19, 5)), + Mask::from_iter((0..1000).map(|i| i % 3 == 0)) + )] + #[case::i8_all_true( + DecimalArray::from_iter([1i8, 2, 3, 4, 5], DecimalDType::new(19, 5)), + Mask::from_iter([true, true, true, true, true]) + )] + #[case::i32_all_false( + DecimalArray::from_iter([1i32, 2, 3, 4, 5], DecimalDType::new(19, 5)), + Mask::from_iter([false, false, false, false, false]) + )] + #[case::i128_values( + DecimalArray::from_iter([1i128, 2, 3, 4, 5], DecimalDType::new(19, 5)), + Mask::from_iter([false, true, false, true, false]) + )] + #[case::i256_values( + DecimalArray::from_iter([i256::from_i128(1), i256::from_i128(2), i256::from_i128(3), i256::from_i128(4), i256::from_i128(5)], DecimalDType::new(19, 5)), + Mask::from_iter([false, true, false, true, false]) + )] + #[tokio::test] + async fn test_gpu_filter_decimal( + #[case] input: DecimalArray, + #[case] mask: Mask, + ) -> VortexResult<()> { + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create CUDA execution context"); + + let filter_array = FilterArray::try_new(input.clone().into_array(), mask.clone())?; + + let cpu_result = filter_array.to_canonical()?.into_array(); + + let gpu_result = FilterExecutor + .execute(filter_array.into_array(), &mut cuda_ctx) + .await + .vortex_expect("GPU filter failed") + .into_host() + .await? + .into_array(); + + assert_arrays_eq!(cpu_result, gpu_result); + + Ok(()) + } + + #[tokio::test] + async fn test_gpu_filter_decimal_large_array() -> VortexResult<()> { + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create CUDA execution context"); + + // Create a large array to test multi-block execution + let data: Vec = (0..100_000).collect(); + let input = DecimalArray::from_iter(data, DecimalDType::new(19, 5)); + + // Select every 7th element + let mask = Mask::from_iter((0..100_000).map(|i| i % 7 == 0)); + + let filter_array = FilterArray::try_new(input.into_array(), mask)?; + + let cpu_result = filter_array.to_canonical()?.into_array(); + + let gpu_result = FilterExecutor + .execute(filter_array.into_array(), &mut cuda_ctx) + .await + .vortex_expect("GPU filter failed") + .into_host() + .await? + .into_array(); + + assert_eq!(cpu_result.len(), gpu_result.len()); + assert_arrays_eq!(cpu_result, gpu_result); + + Ok(()) + } +} diff --git a/vortex-cuda/src/kernel/filter/mod.rs b/vortex-cuda/src/kernel/filter/mod.rs new file mode 100644 index 00000000000..3a38111546a --- /dev/null +++ b/vortex-cuda/src/kernel/filter/mod.rs @@ -0,0 +1,164 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! GPU filter implementation using CUB DeviceSelect::Flagged. + +mod decimal; +mod primitive; +mod varbinview; + +use std::ffi::c_void; +use std::sync::Arc; + +use async_trait::async_trait; +use cudarc::driver::DevicePtr; +use cudarc::driver::DevicePtrMut; +use cudarc::driver::DeviceRepr; +use vortex_array::ArrayRef; +use vortex_array::Canonical; +use vortex_array::arrays::FilterArrayParts; +use vortex_array::arrays::FilterVTable; +use vortex_array::buffer::BufferHandle; +use vortex_cub::filter::CubFilterable; +use vortex_cub::filter::cudaStream_t; +use vortex_dtype::match_each_decimal_value_type; +use vortex_dtype::match_each_native_simd_ptype; +use vortex_error::VortexResult; +use vortex_error::vortex_err; +use vortex_mask::Mask; + +use crate::CudaDeviceBuffer; +use crate::executor::CudaArrayExt; +use crate::executor::CudaExecute; +use crate::executor::CudaExecutionCtx; +use crate::kernel::filter::decimal::filter_decimal; +use crate::kernel::filter::primitive::filter_primitive; +use crate::kernel::filter::varbinview::filter_varbinview; +use crate::stream::await_stream_callback; + +/// CUDA executor for FilterArray using CUB DeviceSelect::Flagged. +#[derive(Debug)] +pub struct FilterExecutor; + +#[async_trait] +impl CudaExecute for FilterExecutor { + async fn execute( + &self, + array: ArrayRef, + ctx: &mut CudaExecutionCtx, + ) -> VortexResult { + let filter_array = array + .try_into::() + .map_err(|_| vortex_err!("Expected FilterArray"))?; + + let FilterArrayParts { child, mask } = filter_array.into_parts(); + + // Early return for trivial cases. + match mask { + Mask::AllTrue(_) => { + // No data filtered => execute child without any post-processing + child.execute_cuda(ctx).await + } + Mask::AllFalse(_) => { + // All data filtered => empty canonical + Ok(Canonical::empty(child.dtype())) + } + m @ Mask::Values(_) => { + let canonical = child.execute_cuda(ctx).await?; + match canonical { + Canonical::Primitive(prim) => { + match_each_native_simd_ptype!(prim.ptype(), |T| { + filter_primitive::(prim, m, ctx).await + }) + } + Canonical::Decimal(decimal) => { + match_each_decimal_value_type!(decimal.values_type(), |D| { + filter_decimal::(decimal, m, ctx).await + }) + } + Canonical::VarBinView(varbinview) => { + filter_varbinview(varbinview, m, ctx).await + } + _ => unimplemented!(), + } + } + } + } +} + +async fn filter_sized( + input: BufferHandle, + mask: Mask, + ctx: &mut CudaExecutionCtx, +) -> VortexResult { + // Return a buffer handle back once this has completed. + let d_input = if input.is_on_device() { + input + } else { + ctx.move_to_device::(input)?.await? + }; + + // Construct the inputs for the cub::DeviceSelect::Flagged call. + let output_len = mask.true_count(); + let (offset, len, flags) = mask.into_bit_buffer().into_inner(); + + let d_flags = ctx.copy_to_device(flags.to_vec())?.await?; + + let offset = offset as u64; + let len = len as i64; + + let temp_bytes = + T::get_temp_size(len).map_err(|e| vortex_err!("CUB filter_temp_size failed: {}", e))?; + + // Allocate device buffers for input, output, mask, and temp space + let d_temp = ctx.device_alloc::(temp_bytes.max(1))?; + let mut d_output = ctx.device_alloc::(output_len)?; + let mut d_num_selected = ctx.device_alloc::(1)?; + // Get raw pointers for FFI. + let stream = ctx.stream(); + let stream_ptr = stream.cu_stream() as cudaStream_t; + + // Downcast input buffer to get device pointer. + let d_input_cuda = d_input + .as_device() + .as_any() + .downcast_ref::>() + .ok_or_else(|| vortex_err!("Expected CudaDeviceBuffer for input"))?; + let d_input_ptr = d_input_cuda.as_view().device_ptr(stream).0 as *const T; + + // Downcast to get device pointer. + let d_packed_cuda = d_flags + .as_device() + .as_any() + .downcast_ref::>() + .ok_or_else(|| vortex_err!("Expected CudaDeviceBuffer for packed flags"))?; + let d_packed_ptr = d_packed_cuda.as_view().device_ptr(stream).0 as *const u8; + + let d_temp_ptr = d_temp.device_ptr(stream).0 as *mut c_void; + let d_output_ptr = d_output.device_ptr_mut(stream).0 as *mut T; + let d_num_selected_ptr = d_num_selected.device_ptr_mut(stream).0 as *mut i64; + + // CUB uses TransformInputIterator internally to read bits on-the-fly. + unsafe { + T::filter_bitmask( + d_temp_ptr, + temp_bytes, + d_input_ptr, + d_packed_ptr, + offset, + d_output_ptr, + d_num_selected_ptr, + len, + stream_ptr, + ) + .map_err(|e| vortex_err!("CUB filter_bitmask failed: {}", e))?; + } + + // Wait for completion + await_stream_callback(stream).await?; + + // Wrap the device buffer of outputs back up into a BufferHandle. + Ok(BufferHandle::new_device(Arc::new(CudaDeviceBuffer::new( + d_output, + )))) +} diff --git a/vortex-cuda/src/kernel/filter/primitive.rs b/vortex-cuda/src/kernel/filter/primitive.rs new file mode 100644 index 00000000000..88270b7df9f --- /dev/null +++ b/vortex-cuda/src/kernel/filter/primitive.rs @@ -0,0 +1,136 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use cudarc::driver::DeviceRepr; +use vortex_array::Canonical; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::PrimitiveArrayParts; +use vortex_cub::filter::CubFilterable; +use vortex_cuda_macros::cuda_tests; +use vortex_dtype::NativePType; +use vortex_error::VortexResult; +use vortex_mask::Mask; + +use crate::CudaExecutionCtx; +use crate::kernel::filter::filter_sized; + +/// Execute a filter operation over the primitive array on a GPU. +pub(super) async fn filter_primitive( + array: PrimitiveArray, + mask: Mask, + ctx: &mut CudaExecutionCtx, +) -> VortexResult +where + T: NativePType + DeviceRepr + CubFilterable + Send + Sync + 'static, +{ + let PrimitiveArrayParts { + buffer, validity, .. + } = array.into_parts(); + + let filtered_validity = validity.filter(&mask)?; + let filtered_values = filter_sized::(buffer, mask, ctx).await?; + + Ok(Canonical::Primitive(PrimitiveArray::from_buffer_handle( + filtered_values, + T::PTYPE, + filtered_validity, + ))) +} + +#[cuda_tests] +mod tests { + use rstest::rstest; + use vortex_array::IntoArray; + use vortex_array::arrays::FilterArray; + use vortex_array::arrays::PrimitiveArray; + use vortex_array::assert_arrays_eq; + use vortex_error::VortexExpect; + use vortex_error::VortexResult; + use vortex_mask::Mask; + use vortex_session::VortexSession; + + use crate::CanonicalCudaExt; + use crate::FilterExecutor; + use crate::executor::CudaExecute; + use crate::session::CudaSession; + + #[rstest] + #[case::i32_sparse( + PrimitiveArray::from_iter([1i32, 2, 3, 4, 5, 6, 7, 8]), + Mask::from_iter([true, false, true, false, true, false, true, false]) + )] + #[case::i32_dense( + PrimitiveArray::from_iter([10i32, 20, 30, 40, 50]), + Mask::from_iter([true, true, true, false, true]) + )] + #[case::i64_large( + PrimitiveArray::from_iter((0..1000i64).collect::>()), + Mask::from_iter((0..1000).map(|i| i % 3 == 0)) + )] + #[case::f64_values( + PrimitiveArray::from_iter([1.1f64, 2.2, 3.3, 4.4, 5.5]), + Mask::from_iter([false, true, false, true, false]) + )] + #[case::u8_all_true( + PrimitiveArray::from_iter([1u8, 2, 3, 4, 5]), + Mask::from_iter([true, true, true, true, true]) + )] + #[case::u32_all_false( + PrimitiveArray::from_iter([1u32, 2, 3, 4, 5]), + Mask::from_iter([false, false, false, false, false]) + )] + #[tokio::test] + async fn test_gpu_filter( + #[case] input: PrimitiveArray, + #[case] mask: Mask, + ) -> VortexResult<()> { + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create CUDA execution context"); + + let filter_array = FilterArray::try_new(input.clone().into_array(), mask.clone())?; + + let cpu_result = filter_array.to_canonical()?.into_array(); + + let gpu_result = FilterExecutor + .execute(filter_array.into_array(), &mut cuda_ctx) + .await + .vortex_expect("GPU filter failed") + .into_host() + .await? + .into_array(); + + assert_arrays_eq!(cpu_result, gpu_result); + + Ok(()) + } + + #[tokio::test] + async fn test_gpu_filter_large_array() -> VortexResult<()> { + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create CUDA execution context"); + + // Create a large array to test multi-block execution + let data: Vec = (0..100_000).collect(); + let input = PrimitiveArray::from_iter(data); + + // Select every 7th element + let mask = Mask::from_iter((0..100_000).map(|i| i % 7 == 0)); + + let filter_array = FilterArray::try_new(input.into_array(), mask)?; + + let cpu_result = filter_array.to_canonical()?.into_array(); + + let gpu_result = FilterExecutor + .execute(filter_array.into_array(), &mut cuda_ctx) + .await + .vortex_expect("GPU filter failed") + .into_host() + .await? + .into_array(); + + assert_eq!(cpu_result.len(), gpu_result.len()); + assert_arrays_eq!(cpu_result, gpu_result); + + Ok(()) + } +} diff --git a/vortex-cuda/src/kernel/filter/varbinview.rs b/vortex-cuda/src/kernel/filter/varbinview.rs new file mode 100644 index 00000000000..8a9b164c9fd --- /dev/null +++ b/vortex-cuda/src/kernel/filter/varbinview.rs @@ -0,0 +1,89 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::Canonical; +use vortex_array::arrays::VarBinViewArray; +use vortex_array::arrays::VarBinViewArrayParts; +use vortex_cuda_macros::cuda_tests; +use vortex_error::VortexResult; +use vortex_mask::Mask; + +use crate::CudaExecutionCtx; +use crate::kernel::filter::filter_sized; + +pub(super) async fn filter_varbinview( + array: VarBinViewArray, + mask: Mask, + ctx: &mut CudaExecutionCtx, +) -> VortexResult { + let VarBinViewArrayParts { + views, + buffers, + validity, + dtype, + } = array.into_parts(); + + let filtered_validity = validity.filter(&mask)?; + let filtered_views = filter_sized::(views, mask, ctx).await?; + + Ok(Canonical::VarBinView(VarBinViewArray::new_handle( + filtered_views, + buffers, + dtype, + filtered_validity, + ))) +} + +#[cuda_tests] +mod tests { + use rstest::rstest; + use vortex_array::IntoArray; + use vortex_array::arrays::FilterArray; + use vortex_array::arrays::VarBinViewArray; + use vortex_array::assert_arrays_eq; + use vortex_error::VortexExpect; + use vortex_error::VortexResult; + use vortex_mask::Mask; + use vortex_session::VortexSession; + + use crate::CanonicalCudaExt; + use crate::FilterExecutor; + use crate::executor::CudaExecute; + use crate::session::CudaSession; + + #[rstest] + #[case::nato( + VarBinViewArray::from_iter_str(["alpha", "bravo", "charlie", "delta"]), + Mask::from_iter([true, false, true, false]) + )] + #[case::planets( + VarBinViewArray::from_iter_str( + ["mercury", "venus", "earth", "mars", "jupiter", "saturn", "uranus", "neptune", "pluto"] + ), + Mask::from_iter([true, true, true, true, true, true, true, true, false]) + )] + #[tokio::test] + async fn test_gpu_filter_strings( + #[case] input: VarBinViewArray, + #[case] mask: Mask, + ) -> VortexResult<()> { + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create CUDA execution context"); + + let filter_array = FilterArray::try_new(input.into_array(), mask.clone())?; + + let cpu_result = filter_array.to_canonical()?.into_array(); + + let gpu_result = FilterExecutor + .execute(filter_array.into_array(), &mut cuda_ctx) + .await + .vortex_expect("GPU filter failed") + .into_host() + .await? + .into_array(); + + assert_arrays_eq!(cpu_result, gpu_result); + + Ok(()) + } +} diff --git a/vortex-duckdb/src/exporter/varbinview.rs b/vortex-duckdb/src/exporter/varbinview.rs index 23066f691d5..16ffe0232fe 100644 --- a/vortex-duckdb/src/exporter/varbinview.rs +++ b/vortex-duckdb/src/exporter/varbinview.rs @@ -46,12 +46,21 @@ pub(crate) fn new_exporter( &LogicalType::try_from(dtype)?, )); } + + let buffers = buffers + .iter() + .cloned() + .map(|b| b.unwrap_host()) + .collect_vec(); + + let buffers: Arc<[ByteBuffer]> = Arc::from(buffers); + Ok(validity::new_exporter( validity, Box::new(VarBinViewExporter { - views, - buffers: buffers.clone(), + views: Buffer::::from_byte_buffer(views.unwrap_host()), vector_buffers: buffers.iter().cloned().map(VectorBuffer::new).collect_vec(), + buffers, }), )) } From e6fd898fcf4948d7f4c228da57b4978040640c0c Mon Sep 17 00:00:00 2001 From: Connor Tsui <87130162+connortsui20@users.noreply.github.com> Date: Wed, 28 Jan 2026 15:09:08 -0500 Subject: [PATCH 2/4] Fix: dont do lazy filters on write (#6194) This should fix the CI failures right now. More detailed discussion that I'll write incoming... Basically we shouldn't be eagerly decompressing validity when filtering it, but for now to make sure things aren't broken this will have to do. Edit: This should fix the break on spiral as well @delta003, as reported by @danking --------- Signed-off-by: Connor Tsui --- vortex-array/src/arrays/filter/execute/mod.rs | 15 ++++----------- vortex-array/src/compute/filter.rs | 2 +- vortex-array/src/patches.rs | 11 +++++++---- vortex-array/src/validity.rs | 14 ++++++++++++-- vortex-btrblocks/src/float.rs | 2 +- vortex-btrblocks/src/string.rs | 2 +- 6 files changed, 26 insertions(+), 20 deletions(-) diff --git a/vortex-array/src/arrays/filter/execute/mod.rs b/vortex-array/src/arrays/filter/execute/mod.rs index 733a6f9a2f4..3eaec409c16 100644 --- a/vortex-array/src/arrays/filter/execute/mod.rs +++ b/vortex-array/src/arrays/filter/execute/mod.rs @@ -31,22 +31,15 @@ mod struct_; mod varbinview; /// Reconstruct a [`Mask`] from an [`Arc`]. -#[inline] fn values_to_mask(values: &Arc) -> Mask { Mask::Values(values.clone()) } -/// A helper function that lazily filters a [`Validity`] with a selection mask. -/// -/// If the validity is a [`Validity::Array`], then this wraps it in a `FilterArray` instead of -/// eagerly filtering the values immediately. +/// A helper function that lazily filters a [`Validity`] with selection mask values. fn filter_validity(validity: Validity, mask: &Arc) -> Validity { - match validity { - v @ (Validity::NonNullable | Validity::AllValid | Validity::AllInvalid) => v, - Validity::Array(arr) => { - Validity::Array(FilterArray::new(arr.clone(), values_to_mask(mask)).into_array()) - } - } + validity + .filter(&values_to_mask(mask)) + .vortex_expect("Somehow unable to wrap filter around a validity array") } /// Check for some fast-path execution conditions before calling [`execute_filter`]. diff --git a/vortex-array/src/compute/filter.rs b/vortex-array/src/compute/filter.rs index fe64704c767..f61c5f6fffe 100644 --- a/vortex-array/src/compute/filter.rs +++ b/vortex-array/src/compute/filter.rs @@ -76,7 +76,7 @@ pub(crate) fn warm_up_vtable() -> usize { /// not the case. pub fn filter(array: &dyn Array, mask: &Mask) -> VortexResult { // TODO(connor): Remove this function completely!!! - array.filter(mask.clone()) + Ok(array.filter(mask.clone())?.to_canonical()?.into_array()) } struct Filter; diff --git a/vortex-array/src/patches.rs b/vortex-array/src/patches.rs index e23e93931a2..39e320ba219 100644 --- a/vortex-array/src/patches.rs +++ b/vortex-array/src/patches.rs @@ -40,6 +40,7 @@ use crate::IntoArray; use crate::ToCanonical; use crate::arrays::PrimitiveArray; use crate::compute::cast; +use crate::compute::filter; use crate::compute::is_sorted; use crate::compute::take; use crate::search_sorted::SearchResult; @@ -626,8 +627,8 @@ impl Patches { } // SAFETY: filtering indices/values with same mask maintains their 1:1 relationship - let filtered_indices = self.indices.filter(filter_mask.clone())?; - let filtered_values = self.values.filter(filter_mask)?; + let filtered_indices = filter(&self.indices, &filter_mask)?; + let filtered_values = filter(&self.values, &filter_mask)?; Ok(Some(Self { array_len: self.array_len, @@ -1148,8 +1149,10 @@ fn filter_patches_with_mask( } let new_patch_indices = new_patch_indices.into_array(); - let new_patch_values = - patch_values.filter(Mask::from_indices(patch_values.len(), new_mask_indices))?; + let new_patch_values = filter( + patch_values, + &Mask::from_indices(patch_values.len(), new_mask_indices), + )?; Ok(Some(Patches::new( true_count, diff --git a/vortex-array/src/validity.rs b/vortex-array/src/validity.rs index 1234a7bede1..838fb0e3a2a 100644 --- a/vortex-array/src/validity.rs +++ b/vortex-array/src/validity.rs @@ -181,9 +181,13 @@ impl Validity { } } - /// Keep only the entries for which the mask is true. + /// Lazily filters a [`Validity`] with a selection mask, which keeps only the entries for which + /// the mask is true. /// /// The result has length equal to the number of true values in mask. + /// + /// If the validity is a [`Validity::Array`], then this lazily wraps it in a `FilterArray` + /// instead of eagerly filtering the values immediately. pub fn filter(&self, mask: &Mask) -> VortexResult { // NOTE(ngates): we take the mask as a reference to avoid the caller cloning unnecessarily // if we happen to be NonNullable, AllValid, or AllInvalid. @@ -191,7 +195,13 @@ impl Validity { v @ (Validity::NonNullable | Validity::AllValid | Validity::AllInvalid) => { Ok(v.clone()) } - Validity::Array(arr) => Ok(Validity::Array(arr.filter(mask.clone())?)), + Validity::Array(arr) => Ok(Validity::Array( + arr.filter(mask.clone())? + // TODO(connor): This is wrong!!! We should not be eagerly decompressing the + // validity array. + .to_canonical()? + .into_array(), + )), } } diff --git a/vortex-btrblocks/src/float.rs b/vortex-btrblocks/src/float.rs index 802439f0738..795d2fb0fcb 100644 --- a/vortex-btrblocks/src/float.rs +++ b/vortex-btrblocks/src/float.rs @@ -571,7 +571,7 @@ mod tests { .display_as(DisplayOptions::MetadataOnly) .to_string() .to_lowercase(); - assert_eq!(display, "vortex.primitive(f32?, len=96)"); + assert_eq!(display, "vortex.sparse(f32?, len=96)"); Ok(()) } diff --git a/vortex-btrblocks/src/string.rs b/vortex-btrblocks/src/string.rs index b7ec69769f5..2ee8d036ea9 100644 --- a/vortex-btrblocks/src/string.rs +++ b/vortex-btrblocks/src/string.rs @@ -502,7 +502,7 @@ mod tests { .display_as(DisplayOptions::MetadataOnly) .to_string() .to_lowercase(); - assert_eq!(display, "vortex.varbinview(utf8?, len=100)"); + assert_eq!(display, "vortex.sparse(utf8?, len=100)"); Ok(()) } From 30c927c1facaf8b1935260da25df365a93f53331 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Tue, 27 Jan 2026 00:39:18 +0000 Subject: [PATCH 3/4] allocators Signed-off-by: Onur Satici Signed-off-by: Joe Isaacs --- Cargo.lock | 3 + vortex-array/src/arrays/decimal/vtable/mod.rs | 12 +- .../src/arrays/primitive/vtable/mod.rs | 17 ++- vortex-array/src/buffer.rs | 38 +++++++ vortex-array/src/serde.rs | 21 ++-- vortex-buffer/src/buffer.rs | 5 +- vortex-cuda/Cargo.toml | 1 + vortex-cuda/benches/dict_cuda.rs | 1 + vortex-cuda/src/device_buffer.rs | 27 +++++ vortex-cuda/src/executor.rs | 90 +++------------ vortex-cuda/src/host_to_device_allocator.rs | 67 +++++++++++ vortex-cuda/src/lib.rs | 2 + vortex-cuda/src/session.rs | 14 ++- vortex-cuda/src/stream.rs | 104 ++++++++++++++++++ vortex-file/Cargo.toml | 1 + vortex-file/src/open.rs | 13 ++- vortex-file/src/read/driver.rs | 4 +- vortex-file/src/read/request.rs | 30 +++-- vortex-file/src/segments/source.rs | 10 +- vortex-file/src/tests.rs | 69 +++++++++++- vortex-io/Cargo.toml | 1 + vortex-io/src/file/object_store.rs | 6 +- vortex-io/src/file/std_file.rs | 9 +- vortex-io/src/read.rs | 21 ++-- vortex-io/src/runtime/tests.rs | 41 ++++--- vortex-layout/src/layouts/chunked/reader.rs | 5 +- vortex-layout/src/layouts/flat/reader.rs | 7 ++ vortex-layout/src/layouts/struct_/reader.rs | 9 +- 28 files changed, 464 insertions(+), 164 deletions(-) create mode 100644 vortex-cuda/src/host_to_device_allocator.rs diff --git a/Cargo.lock b/Cargo.lock index 811a8a8a0db..d67fd625d87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10366,6 +10366,7 @@ dependencies = [ "vortex-dtype", "vortex-error", "vortex-fastlanes", + "vortex-io", "vortex-mask", "vortex-nvcomp", "vortex-scalar", @@ -10613,6 +10614,7 @@ dependencies = [ "vortex-array", "vortex-buffer", "vortex-bytebool", + "vortex-cuda", "vortex-datetime-parts", "vortex-decimal-byte-parts", "vortex-dtype", @@ -10713,6 +10715,7 @@ dependencies = [ "tempfile", "tokio", "tracing", + "vortex-array", "vortex-buffer", "vortex-error", "vortex-metrics", diff --git a/vortex-array/src/arrays/decimal/vtable/mod.rs b/vortex-array/src/arrays/decimal/vtable/mod.rs index f253902813f..7ae29978423 100644 --- a/vortex-array/src/arrays/decimal/vtable/mod.rs +++ b/vortex-array/src/arrays/decimal/vtable/mod.rs @@ -107,13 +107,11 @@ impl VTable for DecimalVTable { match_each_decimal_value_type!(metadata.values_type(), |D| { // Check and reinterpret-cast the buffer - if let Some(buffer) = values.as_host_opt() { - vortex_ensure!( - buffer.is_aligned(Alignment::of::()), - "DecimalArray buffer not aligned for values type {:?}", - D::DECIMAL_TYPE - ); - } + vortex_ensure!( + values.alignment().is_aligned_to(Alignment::of::()), + "DecimalArray buffer not aligned for values type {:?}", + D::DECIMAL_TYPE + ); DecimalArray::try_new_handle(values, metadata.values_type(), *decimal_dtype, validity) }) } diff --git a/vortex-array/src/arrays/primitive/vtable/mod.rs b/vortex-array/src/arrays/primitive/vtable/mod.rs index cf702ca3dad..459f24b433e 100644 --- a/vortex-array/src/arrays/primitive/vtable/mod.rs +++ b/vortex-array/src/arrays/primitive/vtable/mod.rs @@ -101,16 +101,13 @@ impl VTable for PrimitiveVTable { ); } - // For host buffers, we eagerly check alignment on construction. - // TODO(aduffy): check for device buffers. CUDA buffers are generally 256-byte aligned, - // but not sure about other devices. - if let Some(host_buf) = buffer.as_host_opt() { - vortex_ensure!( - host_buf.is_aligned(Alignment::new(ptype.byte_width())), - "PrimitiveArray::build: Buffer must be aligned to {}", - ptype.byte_width() - ); - } + vortex_ensure!( + buffer + .alignment() + .is_aligned_to(Alignment::new(ptype.byte_width())), + "PrimitiveArray::build: Buffer must be aligned to {}", + ptype.byte_width() + ); // SAFETY: checked ahead of time unsafe { diff --git a/vortex-array/src/buffer.rs b/vortex-array/src/buffer.rs index cf209b75f80..35cbf5a233b 100644 --- a/vortex-array/src/buffer.rs +++ b/vortex-array/src/buffer.rs @@ -14,6 +14,7 @@ use vortex_buffer::Alignment; use vortex_buffer::ByteBuffer; use vortex_error::VortexExpect; use vortex_error::VortexResult; +use vortex_error::vortex_bail; use vortex_utils::dyn_traits::DynEq; use vortex_utils::dyn_traits::DynHash; @@ -50,6 +51,9 @@ pub trait DeviceBuffer: 'static + Send + Sync + Debug + DynEq + DynHash { /// Returns the length of the buffer in bytes. fn len(&self) -> usize; + /// Returns the alignment of the buffer. + fn alignment(&self) -> Alignment; + /// Returns true if the buffer is empty. fn is_empty(&self) -> bool { self.len() == 0 @@ -130,6 +134,40 @@ impl BufferHandle { } } + /// Returns the alignment of the buffer. + pub fn alignment(&self) -> Alignment { + match &self.0 { + Inner::Host(bytes) => bytes.alignment(), + Inner::Device(device) => device.alignment(), + } + } + + /// Returns true if the buffer is aligned to the given alignment. + pub fn is_aligned(&self, alignment: Alignment) -> bool { + self.alignment().is_aligned_to(alignment) + } + + /// Ensure the buffer satisfies the requested alignment. + /// + /// Host buffers will be copied if necessary. Device buffers will error if the + /// alignment requirement is not met. + pub fn ensure_aligned(&self, alignment: Alignment) -> VortexResult { + match &self.0 { + Inner::Host(buffer) => Ok(BufferHandle::new_host(buffer.clone().aligned(alignment))), + Inner::Device(device) => { + if device.alignment().is_aligned_to(alignment) { + Ok(self.clone()) + } else { + vortex_bail!( + "Device buffer alignment {} does not satisfy required alignment {}", + device.alignment(), + alignment + ); + } + } + } + } + /// Check if the buffer is empty. pub fn is_empty(&self) -> bool { self.len() == 0 diff --git a/vortex-array/src/serde.rs b/vortex-array/src/serde.rs index a3aa423b177..cba1f2fd0df 100644 --- a/vortex-array/src/serde.rs +++ b/vortex-array/src/serde.rs @@ -490,10 +490,9 @@ impl ArrayParts { array_tree: ByteBuffer, segment: BufferHandle, ) -> VortexResult { - // TODO: this can also work with device buffers. - let segment = segment.try_to_host_sync()?; - // We align each buffer individually, so we remove alignment requirements on the buffer. - let segment = segment.aligned(Alignment::none()); + // We align each buffer individually, so we remove alignment requirements on the segment + // for host-resident buffers. Device buffers are sliced directly. + let segment = segment.ensure_aligned(Alignment::none())?; let fb_buffer = FlatBuffer::align_from(array_tree); @@ -504,7 +503,7 @@ impl ArrayParts { let flatbuffer_loc = fb_root._tab.loc(); let mut offset = 0; - let buffers: Arc<[_]> = fb_array + let buffers = fb_array .buffers() .unwrap_or_default() .iter() @@ -515,15 +514,13 @@ impl ArrayParts { let buffer_len = fb_buf.length() as usize; // Extract a buffer and ensure it's aligned, copying if necessary - let buffer = segment - .slice(offset..(offset + buffer_len)) - .aligned(Alignment::from_exponent(fb_buf.alignment_exponent())); - + let buffer = segment.slice(offset..(offset + buffer_len)); + let buffer = buffer + .ensure_aligned(Alignment::from_exponent(fb_buf.alignment_exponent()))?; offset += buffer_len; - BufferHandle::new_host(buffer) + Ok(buffer) }) - .collect(); - + .collect::>>()?; (flatbuffer_loc, buffers) }; diff --git a/vortex-buffer/src/buffer.rs b/vortex-buffer/src/buffer.rs index ebc435ecca8..f941db6dbec 100644 --- a/vortex-buffer/src/buffer.rs +++ b/vortex-buffer/src/buffer.rs @@ -319,7 +319,10 @@ impl Buffer { let end_byte = end * size_of::(); if !begin_byte.is_multiple_of(*alignment) { - vortex_panic!("range start must be aligned to {alignment:?}"); + vortex_panic!( + "range start must be aligned to {alignment:?}, byte {}", + begin_byte + ); } if !alignment.is_aligned_to(Alignment::of::()) { vortex_panic!("Slice alignment must at least align to type T") diff --git a/vortex-cuda/Cargo.toml b/vortex-cuda/Cargo.toml index a5dcb380c81..ae66521c2d3 100644 --- a/vortex-cuda/Cargo.toml +++ b/vortex-cuda/Cargo.toml @@ -39,6 +39,7 @@ vortex-error = { workspace = true } vortex-fastlanes = { workspace = true } vortex-mask = { workspace = true } vortex-nvcomp = { path = "nvcomp" } +vortex-io = { workspace = true } vortex-session = { workspace = true } vortex-utils = { workspace = true } vortex-zigzag = { workspace = true } diff --git a/vortex-cuda/benches/dict_cuda.rs b/vortex-cuda/benches/dict_cuda.rs index 0142cb8842d..33dd123eb2f 100644 --- a/vortex-cuda/benches/dict_cuda.rs +++ b/vortex-cuda/benches/dict_cuda.rs @@ -17,6 +17,7 @@ use vortex_array::IntoArray; use vortex_array::arrays::DictArray; use vortex_array::arrays::PrimitiveArray; use vortex_array::validity::Validity::NonNullable; +use vortex_buffer::Alignment; use vortex_buffer::Buffer; use vortex_cuda::CudaBufferExt; use vortex_cuda::CudaDeviceBuffer; diff --git a/vortex-cuda/src/device_buffer.rs b/vortex-cuda/src/device_buffer.rs index 2cc6517324d..1bc327c712c 100644 --- a/vortex-cuda/src/device_buffer.rs +++ b/vortex-cuda/src/device_buffer.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::cmp::min; use std::fmt::Debug; use std::ops::Range; use std::sync::Arc; @@ -16,6 +17,7 @@ use vortex_array::buffer::DeviceBuffer; use vortex_buffer::Alignment; use vortex_buffer::BufferMut; use vortex_buffer::ByteBuffer; +use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_err; @@ -27,11 +29,17 @@ pub struct CudaDeviceBuffer { offset: usize, len: usize, device_ptr: u64, + alignment: Alignment, } impl CudaDeviceBuffer { /// Creates a new CUDA device buffer from a [`CudaSlice`]. pub fn new(cuda_slice: CudaSlice) -> Self { + Self::new_aligned(cuda_slice, Alignment::of::()) + } + + pub fn new_aligned(cuda_slice: CudaSlice, alignment: Alignment) -> Self { + assert!(alignment.is_aligned_to(Alignment::of::())); let len = cuda_slice.len(); let device_ptr = cuda_slice.device_ptr(cuda_slice.stream()).0; @@ -40,6 +48,7 @@ impl CudaDeviceBuffer { offset: 0, len, device_ptr, + alignment, } } @@ -109,6 +118,10 @@ impl DeviceBuffer for CudaDeviceBuffer self.len * size_of::() } + fn alignment(&self) -> Alignment { + self.alignment + } + /// Synchronous copy of CUDA device to host memory. /// /// The copy is not started before other operations on the streams are completed. @@ -185,6 +198,19 @@ impl DeviceBuffer for CudaDeviceBuffer fn slice(&self, range: Range) -> Arc { let new_offset = self.offset + range.start; let new_len = range.end - range.start; + let byte_offset = new_offset * size_of::(); + let alignment = if byte_offset == 0 { + self.alignment + } else { + // TODO(joe): self.alignment is an under approx + min( + self.alignment, + Alignment::from_exponent( + u8::try_from((self.device_ptr + byte_offset as u64).trailing_zeros()) + .vortex_expect("impossible"), + ), + ) + }; assert!( range.end <= self.len, @@ -198,6 +224,7 @@ impl DeviceBuffer for CudaDeviceBuffer offset: new_offset, len: new_len, device_ptr: self.device_ptr, + alignment, }) } diff --git a/vortex-cuda/src/executor.rs b/vortex-cuda/src/executor.rs index 63b2af86675..6f2f6b5e921 100644 --- a/vortex-cuda/src/executor.rs +++ b/vortex-cuda/src/executor.rs @@ -10,25 +10,21 @@ use cudarc::driver::CudaEvent; use cudarc::driver::CudaFunction; use cudarc::driver::CudaSlice; use cudarc::driver::CudaStream; -use cudarc::driver::DevicePtrMut; use cudarc::driver::DeviceRepr; use cudarc::driver::LaunchArgs; -use cudarc::driver::result::memcpy_htod_async; use futures::future::BoxFuture; use vortex_array::Array; use vortex_array::ArrayRef; use vortex_array::Canonical; use vortex_array::ExecutionCtx; use vortex_array::buffer::BufferHandle; -use vortex_buffer::Buffer; use vortex_dtype::PType; use vortex_error::VortexResult; use vortex_error::vortex_err; -use crate::CudaDeviceBuffer; use crate::CudaSession; use crate::session::CudaSessionExt; -use crate::stream::await_stream_callback; +use crate::stream::VortexCudaStream; /// CUDA kernel events recorded before and after kernel launch. #[derive(Debug)] @@ -53,14 +49,14 @@ impl CudaKernelEvents { /// Provides access to the CUDA context and stream for kernel execution. /// Handles memory allocation and data transfers between host and device. pub struct CudaExecutionCtx { - stream: Arc, + stream: VortexCudaStream, ctx: ExecutionCtx, cuda_session: CudaSession, } impl CudaExecutionCtx { /// Creates a new CUDA execution context. - pub(crate) fn new(stream: Arc, ctx: ExecutionCtx) -> Self { + pub(crate) fn new(stream: VortexCudaStream, ctx: ExecutionCtx) -> Self { let cuda_session = ctx.session().cuda_session().clone(); Self { stream, @@ -69,24 +65,6 @@ impl CudaExecutionCtx { } } - /// Allocates a typed buffer on the GPU. - /// - /// Note: Allocation is async in case the CUDA driver supports this. - /// - /// The condition for alloc to be async is support for memory pools: - /// `CU_DEVICE_ATTRIBUTE_MEMORY_POOLS_SUPPORTED`. - /// - /// Any kernel submitted to the stream after alloc can safely use the - /// memory, as operations on the stream are ordered sequentially. - pub fn device_alloc(&self, len: usize) -> VortexResult> { - // SAFETY: No safety guarantees for allocations on the GPU. - unsafe { - self.stream - .alloc::(len) - .map_err(|e| vortex_err!("Failed to allocate device memory: {}", e)) - } - } - /// Loads a CUDA kernel function by module name and ptype(s). /// /// # Arguments @@ -143,22 +121,15 @@ impl CudaExecutionCtx { /// /// * `func` - CUDA kernel function to launch pub fn launch_builder<'a>(&'a self, func: &'a CudaFunction) -> LaunchArgs<'a> { - self.stream.launch_builder(func) + self.stream.0.launch_builder(func) } - /// Copies host data to the device asynchronously. - /// - /// Allocates device memory, schedules an async copy, and returns a future - /// that completes when the copy is finished. The source data is moved into - /// the future to ensure it remains valid until the copy completes. - /// - /// # Arguments - /// - /// * `data` - The host data to copy. - /// - /// # Returns - /// - /// A future that resolves to the device buffer handle when the copy completes. + /// See `VortexCudaStream::device_alloc`. + pub fn device_alloc(&self, len: usize) -> VortexResult> { + self.stream.device_alloc(len) + } + + /// See `VortexCudaStream::copy_to_device`. pub fn copy_to_device( &self, data: D, @@ -167,52 +138,20 @@ impl CudaExecutionCtx { T: DeviceRepr + Send + Sync + 'static, D: AsRef<[T]> + Send + 'static, { - let host_slice: &[T] = data.as_ref(); - let mut cuda_slice: CudaSlice = self.device_alloc(host_slice.len())?; - let device_ptr = cuda_slice.device_ptr_mut(&self.stream).0; - - unsafe { - memcpy_htod_async(device_ptr, host_slice, self.stream.cu_stream()) - .map_err(|e| vortex_err!("Failed to schedule async copy to device: {}", e))?; - } - - let cuda_buf = CudaDeviceBuffer::new(cuda_slice); - let stream = Arc::clone(&self.stream); - - Ok(Box::pin(async move { - await_stream_callback(&stream).await?; - - // Keep source memory alive until copy completes. - let _keep_alive = data; - - Ok(BufferHandle::new_device(Arc::new(cuda_buf))) - })) + self.stream.copy_to_device(data) } - /// Moves a host buffer handle to the device asynchronously. - /// - /// # Arguments - /// - /// * `handle` - The host buffer to move. Must be a host buffer. - /// - /// # Returns - /// - /// A future that resolves to the device buffer handle when the copy completes. + /// See `VortexCudaStream::move_to_device`. pub fn move_to_device( &self, handle: BufferHandle, ) -> VortexResult>> { - let host_buffer = handle - .as_host_opt() - .ok_or_else(|| vortex_err!("Buffer is not on host"))?; - - let buffer: Buffer = Buffer::from_byte_buffer(host_buffer.clone()); - self.copy_to_device(buffer) + self.stream.move_to_device::(handle) } /// Returns a reference to the underlying CUDA stream. pub fn stream(&self) -> &Arc { - &self.stream + &self.stream.0 } } @@ -266,6 +205,7 @@ impl CudaArrayExt for ArrayRef { impl CudaExecutionCtx { pub fn synchronize_stream(&self) -> VortexResult<()> { self.stream + .0 .synchronize() .map_err(|e| vortex_err!("cuda error: {e}")) } diff --git a/vortex-cuda/src/host_to_device_allocator.rs b/vortex-cuda/src/host_to_device_allocator.rs new file mode 100644 index 00000000000..4c15c5a5b87 --- /dev/null +++ b/vortex-cuda/src/host_to_device_allocator.rs @@ -0,0 +1,67 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::Arc; + +use futures::FutureExt; +use futures::future::BoxFuture; +use vortex_array::buffer::BufferHandle; +use vortex_buffer::Alignment; +use vortex_error::VortexResult; +use vortex_io::CoalesceConfig; +use vortex_io::VortexReadAt; + +use crate::stream::VortexCudaStream; + +/// A wrapper that uses an allocator to produce the returned buffer handle. +#[derive(Clone)] +pub struct CopyDeviceReadAt { + read: T, + stream: VortexCudaStream, +} + +impl CopyDeviceReadAt { + pub fn new(read: T, stream: VortexCudaStream) -> Self { + Self { read, stream } + } +} + +impl VortexReadAt for CopyDeviceReadAt { + fn uri(&self) -> Option<&Arc> { + self.read.uri() + } + + fn coalesce_config(&self) -> Option { + self.read.coalesce_config() + } + + fn concurrency(&self) -> usize { + self.read.concurrency() + } + + fn size(&self) -> BoxFuture<'static, VortexResult> { + self.read.size() + } + + fn read_at( + &self, + offset: u64, + length: usize, + alignment: Alignment, + ) -> BoxFuture<'static, VortexResult> { + println!("read at cuda"); + let read = self.read.clone(); + let stream = self.stream.clone(); + async move { + let handle = read.read_at(offset, length, alignment).await?; + if handle.is_on_device() { + return Ok(handle); + } + + let host_buffer = handle.as_host().clone(); + + stream.copy_to_device(host_buffer)?.await + } + .boxed() + } +} diff --git a/vortex-cuda/src/lib.rs b/vortex-cuda/src/lib.rs index dca0b1aa335..30df645908a 100644 --- a/vortex-cuda/src/lib.rs +++ b/vortex-cuda/src/lib.rs @@ -8,6 +8,7 @@ use std::process::Command; mod canonical; mod device_buffer; pub mod executor; +mod host_to_device_allocator; mod kernel; mod session; mod stream; @@ -17,6 +18,7 @@ pub use device_buffer::CudaBufferExt; pub use device_buffer::CudaDeviceBuffer; pub use executor::CudaExecutionCtx; pub use executor::CudaKernelEvents; +pub use host_to_device_allocator::CopyDeviceReadAt; use kernel::ALPExecutor; use kernel::BitPackedExecutor; use kernel::DecimalBytePartsExecutor; diff --git a/vortex-cuda/src/session.rs b/vortex-cuda/src/session.rs index c83128def3e..8fc0f1e1292 100644 --- a/vortex-cuda/src/session.rs +++ b/vortex-cuda/src/session.rs @@ -16,6 +16,7 @@ use vortex_utils::aliases::dash_map::DashMap; use crate::executor::CudaExecute; pub use crate::executor::CudaExecutionCtx; use crate::kernel::KernelLoader; +use crate::stream::VortexCudaStream; /// CUDA session for GPU accelerated execution. /// @@ -42,17 +43,20 @@ impl CudaSession { pub fn create_execution_ctx( vortex_session: &vortex_session::VortexSession, ) -> VortexResult { - let stream = vortex_session - .cuda_session() - .context - .new_stream() - .map_err(|e| vortex_err!("Failed to create CUDA stream: {}", e))?; + let stream = vortex_session.cuda_session().new_stream()?; Ok(CudaExecutionCtx::new( stream, vortex_session.create_execution_ctx(), )) } + /// Create a new CUDA stream. + pub fn new_stream(&self) -> VortexResult { + Ok(VortexCudaStream(self.context.new_stream().map_err( + |e| vortex_err!("Failed to create CUDA stream: {}", e), + )?)) + } + /// Registers CUDA support for an array encoding. /// /// # Arguments diff --git a/vortex-cuda/src/stream.rs b/vortex-cuda/src/stream.rs index ba1f264ee60..e3080fe6271 100644 --- a/vortex-cuda/src/stream.rs +++ b/vortex-cuda/src/stream.rs @@ -3,12 +3,116 @@ //! CUDA stream utility functions. +use std::sync::Arc; + +use cudarc::driver::CudaSlice; use cudarc::driver::CudaStream; +use cudarc::driver::DevicePtrMut; +use cudarc::driver::DeviceRepr; +use cudarc::driver::result::memcpy_htod_async; use cudarc::driver::result::stream; +use futures::future::BoxFuture; use kanal::Sender; +use vortex_array::buffer::BufferHandle; +use vortex_buffer::Alignment; +use vortex_buffer::Buffer; +use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_err; +use crate::CudaDeviceBuffer; + +#[derive(Clone)] +pub struct VortexCudaStream(pub Arc); + +impl VortexCudaStream { + /// Allocates a typed buffer on the GPU. + /// + /// Note: Allocation is async in case the CUDA driver supports this. + /// + /// The condition for alloc to be async is support for memory pools: + /// `CU_DEVICE_ATTRIBUTE_MEMORY_POOLS_SUPPORTED`. + /// + /// Any kernel submitted to the stream after alloc can safely use the + /// memory, as operations on the stream are ordered sequentially. + pub fn device_alloc(&self, len: usize) -> VortexResult> { + // SAFETY: No safety guarantees for allocations on the GPU. + unsafe { + self.0 + .alloc::(len) + .map_err(|e| vortex_err!("Failed to allocate device memory: {}", e)) + } + } + + /// Copies host data to the device asynchronously. + /// + /// Allocates device memory, schedules an async copy, and returns a future + /// that completes when the copy is finished. The source data is moved into + /// the future to ensure it remains valid until the copy completes. + /// + /// # Arguments + /// + /// * `data` - The host data to copy. + /// + /// # Returns + /// + /// A future that resolves to the device buffer handle when the copy completes. + pub fn copy_to_device( + &self, + data: D, + ) -> VortexResult>> + where + T: DeviceRepr + Send + Sync + 'static, + D: AsRef<[T]> + Send + 'static, + { + let host_slice: &[T] = data.as_ref(); + let mut cuda_slice: CudaSlice = self.device_alloc(host_slice.len())?; + let device_ptr = cuda_slice.device_ptr_mut(&self.0).0; + + unsafe { + memcpy_htod_async(device_ptr, host_slice, self.0.cu_stream()) + .map_err(|e| vortex_err!("Failed to schedule async copy to device: {}", e))?; + } + + println!( + "dev {}", + u8::try_from(device_ptr.trailing_zeros()).vortex_expect("aligment over 2^2^8??") + ); + let cuda_buf = CudaDeviceBuffer::new_aligned(cuda_slice, Alignment::new(256)); + let stream = Arc::clone(&self.0); + + Ok(Box::pin(async move { + await_stream_callback(&stream).await?; + + // Keep source memory alive until copy completes. + let _keep_alive = data; + + Ok(BufferHandle::new_device(Arc::new(cuda_buf))) + })) + } + + /// Moves a host buffer handle to the device asynchronously. + /// + /// # Arguments + /// + /// * `handle` - The host buffer to move. Must be a host buffer. + /// + /// # Returns + /// + /// A future that resolves to the device buffer handle when the copy completes. + pub fn move_to_device( + &self, + handle: BufferHandle, + ) -> VortexResult>> { + let host_buffer = handle + .as_host_opt() + .ok_or_else(|| vortex_err!("Buffer is not on host"))?; + + let buffer: Buffer = Buffer::from_byte_buffer(host_buffer.clone()); + self.copy_to_device(buffer) + } +} + /// Registers a callback and asynchronously waits for its completion. /// /// This function can be used to asynchronously wait for events previously diff --git a/vortex-file/Cargo.toml b/vortex-file/Cargo.toml index 011cbac39bb..7f9fefd2bd9 100644 --- a/vortex-file/Cargo.toml +++ b/vortex-file/Cargo.toml @@ -35,6 +35,7 @@ vortex-alp = { workspace = true } vortex-array = { workspace = true } vortex-buffer = { workspace = true } vortex-bytebool = { workspace = true } +vortex-cuda = { workspace = true } vortex-datetime-parts = { workspace = true } vortex-decimal-byte-parts = { workspace = true } vortex-dtype = { workspace = true } diff --git a/vortex-file/src/open.rs b/vortex-file/src/open.rs index af507f86278..315295cafef 100644 --- a/vortex-file/src/open.rs +++ b/vortex-file/src/open.rs @@ -12,6 +12,7 @@ use vortex_dtype::DType; use vortex_error::VortexError; use vortex_error::VortexExpect; use vortex_error::VortexResult; +use vortex_io::InstrumentedReadAt; use vortex_io::VortexReadAt; use vortex_io::session::RuntimeSessionExt; use vortex_layout::segments::NoOpSegmentCache; @@ -157,6 +158,7 @@ impl VortexOpenOptions { /// An API for opening a [`VortexFile`] using any [`VortexReadAt`] implementation. pub async fn open_read(self, reader: R) -> VortexResult { let metrics = self.metrics.clone().unwrap_or_default(); + let reader = InstrumentedReadAt::new(reader, &metrics); let footer = if let Some(footer) = self.footer { footer } else { @@ -209,7 +211,8 @@ impl VortexOpenOptions { let initial_offset = file_size - initial_read_size as u64; let initial_read: ByteBuffer = read .read_at(initial_offset, initial_read_size, Alignment::none()) - .await?; + .await? + .try_into_host_sync()?; let mut deserializer = Footer::deserializer(initial_read, self.session.clone()) .with_size(file_size) @@ -218,7 +221,10 @@ impl VortexOpenOptions { let footer = loop { match deserializer.deserialize()? { DeserializeStep::NeedMoreData { offset, len } => { - let more_data = read.read_at(offset, len, Alignment::none()).await?; + let more_data = read + .read_at(offset, len, Alignment::none()) + .await? + .try_into_host_sync()?; deserializer.prefix_data(more_data); } DeserializeStep::NeedFileSize => unreachable!("We passed file_size above"), @@ -287,6 +293,7 @@ mod tests { use futures::future::BoxFuture; use vortex_array::IntoArray; + use vortex_array::buffer::BufferHandle; use vortex_array::expr::session::ExprSession; use vortex_array::session::ArraySession; use vortex_buffer::Buffer; @@ -315,7 +322,7 @@ mod tests { offset: u64, length: usize, alignment: Alignment, - ) -> BoxFuture<'static, VortexResult> { + ) -> BoxFuture<'static, VortexResult> { self.total_read.fetch_add(length, Ordering::Relaxed); let _ = self.first_read_len.compare_exchange( 0, diff --git a/vortex-file/src/read/driver.rs b/vortex-file/src/read/driver.rs index f097385445e..18cdfea6a70 100644 --- a/vortex-file/src/read/driver.rs +++ b/vortex-file/src/read/driver.rs @@ -326,7 +326,7 @@ mod tests { use futures::StreamExt; use futures::stream; use vortex_buffer::Alignment; - use vortex_buffer::ByteBuffer; + use vortex_array::buffer::BufferHandle; use vortex_error::VortexResult; use super::*; @@ -336,7 +336,7 @@ mod tests { id: usize, offset: u64, length: usize, - ) -> (ReadRequest, oneshot::Receiver>) { + ) -> (ReadRequest, oneshot::Receiver>) { let (tx, rx) = oneshot::channel(); ( ReadRequest { diff --git a/vortex-file/src/read/request.rs b/vortex-file/src/read/request.rs index 256cb95851d..cdd71670070 100644 --- a/vortex-file/src/read/request.rs +++ b/vortex-file/src/read/request.rs @@ -7,8 +7,8 @@ use std::fmt::Formatter; use std::ops::Range; use std::sync::Arc; +use vortex_array::buffer::BufferHandle; use vortex_buffer::Alignment; -use vortex_buffer::ByteBuffer; use vortex_error::VortexError; use vortex_error::VortexExpect; use vortex_error::VortexResult; @@ -51,7 +51,7 @@ impl IoRequest { } /// Resolves the request with the given result. - pub fn resolve(self, result: VortexResult) { + pub fn resolve(self, result: VortexResult) { match self.0 { IoRequestInner::Single(req) => req.resolve(result), IoRequestInner::Coalesced(req) => req.resolve(result), @@ -90,7 +90,7 @@ pub struct ReadRequest { pub(crate) offset: u64, pub(crate) length: usize, pub(crate) alignment: Alignment, - pub(crate) callback: oneshot::Sender>, + pub(crate) callback: oneshot::Sender>, } impl Debug for ReadRequest { @@ -106,7 +106,7 @@ impl Debug for ReadRequest { } impl ReadRequest { - pub(crate) fn resolve(self, result: VortexResult) { + pub(crate) fn resolve(self, result: VortexResult) { if let Err(e) = self.callback.send(result) { tracing::debug!("ReadRequest {} dropped before resolving: {e}", self.id); } @@ -132,15 +132,31 @@ impl Debug for CoalescedRequest { } impl CoalescedRequest { - pub fn resolve(self, result: VortexResult) { + pub fn resolve(self, result: VortexResult) { match result { Ok(buffer) => { - let buffer = buffer.aligned(Alignment::none()); + let base = match buffer.ensure_aligned(Alignment::none()) { + Ok(base) => base, + Err(e) => { + let e = Arc::new(e); + for req in self.requests.into_iter() { + req.resolve(Err(VortexError::from(e.clone()))); + } + return; + } + }; + for req in self.requests.into_iter() { let start = usize::try_from(req.offset - self.range.start) .vortex_expect("invalid offset"); let end = start + req.length; - let slice = buffer.slice(start..end).aligned(req.alignment); + let slice = match base.slice(start..end).ensure_aligned(req.alignment) { + Ok(slice) => slice, + Err(e) => { + req.resolve(Err(e)); + continue; + } + }; req.resolve(Ok(slice)); } } diff --git a/vortex-file/src/segments/source.rs b/vortex-file/src/segments/source.rs index a1072af9998..344f805f516 100644 --- a/vortex-file/src/segments/source.rs +++ b/vortex-file/src/segments/source.rs @@ -14,7 +14,6 @@ use futures::StreamExt; use futures::channel::mpsc; use vortex_array::buffer::BufferHandle; use vortex_buffer::Alignment; -use vortex_buffer::ByteBuffer; use vortex_error::VortexResult; use vortex_error::vortex_err; use vortex_io::VortexReadAt; @@ -99,9 +98,9 @@ impl FileSegmentSource { stream .map(move |req| { - let source = reader.clone(); + let reader = reader.clone(); async move { - let result = source + let result = reader .read_at(req.offset(), req.len(), req.alignment()) .await; req.resolve(result); @@ -162,7 +161,6 @@ impl SegmentSource for FileSegmentSource { maybe_fut .ok_or_else(|| vortex_err!("Missing segment: {}", id))? .await - .map(BufferHandle::new_host) } .boxed() } @@ -174,13 +172,13 @@ impl SegmentSource for FileSegmentSource { /// If dropped, the read request will be canceled where possible. struct ReadFuture { id: usize, - recv: oneshot::Receiver>, + recv: oneshot::Receiver>, polled: bool, events: mpsc::UnboundedSender, } impl Future for ReadFuture { - type Output = VortexResult; + type Output = VortexResult; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { if !self.polled { diff --git a/vortex-file/src/tests.rs b/vortex-file/src/tests.rs index a43102b7d3c..e75301cc44e 100644 --- a/vortex-file/src/tests.rs +++ b/vortex-file/src/tests.rs @@ -50,6 +50,11 @@ use vortex_array::validity::Validity; use vortex_buffer::Buffer; use vortex_buffer::ByteBufferMut; use vortex_buffer::buffer; +use vortex_cuda::CanonicalCudaExt; +use vortex_cuda::CopyDeviceReadAt; +use vortex_cuda::CudaSession; +use vortex_cuda::CudaSessionExt; +use vortex_cuda::executor::CudaArrayExt; use vortex_dtype::DType; use vortex_dtype::DecimalDType; use vortex_dtype::ExtDType; @@ -61,7 +66,9 @@ use vortex_dtype::datetime::TIMESTAMP_ID; use vortex_dtype::datetime::TemporalMetadata; use vortex_dtype::datetime::TimeUnit; use vortex_error::VortexResult; +use vortex_io::file::std_file::FileReadAdapter; use vortex_io::session::RuntimeSession; +use vortex_io::session::RuntimeSessionExt; use vortex_layout::session::LayoutSession; use vortex_metrics::VortexMetrics; use vortex_scalar::Scalar; @@ -1635,8 +1642,66 @@ async fn main_test() -> Result<(), Box> { let results = stream.try_collect::>().await; - let err = results.err().unwrap(); - println!("Expected error: {}", err); + assert!(results.is_err()); + + Ok(()) +} + +#[tokio::test] +async fn gpu_scan() -> VortexResult<()> { + use vortex_alp::alp_encode; + + // Create an ALP-encoded array from primitive f64 values + let primitive = PrimitiveArray::from_iter((0..100).map(|i| i as f64 * 1.1)); + let alp_array = alp_encode(&primitive, None)?; + + println!("alp {}", alp_array.display_tree()); + + // Write to a buffer, then to a temp file + let temp_path = std::env::temp_dir().join("gpu_scan_test.vortex"); + let mut buf = Vec::new(); + SESSION + .write_options() + .write(&mut buf, alp_array.to_array_stream()) + .await?; + std::fs::write(&temp_path, &buf)?; + + // Read back via GPU + let handle = SESSION.handle(); + let source = Arc::new(FileReadAdapter::open(&temp_path, handle)?); + let gpu_reader = CopyDeviceReadAt::new(source.clone(), SESSION.cuda_session().new_stream()?); + let cpu_reader = source; + + let cpu_file = SESSION.open_options().open_read(cpu_reader).await?; + + let gpu_file = SESSION + .open_options() + .with_footer(cpu_file.footer) + .open_read(gpu_reader) + .await?; + + let mut cuda_ctx = CudaSession::create_execution_ctx(&SESSION)?; + + let mut res = Vec::new(); + let mut stream = gpu_file.scan()?.into_array_stream()?; + while let Some(a) = stream.next().await { + let a = a?; + // println!("arr {}", a.display_tree()); + let array = a + .execute_cuda(&mut cuda_ctx) + .await? + .into_host() + .await? + .into_array(); + res.push(array); + } + + for a in res { + println!("a {} ", a.display_tree()) + } + + // Cleanup + std::fs::remove_file(&temp_path)?; Ok(()) } diff --git a/vortex-io/Cargo.toml b/vortex-io/Cargo.toml index cef1c69e351..5a98bd28528 100644 --- a/vortex-io/Cargo.toml +++ b/vortex-io/Cargo.toml @@ -35,6 +35,7 @@ handle = "1.0.2" tokio = { workspace = true, features = ["io-util", "rt", "sync"] } tracing = { workspace = true } vortex-buffer = { workspace = true } +vortex-array = { workspace = true } vortex-error = { workspace = true } vortex-metrics = { workspace = true } vortex-session = { workspace = true } diff --git a/vortex-io/src/file/object_store.rs b/vortex-io/src/file/object_store.rs index 0d09cbdcd2b..80c8a9343fe 100644 --- a/vortex-io/src/file/object_store.rs +++ b/vortex-io/src/file/object_store.rs @@ -13,8 +13,8 @@ use object_store::GetRange; use object_store::GetResultPayload; use object_store::ObjectStore; use object_store::path::Path as ObjectPath; +use vortex_array::buffer::BufferHandle; use vortex_buffer::Alignment; -use vortex_buffer::ByteBuffer; use vortex_buffer::ByteBufferMut; use vortex_error::VortexError; use vortex_error::VortexResult; @@ -108,7 +108,7 @@ impl VortexReadAt for ObjectStoreSource { offset: u64, length: usize, alignment: Alignment, - ) -> BoxFuture<'static, VortexResult> { + ) -> BoxFuture<'static, VortexResult> { let store = self.store.clone(); let path = self.path.clone(); let handle = self.handle.clone(); @@ -161,7 +161,7 @@ impl VortexReadAt for ObjectStoreSource { } }; - Ok(buffer.freeze()) + Ok(BufferHandle::new_host(buffer.freeze())) }) .boxed() } diff --git a/vortex-io/src/file/std_file.rs b/vortex-io/src/file/std_file.rs index 56abd56eb60..77417aea659 100644 --- a/vortex-io/src/file/std_file.rs +++ b/vortex-io/src/file/std_file.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use std::fs::File; +use std::io; #[cfg(all(not(unix), not(windows)))] use std::io::Read; #[cfg(all(not(unix), not(windows)))] @@ -15,8 +16,8 @@ use std::sync::Arc; use futures::FutureExt; use futures::future::BoxFuture; +use vortex_array::buffer::BufferHandle; use vortex_buffer::Alignment; -use vortex_buffer::ByteBuffer; use vortex_buffer::ByteBufferMut; use vortex_error::VortexResult; @@ -27,7 +28,7 @@ use crate::runtime::Handle; /// Read exactly `buffer.len()` bytes from `file` starting at `offset`. /// This is a platform-specific helper that uses the most efficient method available. #[cfg(not(target_arch = "wasm32"))] -pub(crate) fn read_exact_at(file: &File, buffer: &mut [u8], offset: u64) -> std::io::Result<()> { +pub(crate) fn read_exact_at(file: &File, buffer: &mut [u8], offset: u64) -> io::Result<()> { #[cfg(unix)] { file.read_exact_at(buffer, offset) @@ -107,7 +108,7 @@ impl VortexReadAt for FileReadAdapter { offset: u64, length: usize, alignment: Alignment, - ) -> BoxFuture<'static, VortexResult> { + ) -> BoxFuture<'static, VortexResult> { let file = self.file.clone(); let handle = self.handle.clone(); async move { @@ -116,7 +117,7 @@ impl VortexReadAt for FileReadAdapter { let mut buffer = ByteBufferMut::with_capacity_aligned(length, alignment); unsafe { buffer.set_len(length) }; read_exact_at(&file, &mut buffer, offset)?; - Ok(buffer.freeze()) + Ok(BufferHandle::new_host(buffer.freeze())) }) .await } diff --git a/vortex-io/src/read.rs b/vortex-io/src/read.rs index fbcbd697d45..2cbdf017f9e 100644 --- a/vortex-io/src/read.rs +++ b/vortex-io/src/read.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use futures::FutureExt; use futures::future::BoxFuture; +use vortex_array::buffer::BufferHandle; use vortex_buffer::Alignment; use vortex_buffer::ByteBuffer; use vortex_error::VortexExpect; @@ -71,7 +72,7 @@ pub trait VortexReadAt: Send + Sync + 'static { /// Asynchronously get the number of bytes of the underlying source. fn size(&self) -> BoxFuture<'static, VortexResult>; - /// Request an asynchronous positional read. Results will be returned as a [`ByteBuffer`]. + /// Request an asynchronous positional read. Results will be returned as a [`BufferHandle`]. /// /// If the reader does not have the requested number of bytes, the returned Future will complete /// with an [`UnexpectedEof`][std::io::ErrorKind::UnexpectedEof] error. @@ -80,7 +81,7 @@ pub trait VortexReadAt: Send + Sync + 'static { offset: u64, length: usize, alignment: Alignment, - ) -> BoxFuture<'static, VortexResult>; + ) -> BoxFuture<'static, VortexResult>; } impl VortexReadAt for Arc { @@ -105,7 +106,7 @@ impl VortexReadAt for Arc { offset: u64, length: usize, alignment: Alignment, - ) -> BoxFuture<'static, VortexResult> { + ) -> BoxFuture<'static, VortexResult> { self.as_ref().read_at(offset, length, alignment) } } @@ -132,7 +133,7 @@ impl VortexReadAt for Arc { offset: u64, length: usize, alignment: Alignment, - ) -> BoxFuture<'static, VortexResult> { + ) -> BoxFuture<'static, VortexResult> { self.as_ref().read_at(offset, length, alignment) } @@ -158,7 +159,7 @@ impl VortexReadAt for ByteBuffer { offset: u64, length: usize, alignment: Alignment, - ) -> BoxFuture<'static, VortexResult> { + ) -> BoxFuture<'static, VortexResult> { let buffer = self.clone(); async move { let start = usize::try_from(offset).vortex_expect("start too big for usize"); @@ -172,7 +173,9 @@ impl VortexReadAt for ByteBuffer { buffer.len() ); } - Ok(buffer.slice_unaligned(start..end).aligned(alignment)) + Ok(BufferHandle::new_host( + buffer.slice_unaligned(start..end).aligned(alignment), + )) } .boxed() } @@ -247,7 +250,7 @@ impl VortexReadAt for InstrumentedReadAt { offset: u64, length: usize, alignment: Alignment, - ) -> BoxFuture<'static, VortexResult> { + ) -> BoxFuture<'static, VortexResult> { let durations = self.durations.clone(); let sizes = self.sizes.clone(); let total_size = self.total_size.clone(); @@ -291,7 +294,7 @@ mod tests { let data = ByteBuffer::from(vec![1, 2, 3, 4, 5]); let result = data.read_at(1, 3, Alignment::none()).await.unwrap(); - assert_eq!(result.as_ref(), &[2, 3, 4]); + assert_eq!(result.to_host_sync().as_ref(), &[2, 3, 4]); } #[tokio::test] @@ -307,7 +310,7 @@ mod tests { let data = Arc::new(ByteBuffer::from(vec![1, 2, 3, 4, 5])); let result = data.read_at(2, 3, Alignment::none()).await.unwrap(); - assert_eq!(result.as_ref(), &[3, 4, 5]); + assert_eq!(result.to_host_sync().as_ref(), &[3, 4, 5]); let size = data.size().await.unwrap(); assert_eq!(size, 5); diff --git a/vortex-io/src/runtime/tests.rs b/vortex-io/src/runtime/tests.rs index 10832633983..928fb476406 100644 --- a/vortex-io/src/runtime/tests.rs +++ b/vortex-io/src/runtime/tests.rs @@ -11,6 +11,7 @@ use std::sync::atomic::Ordering; use futures::FutureExt; use futures::future::BoxFuture; use tempfile::NamedTempFile; +use vortex_array::buffer::BufferHandle; use vortex_buffer::Alignment; use vortex_buffer::ByteBuffer; use vortex_buffer::ByteBufferMut; @@ -42,7 +43,7 @@ fn test_file_read_with_single_thread_runtime() { .await .unwrap(); assert_eq!( - result.as_slice(), + result.to_host_sync().as_slice(), &TEST_DATA[TEST_OFFSET as usize..][..TEST_LEN] ); @@ -51,7 +52,7 @@ fn test_file_read_with_single_thread_runtime() { .read_at(0, TEST_DATA.len(), Alignment::new(1)) .await .unwrap(); - assert_eq!(full.as_slice(), TEST_DATA); + assert_eq!(full.to_host_sync().as_slice(), TEST_DATA); "success" } @@ -70,7 +71,7 @@ async fn test_file_read_with_tokio_runtime() { .await .unwrap(); assert_eq!( - result.as_slice(), + result.to_host_sync().as_slice(), &TEST_DATA[TEST_OFFSET as usize..][..TEST_LEN] ); @@ -79,7 +80,7 @@ async fn test_file_read_with_tokio_runtime() { .read_at(0, TEST_DATA.len(), Alignment::new(1)) .await .unwrap(); - assert_eq!(full.as_slice(), TEST_DATA); + assert_eq!(full.to_host_sync().as_slice(), TEST_DATA); } // ============================================================================ @@ -107,7 +108,7 @@ fn test_file_read_with_real_file_single_thread() { .await .unwrap(); assert_eq!( - result.as_slice(), + result.to_host_sync().as_slice(), &TEST_DATA[TEST_OFFSET as usize..][..TEST_LEN] ); @@ -116,7 +117,7 @@ fn test_file_read_with_real_file_single_thread() { .read_at(0, TEST_DATA.len(), Alignment::new(1)) .await .unwrap(); - assert_eq!(full.as_slice(), TEST_DATA); + assert_eq!(full.to_host_sync().as_slice(), TEST_DATA); "success" } @@ -144,7 +145,7 @@ async fn test_file_read_with_real_file_tokio() { .await .unwrap(); assert_eq!( - result.as_slice(), + result.to_host_sync().as_slice(), &TEST_DATA[TEST_OFFSET as usize..][..TEST_LEN] ); @@ -153,7 +154,7 @@ async fn test_file_read_with_real_file_tokio() { .read_at(0, TEST_DATA.len(), Alignment::new(1)) .await .unwrap(); - assert_eq!(full.as_slice(), TEST_DATA); + assert_eq!(full.to_host_sync().as_slice(), TEST_DATA); } // ============================================================================ @@ -174,10 +175,22 @@ async fn test_concurrent_reads() { let results = futures::future::join_all(futures).await; - assert_eq!(results[0].as_ref().unwrap().as_slice(), &TEST_DATA[0..5]); - assert_eq!(results[1].as_ref().unwrap().as_slice(), &TEST_DATA[5..10]); - assert_eq!(results[2].as_ref().unwrap().as_slice(), &TEST_DATA[10..15]); - assert_eq!(results[3].as_ref().unwrap().as_slice(), &TEST_DATA[15..20]); + assert_eq!( + results[0].as_ref().unwrap().to_host_sync().as_slice(), + &TEST_DATA[0..5] + ); + assert_eq!( + results[1].as_ref().unwrap().to_host_sync().as_slice(), + &TEST_DATA[5..10] + ); + assert_eq!( + results[2].as_ref().unwrap().to_host_sync().as_slice(), + &TEST_DATA[10..15] + ); + assert_eq!( + results[3].as_ref().unwrap().to_host_sync().as_slice(), + &TEST_DATA[15..20] + ); } // ============================================================================ @@ -240,7 +253,7 @@ impl VortexReadAt for CountingReadAt { offset: u64, length: usize, alignment: Alignment, - ) -> BoxFuture<'static, VortexResult> { + ) -> BoxFuture<'static, VortexResult> { self.read_count.fetch_add(1, Ordering::SeqCst); let data = self.data.clone(); async move { @@ -253,7 +266,7 @@ impl VortexReadAt for CountingReadAt { buffer .as_mut_slice() .copy_from_slice(&data.as_slice()[start..start + length]); - Ok(buffer.freeze()) + Ok(BufferHandle::new_host(buffer.freeze())) } .boxed() } diff --git a/vortex-layout/src/layouts/chunked/reader.rs b/vortex-layout/src/layouts/chunked/reader.rs index e4d26c7e808..6ef61a5504d 100644 --- a/vortex-layout/src/layouts/chunked/reader.rs +++ b/vortex-layout/src/layouts/chunked/reader.rs @@ -301,7 +301,10 @@ impl LayoutReader for ChunkedReader { } // Combine the arrays. - Ok(ChunkedArray::try_new(chunks, dtype)?.to_array()) + let x = ChunkedArray::try_new(chunks, dtype)?.to_array(); + println!("{}", x.display_tree()); + + Ok(x) } .boxed()) } diff --git a/vortex-layout/src/layouts/flat/reader.rs b/vortex-layout/src/layouts/flat/reader.rs index 273dfa74d51..05ddbe75754 100644 --- a/vortex-layout/src/layouts/flat/reader.rs +++ b/vortex-layout/src/layouts/flat/reader.rs @@ -72,6 +72,11 @@ impl FlatReader { let array_tree = self.layout.array_tree().cloned(); async move { let segment = segment_fut.await?; + println!( + "segment host {}, device {}", + segment.is_on_host(), + segment.is_on_device() + ); let parts = if let Some(array_tree) = array_tree { // Use the pre-stored flatbuffer from layout metadata combined with segment buffers. ArrayParts::from_flatbuffer_and_segment(array_tree, segment)? @@ -214,6 +219,8 @@ impl LayoutReader for FlatReader { // Evaluate the projection expression. array = array.apply(&expr)?; + println!("array {}", array.display_tree()); + Ok(array) } .boxed()) diff --git a/vortex-layout/src/layouts/struct_/reader.rs b/vortex-layout/src/layouts/struct_/reader.rs index 66c06505ff5..6b23d8c5b19 100644 --- a/vortex-layout/src/layouts/struct_/reader.rs +++ b/vortex-layout/src/layouts/struct_/reader.rs @@ -346,7 +346,7 @@ impl LayoutReader for StructReader { let mask = Mask::from_buffer(validity.to_bool().to_bit_buffer().not()); // If root expression was a pack, then we apply the validity to each child field - if is_pack_merge { + let res = if is_pack_merge { let struct_array = array.to_struct(); let masked_fields: Vec = struct_array .unmasked_fields() @@ -365,9 +365,12 @@ impl LayoutReader for StructReader { // If the root expression was not a pack or merge, e.g. if it's something like // a get_item, then we apply the validity directly to the result vortex_array::compute::mask(array.as_ref(), &mask) - } + }; + res } else { - projected.await + projected + .await + .inspect(|a| println!("ret array {}", a.display_tree())) } })) } From 1674b3e43123593bb3e6a6958827b3ce737ad1b1 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 28 Jan 2026 23:53:06 +0000 Subject: [PATCH 4/4] test with struct/varbin passing Signed-off-by: Andrew Duffy --- vortex-cuda/src/canonical.rs | 29 ++++++++++++++++++++++++++++- vortex-file/src/read/driver.rs | 2 +- vortex-file/src/tests.rs | 21 +++++++++++++++++++-- 3 files changed, 48 insertions(+), 4 deletions(-) diff --git a/vortex-cuda/src/canonical.rs b/vortex-cuda/src/canonical.rs index 0061ba99851..dfc2184e078 100644 --- a/vortex-cuda/src/canonical.rs +++ b/vortex-cuda/src/canonical.rs @@ -5,7 +5,9 @@ use std::sync::Arc; use async_trait::async_trait; use futures::future::try_join_all; +use vortex_array::Array; use vortex_array::Canonical; +use vortex_array::IntoArray; use vortex_array::arrays::BinaryView; use vortex_array::arrays::BoolArray; use vortex_array::arrays::BoolArrayParts; @@ -13,6 +15,8 @@ use vortex_array::arrays::DecimalArray; use vortex_array::arrays::DecimalArrayParts; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::PrimitiveArrayParts; +use vortex_array::arrays::StructArray; +use vortex_array::arrays::StructArrayParts; use vortex_array::arrays::VarBinViewArray; use vortex_array::arrays::VarBinViewArrayParts; use vortex_array::buffer::BufferHandle; @@ -32,6 +36,29 @@ pub trait CanonicalCudaExt { impl CanonicalCudaExt for Canonical { async fn into_host(self) -> VortexResult { match self { + Canonical::Struct(struct_array) => { + // Children should all be canonical now + let len = struct_array.len(); + let StructArrayParts { + fields, + struct_fields, + validity, + .. + } = struct_array.into_parts(); + + // TODO(aduffy): try_join_all + let mut host_fields = vec![]; + for field in fields.iter().cloned() { + host_fields.push(field.to_canonical()?.into_host().await?.into_array()); + } + + Ok(Canonical::Struct(StructArray::new( + struct_fields.names().clone(), + host_fields, + len, + validity, + ))) + } n @ Canonical::Null(_) => Ok(n), Canonical::Bool(bool) => { // NOTE: update to copy to host when adding buffer handle. @@ -102,7 +129,7 @@ impl CanonicalCudaExt for Canonical { VarBinViewArray::new_unchecked(host_views, host_buffers, dtype, validity) })) } - _ => todo!(), + c => todo!("{} not implemented", c.dtype()), } } } diff --git a/vortex-file/src/read/driver.rs b/vortex-file/src/read/driver.rs index 18cdfea6a70..e0aa59666fa 100644 --- a/vortex-file/src/read/driver.rs +++ b/vortex-file/src/read/driver.rs @@ -325,8 +325,8 @@ impl State { mod tests { use futures::StreamExt; use futures::stream; - use vortex_buffer::Alignment; use vortex_array::buffer::BufferHandle; + use vortex_buffer::Alignment; use vortex_error::VortexResult; use super::*; diff --git a/vortex-file/src/tests.rs b/vortex-file/src/tests.rs index e75301cc44e..fc46d2dc0ad 100644 --- a/vortex-file/src/tests.rs +++ b/vortex-file/src/tests.rs @@ -58,6 +58,7 @@ use vortex_cuda::executor::CudaArrayExt; use vortex_dtype::DType; use vortex_dtype::DecimalDType; use vortex_dtype::ExtDType; +use vortex_dtype::FieldNames; use vortex_dtype::Nullability; use vortex_dtype::PType; use vortex_dtype::PType::I32; @@ -1654,15 +1655,27 @@ async fn gpu_scan() -> VortexResult<()> { // Create an ALP-encoded array from primitive f64 values let primitive = PrimitiveArray::from_iter((0..100).map(|i| i as f64 * 1.1)); let alp_array = alp_encode(&primitive, None)?; + let str_array = VarBinViewArray::from_iter_str((0..100).map(|i| format!("number {i}"))); println!("alp {}", alp_array.display_tree()); + let array = StructArray::new( + FieldNames::from(vec!["float_col", "int_col", "str_col"]), + vec![ + primitive.into_array(), + alp_array.into_array(), + str_array.into_array(), + ], + 100, + Validity::NonNullable, + ); + // Write to a buffer, then to a temp file let temp_path = std::env::temp_dir().join("gpu_scan_test.vortex"); let mut buf = Vec::new(); SESSION .write_options() - .write(&mut buf, alp_array.to_array_stream()) + .write(&mut buf, array.to_array_stream()) .await?; std::fs::write(&temp_path, &buf)?; @@ -1683,7 +1696,11 @@ async fn gpu_scan() -> VortexResult<()> { let mut cuda_ctx = CudaSession::create_execution_ctx(&SESSION)?; let mut res = Vec::new(); - let mut stream = gpu_file.scan()?.into_array_stream()?; + let mut stream = gpu_file + .scan()? + // filter with a predefined mask + .with_row_indices(buffer![0, 10, 20, 30, 40, 50, 60, 70, 80, 90]) + .into_array_stream()?; while let Some(a) = stream.next().await { let a = a?; // println!("arr {}", a.display_tree());