Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions encodings/fsst/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,9 @@ impl VTable for FSSTVTable {

// Decompress the whole block of data into a new buffer, and create some views
// from it instead.
let (buffer, views) = fsst_decode_views(array, builder.completed_block_count(), ctx)?;
let (buffers, views) = fsst_decode_views(array, builder.completed_block_count(), ctx)?;

builder.push_buffer_and_adjusted_views(&[buffer], &views, array.validity_mask());
builder.push_buffer_and_adjusted_views(&buffers, &views, array.validity_mask());
Ok(())
}

Expand Down
42 changes: 14 additions & 28 deletions encodings/fsst/src/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ use vortex_array::Canonical;
use vortex_array::ExecutionCtx;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::VarBinViewArray;
use vortex_array::arrays::build_views::MAX_BUFFER_LEN;
use vortex_array::arrays::build_views::build_views;
use vortex_array::vtable::ValidityHelper;
use vortex_buffer::Buffer;
use vortex_buffer::BufferMut;
use vortex_buffer::ByteBuffer;
use vortex_buffer::ByteBufferMut;
use vortex_dtype::match_each_integer_ptype;
Expand All @@ -22,28 +23,24 @@ pub(super) fn canonicalize_fsst(
array: &FSSTArray,
ctx: &mut ExecutionCtx,
) -> VortexResult<Canonical> {
let (buffer, views) = fsst_decode_views(array, 0, ctx)?;
let (buffers, views) = fsst_decode_views(array, 0, ctx)?;
// SAFETY: FSST already validates the bytes for binary/UTF-8. We build views directly on
// top of them, so the view pointers will all be valid.
Ok(unsafe {
Canonical::VarBinView(VarBinViewArray::new_unchecked(
views,
Arc::new([buffer]),
Arc::from(buffers),
array.dtype().clone(),
array.codes().validity().clone(),
))
})
}

