diff --git a/encodings/fsst/src/array.rs b/encodings/fsst/src/array.rs index d4e760f259e..32e5aa83b78 100644 --- a/encodings/fsst/src/array.rs +++ b/encodings/fsst/src/array.rs @@ -188,9 +188,9 @@ impl VTable for FSSTVTable { // Decompress the whole block of data into a new buffer, and create some views // from it instead. - let (buffer, views) = fsst_decode_views(array, builder.completed_block_count(), ctx)?; + let (buffers, views) = fsst_decode_views(array, builder.completed_block_count(), ctx)?; - builder.push_buffer_and_adjusted_views(&[buffer], &views, array.validity_mask()); + builder.push_buffer_and_adjusted_views(&buffers, &views, array.validity_mask()); Ok(()) } diff --git a/encodings/fsst/src/canonical.rs b/encodings/fsst/src/canonical.rs index d41ebab786b..4cb8d96bb00 100644 --- a/encodings/fsst/src/canonical.rs +++ b/encodings/fsst/src/canonical.rs @@ -7,9 +7,10 @@ use vortex_array::Canonical; use vortex_array::ExecutionCtx; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::VarBinViewArray; +use vortex_array::arrays::build_views::MAX_BUFFER_LEN; +use vortex_array::arrays::build_views::build_views; use vortex_array::vtable::ValidityHelper; use vortex_buffer::Buffer; -use vortex_buffer::BufferMut; use vortex_buffer::ByteBuffer; use vortex_buffer::ByteBufferMut; use vortex_dtype::match_each_integer_ptype; @@ -22,28 +23,24 @@ pub(super) fn canonicalize_fsst( array: &FSSTArray, ctx: &mut ExecutionCtx, ) -> VortexResult { - let (buffer, views) = fsst_decode_views(array, 0, ctx)?; + let (buffers, views) = fsst_decode_views(array, 0, ctx)?; // SAFETY: FSST already validates the bytes for binary/UTF-8. We build views directly on // top of them, so the view pointers will all be valid. Ok(unsafe { Canonical::VarBinView(VarBinViewArray::new_unchecked( views, - Arc::new([buffer]), + Arc::from(buffers), array.dtype().clone(), array.codes().validity().clone(), )) }) } -#[expect( - clippy::cast_possible_truncation, - reason = "truncation is intentional for buffer index" -)] -pub(super) fn fsst_decode_views( +pub(crate) fn fsst_decode_views( fsst_array: &FSSTArray, - buf_index: u32, + start_buf_index: u32, ctx: &mut ExecutionCtx, -) -> VortexResult<(ByteBuffer, Buffer)> { +) -> VortexResult<(Vec, Buffer)> { // FSSTArray has two child arrays: // 1. A VarBinArray, which holds the string heap of the compressed codes. // 2. An uncompressed_lengths primitive array, storing the length of each original @@ -58,7 +55,6 @@ pub(super) fn fsst_decode_views( .clone() .execute::(ctx)?; - // Decompress the full dataset. #[allow(clippy::cast_possible_truncation)] let total_size: usize = match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| { uncompressed_lens_array @@ -76,24 +72,14 @@ pub(super) fn fsst_decode_views( unsafe { uncompressed_bytes.set_len(len) }; // Directly create the binary views. - let mut views = BufferMut::::with_capacity(uncompressed_lens_array.len()); - match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| { - let mut offset = 0; - for len in uncompressed_lens_array.as_slice::

() { - let len = *len as usize; - let view = BinaryView::make_view( - &uncompressed_bytes[offset..][..len], - buf_index, - offset as u32, - ); - // SAFETY: we reserved the right capacity beforehand - unsafe { views.push_unchecked(view) }; - offset += len; - } - }); - - Ok((uncompressed_bytes.freeze(), views.freeze())) + Ok(build_views( + start_buf_index, + MAX_BUFFER_LEN, + uncompressed_bytes, + uncompressed_lens_array.as_slice::

(), + )) + }) } #[cfg(test)] diff --git a/vortex-array/src/arrays/varbin/vtable/canonical.rs b/vortex-array/src/arrays/varbin/vtable/canonical.rs index 2499235a16d..d92068b028a 100644 --- a/vortex-array/src/arrays/varbin/vtable/canonical.rs +++ b/vortex-array/src/arrays/varbin/vtable/canonical.rs @@ -3,17 +3,16 @@ use std::sync::Arc; -use num_traits::AsPrimitive; -use vortex_buffer::Buffer; -use vortex_buffer::BufferMut; use vortex_dtype::match_each_integer_ptype; use vortex_error::VortexResult; -use vortex_vector::binaryview::BinaryView; use crate::Canonical; use crate::ExecutionCtx; use crate::arrays::PrimitiveArray; use crate::arrays::VarBinViewArray; +use crate::arrays::build_views::MAX_BUFFER_LEN; +use crate::arrays::build_views::build_views; +use crate::arrays::build_views::offsets_to_lengths; use crate::arrays::varbin::VarBinArray; /// Converts a VarBinArray to its canonical form (VarBinViewArray). @@ -27,29 +26,21 @@ pub(crate) fn varbin_to_canonical( let array = array.clone().zero_offsets(); let (dtype, bytes, offsets, validity) = array.into_parts(); + // offsets_to_lengths let offsets = offsets.execute::(ctx)?; + let bytes = bytes.into_mut(); - // Build views directly from offsets - #[expect(clippy::cast_possible_truncation, reason = "BinaryView offset is u32")] - let views: Buffer = match_each_integer_ptype!(offsets.ptype(), |O| { - let offsets_slice = offsets.as_slice::(); - let bytes_slice = bytes.as_ref(); - - let mut views = BufferMut::::with_capacity(offsets_slice.len() - 1); - for window in offsets_slice.windows(2) { - let start: usize = window[0].as_(); - let end: usize = window[1].as_(); - let value = &bytes_slice[start..end]; - views.push(BinaryView::make_view(value, 0, start as u32)); - } - views.freeze() - }); - - // Create VarBinViewArray with the original bytes buffer and computed views - // SAFETY: views are correctly computed from valid offsets - let varbinview = - unsafe { VarBinViewArray::new_unchecked(views, Arc::from([bytes]), dtype, validity) }; - Ok(Canonical::VarBinView(varbinview)) + match_each_integer_ptype!(offsets.ptype(), |P| { + let lens = offsets_to_lengths(offsets.as_slice::

