From 469d55bdea3de87233be35421559e5bf78dc2752 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 14 Jan 2026 11:25:36 -0500 Subject: [PATCH 1/3] fix: FSST canonicalize when > 2GB of buffers Signed-off-by: Andrew Duffy --- encodings/fsst/src/array.rs | 4 +- encodings/fsst/src/canonical.rs | 122 +++++++++++++++++++----- vortex-array/src/builders/varbinview.rs | 8 +- 3 files changed, 104 insertions(+), 30 deletions(-) diff --git a/encodings/fsst/src/array.rs b/encodings/fsst/src/array.rs index d4e760f259e..32e5aa83b78 100644 --- a/encodings/fsst/src/array.rs +++ b/encodings/fsst/src/array.rs @@ -188,9 +188,9 @@ impl VTable for FSSTVTable { // Decompress the whole block of data into a new buffer, and create some views // from it instead. - let (buffer, views) = fsst_decode_views(array, builder.completed_block_count(), ctx)?; + let (buffers, views) = fsst_decode_views(array, builder.completed_block_count(), ctx)?; - builder.push_buffer_and_adjusted_views(&[buffer], &views, array.validity_mask()); + builder.push_buffer_and_adjusted_views(&buffers, &views, array.validity_mask()); Ok(()) } diff --git a/encodings/fsst/src/canonical.rs b/encodings/fsst/src/canonical.rs index d41ebab786b..22697ecbb7c 100644 --- a/encodings/fsst/src/canonical.rs +++ b/encodings/fsst/src/canonical.rs @@ -3,6 +3,7 @@ use std::sync::Arc; +use num_traits::AsPrimitive; use vortex_array::Canonical; use vortex_array::ExecutionCtx; use vortex_array::arrays::PrimitiveArray; @@ -12,6 +13,7 @@ use vortex_buffer::Buffer; use vortex_buffer::BufferMut; use vortex_buffer::ByteBuffer; use vortex_buffer::ByteBufferMut; +use vortex_dtype::NativePType; use vortex_dtype::match_each_integer_ptype; use vortex_error::VortexResult; use vortex_vector::binaryview::BinaryView; @@ -22,28 +24,27 @@ pub(super) fn canonicalize_fsst( array: &FSSTArray, ctx: &mut ExecutionCtx, ) -> VortexResult { - let (buffer, views) = fsst_decode_views(array, 0, ctx)?; + let (buffers, views) = fsst_decode_views(array, 0, ctx)?; // SAFETY: FSST already validates the bytes for binary/UTF-8. We build views directly on // top of them, so the view pointers will all be valid. Ok(unsafe { Canonical::VarBinView(VarBinViewArray::new_unchecked( views, - Arc::new([buffer]), + Arc::from(buffers), array.dtype().clone(), array.codes().validity().clone(), )) }) } -#[expect( - clippy::cast_possible_truncation, - reason = "truncation is intentional for buffer index" -)] -pub(super) fn fsst_decode_views( +/// Maximum number of buffer bytes that can be referenced by a single `BinaryView` +const MAX_BUFFER_LEN: usize = i32::MAX as usize; + +pub(crate) fn fsst_decode_views( fsst_array: &FSSTArray, - buf_index: u32, + start_buf_index: u32, ctx: &mut ExecutionCtx, -) -> VortexResult<(ByteBuffer, Buffer)> { +) -> VortexResult<(Vec, Buffer)> { // FSSTArray has two child arrays: // 1. A VarBinArray, which holds the string heap of the compressed codes. // 2. An uncompressed_lengths primitive array, storing the length of each original @@ -76,24 +77,57 @@ pub(super) fn fsst_decode_views( unsafe { uncompressed_bytes.set_len(len) }; // Directly create the binary views. - let mut views = BufferMut::::with_capacity(uncompressed_lens_array.len()); - match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| { - let mut offset = 0; - for len in uncompressed_lens_array.as_slice::

() { - let len = *len as usize; - let view = BinaryView::make_view( - &uncompressed_bytes[offset..][..len], - buf_index, - offset as u32, - ); - // SAFETY: we reserved the right capacity beforehand - unsafe { views.push_unchecked(view) }; - offset += len; + Ok(build_views( + start_buf_index, + MAX_BUFFER_LEN, + uncompressed_bytes, + uncompressed_lens_array.as_slice::

(), + )) + }) +} + +fn build_views>( + start_buf_index: u32, + max_buffer_len: usize, + mut uncompressed_bytes: ByteBufferMut, + uncompressed_lens: &[P], +) -> (Vec, Buffer) { + let mut views = BufferMut::::with_capacity(uncompressed_lens.len()); + + let mut buffers = Vec::new(); + let mut buf_index = start_buf_index; + + let mut offset = 0; + for &len in uncompressed_lens { + let len = len.as_(); + assert!(len <= max_buffer_len, "values cannot exceed max_buffer_len"); + + if (offset + len) > max_buffer_len { + // Roll the buffer every 2GiB, to avoid overflowing VarBinView offset field + let rest = uncompressed_bytes.split_off(offset); + + buffers.push(uncompressed_bytes.freeze()); + buf_index += 1; + offset = 0; + + uncompressed_bytes = rest; } - }); + let view = BinaryView::make_view( + &uncompressed_bytes[offset..][..len], + buf_index, + offset.as_(), + ); + // SAFETY: we reserved the right capacity beforehand + unsafe { views.push_unchecked(view) }; + offset += len; + } + + if !uncompressed_bytes.is_empty() { + buffers.push(uncompressed_bytes.freeze()); + } - Ok((uncompressed_bytes.freeze(), views.freeze())) + (buffers, views.freeze()) } #[cfg(test)] @@ -113,11 +147,15 @@ mod tests { use vortex_array::builders::ArrayBuilder; use vortex_array::builders::VarBinViewBuilder; use vortex_array::session::ArraySession; + use vortex_buffer::ByteBuffer; + use vortex_buffer::ByteBufferMut; use vortex_dtype::DType; use vortex_dtype::Nullability; use vortex_error::VortexResult; use vortex_session::VortexSession; + use vortex_vector::binaryview::BinaryView; + use crate::canonical::build_views; use crate::fsst_compress; use crate::fsst_train_compressor; @@ -196,4 +234,40 @@ mod tests { }; Ok(()) } + + #[test] + fn test_to_canonical_large() { + // We are testing generating views for raw data that should look like + // + // aaaaaaaaaaaaa ("a"*13) + // bbbbbbbbbbbbb ("b"*13) + // ccccccccccccc ("c"*13) + // ddddddddddddd ("d"*13) + // + // In real code, this would all fit in one buffer, but to unit test the splitting logic + // we split buffers at length 26, which should result in two buffers for the output array. + let raw_data = + ByteBufferMut::copy_from("aaaaaaaaaaaaabbbbbbbbbbbbbcccccccccccccddddddddddddd"); + let lens = vec![13u8; 4]; + + let (buffers, views) = build_views(0, 26, raw_data, &lens); + + assert_eq!( + buffers, + vec![ + ByteBuffer::copy_from("aaaaaaaaaaaaabbbbbbbbbbbbb"), + ByteBuffer::copy_from("cccccccccccccddddddddddddd"), + ] + ); + + assert_eq!( + views.as_slice(), + &[ + BinaryView::make_view(b"aaaaaaaaaaaaa", 0, 0), + BinaryView::make_view(b"bbbbbbbbbbbbb", 0, 13), + BinaryView::make_view(b"ccccccccccccc", 1, 0), + BinaryView::make_view(b"ddddddddddddd", 1, 13), + ] + ) + } } diff --git a/vortex-array/src/builders/varbinview.rs b/vortex-array/src/builders/varbinview.rs index 5dd96e9ba6c..42c9b805e8b 100644 --- a/vortex-array/src/builders/varbinview.rs +++ b/vortex-array/src/builders/varbinview.rs @@ -150,7 +150,7 @@ impl VarBinViewBuilder { /// Pushes buffers and pre-adjusted views into the builder. /// - /// The provided `buffer` slices contain sections of data from a `VarBinViewArray`, and the + /// The provided `buffers` contain sections of data from a `VarBinViewArray`, and the /// `views` are `BinaryView`s that have already been adjusted to reference the correct buffer /// indices and offsets for this builder. All views must point to valid sections within the /// provided buffers, and the validity length must match the view length. @@ -166,14 +166,14 @@ impl VarBinViewBuilder { /// exist in this builder. pub fn push_buffer_and_adjusted_views( &mut self, - buffer: &[ByteBuffer], + buffers: &[ByteBuffer], views: &Buffer, validity_mask: Mask, ) { self.flush_in_progress(); - let expected_completed_len = self.completed.len() as usize + buffer.len(); - self.completed.extend_from_slice_unchecked(buffer); + let expected_completed_len = self.completed.len() as usize + buffers.len(); + self.completed.extend_from_slice_unchecked(buffers); assert_eq!( self.completed.len() as usize, expected_completed_len, From b504028e9b55564117465fea26853714a1c36116 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 20 Jan 2026 12:26:35 -0800 Subject: [PATCH 2/3] extract out build_views to use with varbin Signed-off-by: Andrew Duffy --- encodings/fsst/src/canonical.rs | 52 +-------------- .../src/arrays/varbin/vtable/canonical.rs | 41 +++++------- .../src/arrays/varbinview/build_views.rs | 64 +++++++++++++++++++ vortex-array/src/arrays/varbinview/mod.rs | 1 + 4 files changed, 83 insertions(+), 75 deletions(-) create mode 100644 vortex-array/src/arrays/varbinview/build_views.rs diff --git a/encodings/fsst/src/canonical.rs b/encodings/fsst/src/canonical.rs index 22697ecbb7c..3b7cdde41b0 100644 --- a/encodings/fsst/src/canonical.rs +++ b/encodings/fsst/src/canonical.rs @@ -3,17 +3,16 @@ use std::sync::Arc; -use num_traits::AsPrimitive; use vortex_array::Canonical; use vortex_array::ExecutionCtx; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::VarBinViewArray; +use vortex_array::arrays::build_views::MAX_BUFFER_LEN; +use vortex_array::arrays::build_views::build_views; use vortex_array::vtable::ValidityHelper; use vortex_buffer::Buffer; -use vortex_buffer::BufferMut; use vortex_buffer::ByteBuffer; use vortex_buffer::ByteBufferMut; -use vortex_dtype::NativePType; use vortex_dtype::match_each_integer_ptype; use vortex_error::VortexResult; use vortex_vector::binaryview::BinaryView; @@ -37,9 +36,6 @@ pub(super) fn canonicalize_fsst( }) } -/// Maximum number of buffer bytes that can be referenced by a single `BinaryView` -const MAX_BUFFER_LEN: usize = i32::MAX as usize; - pub(crate) fn fsst_decode_views( fsst_array: &FSSTArray, start_buf_index: u32, @@ -59,7 +55,6 @@ pub(crate) fn fsst_decode_views( .clone() .execute::(ctx)?; - // Decompress the full dataset. #[allow(clippy::cast_possible_truncation)] let total_size: usize = match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| { uncompressed_lens_array @@ -87,49 +82,6 @@ pub(crate) fn fsst_decode_views( }) } -fn build_views>( - start_buf_index: u32, - max_buffer_len: usize, - mut uncompressed_bytes: ByteBufferMut, - uncompressed_lens: &[P], -) -> (Vec, Buffer) { - let mut views = BufferMut::::with_capacity(uncompressed_lens.len()); - - let mut buffers = Vec::new(); - let mut buf_index = start_buf_index; - - let mut offset = 0; - for &len in uncompressed_lens { - let len = len.as_(); - assert!(len <= max_buffer_len, "values cannot exceed max_buffer_len"); - - if (offset + len) > max_buffer_len { - // Roll the buffer every 2GiB, to avoid overflowing VarBinView offset field - let rest = uncompressed_bytes.split_off(offset); - - buffers.push(uncompressed_bytes.freeze()); - buf_index += 1; - offset = 0; - - uncompressed_bytes = rest; - } - let view = BinaryView::make_view( - &uncompressed_bytes[offset..][..len], - buf_index, - offset.as_(), - ); - // SAFETY: we reserved the right capacity beforehand - unsafe { views.push_unchecked(view) }; - offset += len; - } - - if !uncompressed_bytes.is_empty() { - buffers.push(uncompressed_bytes.freeze()); - } - - (buffers, views.freeze()) -} - #[cfg(test)] mod tests { use std::sync::LazyLock; diff --git a/vortex-array/src/arrays/varbin/vtable/canonical.rs b/vortex-array/src/arrays/varbin/vtable/canonical.rs index 2499235a16d..d92068b028a 100644 --- a/vortex-array/src/arrays/varbin/vtable/canonical.rs +++ b/vortex-array/src/arrays/varbin/vtable/canonical.rs @@ -3,17 +3,16 @@ use std::sync::Arc; -use num_traits::AsPrimitive; -use vortex_buffer::Buffer; -use vortex_buffer::BufferMut; use vortex_dtype::match_each_integer_ptype; use vortex_error::VortexResult; -use vortex_vector::binaryview::BinaryView; use crate::Canonical; use crate::ExecutionCtx; use crate::arrays::PrimitiveArray; use crate::arrays::VarBinViewArray; +use crate::arrays::build_views::MAX_BUFFER_LEN; +use crate::arrays::build_views::build_views; +use crate::arrays::build_views::offsets_to_lengths; use crate::arrays::varbin::VarBinArray; /// Converts a VarBinArray to its canonical form (VarBinViewArray). @@ -27,29 +26,21 @@ pub(crate) fn varbin_to_canonical( let array = array.clone().zero_offsets(); let (dtype, bytes, offsets, validity) = array.into_parts(); + // offsets_to_lengths let offsets = offsets.execute::(ctx)?; + let bytes = bytes.into_mut(); - // Build views directly from offsets - #[expect(clippy::cast_possible_truncation, reason = "BinaryView offset is u32")] - let views: Buffer = match_each_integer_ptype!(offsets.ptype(), |O| { - let offsets_slice = offsets.as_slice::(); - let bytes_slice = bytes.as_ref(); - - let mut views = BufferMut::::with_capacity(offsets_slice.len() - 1); - for window in offsets_slice.windows(2) { - let start: usize = window[0].as_(); - let end: usize = window[1].as_(); - let value = &bytes_slice[start..end]; - views.push(BinaryView::make_view(value, 0, start as u32)); - } - views.freeze() - }); - - // Create VarBinViewArray with the original bytes buffer and computed views - // SAFETY: views are correctly computed from valid offsets - let varbinview = - unsafe { VarBinViewArray::new_unchecked(views, Arc::from([bytes]), dtype, validity) }; - Ok(Canonical::VarBinView(varbinview)) + match_each_integer_ptype!(offsets.ptype(), |P| { + let lens = offsets_to_lengths(offsets.as_slice::

()); + let (buffers, views) = build_views(0, MAX_BUFFER_LEN, bytes, lens.as_slice()); + + let varbinview = + unsafe { VarBinViewArray::new_unchecked(views, Arc::from(buffers), dtype, validity) }; + + // Create VarBinViewArray with the original bytes buffer and computed views + // SAFETY: views are correctly computed from valid offsets + Ok(Canonical::VarBinView(varbinview)) + }) } #[cfg(test)] diff --git a/vortex-array/src/arrays/varbinview/build_views.rs b/vortex-array/src/arrays/varbinview/build_views.rs new file mode 100644 index 00000000000..c0f6e0e9212 --- /dev/null +++ b/vortex-array/src/arrays/varbinview/build_views.rs @@ -0,0 +1,64 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use itertools::Itertools; +use num_traits::AsPrimitive; +use vortex_buffer::Buffer; +use vortex_buffer::BufferMut; +use vortex_buffer::ByteBuffer; +use vortex_buffer::ByteBufferMut; +use vortex_dtype::NativePType; +use vortex_vector::binaryview::BinaryView; + +/// Convert an offsets buffer to a buffer of element lengths. +#[inline] +pub fn offsets_to_lengths(offsets: &[P]) -> Buffer

{ + offsets + .iter() + .tuple_windows::<(_, _)>() + .map(|(&start, &end)| end - start) + .collect() +} + +/// Maximum number of buffer bytes that can be referenced by a single `BinaryView` +pub const MAX_BUFFER_LEN: usize = i32::MAX as usize; + +/// Split a large buffer of input `bytes` holding string data +pub fn build_views>( + start_buf_index: u32, + max_buffer_len: usize, + mut bytes: ByteBufferMut, + lens: &[P], +) -> (Vec, Buffer) { + let mut views = BufferMut::::with_capacity(lens.len()); + + let mut buffers = Vec::new(); + let mut buf_index = start_buf_index; + + let mut offset = 0; + for &len in lens { + let len = len.as_(); + assert!(len <= max_buffer_len, "values cannot exceed max_buffer_len"); + + if (offset + len) > max_buffer_len { + // Roll the buffer every 2GiB, to avoid overflowing VarBinView offset field + let rest = bytes.split_off(offset); + + buffers.push(bytes.freeze()); + buf_index += 1; + offset = 0; + + bytes = rest; + } + let view = BinaryView::make_view(&bytes[offset..][..len], buf_index, offset.as_()); + // SAFETY: we reserved the right capacity beforehand + unsafe { views.push_unchecked(view) }; + offset += len; + } + + if !bytes.is_empty() { + buffers.push(bytes.freeze()); + } + + (buffers, views.freeze()) +} diff --git a/vortex-array/src/arrays/varbinview/mod.rs b/vortex-array/src/arrays/varbinview/mod.rs index af6be468c18..0b8cb5c82ae 100644 --- a/vortex-array/src/arrays/varbinview/mod.rs +++ b/vortex-array/src/arrays/varbinview/mod.rs @@ -13,5 +13,6 @@ mod compute; mod vtable; pub use vtable::VarBinViewVTable; +pub mod build_views; #[cfg(test)] mod tests; From 78cea9cb0925340c67ba7989183dd7781928be24 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 20 Jan 2026 12:28:51 -0800 Subject: [PATCH 3/3] move unit test Signed-off-by: Andrew Duffy --- encodings/fsst/src/canonical.rs | 40 ----------------- .../src/arrays/varbinview/build_views.rs | 45 +++++++++++++++++++ 2 files changed, 45 insertions(+), 40 deletions(-) diff --git a/encodings/fsst/src/canonical.rs b/encodings/fsst/src/canonical.rs index 3b7cdde41b0..4cb8d96bb00 100644 --- a/encodings/fsst/src/canonical.rs +++ b/encodings/fsst/src/canonical.rs @@ -99,15 +99,11 @@ mod tests { use vortex_array::builders::ArrayBuilder; use vortex_array::builders::VarBinViewBuilder; use vortex_array::session::ArraySession; - use vortex_buffer::ByteBuffer; - use vortex_buffer::ByteBufferMut; use vortex_dtype::DType; use vortex_dtype::Nullability; use vortex_error::VortexResult; use vortex_session::VortexSession; - use vortex_vector::binaryview::BinaryView; - use crate::canonical::build_views; use crate::fsst_compress; use crate::fsst_train_compressor; @@ -186,40 +182,4 @@ mod tests { }; Ok(()) } - - #[test] - fn test_to_canonical_large() { - // We are testing generating views for raw data that should look like - // - // aaaaaaaaaaaaa ("a"*13) - // bbbbbbbbbbbbb ("b"*13) - // ccccccccccccc ("c"*13) - // ddddddddddddd ("d"*13) - // - // In real code, this would all fit in one buffer, but to unit test the splitting logic - // we split buffers at length 26, which should result in two buffers for the output array. - let raw_data = - ByteBufferMut::copy_from("aaaaaaaaaaaaabbbbbbbbbbbbbcccccccccccccddddddddddddd"); - let lens = vec![13u8; 4]; - - let (buffers, views) = build_views(0, 26, raw_data, &lens); - - assert_eq!( - buffers, - vec![ - ByteBuffer::copy_from("aaaaaaaaaaaaabbbbbbbbbbbbb"), - ByteBuffer::copy_from("cccccccccccccddddddddddddd"), - ] - ); - - assert_eq!( - views.as_slice(), - &[ - BinaryView::make_view(b"aaaaaaaaaaaaa", 0, 0), - BinaryView::make_view(b"bbbbbbbbbbbbb", 0, 13), - BinaryView::make_view(b"ccccccccccccc", 1, 0), - BinaryView::make_view(b"ddddddddddddd", 1, 13), - ] - ) - } } diff --git a/vortex-array/src/arrays/varbinview/build_views.rs b/vortex-array/src/arrays/varbinview/build_views.rs index c0f6e0e9212..b1bdb9013a0 100644 --- a/vortex-array/src/arrays/varbinview/build_views.rs +++ b/vortex-array/src/arrays/varbinview/build_views.rs @@ -62,3 +62,48 @@ pub fn build_views>( (buffers, views.freeze()) } + +#[cfg(test)] +mod tests { + use vortex_buffer::ByteBuffer; + use vortex_buffer::ByteBufferMut; + use vortex_vector::binaryview::BinaryView; + + use crate::arrays::build_views::build_views; + + #[test] + fn test_to_canonical_large() { + // We are testing generating views for raw data that should look like + // + // aaaaaaaaaaaaa ("a"*13) + // bbbbbbbbbbbbb ("b"*13) + // ccccccccccccc ("c"*13) + // ddddddddddddd ("d"*13) + // + // In real code, this would all fit in one buffer, but to unit test the splitting logic + // we split buffers at length 26, which should result in two buffers for the output array. + let raw_data = + ByteBufferMut::copy_from("aaaaaaaaaaaaabbbbbbbbbbbbbcccccccccccccddddddddddddd"); + let lens = vec![13u8; 4]; + + let (buffers, views) = build_views(0, 26, raw_data, &lens); + + assert_eq!( + buffers, + vec![ + ByteBuffer::copy_from("aaaaaaaaaaaaabbbbbbbbbbbbb"), + ByteBuffer::copy_from("cccccccccccccddddddddddddd"), + ] + ); + + assert_eq!( + views.as_slice(), + &[ + BinaryView::make_view(b"aaaaaaaaaaaaa", 0, 0), + BinaryView::make_view(b"bbbbbbbbbbbbb", 0, 13), + BinaryView::make_view(b"ccccccccccccc", 1, 0), + BinaryView::make_view(b"ddddddddddddd", 1, 13), + ] + ) + } +}