#[expect(
clippy::cast_possible_truncation,
reason = "truncation is intentional for buffer index"
)]
pub(super) fn fsst_decode_views(
pub(crate) fn fsst_decode_views(
fsst_array: &FSSTArray,
buf_index: u32,
start_buf_index: u32,
ctx: &mut ExecutionCtx,
) -> VortexResult<(ByteBuffer, Buffer<BinaryView>)> {
) -> VortexResult<(Vec<ByteBuffer>, Buffer<BinaryView>)> {
// FSSTArray has two child arrays:
// 1. A VarBinArray, which holds the string heap of the compressed codes.
// 2. An uncompressed_lengths primitive array, storing the length of each original
Expand All @@ -58,7 +55,6 @@ pub(super) fn fsst_decode_views(
.clone()
.execute::<PrimitiveArray>(ctx)?;

// Decompress the full dataset.
#[allow(clippy::cast_possible_truncation)]
let total_size: usize = match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| {
uncompressed_lens_array
Expand All @@ -76,24 +72,14 @@ pub(super) fn fsst_decode_views(
unsafe { uncompressed_bytes.set_len(len) };

// Directly create the binary views.
let mut views = BufferMut::<BinaryView>::with_capacity(uncompressed_lens_array.len());

match_each_integer_ptype!(uncompressed_lens_array.ptype(), |P| {
let mut offset = 0;
for len in uncompressed_lens_array.as_slice::<P>() {
let len = *len as usize;
let view = BinaryView::make_view(
&uncompressed_bytes[offset..][..len],
buf_index,
offset as u32,
);
// SAFETY: we reserved the right capacity beforehand
unsafe { views.push_unchecked(view) };
offset += len;
}
});

Ok((uncompressed_bytes.freeze(), views.freeze()))
Ok(build_views(
start_buf_index,
MAX_BUFFER_LEN,
uncompressed_bytes,
uncompressed_lens_array.as_slice::<P>(),
))
})
}

#[cfg(test)]
Expand Down
41 changes: 16 additions & 25 deletions vortex-array/src/arrays/varbin/vtable/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@

use std::sync::Arc;

use num_traits::AsPrimitive;
use vortex_buffer::Buffer;
use vortex_buffer::BufferMut;
use vortex_dtype::match_each_integer_ptype;
use vortex_error::VortexResult;
use vortex_vector::binaryview::BinaryView;

use crate::Canonical;
use crate::ExecutionCtx;
use crate::arrays::PrimitiveArray;
use crate::arrays::VarBinViewArray;
use crate::arrays::build_views::MAX_BUFFER_LEN;
use crate::arrays::build_views::build_views;
use crate::arrays::build_views::offsets_to_lengths;
use crate::arrays::varbin::VarBinArray;

/// Converts a VarBinArray to its canonical form (VarBinViewArray).
Expand All @@ -27,29 +26,21 @@ pub(crate) fn varbin_to_canonical(
let array = array.clone().zero_offsets();
let (dtype, bytes, offsets, validity) = array.into_parts();

// offsets_to_lengths
let offsets = offsets.execute::<PrimitiveArray>(ctx)?;
let bytes = bytes.into_mut();

// Build views directly from offsets
#[expect(clippy::cast_possible_truncation, reason = "BinaryView offset is u32")]
let views: Buffer<BinaryView> = match_each_integer_ptype!(offsets.ptype(), |O| {
let offsets_slice = offsets.as_slice::<O>();
let bytes_slice = bytes.as_ref();

let mut views = BufferMut::<BinaryView>::with_capacity(offsets_slice.len() - 1);
for window in offsets_slice.windows(2) {
let start: usize = window[0].as_();
let end: usize = window[1].as_();
let value = &bytes_slice[start..end];
views.push(BinaryView::make_view(value, 0, start as u32));
}
views.freeze()
});

// Create VarBinViewArray with the original bytes buffer and computed views
// SAFETY: views are correctly computed from valid offsets
let varbinview =
unsafe { VarBinViewArray::new_unchecked(views, Arc::from([bytes]), dtype, validity) };
Ok(Canonical::VarBinView(varbinview))
match_each_integer_ptype!(offsets.ptype(), |P| {
let lens = offsets_to_lengths(offsets.as_slice::<P>());
let (buffers, views) = build_views(0, MAX_BUFFER_LEN, bytes, lens.as_slice());

let varbinview =
unsafe { VarBinViewArray::new_unchecked(views, Arc::from(buffers), dtype, validity) };

// Create VarBinViewArray with the original bytes buffer and computed views
// SAFETY: views are correctly computed from valid offsets
Ok(Canonical::VarBinView(varbinview))
})
}

#[cfg(test)]
Expand Down
109 changes: 109 additions & 0 deletions vortex-array/src/arrays/varbinview/build_views.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use itertools::Itertools;
use num_traits::AsPrimitive;
use vortex_buffer::Buffer;
use vortex_buffer::BufferMut;
use vortex_buffer::ByteBuffer;
use vortex_buffer::ByteBufferMut;
use vortex_dtype::NativePType;
use vortex_vector::binaryview::BinaryView;

/// Convert an offsets buffer to a buffer of element lengths.
#[inline]
pub fn offsets_to_lengths<P: NativePType>(offsets: &[P]) -> Buffer<P> {
offsets
.iter()
.tuple_windows::<(_, _)>()
.map(|(&start, &end)| end - start)
.collect()
}

/// Maximum number of buffer bytes that can be referenced by a single `BinaryView`
pub const MAX_BUFFER_LEN: usize = i32::MAX as usize;

/// Split a large buffer of input `bytes` holding string data
pub fn build_views<P: NativePType + AsPrimitive<usize>>(
start_buf_index: u32,
max_buffer_len: usize,
mut bytes: ByteBufferMut,
lens: &[P],
) -> (Vec<ByteBuffer>, Buffer<BinaryView>) {
let mut views = BufferMut::<BinaryView>::with_capacity(lens.len());

let mut buffers = Vec::new();
let mut buf_index = start_buf_index;

let mut offset = 0;
for &len in lens {
let len = len.as_();
assert!(len <= max_buffer_len, "values cannot exceed max_buffer_len");

if (offset + len) > max_buffer_len {
// Roll the buffer every 2GiB, to avoid overflowing VarBinView offset field
let rest = bytes.split_off(offset);

buffers.push(bytes.freeze());
buf_index += 1;
offset = 0;

bytes = rest;
}
let view = BinaryView::make_view(&bytes[offset..][..len], buf_index, offset.as_());
// SAFETY: we reserved the right capacity beforehand
unsafe { views.push_unchecked(view) };
offset += len;
}

if !bytes.is_empty() {
buffers.push(bytes.freeze());
}

(buffers, views.freeze())
}

#[cfg(test)]
mod tests {
use vortex_buffer::ByteBuffer;
use vortex_buffer::ByteBufferMut;
use vortex_vector::binaryview::BinaryView;

use crate::arrays::build_views::build_views;

#[test]
fn test_to_canonical_large() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the test that takes a while to run?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no this one is very small b/c i artificially constrain the max buffer len

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but now that we're hitting the same codepath for varbin/fsst it's ok

// We are testing generating views for raw data that should look like
//
// aaaaaaaaaaaaa ("a"*13)
// bbbbbbbbbbbbb ("b"*13)
// ccccccccccccc ("c"*13)
// ddddddddddddd ("d"*13)
//
// In real code, this would all fit in one buffer, but to unit test the splitting logic
// we split buffers at length 26, which should result in two buffers for the output array.
let raw_data =
ByteBufferMut::copy_from("aaaaaaaaaaaaabbbbbbbbbbbbbcccccccccccccddddddddddddd");
let lens = vec![13u8; 4];

let (buffers, views) = build_views(0, 26, raw_data, &lens);

assert_eq!(
buffers,
vec![
ByteBuffer::copy_from("aaaaaaaaaaaaabbbbbbbbbbbbb"),
ByteBuffer::copy_from("cccccccccccccddddddddddddd"),
]
);

assert_eq!(
views.as_slice(),
&[
BinaryView::make_view(b"aaaaaaaaaaaaa", 0, 0),
BinaryView::make_view(b"bbbbbbbbbbbbb", 0, 13),
BinaryView::make_view(b"ccccccccccccc", 1, 0),
BinaryView::make_view(b"ddddddddddddd", 1, 13),
]
)
}
}
1 change: 1 addition & 0 deletions vortex-array/src/arrays/varbinview/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ mod compute;
mod vtable;
pub use vtable::VarBinViewVTable;

pub mod build_views;
#[cfg(test)]
mod tests;
8 changes: 4 additions & 4 deletions vortex-array/src/builders/varbinview.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl VarBinViewBuilder {

/// Pushes buffers and pre-adjusted views into the builder.
///
/// The provided `buffer` slices contain sections of data from a `VarBinViewArray`, and the
/// The provided `buffers` contain sections of data from a `VarBinViewArray`, and the
/// `views` are `BinaryView`s that have already been adjusted to reference the correct buffer
/// indices and offsets for this builder. All views must point to valid sections within the
/// provided buffers, and the validity length must match the view length.
Expand All @@ -166,14 +166,14 @@ impl VarBinViewBuilder {
/// exist in this builder.
pub fn push_buffer_and_adjusted_views(
&mut self,
buffer: &[ByteBuffer],
buffers: &[ByteBuffer],
views: &Buffer<BinaryView>,
validity_mask: Mask,
) {
self.flush_in_progress();

let expected_completed_len = self.completed.len() as usize + buffer.len();
self.completed.extend_from_slice_unchecked(buffer);
let expected_completed_len = self.completed.len() as usize + buffers.len();
self.completed.extend_from_slice_unchecked(buffers);
assert_eq!(
self.completed.len() as usize,
expected_completed_len,
Expand Down
Loading