()); + let (buffers, views) = build_views(0, MAX_BUFFER_LEN, bytes, lens.as_slice()); + + let varbinview = + unsafe { VarBinViewArray::new_unchecked(views, Arc::from(buffers), dtype, validity) }; + + // Create VarBinViewArray with the original bytes buffer and computed views + // SAFETY: views are correctly computed from valid offsets + Ok(Canonical::VarBinView(varbinview)) + }) } #[cfg(test)] diff --git a/vortex-array/src/arrays/varbinview/build_views.rs b/vortex-array/src/arrays/varbinview/build_views.rs new file mode 100644 index 00000000000..b1bdb9013a0 --- /dev/null +++ b/vortex-array/src/arrays/varbinview/build_views.rs @@ -0,0 +1,109 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use itertools::Itertools; +use num_traits::AsPrimitive; +use vortex_buffer::Buffer; +use vortex_buffer::BufferMut; +use vortex_buffer::ByteBuffer; +use vortex_buffer::ByteBufferMut; +use vortex_dtype::NativePType; +use vortex_vector::binaryview::BinaryView; + +/// Convert an offsets buffer to a buffer of element lengths. +#[inline] +pub fn offsets_to_lengths(offsets: &[P]) -> Buffer

{ + offsets + .iter() + .tuple_windows::<(_, _)>() + .map(|(&start, &end)| end - start) + .collect() +} + +/// Maximum number of buffer bytes that can be referenced by a single `BinaryView` +pub const MAX_BUFFER_LEN: usize = i32::MAX as usize; + +/// Split a large buffer of input `bytes` holding string data +pub fn build_views>( + start_buf_index: u32, + max_buffer_len: usize, + mut bytes: ByteBufferMut, + lens: &[P], +) -> (Vec, Buffer) { + let mut views = BufferMut::::with_capacity(lens.len()); + + let mut buffers = Vec::new(); + let mut buf_index = start_buf_index; + + let mut offset = 0; + for &len in lens { + let len = len.as_(); + assert!(len <= max_buffer_len, "values cannot exceed max_buffer_len"); + + if (offset + len) > max_buffer_len { + // Roll the buffer every 2GiB, to avoid overflowing VarBinView offset field + let rest = bytes.split_off(offset); + + buffers.push(bytes.freeze()); + buf_index += 1; + offset = 0; + + bytes = rest; + } + let view = BinaryView::make_view(&bytes[offset..][..len], buf_index, offset.as_()); + // SAFETY: we reserved the right capacity beforehand + unsafe { views.push_unchecked(view) }; + offset += len; + } + + if !bytes.is_empty() { + buffers.push(bytes.freeze()); + } + + (buffers, views.freeze()) +} + +#[cfg(test)] +mod tests { + use vortex_buffer::ByteBuffer; + use vortex_buffer::ByteBufferMut; + use vortex_vector::binaryview::BinaryView; + + use crate::arrays::build_views::build_views; + + #[test] + fn test_to_canonical_large() { + // We are testing generating views for raw data that should look like + // + // aaaaaaaaaaaaa ("a"*13) + // bbbbbbbbbbbbb ("b"*13) + // ccccccccccccc ("c"*13) + // ddddddddddddd ("d"*13) + // + // In real code, this would all fit in one buffer, but to unit test the splitting logic + // we split buffers at length 26, which should result in two buffers for the output array. + let raw_data = + ByteBufferMut::copy_from("aaaaaaaaaaaaabbbbbbbbbbbbbcccccccccccccddddddddddddd"); + let lens = vec![13u8; 4]; + + let (buffers, views) = build_views(0, 26, raw_data, &lens); + + assert_eq!( + buffers, + vec![ + ByteBuffer::copy_from("aaaaaaaaaaaaabbbbbbbbbbbbb"), + ByteBuffer::copy_from("cccccccccccccddddddddddddd"), + ] + ); + + assert_eq!( + views.as_slice(), + &[ + BinaryView::make_view(b"aaaaaaaaaaaaa", 0, 0), + BinaryView::make_view(b"bbbbbbbbbbbbb", 0, 13), + BinaryView::make_view(b"ccccccccccccc", 1, 0), + BinaryView::make_view(b"ddddddddddddd", 1, 13), + ] + ) + } +} diff --git a/vortex-array/src/arrays/varbinview/mod.rs b/vortex-array/src/arrays/varbinview/mod.rs index af6be468c18..0b8cb5c82ae 100644 --- a/vortex-array/src/arrays/varbinview/mod.rs +++ b/vortex-array/src/arrays/varbinview/mod.rs @@ -13,5 +13,6 @@ mod compute; mod vtable; pub use vtable::VarBinViewVTable; +pub mod build_views; #[cfg(test)] mod tests; diff --git a/vortex-array/src/builders/varbinview.rs b/vortex-array/src/builders/varbinview.rs index 5dd96e9ba6c..42c9b805e8b 100644 --- a/vortex-array/src/builders/varbinview.rs +++ b/vortex-array/src/builders/varbinview.rs @@ -150,7 +150,7 @@ impl VarBinViewBuilder { /// Pushes buffers and pre-adjusted views into the builder. /// - /// The provided `buffer` slices contain sections of data from a `VarBinViewArray`, and the + /// The provided `buffers` contain sections of data from a `VarBinViewArray`, and the /// `views` are `BinaryView`s that have already been adjusted to reference the correct buffer /// indices and offsets for this builder. All views must point to valid sections within the /// provided buffers, and the validity length must match the view length. @@ -166,14 +166,14 @@ impl VarBinViewBuilder { /// exist in this builder. pub fn push_buffer_and_adjusted_views( &mut self, - buffer: &[ByteBuffer], + buffers: &[ByteBuffer], views: &Buffer, validity_mask: Mask, ) { self.flush_in_progress(); - let expected_completed_len = self.completed.len() as usize + buffer.len(); - self.completed.extend_from_slice_unchecked(buffer); + let expected_completed_len = self.completed.len() as usize + buffers.len(); + self.completed.extend_from_slice_unchecked(buffers); assert_eq!( self.completed.len() as usize, expected_completed_len,