diff --git a/Cargo.lock b/Cargo.lock index 9cc10fe4415..104337ecd5a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10290,6 +10290,7 @@ name = "vortex-btrblocks" version = "0.1.0" dependencies = [ "codspeed-divan-compat", + "enum-iterator", "getrandom 0.3.4", "itertools 0.14.0", "num-traits", @@ -10297,6 +10298,7 @@ dependencies = [ "rustc-hash", "test-with", "tracing", + "tracing-subscriber", "vortex-alp", "vortex-array", "vortex-buffer", diff --git a/vortex-btrblocks/Cargo.toml b/vortex-btrblocks/Cargo.toml index dedb6fb5ade..679a81f33da 100644 --- a/vortex-btrblocks/Cargo.toml +++ b/vortex-btrblocks/Cargo.toml @@ -14,6 +14,7 @@ rust-version = { workspace = true } version = { workspace = true } [dependencies] +enum-iterator = { workspace = true } getrandom_v03 = { workspace = true } itertools = { workspace = true } num-traits = { workspace = true } @@ -40,6 +41,7 @@ vortex-zigzag = { workspace = true } [dev-dependencies] divan = { workspace = true } test-with = { workspace = true } +tracing-subscriber = { workspace = true } vortex-array = { workspace = true, features = ["_test-harness"] } [features] diff --git a/vortex-btrblocks/benches/compress.rs b/vortex-btrblocks/benches/compress.rs index 0c51c0efdec..75139361ed5 100644 --- a/vortex-btrblocks/benches/compress.rs +++ b/vortex-btrblocks/benches/compress.rs @@ -15,8 +15,7 @@ mod benchmarks { use vortex_array::ArrayRef; use vortex_array::IntoArray; use vortex_array::ToCanonical; - use vortex_btrblocks::Compressor; - use vortex_btrblocks::IntCompressor; + use vortex_btrblocks::BtrBlocksCompressor; use vortex_buffer::buffer_mut; use vortex_utils::aliases::hash_set::HashSet; @@ -42,11 +41,12 @@ mod benchmarks { #[divan::bench] fn btrblocks(bencher: Bencher) { let array = make_clickbench_window_name().to_primitive(); + let compressor = BtrBlocksCompressor::default(); bencher .with_inputs(|| &array) .input_counter(|array| ItemsCount::new(array.len())) .input_counter(|array| BytesCount::of_many::(array.len())) - .bench_refs(|array| IntCompressor::compress(array, false, 3, &[]).unwrap()); + .bench_refs(|array| compressor.compress(array.as_ref()).unwrap()); } } diff --git a/vortex-btrblocks/src/builder.rs b/vortex-btrblocks/src/builder.rs new file mode 100644 index 00000000000..201104f9b85 --- /dev/null +++ b/vortex-btrblocks/src/builder.rs @@ -0,0 +1,124 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Builder for configuring `BtrBlocksCompressor` instances. + +use itertools::Itertools; +use vortex_utils::aliases::hash_set::HashSet; + +use crate::BtrBlocksCompressor; +use crate::FloatCode; +use crate::IntCode; +use crate::StringCode; +use crate::compressor::float::ALL_FLOAT_SCHEMES; +use crate::compressor::float::FloatScheme; +use crate::compressor::integer::ALL_INT_SCHEMES; +use crate::compressor::integer::IntegerScheme; +use crate::compressor::string::ALL_STRING_SCHEMES; +use crate::compressor::string::StringScheme; + +/// Builder for creating configured [`BtrBlocksCompressor`] instances. +/// +/// Use this builder to configure which compression schemes are allowed for each data type. +/// By default, all schemes are enabled. +/// +/// # Examples +/// +/// ```rust +/// use vortex_btrblocks::{BtrBlocksCompressorBuilder, IntCode, FloatCode}; +/// +/// // Default compressor - all schemes allowed +/// let compressor = BtrBlocksCompressorBuilder::default().build(); +/// +/// // Exclude specific schemes +/// let compressor = BtrBlocksCompressorBuilder::default() +/// .exclude_int([IntCode::Dict]) +/// .build(); +/// +/// // Exclude then re-include +/// let compressor = BtrBlocksCompressorBuilder::default() +/// .exclude_int([IntCode::Dict, IntCode::Rle]) +/// .include_int([IntCode::Dict]) +/// .build(); +/// ``` +#[derive(Debug, Clone)] +pub struct BtrBlocksCompressorBuilder { + int_schemes: HashSet<&'static dyn IntegerScheme>, + float_schemes: HashSet<&'static dyn FloatScheme>, + string_schemes: HashSet<&'static dyn StringScheme>, +} + +impl Default for BtrBlocksCompressorBuilder { + fn default() -> Self { + Self { + int_schemes: ALL_INT_SCHEMES.iter().copied().collect(), + float_schemes: ALL_FLOAT_SCHEMES.iter().copied().collect(), + string_schemes: ALL_STRING_SCHEMES.iter().copied().collect(), + } + } +} + +impl BtrBlocksCompressorBuilder { + /// Excludes the specified integer compression schemes. + pub fn exclude_int(mut self, codes: impl IntoIterator) -> Self { + let codes: HashSet<_> = codes.into_iter().collect(); + self.int_schemes.retain(|s| !codes.contains(&s.code())); + self + } + + /// Excludes the specified float compression schemes. + pub fn exclude_float(mut self, codes: impl IntoIterator) -> Self { + let codes: HashSet<_> = codes.into_iter().collect(); + self.float_schemes.retain(|s| !codes.contains(&s.code())); + self + } + + /// Excludes the specified string compression schemes. + pub fn exclude_string(mut self, codes: impl IntoIterator) -> Self { + let codes: HashSet<_> = codes.into_iter().collect(); + self.string_schemes.retain(|s| !codes.contains(&s.code())); + self + } + + /// Includes the specified integer compression schemes. + pub fn include_int(mut self, codes: impl IntoIterator) -> Self { + let codes: HashSet<_> = codes.into_iter().collect(); + for scheme in ALL_INT_SCHEMES { + if codes.contains(&scheme.code()) { + self.int_schemes.insert(*scheme); + } + } + self + } + + /// Includes the specified float compression schemes. + pub fn include_float(mut self, codes: impl IntoIterator) -> Self { + let codes: HashSet<_> = codes.into_iter().collect(); + for scheme in ALL_FLOAT_SCHEMES { + if codes.contains(&scheme.code()) { + self.float_schemes.insert(*scheme); + } + } + self + } + + /// Includes the specified string compression schemes. + pub fn include_string(mut self, codes: impl IntoIterator) -> Self { + let codes: HashSet<_> = codes.into_iter().collect(); + for scheme in ALL_STRING_SCHEMES { + if codes.contains(&scheme.code()) { + self.string_schemes.insert(*scheme); + } + } + self + } + + /// Builds the configured `BtrBlocksCompressor`. + pub fn build(self) -> BtrBlocksCompressor { + BtrBlocksCompressor { + int_schemes: self.int_schemes.into_iter().collect_vec(), + float_schemes: self.float_schemes.into_iter().collect_vec(), + string_schemes: self.string_schemes.into_iter().collect_vec(), + } + } +} diff --git a/vortex-btrblocks/src/canonical_compressor.rs b/vortex-btrblocks/src/canonical_compressor.rs new file mode 100644 index 00000000000..e977680a9a2 --- /dev/null +++ b/vortex-btrblocks/src/canonical_compressor.rs @@ -0,0 +1,277 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Canonical array compression implementation. + +use vortex_array::Array; +use vortex_array::ArrayRef; +use vortex_array::Canonical; +use vortex_array::IntoArray; +use vortex_array::ToCanonical; +use vortex_array::arrays::ConstantArray; +use vortex_array::arrays::ExtensionArray; +use vortex_array::arrays::FixedSizeListArray; +use vortex_array::arrays::ListArray; +use vortex_array::arrays::StructArray; +use vortex_array::arrays::TemporalArray; +use vortex_array::arrays::list_from_list_view; +use vortex_array::compute::Cost; +use vortex_array::compute::IsConstantOpts; +use vortex_array::compute::is_constant_opts; +use vortex_array::vtable::ValidityHelper; +use vortex_dtype::DType; +use vortex_dtype::Nullability; +use vortex_dtype::datetime::TemporalMetadata; +use vortex_error::VortexResult; + +use crate::BtrBlocksCompressorBuilder; +use crate::CompressorContext; +use crate::CompressorExt; +use crate::Excludes; +use crate::FloatCompressor; +use crate::IntCode; +use crate::IntCompressor; +use crate::StringCompressor; +use crate::compressor::decimal::compress_decimal; +use crate::compressor::float::FloatScheme; +use crate::compressor::integer::IntegerScheme; +use crate::compressor::string::StringScheme; +use crate::compressor::temporal::compress_temporal; + +/// Trait for compressors that can compress canonical arrays. +/// +/// Provides access to configured compression schemes and the ability to +/// compress canonical arrays recursively. +pub trait CanonicalCompressor { + /// Compresses a canonical array with the specified options. + fn compress_canonical( + &self, + array: Canonical, + ctx: CompressorContext, + excludes: Excludes, + ) -> VortexResult; + + /// Returns the enabled integer compression schemes. + fn int_schemes(&self) -> &[&'static dyn IntegerScheme]; + + /// Returns the enabled float compression schemes. + fn float_schemes(&self) -> &[&'static dyn FloatScheme]; + + /// Returns the enabled string compression schemes. + fn string_schemes(&self) -> &[&'static dyn StringScheme]; +} + +/// The main compressor type implementing BtrBlocks-inspired compression. +/// +/// This compressor applies adaptive compression schemes to arrays based on their data types +/// and characteristics. It recursively compresses nested structures like structs and lists, +/// and chooses optimal compression schemes for primitive types. +/// +/// The compressor works by: +/// 1. Canonicalizing input arrays to a standard representation +/// 2. Analyzing data characteristics to choose optimal compression schemes +/// 3. Recursively compressing nested structures +/// 4. Applying type-specific compression for primitives, strings, and temporal data +/// +/// Use [`BtrBlocksCompressorBuilder`] to configure which compression schemes are enabled. +/// +/// # Examples +/// +/// ```rust +/// use vortex_btrblocks::{BtrBlocksCompressor, BtrBlocksCompressorBuilder, IntCode}; +/// +/// // Default compressor - all schemes allowed +/// let compressor = BtrBlocksCompressor::default(); +/// +/// // Exclude specific schemes using the builder +/// let compressor = BtrBlocksCompressorBuilder::default() +/// .exclude_int([IntCode::Dict]) +/// .build(); +/// ``` +#[derive(Clone)] +pub struct BtrBlocksCompressor { + /// Integer compressor with configured schemes. + pub int_schemes: Vec<&'static dyn IntegerScheme>, + + /// Float compressor with configured schemes. + pub float_schemes: Vec<&'static dyn FloatScheme>, + + /// String compressor with configured schemes. + pub string_schemes: Vec<&'static dyn StringScheme>, +} + +impl Default for BtrBlocksCompressor { + fn default() -> Self { + BtrBlocksCompressorBuilder::default().build() + } +} + +impl BtrBlocksCompressor { + /// Compresses an array using BtrBlocks-inspired compression. + /// + /// First canonicalizes and compacts the array, then applies optimal compression schemes. + pub fn compress(&self, array: &dyn Array) -> VortexResult { + // Canonicalize the array + let canonical = array.to_canonical()?; + + // Compact it, removing any wasted space before we attempt to compress it + let compact = canonical.compact()?; + + self.compress_canonical(compact, CompressorContext::default(), Excludes::none()) + } + + pub(crate) fn integer_compressor(&self) -> IntCompressor<'_> { + IntCompressor { + btr_blocks_compressor: self, + } + } + + pub(crate) fn float_compressor(&self) -> FloatCompressor<'_> { + FloatCompressor { + btr_blocks_compressor: self, + } + } + + pub(crate) fn string_compressor(&self) -> StringCompressor<'_> { + StringCompressor { + btr_blocks_compressor: self, + } + } +} + +impl CanonicalCompressor for BtrBlocksCompressor { + /// Compresses a canonical array by dispatching to type-specific compressors. + /// + /// Recursively compresses nested structures and applies optimal schemes for each data type. + fn compress_canonical( + &self, + array: Canonical, + ctx: CompressorContext, + excludes: Excludes, + ) -> VortexResult { + match array { + Canonical::Null(null_array) => Ok(null_array.into_array()), + // TODO(aduffy): Sparse, other bool compressors. + Canonical::Bool(bool_array) => Ok(bool_array.into_array()), + Canonical::Primitive(primitive) => { + if primitive.ptype().is_int() { + self.integer_compressor() + .compress(self, &primitive, ctx, excludes.int) + } else { + self.float_compressor() + .compress(self, &primitive, ctx, excludes.float) + } + } + Canonical::Decimal(decimal) => compress_decimal(self, &decimal), + Canonical::Struct(struct_array) => { + let fields = struct_array + .unmasked_fields() + .iter() + .map(|field| self.compress(field)) + .collect::, _>>()?; + + Ok(StructArray::try_new( + struct_array.names().clone(), + fields, + struct_array.len(), + struct_array.validity().clone(), + )? + .into_array()) + } + Canonical::List(list_view_array) => { + // TODO(joe): We might want to write list views in the future and chose between + // list and list view. + let list_array = list_from_list_view(list_view_array)?; + + // Reset the offsets to remove garbage data that might prevent us from narrowing our + // offsets (there could be a large amount of trailing garbage data that the current + // views do not reference at all). + let list_array = list_array.reset_offsets(true)?; + + let compressed_elems = self.compress(list_array.elements())?; + + // Note that since the type of our offsets are not encoded in our `DType`, and since + // we guarantee above that all elements are referenced by offsets, we may narrow the + // widths. + + let compressed_offsets = self.compress_canonical( + Canonical::Primitive(list_array.offsets().to_primitive().narrow()?), + ctx, + Excludes::from(&[IntCode::Dict]), + )?; + + Ok(ListArray::try_new( + compressed_elems, + compressed_offsets, + list_array.validity().clone(), + )? + .into_array()) + } + Canonical::FixedSizeList(fsl_array) => { + let compressed_elems = self.compress(fsl_array.elements())?; + + Ok(FixedSizeListArray::try_new( + compressed_elems, + fsl_array.list_size(), + fsl_array.validity().clone(), + fsl_array.len(), + )? + .into_array()) + } + Canonical::VarBinView(strings) => { + if strings + .dtype() + .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable)) + { + self.string_compressor() + .compress(self, &strings, ctx, excludes.string) + } else { + // Binary arrays do not compress + Ok(strings.into_array()) + } + } + Canonical::Extension(ext_array) => { + // We compress Timestamp-level arrays with DateTimeParts compression + if let Ok(temporal_array) = TemporalArray::try_from(ext_array.to_array()) + && let TemporalMetadata::Timestamp(..) = temporal_array.temporal_metadata() + { + if is_constant_opts( + temporal_array.as_ref(), + &IsConstantOpts { + cost: Cost::Canonicalize, + }, + )? + .unwrap_or_default() + { + return Ok(ConstantArray::new( + temporal_array.as_ref().scalar_at(0)?, + ext_array.len(), + ) + .into_array()); + } + return compress_temporal(self, temporal_array); + } + + // Compress the underlying storage array. + let compressed_storage = self.compress(ext_array.storage())?; + + Ok( + ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage) + .into_array(), + ) + } + } + } + + fn int_schemes(&self) -> &[&'static dyn IntegerScheme] { + &self.int_schemes + } + + fn float_schemes(&self) -> &[&'static dyn FloatScheme] { + &self.float_schemes + } + + fn string_schemes(&self) -> &[&'static dyn StringScheme] { + &self.string_schemes + } +} diff --git a/vortex-btrblocks/src/decimal.rs b/vortex-btrblocks/src/compressor/decimal.rs similarity index 73% rename from vortex-btrblocks/src/decimal.rs rename to vortex-btrblocks/src/compressor/decimal.rs index 68711aaf6fd..5170405d10c 100644 --- a/vortex-btrblocks/src/decimal.rs +++ b/vortex-btrblocks/src/compressor/decimal.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use vortex_array::ArrayRef; +use vortex_array::Canonical; use vortex_array::arrays::DecimalArray; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::narrowed_decimal; @@ -10,13 +11,17 @@ use vortex_decimal_byte_parts::DecimalBytePartsArray; use vortex_error::VortexResult; use vortex_scalar::DecimalType; -use crate::Compressor; -use crate::IntCompressor; -use crate::MAX_CASCADE; +use crate::BtrBlocksCompressor; +use crate::CanonicalCompressor; +use crate::CompressorContext; +use crate::Excludes; // TODO(joe): add support splitting i128/256 buffers into chunks primitive values for compression. // 2 for i128 and 4 for i256 -pub fn compress_decimal(decimal: &DecimalArray) -> VortexResult { +pub fn compress_decimal( + compressor: &BtrBlocksCompressor, + decimal: &DecimalArray, +) -> VortexResult { let decimal = narrowed_decimal(decimal.clone()); let validity = decimal.validity(); let prim = match decimal.values_type() { @@ -27,7 +32,11 @@ pub fn compress_decimal(decimal: &DecimalArray) -> VortexResult { _ => return Ok(decimal.to_array()), }; - let compressed = IntCompressor::compress(&prim, false, MAX_CASCADE, &[])?; + let compressed = compressor.compress_canonical( + Canonical::Primitive(prim), + CompressorContext::default(), + Excludes::none(), + )?; DecimalBytePartsArray::try_new(compressed, decimal.decimal_dtype()).map(|d| d.to_array()) } diff --git a/vortex-btrblocks/src/float/dictionary.rs b/vortex-btrblocks/src/compressor/float/dictionary.rs similarity index 96% rename from vortex-btrblocks/src/float/dictionary.rs rename to vortex-btrblocks/src/compressor/float/dictionary.rs index 5b9bfc331cc..9a5955a6daf 100644 --- a/vortex-btrblocks/src/float/dictionary.rs +++ b/vortex-btrblocks/src/compressor/float/dictionary.rs @@ -11,8 +11,8 @@ use vortex_array::vtable::ValidityHelper; use vortex_buffer::Buffer; use vortex_dtype::half::f16; -use crate::float::stats::ErasedDistinctValues; -use crate::float::stats::FloatStats; +use super::stats::ErasedDistinctValues; +use super::stats::FloatStats; macro_rules! typed_encode { ($stats:ident, $typed:ident, $validity:ident, $typ:ty) => {{ @@ -104,9 +104,9 @@ mod tests { use vortex_array::validity::Validity; use vortex_buffer::buffer; + use super::super::FloatStats; use crate::CompressorStats; - use crate::float::dictionary::dictionary_encode; - use crate::float::stats::FloatStats; + use crate::compressor::float::dictionary::dictionary_encode; #[test] fn test_float_dict_encode() { diff --git a/vortex-btrblocks/src/float.rs b/vortex-btrblocks/src/compressor/float/mod.rs similarity index 61% rename from vortex-btrblocks/src/float.rs rename to vortex-btrblocks/src/compressor/float/mod.rs index 795d2fb0fcb..205bb8472b0 100644 --- a/vortex-btrblocks/src/float.rs +++ b/vortex-btrblocks/src/compressor/float/mod.rs @@ -2,19 +2,26 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors pub(crate) mod dictionary; -mod stats; +pub(super) mod stats; +use std::hash::Hash; +use std::hash::Hasher; + +use enum_iterator::Sequence; use vortex_alp::ALPArray; use vortex_alp::ALPVTable; use vortex_alp::RDEncoder; use vortex_alp::alp_encode; use vortex_array::ArrayRef; +use vortex_array::Canonical; use vortex_array::IntoArray; use vortex_array::ToCanonical; use vortex_array::arrays::ConstantArray; use vortex_array::arrays::DictArray; +use vortex_array::arrays::DictArrayParts; use vortex_array::arrays::MaskedArray; use vortex_array::arrays::PrimitiveVTable; +use vortex_array::vtable::VTable; use vortex_array::vtable::ValidityHelper; use vortex_dtype::PType; use vortex_error::VortexResult; @@ -23,100 +30,174 @@ use vortex_scalar::Scalar; use vortex_sparse::SparseArray; use vortex_sparse::SparseVTable; +use self::dictionary::dictionary_encode; pub use self::stats::FloatStats; +use super::integer::DictScheme as IntDictScheme; +use super::integer::RunEndScheme as IntRunEndScheme; +use super::integer::SparseScheme as IntSparseScheme; +use crate::BtrBlocksCompressor; +use crate::CanonicalCompressor; use crate::Compressor; +use crate::CompressorContext; use crate::CompressorStats; +use crate::Excludes; use crate::GenerateStatsOptions; +use crate::IntCode; use crate::Scheme; -use crate::estimate_compression_ratio_with_sampling; -use crate::float::dictionary::dictionary_encode; -use crate::integer; -use crate::integer::IntCompressor; -use crate::integer::IntegerStats; -use crate::patches::compress_patches; -use crate::rle::RLEScheme; +use crate::SchemeExt; +use crate::compressor::patches::compress_patches; +use crate::compressor::rle; +use crate::compressor::rle::RLEScheme; + +pub trait FloatScheme: Scheme + Send + Sync {} + +impl FloatScheme for T where T: Scheme + Send + Sync +{} + +impl PartialEq for dyn FloatScheme { + fn eq(&self, other: &Self) -> bool { + self.code() == other.code() + } +} -pub trait FloatScheme: Scheme {} +impl Eq for dyn FloatScheme {} + +impl Hash for dyn FloatScheme { + fn hash(&self, state: &mut H) { + self.code().hash(state) + } +} -impl FloatScheme for T where T: Scheme {} +/// All available float compression schemes. +pub const ALL_FLOAT_SCHEMES: &[&dyn FloatScheme] = &[ + &UncompressedScheme, + &ConstantScheme, + &ALPScheme, + &ALPRDScheme, + &DictScheme, + &NullDominated, + &RLE_FLOAT_SCHEME, +]; /// [`Compressor`] for floating-point numbers. -pub struct FloatCompressor; +#[derive(Clone, Copy)] +pub struct FloatCompressor<'a> { + /// Reference to the parent compressor. + pub btr_blocks_compressor: &'a dyn CanonicalCompressor, +} -impl Compressor for FloatCompressor { +impl<'a> Compressor for FloatCompressor<'a> { type ArrayVTable = PrimitiveVTable; type SchemeType = dyn FloatScheme; type StatsType = FloatStats; - fn schemes() -> &'static [&'static Self::SchemeType] { - &[ - &UncompressedScheme, - &ConstantScheme, - &ALPScheme, - &ALPRDScheme, - &DictScheme, - &NullDominated, - &RLE_FLOAT_SCHEME, - ] + fn gen_stats(&self, array: &::Array) -> Self::StatsType { + if self + .btr_blocks_compressor + .float_schemes() + .iter() + .any(|s| s.code() == DictScheme.code()) + { + FloatStats::generate_opts( + array, + GenerateStatsOptions { + count_distinct_values: true, + }, + ) + } else { + FloatStats::generate_opts( + array, + GenerateStatsOptions { + count_distinct_values: false, + }, + ) + } } - fn default_scheme() -> &'static Self::SchemeType { - &UncompressedScheme + fn schemes(&self) -> &[&'static dyn FloatScheme] { + self.btr_blocks_compressor.float_schemes() } - fn dict_scheme_code() -> FloatCode { - DICT_SCHEME + fn default_scheme(&self) -> &'static Self::SchemeType { + &UncompressedScheme } } -const UNCOMPRESSED_SCHEME: FloatCode = FloatCode(0); -const CONSTANT_SCHEME: FloatCode = FloatCode(1); -const ALP_SCHEME: FloatCode = FloatCode(2); -const ALPRD_SCHEME: FloatCode = FloatCode(3); -const DICT_SCHEME: FloatCode = FloatCode(4); -const RUN_END_SCHEME: FloatCode = FloatCode(5); -const RUN_LENGTH_SCHEME: FloatCode = FloatCode(6); - -const SPARSE_SCHEME: FloatCode = FloatCode(7); +/// Unique identifier for float compression schemes. +#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Sequence)] +pub enum FloatCode { + /// No compression applied. + Uncompressed, + /// Constant encoding for arrays with a single distinct value. + Constant, + /// ALP (Adaptive Lossless floating-Point) encoding. + Alp, + /// ALPRD (ALP with Right Division) encoding variant. + AlpRd, + /// Dictionary encoding for low-cardinality float values. + Dict, + /// Run-end encoding. + RunEnd, + /// RLE encoding - generic run-length encoding. + Rle, + /// Sparse encoding for null-dominated arrays. + Sparse, +} -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] struct UncompressedScheme; -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] struct ConstantScheme; -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] struct ALPScheme; -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] struct ALPRDScheme; -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] struct DictScheme; -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct NullDominated; -pub const RLE_FLOAT_SCHEME: RLEScheme = RLEScheme::new( - RUN_LENGTH_SCHEME, - |values, is_sample, allowed_cascading, excludes| { - FloatCompressor::compress(values, is_sample, allowed_cascading, excludes) - }, -); +/// Configuration for float RLE compression. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct FloatRLEConfig; + +impl rle::RLEConfig for FloatRLEConfig { + type Stats = FloatStats; + type Code = FloatCode; + + const CODE: FloatCode = FloatCode::Rle; + + fn compress_values( + compressor: &BtrBlocksCompressor, + values: &vortex_array::arrays::PrimitiveArray, + ctx: CompressorContext, + excludes: &[FloatCode], + ) -> VortexResult { + compressor.compress_canonical(Canonical::Primitive(values.clone()), ctx, excludes.into()) + } +} + +/// RLE scheme for float compression. +pub const RLE_FLOAT_SCHEME: RLEScheme = RLEScheme::new(); impl Scheme for UncompressedScheme { type StatsType = FloatStats; type CodeType = FloatCode; fn code(&self) -> FloatCode { - UNCOMPRESSED_SCHEME + FloatCode::Uncompressed } fn expected_compression_ratio( &self, + _compressor: &BtrBlocksCompressor, _stats: &Self::StatsType, - _is_sample: bool, - _allowed_cascading: usize, + _ctx: CompressorContext, _excludes: &[FloatCode], ) -> VortexResult { Ok(1.0) @@ -124,9 +205,9 @@ impl Scheme for UncompressedScheme { fn compress( &self, + _btr_blocks_compressor: &BtrBlocksCompressor, stats: &Self::StatsType, - _is_sample: bool, - _allowed_cascading: usize, + _ctx: CompressorContext, _excludes: &[FloatCode], ) -> VortexResult { Ok(stats.source().to_array()) @@ -138,18 +219,18 @@ impl Scheme for ConstantScheme { type CodeType = FloatCode; fn code(&self) -> FloatCode { - CONSTANT_SCHEME + FloatCode::Constant } fn expected_compression_ratio( &self, + _btr_blocks_compressor: &BtrBlocksCompressor, stats: &Self::StatsType, - is_sample: bool, - _allowed_cascading: usize, + ctx: CompressorContext, _excludes: &[FloatCode], ) -> VortexResult { // Never select Constant when sampling - if is_sample { + if ctx.is_sample { return Ok(0.0); } @@ -167,9 +248,9 @@ impl Scheme for ConstantScheme { fn compress( &self, + _btr_blocks_compressor: &BtrBlocksCompressor, stats: &Self::StatsType, - _is_sample: bool, - _allowed_cascading: usize, + _ctx: CompressorContext, _excludes: &[FloatCode], ) -> VortexResult { let scalar_idx = @@ -194,22 +275,19 @@ impl Scheme for ConstantScheme { } } -#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] -pub struct FloatCode(u8); - impl Scheme for ALPScheme { type StatsType = FloatStats; type CodeType = FloatCode; fn code(&self) -> FloatCode { - ALP_SCHEME + FloatCode::Alp } fn expected_compression_ratio( &self, + compressor: &BtrBlocksCompressor, stats: &Self::StatsType, - is_sample: bool, - allowed_cascading: usize, + ctx: CompressorContext, excludes: &[FloatCode], ) -> VortexResult { // We don't support ALP for f16 @@ -217,26 +295,20 @@ impl Scheme for ALPScheme { return Ok(0.0); } - if allowed_cascading == 0 { + if ctx.allowed_cascading == 0 { // ALP does not compress on its own, we need to be able to cascade it with // an integer compressor. return Ok(0.0); } - estimate_compression_ratio_with_sampling( - self, - stats, - is_sample, - allowed_cascading, - excludes, - ) + self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes) } fn compress( &self, + compressor: &BtrBlocksCompressor, stats: &FloatStats, - is_sample: bool, - allowed_cascading: usize, + ctx: CompressorContext, excludes: &[FloatCode], ) -> VortexResult { let alp_encoded = alp_encode(&stats.source().to_primitive(), None)?; @@ -247,15 +319,18 @@ impl Scheme for ALPScheme { // Patches are not compressed. They should be infrequent, and if they are not then we want // to keep them linear for easy indexing. let mut int_excludes = Vec::new(); - if excludes.contains(&DICT_SCHEME) { - int_excludes.push(integer::DictScheme.code()); + if excludes.contains(&FloatCode::Dict) { + int_excludes.push(IntDictScheme.code()); } - if excludes.contains(&RUN_END_SCHEME) { - int_excludes.push(integer::RunEndScheme.code()); + if excludes.contains(&FloatCode::RunEnd) { + int_excludes.push(IntRunEndScheme.code()); } - let compressed_alp_ints = - IntCompressor::compress(&alp_ints, is_sample, allowed_cascading - 1, &int_excludes)?; + let compressed_alp_ints = compressor.compress_canonical( + Canonical::Primitive(alp_ints), + ctx.descend(), + Excludes::int_only(&int_excludes), + )?; let patches = alp.patches().map(compress_patches).transpose()?; @@ -268,34 +343,28 @@ impl Scheme for ALPRDScheme { type CodeType = FloatCode; fn code(&self) -> FloatCode { - ALPRD_SCHEME + FloatCode::AlpRd } fn expected_compression_ratio( &self, + compressor: &BtrBlocksCompressor, stats: &Self::StatsType, - is_sample: bool, - allowed_cascading: usize, + ctx: CompressorContext, excludes: &[FloatCode], ) -> VortexResult { if stats.source().ptype() == PType::F16 { return Ok(0.0); } - estimate_compression_ratio_with_sampling( - self, - stats, - is_sample, - allowed_cascading, - excludes, - ) + self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes) } fn compress( &self, + _compressor: &BtrBlocksCompressor, stats: &Self::StatsType, - _is_sample: bool, - _allowed_cascading: usize, + _ctx: CompressorContext, _excludes: &[FloatCode], ) -> VortexResult { let encoder = match stats.source().ptype() { @@ -321,14 +390,14 @@ impl Scheme for DictScheme { type CodeType = FloatCode; fn code(&self) -> FloatCode { - DICT_SCHEME + FloatCode::Dict } fn expected_compression_ratio( &self, + compressor: &BtrBlocksCompressor, stats: &Self::StatsType, - is_sample: bool, - allowed_cascading: usize, + ctx: CompressorContext, excludes: &[FloatCode], ) -> VortexResult { if stats.value_count == 0 { @@ -341,56 +410,38 @@ impl Scheme for DictScheme { } // Take a sample and run compression on the sample to determine before/after size. - estimate_compression_ratio_with_sampling( - self, - stats, - is_sample, - allowed_cascading, - excludes, - ) + self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes) } fn compress( &self, + compressor: &BtrBlocksCompressor, stats: &Self::StatsType, - is_sample: bool, - allowed_cascading: usize, - _excludes: &[FloatCode], + ctx: CompressorContext, + _excludes: &[Self::CodeType], ) -> VortexResult { - let dict_array = dictionary_encode(stats); - - // Only compress the codes. - let codes_stats = IntegerStats::generate_opts( - &dict_array.codes().to_primitive().narrow()?, - GenerateStatsOptions { - count_distinct_values: false, - }, - ); - let codes_scheme = IntCompressor::choose_scheme( - &codes_stats, - is_sample, - allowed_cascading - 1, - &[integer::DictScheme.code(), integer::SequenceScheme.code()], - )?; - let compressed_codes = codes_scheme.compress( - &codes_stats, - is_sample, - allowed_cascading - 1, - &[integer::DictScheme.code()], + let dict = dictionary_encode(stats); + let has_all_values_referenced = dict.has_all_values_referenced(); + let DictArrayParts { codes, values, .. } = dict.into_parts(); + + let compressed_codes = compressor.compress_canonical( + Canonical::Primitive(codes.to_primitive()), + ctx.descend(), + Excludes::int_only(&[IntCode::Dict, IntCode::Sequence]), )?; - let compressed_values = FloatCompressor::compress( - &dict_array.values().to_primitive(), - is_sample, - allowed_cascading - 1, - &[DICT_SCHEME], + assert!(values.is_canonical()); + let compressed_values = compressor.compress_canonical( + Canonical::Primitive(values.to_primitive()), + ctx.descend(), + Excludes::from(&[FloatCode::Dict]), )?; // SAFETY: compressing codes or values does not alter the invariants unsafe { Ok( DictArray::new_unchecked(compressed_codes, compressed_values) - .set_all_values_referenced(dict_array.has_all_values_referenced()) + .set_all_values_referenced(has_all_values_referenced) .into_array(), ) } @@ -402,18 +453,18 @@ impl Scheme for NullDominated { type CodeType = FloatCode; fn code(&self) -> Self::CodeType { - SPARSE_SCHEME + FloatCode::Sparse } fn expected_compression_ratio( &self, + _compressor: &BtrBlocksCompressor, stats: &Self::StatsType, - _is_sample: bool, - allowed_cascading: usize, + ctx: CompressorContext, _excludes: &[Self::CodeType], ) -> VortexResult { // Only use `SparseScheme` if we can cascade. - if allowed_cascading == 0 { + if ctx.allowed_cascading == 0 { return Ok(0.0); } @@ -433,28 +484,27 @@ impl Scheme for NullDominated { fn compress( &self, + compressor: &BtrBlocksCompressor, stats: &Self::StatsType, - is_sample: bool, - allowed_cascading: usize, + ctx: CompressorContext, _excludes: &[Self::CodeType], ) -> VortexResult { - assert!(allowed_cascading > 0); + assert!(ctx.allowed_cascading > 0); // We pass None as we only run this pathway for NULL-dominated float arrays let sparse_encoded = SparseArray::encode(stats.src.as_ref(), None)?; if let Some(sparse) = sparse_encoded.as_opt::() { // Compress the values - let new_excludes = vec![integer::SparseScheme.code()]; + let new_excludes = [IntSparseScheme.code()]; // Don't attempt to compress the non-null values let indices = sparse.patches().indices().to_primitive().narrow()?; - let compressed_indices = IntCompressor::compress_no_dict( - &indices, - is_sample, - allowed_cascading - 1, - &new_excludes, + let compressed_indices = compressor.compress_canonical( + Canonical::Primitive(indices.to_primitive()), + ctx.descend(), + Excludes::int_only(&new_excludes), )?; SparseArray::try_new( @@ -489,20 +539,21 @@ mod tests { use vortex_dtype::Nullability; use vortex_error::VortexResult; - use crate::Compressor; + use super::RLE_FLOAT_SCHEME; + use crate::BtrBlocksCompressor; + use crate::CompressorContext; + use crate::CompressorExt; use crate::CompressorStats; - use crate::MAX_CASCADE; use crate::Scheme; - use crate::float::FloatCompressor; - use crate::float::RLE_FLOAT_SCHEME; #[test] fn test_empty() -> VortexResult<()> { // Make sure empty array compression does not fail - let result = FloatCompressor::compress( + let btr = BtrBlocksCompressor::default(); + let result = btr.float_compressor().compress( + &btr, &PrimitiveArray::new(Buffer::::empty(), Validity::NonNullable), - false, - 3, + CompressorContext::default(), &[], )?; @@ -522,7 +573,10 @@ mod tests { } let floats = values.into_array().to_primitive(); - let compressed = FloatCompressor::compress(&floats, false, MAX_CASCADE, &[])?; + let btr = BtrBlocksCompressor::default(); + let compressed = + btr.float_compressor() + .compress(&btr, &floats, CompressorContext::default(), &[])?; assert_eq!(compressed.len(), 1024); let display = compressed @@ -542,8 +596,10 @@ mod tests { values.extend(iter::repeat_n(3.15f32, 150)); let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable); - let stats = crate::float::FloatStats::generate(&array); - let compressed = RLE_FLOAT_SCHEME.compress(&stats, false, 3, &[])?; + let stats = super::FloatStats::generate(&array); + let btr = BtrBlocksCompressor::default(); + let compressed = + RLE_FLOAT_SCHEME.compress(&btr, &stats, CompressorContext::default(), &[])?; let decoded = compressed; let expected = Buffer::copy_from(&values).into_array(); @@ -563,8 +619,10 @@ mod tests { array.append_nulls(90); let floats = array.finish_into_primitive(); - - let compressed = FloatCompressor::compress(&floats, false, MAX_CASCADE, &[])?; + let btr = BtrBlocksCompressor::default(); + let compressed = + btr.float_compressor() + .compress(&btr, &floats, CompressorContext::default(), &[])?; assert_eq!(compressed.len(), 96); let display = compressed @@ -592,14 +650,18 @@ mod scheme_selection_tests { use vortex_dtype::Nullability; use vortex_error::VortexResult; - use crate::Compressor; - use crate::float::FloatCompressor; + use crate::BtrBlocksCompressor; + use crate::CompressorContext; + use crate::CompressorExt; #[test] fn test_constant_compressed() -> VortexResult<()> { let values: Vec = vec![42.5; 100]; let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable); - let compressed = FloatCompressor::compress(&array, false, 3, &[])?; + let btr = BtrBlocksCompressor::default(); + let compressed = + btr.float_compressor() + .compress(&btr, &array, CompressorContext::default(), &[])?; assert!(compressed.is::()); Ok(()) } @@ -608,7 +670,10 @@ mod scheme_selection_tests { fn test_alp_compressed() -> VortexResult<()> { let values: Vec = (0..1000).map(|i| (i as f64) * 0.01).collect(); let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable); - let compressed = FloatCompressor::compress(&array, false, 3, &[])?; + let btr = BtrBlocksCompressor::default(); + let compressed = + btr.float_compressor() + .compress(&btr, &array, CompressorContext::default(), &[])?; assert!(compressed.is::()); Ok(()) } @@ -620,7 +685,10 @@ mod scheme_selection_tests { .map(|i| distinct_values[i % distinct_values.len()]) .collect(); let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable); - let compressed = FloatCompressor::compress(&array, false, 3, &[])?; + let btr = BtrBlocksCompressor::default(); + let compressed = + btr.float_compressor() + .compress(&btr, &array, CompressorContext::default(), &[])?; assert!(compressed.is::()); Ok(()) } @@ -633,7 +701,10 @@ mod scheme_selection_tests { } builder.append_nulls(95); let array = builder.finish_into_primitive(); - let compressed = FloatCompressor::compress(&array, false, 3, &[])?; + let btr = BtrBlocksCompressor::default(); + let compressed = + btr.float_compressor() + .compress(&btr, &array, CompressorContext::default(), &[])?; // Verify the compressed array preserves values. assert_eq!(compressed.len(), 100); Ok(()) diff --git a/vortex-btrblocks/src/float/stats.rs b/vortex-btrblocks/src/compressor/float/stats.rs similarity index 96% rename from vortex-btrblocks/src/float/stats.rs rename to vortex-btrblocks/src/compressor/float/stats.rs index a1dcaa35d98..eb9c337bc6a 100644 --- a/vortex-btrblocks/src/float/stats.rs +++ b/vortex-btrblocks/src/compressor/float/stats.rs @@ -22,7 +22,7 @@ use vortex_utils::aliases::hash_set::HashSet; use crate::CompressorStats; use crate::GenerateStatsOptions; -use crate::rle::RLEStats; +use crate::compressor::rle::RLEStats; use crate::sample::sample; #[derive(Debug, Clone)] @@ -54,15 +54,15 @@ impl_from_typed!(f64, ErasedDistinctValues::F64); /// Array of floating-point numbers and relevant stats for compression. #[derive(Debug, Clone)] pub struct FloatStats { - pub(super) src: PrimitiveArray, + pub(crate) src: PrimitiveArray, // cache for validity.false_count() - pub(super) null_count: u32, + pub(crate) null_count: u32, // cache for validity.true_count() - pub(super) value_count: u32, + pub(crate) value_count: u32, #[allow(dead_code)] - pub(super) average_run_length: u32, - pub(super) distinct_values: ErasedDistinctValues, - pub(super) distinct_values_count: u32, + pub(crate) average_run_length: u32, + pub(crate) distinct_values: ErasedDistinctValues, + pub(crate) distinct_values_count: u32, } impl FloatStats { @@ -233,8 +233,8 @@ mod tests { use vortex_array::validity::Validity; use vortex_buffer::buffer; + use super::FloatStats; use crate::CompressorStats; - use crate::float::stats::FloatStats; #[test] fn test_float_stats() { diff --git a/vortex-btrblocks/src/integer/dictionary.rs b/vortex-btrblocks/src/compressor/integer/dictionary.rs similarity index 97% rename from vortex-btrblocks/src/integer/dictionary.rs rename to vortex-btrblocks/src/compressor/integer/dictionary.rs index b441240ece0..681bf6c5811 100644 --- a/vortex-btrblocks/src/integer/dictionary.rs +++ b/vortex-btrblocks/src/compressor/integer/dictionary.rs @@ -10,8 +10,8 @@ use vortex_array::validity::Validity; use vortex_array::vtable::ValidityHelper; use vortex_buffer::Buffer; -use crate::integer::IntegerStats; -use crate::integer::stats::ErasedStats; +use super::IntegerStats; +use super::stats::ErasedStats; macro_rules! typed_encode { ($stats:ident, $typed:ident, $validity:ident, $typ:ty) => {{ @@ -120,9 +120,9 @@ mod tests { use vortex_array::validity::Validity; use vortex_buffer::buffer; + use super::IntegerStats; + use super::dictionary_encode; use crate::CompressorStats; - use crate::integer::IntegerStats; - use crate::integer::dictionary::dictionary_encode; #[test] fn test_dict_encode_integer_stats() { diff --git a/vortex-btrblocks/src/integer.rs b/vortex-btrblocks/src/compressor/integer/mod.rs similarity index 66% rename from vortex-btrblocks/src/integer.rs rename to vortex-btrblocks/src/compressor/integer/mod.rs index 459b067b8fe..bbc96ab362b 100644 --- a/vortex-btrblocks/src/integer.rs +++ b/vortex-btrblocks/src/compressor/integer/mod.rs @@ -1,14 +1,16 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -pub mod dictionary; -mod stats; +pub(crate) mod dictionary; +pub(super) mod stats; -use std::fmt::Debug; use std::hash::Hash; +use std::hash::Hasher; +use enum_iterator::Sequence; pub use stats::IntegerStats; use vortex_array::ArrayRef; +use vortex_array::Canonical; use vortex_array::IntoArray; use vortex_array::ToCanonical; use vortex_array::arrays::ConstantArray; @@ -16,6 +18,7 @@ use vortex_array::arrays::DictArray; use vortex_array::arrays::MaskedArray; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::PrimitiveVTable; +use vortex_array::vtable::VTable; use vortex_array::vtable::ValidityHelper; use vortex_error::VortexExpect; use vortex_error::VortexResult; @@ -34,141 +37,196 @@ use vortex_sparse::SparseVTable; use vortex_zigzag::ZigZagArray; use vortex_zigzag::zigzag_encode; +use self::dictionary::dictionary_encode; +use crate::BtrBlocksCompressor; +use crate::CanonicalCompressor; use crate::Compressor; +use crate::CompressorContext; use crate::CompressorStats; +use crate::Excludes; use crate::GenerateStatsOptions; use crate::Scheme; -use crate::estimate_compression_ratio_with_sampling; -use crate::integer::dictionary::dictionary_encode; -use crate::patches::compress_patches; -use crate::rle::RLEScheme; +use crate::SchemeExt; +use crate::compressor::patches::compress_patches; +use crate::compressor::rle; +use crate::compressor::rle::RLEScheme; + +/// All available integer compression schemes. +pub const ALL_INT_SCHEMES: &[&dyn IntegerScheme] = &[ + &ConstantScheme, + &FORScheme, + &ZigZagScheme, + &BitPackingScheme, + &SparseScheme, + &DictScheme, + &RunEndScheme, + &SequenceScheme, + &RLE_INTEGER_SCHEME, +]; /// [`Compressor`] for signed and unsigned integers. -pub struct IntCompressor; +#[derive(Clone, Copy)] +pub struct IntCompressor<'a> { + /// Reference to the parent compressor. + pub btr_blocks_compressor: &'a dyn CanonicalCompressor, +} -impl Compressor for IntCompressor { +impl<'a> Compressor for IntCompressor<'a> { type ArrayVTable = PrimitiveVTable; type SchemeType = dyn IntegerScheme; type StatsType = IntegerStats; - fn schemes() -> &'static [&'static dyn IntegerScheme] { - &[ - &ConstantScheme, - &FORScheme, - &ZigZagScheme, - &BitPackingScheme, - &SparseScheme, - &DictScheme, - &RunEndScheme, - &SequenceScheme, - &RLE_INTEGER_SCHEME, - ] + fn schemes(&self) -> &[&'static dyn IntegerScheme] { + self.btr_blocks_compressor.int_schemes() } - fn default_scheme() -> &'static Self::SchemeType { + fn default_scheme(&self) -> &'static Self::SchemeType { &UncompressedScheme } - fn dict_scheme_code() -> IntCode { - DICT_SCHEME + fn gen_stats(&self, array: &::Array) -> Self::StatsType { + if self + .btr_blocks_compressor + .int_schemes() + .iter() + .any(|s| s.code() == IntCode::Dict) + { + IntegerStats::generate_opts( + array, + GenerateStatsOptions { + count_distinct_values: true, + }, + ) + } else { + IntegerStats::generate_opts( + array, + GenerateStatsOptions { + count_distinct_values: false, + }, + ) + } } } -impl IntCompressor { - pub(crate) fn compress_no_dict( - array: &PrimitiveArray, - is_sample: bool, - allowed_cascading: usize, - excludes: &[IntCode], - ) -> VortexResult { - let stats = IntegerStats::generate_opts( - array, - GenerateStatsOptions { - count_distinct_values: false, - }, - ); +pub trait IntegerScheme: + Scheme + Send + Sync +{ +} - let scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?; - let output = scheme.compress(&stats, is_sample, allowed_cascading, excludes)?; +// Auto-impl +impl IntegerScheme for T where + T: Scheme + Send + Sync +{ +} - if output.nbytes() < array.nbytes() { - Ok(output) - } else { - tracing::debug!("resulting tree too large: {}", output.display_tree()); - Ok(array.to_array()) - } +impl PartialEq for dyn IntegerScheme { + fn eq(&self, other: &Self) -> bool { + self.code() == other.code() } } -pub trait IntegerScheme: Scheme {} +impl Eq for dyn IntegerScheme {} + +impl Hash for dyn IntegerScheme { + fn hash(&self, state: &mut H) { + self.code().hash(state) + } +} + +/// Unique identifier for integer compression schemes. +#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Sequence)] +pub enum IntCode { + /// No compression applied. + Uncompressed, + /// Constant encoding for arrays with a single distinct value. + Constant, + /// Frame of Reference encoding - subtracts minimum value then bitpacks. + For, + /// ZigZag encoding - transforms negative integers to positive for better bitpacking. + ZigZag, + /// BitPacking encoding - compresses non-negative integers by reducing bit width. + BitPacking, + /// Sparse encoding - optimizes null-dominated or single-value-dominated arrays. + Sparse, + /// Dictionary encoding - creates a dictionary of unique values. + Dict, + /// Run-end encoding - run-length encoding with end positions. + RunEnd, + /// Sequence encoding - detects sequential patterns. + Sequence, + /// RLE encoding - generic run-length encoding. + Rle, +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] -// Auto-impl -impl IntegerScheme for T where T: Scheme {} - -#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] -pub struct IntCode(u8); - -const UNCOMPRESSED_SCHEME: IntCode = IntCode(0); -const CONSTANT_SCHEME: IntCode = IntCode(1); -const FOR_SCHEME: IntCode = IntCode(2); -const ZIGZAG_SCHEME: IntCode = IntCode(3); -const BITPACKING_SCHEME: IntCode = IntCode(4); -const SPARSE_SCHEME: IntCode = IntCode(5); -const DICT_SCHEME: IntCode = IntCode(6); -const RUN_END_SCHEME: IntCode = IntCode(7); -const SEQUENCE_SCHEME: IntCode = IntCode(8); -const RUN_LENGTH_SCHEME: IntCode = IntCode(9); - -#[derive(Debug, Copy, Clone)] pub struct UncompressedScheme; -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] + pub struct ConstantScheme; -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] + pub struct FORScheme; -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct ZigZagScheme; -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct BitPackingScheme; -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct SparseScheme; -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct DictScheme; -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct RunEndScheme; -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct SequenceScheme; /// Threshold for the average run length in an array before we consider run-end encoding. const RUN_END_THRESHOLD: u32 = 4; -pub const RLE_INTEGER_SCHEME: RLEScheme = RLEScheme::new( - RUN_LENGTH_SCHEME, - |values, is_sample, allowed_cascading, excludes| { - IntCompressor::compress_no_dict(values, is_sample, allowed_cascading, excludes) - }, -); +/// Configuration for integer RLE compression. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct IntRLEConfig; + +impl rle::RLEConfig for IntRLEConfig { + type Stats = IntegerStats; + type Code = IntCode; + + const CODE: IntCode = IntCode::Rle; + + fn compress_values( + compressor: &BtrBlocksCompressor, + values: &PrimitiveArray, + ctx: CompressorContext, + excludes: &[IntCode], + ) -> VortexResult { + compressor.compress_canonical(Canonical::Primitive(values.clone()), ctx, excludes.into()) + } +} + +/// RLE scheme for integer compression. +pub const RLE_INTEGER_SCHEME: RLEScheme = RLEScheme::new(); impl Scheme for UncompressedScheme { type StatsType = IntegerStats; type CodeType = IntCode; fn code(&self) -> IntCode { - UNCOMPRESSED_SCHEME + IntCode::Uncompressed } fn expected_compression_ratio( &self, + _compressor: &BtrBlocksCompressor, _stats: &IntegerStats, - _is_sample: bool, - _allowed_cascading: usize, + _ctx: CompressorContext, _excludes: &[IntCode], ) -> VortexResult { // no compression @@ -177,9 +235,9 @@ impl Scheme for UncompressedScheme { fn compress( &self, + _compressor: &BtrBlocksCompressor, stats: &IntegerStats, - _is_sample: bool, - _allowed_cascading: usize, + _ctx: CompressorContext, _excludes: &[IntCode], ) -> VortexResult { Ok(stats.source().to_array()) @@ -191,7 +249,7 @@ impl Scheme for ConstantScheme { type CodeType = IntCode; fn code(&self) -> IntCode { - CONSTANT_SCHEME + IntCode::Constant } fn is_constant(&self) -> bool { @@ -200,13 +258,13 @@ impl Scheme for ConstantScheme { fn expected_compression_ratio( &self, + _compressor: &BtrBlocksCompressor, stats: &IntegerStats, - is_sample: bool, - _allowed_cascading: usize, + ctx: CompressorContext, _excludes: &[IntCode], ) -> VortexResult { // Never yield ConstantScheme for a sample, it could be a false-positive. - if is_sample { + if ctx.is_sample { return Ok(0.0); } @@ -220,9 +278,9 @@ impl Scheme for ConstantScheme { fn compress( &self, + _compressor: &BtrBlocksCompressor, stats: &IntegerStats, - _is_sample: bool, - _allowed_cascading: usize, + _ctx: CompressorContext, _excludes: &[IntCode], ) -> VortexResult { let scalar_idx = @@ -252,18 +310,18 @@ impl Scheme for FORScheme { type CodeType = IntCode; fn code(&self) -> IntCode { - FOR_SCHEME + IntCode::For } fn expected_compression_ratio( &self, + _compressor: &BtrBlocksCompressor, stats: &IntegerStats, - _is_sample: bool, - allowed_cascading: usize, + ctx: CompressorContext, _excludes: &[IntCode], ) -> VortexResult { // Only apply if we are not at the leaf - if allowed_cascading == 0 { + if ctx.allowed_cascading == 0 { return Ok(0.0); } @@ -301,9 +359,9 @@ impl Scheme for FORScheme { fn compress( &self, + compressor: &BtrBlocksCompressor, stats: &IntegerStats, - is_sample: bool, - _allowed_cascading: usize, + ctx: CompressorContext, excludes: &[IntCode], ) -> VortexResult { let for_array = FoRArray::encode(stats.src.clone())?; @@ -319,7 +377,12 @@ impl Scheme for FORScheme { // of bitpacking. // NOTE: we could delegate in the future if we had another downstream codec that performs // as well. - let compressed = BitPackingScheme.compress(&biased_stats, is_sample, 0, excludes)?; + let leaf_ctx = CompressorContext { + is_sample: ctx.is_sample, + allowed_cascading: 0, + }; + let compressed = + BitPackingScheme.compress(compressor, &biased_stats, leaf_ctx, excludes)?; let for_compressed = FoRArray::try_new(compressed, for_array.reference_scalar().clone())?; for_compressed @@ -335,18 +398,18 @@ impl Scheme for ZigZagScheme { type CodeType = IntCode; fn code(&self) -> IntCode { - ZIGZAG_SCHEME + IntCode::ZigZag } fn expected_compression_ratio( &self, + compressor: &BtrBlocksCompressor, stats: &IntegerStats, - is_sample: bool, - allowed_cascading: usize, + ctx: CompressorContext, excludes: &[IntCode], ) -> VortexResult { // ZigZag is only useful when we cascade it with another encoding - if allowed_cascading == 0 { + if ctx.allowed_cascading == 0 { return Ok(0.0); } @@ -361,20 +424,14 @@ impl Scheme for ZigZagScheme { } // Run compression on a sample to see how it performs. - estimate_compression_ratio_with_sampling( - self, - stats, - is_sample, - allowed_cascading, - excludes, - ) + self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes) } fn compress( &self, + compressor: &BtrBlocksCompressor, stats: &IntegerStats, - is_sample: bool, - allowed_cascading: usize, + ctx: CompressorContext, excludes: &[IntCode], ) -> VortexResult { // Zigzag encode the values, then recursively compress the inner values. @@ -391,8 +448,11 @@ impl Scheme for ZigZagScheme { ]; new_excludes.extend_from_slice(excludes); - let compressed = - IntCompressor::compress(&encoded, is_sample, allowed_cascading - 1, &new_excludes)?; + let compressed = compressor.compress_canonical( + Canonical::Primitive(encoded), + ctx.descend(), + Excludes::int_only(&new_excludes), + )?; tracing::debug!("zigzag output: {}", compressed.display_tree()); @@ -405,14 +465,14 @@ impl Scheme for BitPackingScheme { type CodeType = IntCode; fn code(&self) -> IntCode { - BITPACKING_SCHEME + IntCode::BitPacking } fn expected_compression_ratio( &self, + compressor: &BtrBlocksCompressor, stats: &IntegerStats, - is_sample: bool, - allowed_cascading: usize, + ctx: CompressorContext, excludes: &[IntCode], ) -> VortexResult { // BitPacking only works for non-negative values @@ -425,20 +485,14 @@ impl Scheme for BitPackingScheme { return Ok(0.0); } - estimate_compression_ratio_with_sampling( - self, - stats, - is_sample, - allowed_cascading, - excludes, - ) + self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes) } fn compress( &self, + _compressor: &BtrBlocksCompressor, stats: &IntegerStats, - _is_sample: bool, - _allowed_cascading: usize, + _ctx: CompressorContext, _excludes: &[IntCode], ) -> VortexResult { let histogram = bit_width_histogram(stats.source())?; @@ -461,19 +515,19 @@ impl Scheme for SparseScheme { type CodeType = IntCode; fn code(&self) -> IntCode { - SPARSE_SCHEME + IntCode::Sparse } // We can avoid asserting the encoding tree instead. fn expected_compression_ratio( &self, + _compressor: &BtrBlocksCompressor, stats: &IntegerStats, - _is_sample: bool, - allowed_cascading: usize, + ctx: CompressorContext, _excludes: &[IntCode], ) -> VortexResult { // Only use `SparseScheme` if we can cascade. - if allowed_cascading == 0 { + if ctx.allowed_cascading == 0 { return Ok(0.0); } @@ -506,12 +560,12 @@ impl Scheme for SparseScheme { fn compress( &self, + compressor: &BtrBlocksCompressor, stats: &IntegerStats, - is_sample: bool, - allowed_cascading: usize, + ctx: CompressorContext, excludes: &[IntCode], ) -> VortexResult { - assert!(allowed_cascading > 0); + assert!(ctx.allowed_cascading > 0); let (top_pvalue, top_count) = stats.typed.top_value_and_count(); if top_count as usize == stats.src.len() { // top_value is the only value, use ConstantScheme @@ -537,23 +591,21 @@ impl Scheme for SparseScheme { if let Some(sparse) = sparse_encoded.as_opt::() { // Compress the values - let mut new_excludes = vec![SparseScheme.code()]; + let mut new_excludes = vec![SparseScheme.code(), IntCode::Dict]; new_excludes.extend_from_slice(excludes); - let compressed_values = IntCompressor::compress_no_dict( - &sparse.patches().values().to_primitive(), - is_sample, - allowed_cascading - 1, - &new_excludes, + let compressed_values = compressor.compress_canonical( + Canonical::Primitive(sparse.patches().values().to_primitive()), + ctx.descend(), + Excludes::int_only(&new_excludes), )?; let indices = sparse.patches().indices().to_primitive().narrow()?; - let compressed_indices = IntCompressor::compress_no_dict( - &indices, - is_sample, - allowed_cascading - 1, - &new_excludes, + let compressed_indices = compressor.compress_canonical( + Canonical::Primitive(indices), + ctx.descend(), + Excludes::int_only(&new_excludes), )?; SparseArray::try_new( @@ -574,18 +626,18 @@ impl Scheme for DictScheme { type CodeType = IntCode; fn code(&self) -> IntCode { - DICT_SCHEME + IntCode::Dict } fn expected_compression_ratio( &self, + _compressor: &BtrBlocksCompressor, stats: &IntegerStats, - _is_sample: bool, - allowed_cascading: usize, + ctx: CompressorContext, _excludes: &[IntCode], ) -> VortexResult { // Dict should not be terminal. - if allowed_cascading == 0 { + if ctx.allowed_cascading == 0 { return Ok(0.0); } @@ -619,12 +671,12 @@ impl Scheme for DictScheme { fn compress( &self, + compressor: &BtrBlocksCompressor, stats: &IntegerStats, - is_sample: bool, - allowed_cascading: usize, + ctx: CompressorContext, excludes: &[IntCode], ) -> VortexResult { - assert!(allowed_cascading > 0); + assert!(ctx.allowed_cascading > 0); // TODO(aduffy): we can be more prescriptive: we know that codes will EITHER be // RLE or FOR + BP. Cascading probably wastes some time here. @@ -633,14 +685,13 @@ impl Scheme for DictScheme { // Cascade the codes child // Don't allow SequenceArray as the codes child as it merely adds extra indirection without actually compressing data. - let mut new_excludes = vec![DICT_SCHEME, SEQUENCE_SCHEME]; + let mut new_excludes = vec![IntCode::Dict, IntCode::Sequence]; new_excludes.extend_from_slice(excludes); - let compressed_codes = IntCompressor::compress_no_dict( - &dict.codes().to_primitive().narrow()?, - is_sample, - allowed_cascading - 1, - &new_excludes, + let compressed_codes = compressor.compress_canonical( + Canonical::Primitive(dict.codes().to_primitive().narrow()?), + ctx.descend(), + Excludes::int_only(&new_excludes), )?; // SAFETY: compressing codes does not change their values @@ -659,14 +710,14 @@ impl Scheme for RunEndScheme { type CodeType = IntCode; fn code(&self) -> IntCode { - RUN_END_SCHEME + IntCode::RunEnd } fn expected_compression_ratio( &self, + compressor: &BtrBlocksCompressor, stats: &IntegerStats, - is_sample: bool, - allowed_cascading: usize, + ctx: CompressorContext, excludes: &[IntCode], ) -> VortexResult { // If the run length is below the threshold, drop it. @@ -674,28 +725,22 @@ impl Scheme for RunEndScheme { return Ok(0.0); } - if allowed_cascading == 0 { + if ctx.allowed_cascading == 0 { return Ok(0.0); } // Run compression on a sample, see how it performs. - estimate_compression_ratio_with_sampling( - self, - stats, - is_sample, - allowed_cascading, - excludes, - ) + self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes) } fn compress( &self, + compressor: &BtrBlocksCompressor, stats: &IntegerStats, - is_sample: bool, - allowed_cascading: usize, + ctx: CompressorContext, excludes: &[IntCode], ) -> VortexResult { - assert!(allowed_cascading > 0); + assert!(ctx.allowed_cascading > 0); // run-end encode the ends let (ends, values) = runend_encode(&stats.src); @@ -703,26 +748,16 @@ impl Scheme for RunEndScheme { let mut new_excludes = vec![RunEndScheme.code(), DictScheme.code()]; new_excludes.extend_from_slice(excludes); - let ends_stats = IntegerStats::generate_opts( - &ends.to_primitive(), - GenerateStatsOptions { - count_distinct_values: false, - }, - ); - let ends_scheme = IntCompressor::choose_scheme( - &ends_stats, - is_sample, - allowed_cascading - 1, - &new_excludes, + let compressed_ends = compressor.compress_canonical( + Canonical::Primitive(ends.to_primitive()), + ctx.descend(), + Excludes::int_only(&new_excludes), )?; - let compressed_ends = - ends_scheme.compress(&ends_stats, is_sample, allowed_cascading - 1, &new_excludes)?; - - let compressed_values = IntCompressor::compress_no_dict( - &values.to_primitive(), - is_sample, - allowed_cascading - 1, - &new_excludes, + + let compressed_values = compressor.compress_canonical( + Canonical::Primitive(values.to_primitive()), + ctx.descend(), + Excludes::int_only(&new_excludes), )?; // SAFETY: compression doesn't affect invariants @@ -740,19 +775,28 @@ impl Scheme for SequenceScheme { type CodeType = IntCode; fn code(&self) -> Self::CodeType { - SEQUENCE_SCHEME + IntCode::Sequence } fn expected_compression_ratio( &self, + _compressor: &BtrBlocksCompressor, stats: &Self::StatsType, - _is_sample: bool, - _allowed_cascading: usize, + _ctx: CompressorContext, _excludes: &[Self::CodeType], ) -> VortexResult { if stats.null_count > 0 { return Ok(0.0); } + + // If the distinct_values_count was computed (!= u32::MAX) + // Then all values in a sequence must be unique. + if stats.distinct_values_count != u32::MAX + && stats.distinct_values_count as usize != stats.src.len() + { + return Ok(0.0); + } + // Since two values are required to store base and multiplier the // compression ratio is divided by 2. Ok(sequence_encode(&stats.src)? @@ -762,9 +806,9 @@ impl Scheme for SequenceScheme { fn compress( &self, + _compressor: &BtrBlocksCompressor, stats: &Self::StatsType, - _is_sample: bool, - _allowed_cascading: usize, + _ctx: CompressorContext, _excludes: &[Self::CodeType], ) -> VortexResult { if stats.null_count > 0 { @@ -797,28 +841,29 @@ mod tests { use vortex_sequence::SequenceVTable; use vortex_sparse::SparseVTable; - use crate::Compressor; + use super::IntegerStats; + use super::RLE_INTEGER_SCHEME; + use super::SequenceScheme; + use super::SparseScheme; + use crate::BtrBlocksCompressor; + use crate::CompressorContext; + use crate::CompressorExt; use crate::CompressorStats; - use crate::FloatCompressor; use crate::Scheme; - use crate::integer::IntCompressor; - use crate::integer::IntegerStats; - use crate::integer::RLE_INTEGER_SCHEME; - use crate::integer::SequenceScheme; - use crate::integer::SparseScheme; #[test] - fn test_empty() { + fn test_empty() -> VortexResult<()> { // Make sure empty array compression does not fail - let result = IntCompressor::compress( + let btr = BtrBlocksCompressor::default(); + let result = btr.integer_compressor().compress( + &btr, &PrimitiveArray::new(Buffer::::empty(), Validity::NonNullable), - false, - 3, + CompressorContext::default(), &[], - ) - .unwrap(); + )?; assert!(result.is_empty()); + Ok(()) } #[test] @@ -842,7 +887,13 @@ mod tests { } let primitive = codes.freeze().into_array().to_primitive(); - let compressed = IntCompressor::compress(&primitive, false, 3, &[])?; + let btr = BtrBlocksCompressor::default(); + let compressed = btr.integer_compressor().compress( + &btr, + &primitive, + CompressorContext::default(), + &[], + )?; assert!(compressed.is::()); Ok(()) } @@ -853,7 +904,13 @@ mod tests { buffer![189u8, 189, 189, 0, 46], Validity::from_iter(vec![true, true, true, true, false]), ); - let compressed = SparseScheme.compress(&IntegerStats::generate(&array), false, 3, &[])?; + let btr = BtrBlocksCompressor::default(); + let compressed = SparseScheme.compress( + &btr, + &IntegerStats::generate(&array), + CompressorContext::default(), + &[], + )?; assert!(compressed.is::()); let decoded = compressed.clone(); let expected = @@ -871,7 +928,13 @@ mod tests { false, false, false, false, false, false, false, false, false, false, true, ]), ); - let compressed = SparseScheme.compress(&IntegerStats::generate(&array), false, 3, &[])?; + let btr = BtrBlocksCompressor::default(); + let compressed = SparseScheme.compress( + &btr, + &IntegerStats::generate(&array), + CompressorContext::default(), + &[], + )?; assert!(compressed.is::()); let decoded = compressed.clone(); let expected = PrimitiveArray::new( @@ -887,7 +950,13 @@ mod tests { fn nullable_sequence() -> VortexResult<()> { let values = (0i32..20).step_by(7).collect_vec(); let array = PrimitiveArray::from_option_iter(values.clone().into_iter().map(Some)); - let compressed = SequenceScheme.compress(&IntegerStats::generate(&array), false, 3, &[])?; + let btr = BtrBlocksCompressor::default(); + let compressed = SequenceScheme.compress( + &btr, + &IntegerStats::generate(&array), + CompressorContext::default(), + &[], + )?; assert!(compressed.is::()); let decoded = compressed; let expected = PrimitiveArray::from_option_iter(values.into_iter().map(Some)).into_array(); @@ -903,8 +972,13 @@ mod tests { values.extend(iter::repeat_n(987i32, 150)); let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable); - let compressed = - RLE_INTEGER_SCHEME.compress(&IntegerStats::generate(&array), false, 3, &[])?; + let btr = BtrBlocksCompressor::default(); + let compressed = RLE_INTEGER_SCHEME.compress( + &btr, + &IntegerStats::generate(&array), + CompressorContext::default(), + &[], + )?; let decoded = compressed; let expected = Buffer::copy_from(&values).into_array(); @@ -922,9 +996,11 @@ mod tests { .flat_map(|list_idx| { (0..ELEMENTS_PER_LIST).map(move |elem_idx| (list_idx * 1000 + elem_idx) as f64) }) - .collect::(); + .collect::() + .into_array(); - drop(FloatCompressor::compress(&prim, false, 3, &[])?); + let btr = BtrBlocksCompressor::default(); + drop(btr.compress(prim.as_ref())?); Ok(()) } @@ -940,6 +1016,7 @@ mod scheme_selection_tests { use vortex_array::arrays::PrimitiveArray; use vortex_array::validity::Validity; use vortex_buffer::Buffer; + use vortex_error::VortexResult; use vortex_fastlanes::BitPackedVTable; use vortex_fastlanes::FoRVTable; use vortex_fastlanes::RLEVTable; @@ -947,35 +1024,48 @@ mod scheme_selection_tests { use vortex_sequence::SequenceVTable; use vortex_sparse::SparseVTable; - use crate::Compressor; - use crate::integer::IntCompressor; + use crate::BtrBlocksCompressor; + use crate::CompressorContext; + use crate::CompressorExt; #[test] - fn test_constant_compressed() { + fn test_constant_compressed() -> VortexResult<()> { let values: Vec = iter::repeat_n(42, 100).collect(); let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable); - let compressed = IntCompressor::compress(&array, false, 3, &[]).unwrap(); + let btr = BtrBlocksCompressor::default(); + let compressed = + btr.integer_compressor() + .compress(&btr, &array, CompressorContext::default(), &[])?; assert!(compressed.is::()); + Ok(()) } #[test] - fn test_for_compressed() { + fn test_for_compressed() -> VortexResult<()> { let values: Vec = (0..1000).map(|i| 1_000_000 + ((i * 37) % 100)).collect(); let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable); - let compressed = IntCompressor::compress(&array, false, 3, &[]).unwrap(); + let btr = BtrBlocksCompressor::default(); + let compressed = + btr.integer_compressor() + .compress(&btr, &array, CompressorContext::default(), &[])?; assert!(compressed.is::()); + Ok(()) } #[test] - fn test_bitpacking_compressed() { + fn test_bitpacking_compressed() -> VortexResult<()> { let values: Vec = (0..1000).map(|i| i % 16).collect(); let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable); - let compressed = IntCompressor::compress(&array, false, 3, &[]).unwrap(); + let btr = BtrBlocksCompressor::default(); + let compressed = + btr.integer_compressor() + .compress(&btr, &array, CompressorContext::default(), &[])?; assert!(compressed.is::()); + Ok(()) } #[test] - fn test_sparse_compressed() { + fn test_sparse_compressed() -> VortexResult<()> { let mut values: Vec = Vec::new(); for i in 0..1000 { if i % 20 == 0 { @@ -985,12 +1075,16 @@ mod scheme_selection_tests { } } let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable); - let compressed = IntCompressor::compress(&array, false, 3, &[]).unwrap(); + let btr = BtrBlocksCompressor::default(); + let compressed = + btr.integer_compressor() + .compress(&btr, &array, CompressorContext::default(), &[])?; assert!(compressed.is::()); + Ok(()) } #[test] - fn test_dict_compressed() { + fn test_dict_compressed() -> VortexResult<()> { use rand::RngCore; use rand::SeedableRng; use rand::rngs::StdRng; @@ -1011,37 +1105,53 @@ mod scheme_selection_tests { } let array = PrimitiveArray::new(Buffer::copy_from(&codes), Validity::NonNullable); - let compressed = IntCompressor::compress(&array, false, 3, &[]).unwrap(); + let btr = BtrBlocksCompressor::default(); + let compressed = + btr.integer_compressor() + .compress(&btr, &array, CompressorContext::default(), &[])?; assert!(compressed.is::()); + Ok(()) } #[test] - fn test_runend_compressed() { + fn test_runend_compressed() -> VortexResult<()> { let mut values: Vec = Vec::new(); for i in 0..100 { values.extend(iter::repeat_n((i32::MAX - 50).wrapping_add(i), 10)); } let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable); - let compressed = IntCompressor::compress(&array, false, 3, &[]).unwrap(); + let btr = BtrBlocksCompressor::default(); + let compressed = + btr.integer_compressor() + .compress(&btr, &array, CompressorContext::default(), &[])?; assert!(compressed.is::()); + Ok(()) } #[test] - fn test_sequence_compressed() { + fn test_sequence_compressed() -> VortexResult<()> { let values: Vec = (0..1000).map(|i| i * 7).collect(); let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable); - let compressed = IntCompressor::compress(&array, false, 3, &[]).unwrap(); + let btr = BtrBlocksCompressor::default(); + let compressed = + btr.integer_compressor() + .compress(&btr, &array, CompressorContext::default(), &[])?; assert!(compressed.is::()); + Ok(()) } #[test] - fn test_rle_compressed() { + fn test_rle_compressed() -> VortexResult<()> { let mut values: Vec = Vec::new(); for i in 0..10 { values.extend(iter::repeat_n(i, 100)); } let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable); - let compressed = IntCompressor::compress(&array, false, 3, &[]).unwrap(); + let btr = BtrBlocksCompressor::default(); + let compressed = + btr.integer_compressor() + .compress(&btr, &array, CompressorContext::default(), &[])?; assert!(compressed.is::()); + Ok(()) } } diff --git a/vortex-btrblocks/src/integer/stats.rs b/vortex-btrblocks/src/compressor/integer/stats.rs similarity index 99% rename from vortex-btrblocks/src/integer/stats.rs rename to vortex-btrblocks/src/compressor/integer/stats.rs index 954e100d114..cff8fcf8901 100644 --- a/vortex-btrblocks/src/integer/stats.rs +++ b/vortex-btrblocks/src/compressor/integer/stats.rs @@ -23,7 +23,7 @@ use vortex_utils::aliases::hash_map::HashMap; use crate::CompressorStats; use crate::GenerateStatsOptions; -use crate::rle::RLEStats; +use crate::compressor::rle::RLEStats; use crate::sample::sample; #[derive(Clone, Debug)] @@ -437,9 +437,9 @@ mod tests { use vortex_buffer::buffer; use vortex_error::VortexResult; + use super::IntegerStats; + use super::typed_int_stats; use crate::CompressorStats; - use crate::integer::IntegerStats; - use crate::integer::stats::typed_int_stats; #[test] fn test_naive_count_distinct_values() -> VortexResult<()> { diff --git a/vortex-btrblocks/src/compressor/mod.rs b/vortex-btrblocks/src/compressor/mod.rs new file mode 100644 index 00000000000..1e0c7983dc4 --- /dev/null +++ b/vortex-btrblocks/src/compressor/mod.rs @@ -0,0 +1,156 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Compressor traits for type-specific compression. + +use vortex_array::ArrayRef; +use vortex_array::vtable::VTable; +use vortex_error::VortexResult; + +use crate::BtrBlocksCompressor; +use crate::CompressorContext; +use crate::CompressorStats; +use crate::Scheme; + +pub(crate) mod decimal; +pub(crate) mod float; +pub(crate) mod integer; +mod patches; +mod rle; +pub(crate) mod string; +pub(crate) mod temporal; + +/// Maximum cascade depth for compression. +pub(crate) const MAX_CASCADE: usize = 3; + +/// A compressor for a particular input type. +/// +/// This trait defines the interface for type-specific compressors that can adaptively +/// choose and apply compression schemes based on data characteristics. Compressors +/// analyze input arrays, select optimal compression schemes, and handle cascading +/// compression with multiple encoding layers. +/// +/// The compressor works by generating statistics on the input data, evaluating +/// available compression schemes, and selecting the one with the best compression ratio. +pub trait Compressor { + /// The VTable type for arrays this compressor operates on. + type ArrayVTable: VTable; + /// The compression scheme type used by this compressor. + type SchemeType: Scheme + ?Sized; + /// The statistics type used to analyze arrays for compression. + type StatsType: CompressorStats; + + /// Generates statistics for the given array to guide compression scheme selection. + fn gen_stats(&self, array: &::Array) -> Self::StatsType; + + /// Returns all available compression schemes for this compressor. + fn schemes(&self) -> &[&'static Self::SchemeType]; + /// Returns the default fallback compression scheme. + fn default_scheme(&self) -> &'static Self::SchemeType; +} + +/// Extension trait providing scheme selection and compression for compressors. +pub trait CompressorExt: Compressor +where + Self::SchemeType: 'static, +{ + /// Selects the best compression scheme based on expected compression ratios. + /// + /// Evaluates all available schemes against the provided statistics and returns + /// the one with the highest compression ratio. Falls back to the default scheme + /// if no scheme provides compression benefits. + #[allow(clippy::cognitive_complexity)] + fn choose_scheme( + &self, + compressor: &BtrBlocksCompressor, + stats: &Self::StatsType, + ctx: CompressorContext, + excludes: &[::CodeType], + ) -> VortexResult<&'static Self::SchemeType> { + let mut best_ratio = 1.0; + let mut best_scheme: Option<&'static Self::SchemeType> = None; + + // logging helpers + let depth = MAX_CASCADE - ctx.allowed_cascading; + + for scheme in self.schemes().iter() { + // Skip excluded schemes + if excludes.contains(&scheme.code()) { + continue; + } + + // We never choose Constant for a sample + if ctx.is_sample && scheme.is_constant() { + continue; + } + + tracing::trace!( + is_sample = ctx.is_sample, + depth, + is_constant = scheme.is_constant(), + ?scheme, + "Trying compression scheme" + ); + + let ratio = scheme.expected_compression_ratio(compressor, stats, ctx, excludes)?; + tracing::trace!( + is_sample = ctx.is_sample, + depth, + ratio, + ?scheme, + "Expected compression result" + ); + + if !(ratio.is_subnormal() || ratio.is_infinite() || ratio.is_nan()) { + if ratio > best_ratio { + best_ratio = ratio; + best_scheme = Some(*scheme); + } + } else { + tracing::trace!( + "Calculated invalid compression ratio {ratio} for scheme: {scheme:?}. Must not be sub-normal, infinite or nan." + ); + } + } + + tracing::trace!(depth, scheme = ?best_scheme, ratio = best_ratio, "best scheme found"); + + if let Some(best) = best_scheme { + Ok(best) + } else { + Ok(self.default_scheme()) + } + } + + /// Compresses an array using this compressor. + /// + /// Generates statistics on the input array, selects the best compression scheme, + /// and applies it. Returns the original array if compression would increase size. + fn compress( + &self, + btr_blocks_compressor: &BtrBlocksCompressor, + array: &<::ArrayVTable as VTable>::Array, + ctx: CompressorContext, + excludes: &[::CodeType], + ) -> VortexResult { + // Avoid compressing empty arrays. + if array.is_empty() { + return Ok(array.to_array()); + } + + // Generate stats on the array directly. + let stats = self.gen_stats(array); + let best_scheme = self.choose_scheme(btr_blocks_compressor, &stats, ctx, excludes)?; + + let output = best_scheme.compress(btr_blocks_compressor, &stats, ctx, excludes)?; + if output.nbytes() < array.nbytes() { + Ok(output) + } else { + tracing::debug!("resulting tree too large: {}", output.display_tree()); + Ok(array.to_array()) + } + } +} + +// Blanket implementation for all Compressor types with 'static SchemeType +impl CompressorExt for T where T::SchemeType: 'static {} diff --git a/vortex-btrblocks/src/patches.rs b/vortex-btrblocks/src/compressor/patches.rs similarity index 100% rename from vortex-btrblocks/src/patches.rs rename to vortex-btrblocks/src/compressor/patches.rs diff --git a/vortex-btrblocks/src/compressor/rle.rs b/vortex-btrblocks/src/compressor/rle.rs new file mode 100644 index 00000000000..b9e21652489 --- /dev/null +++ b/vortex-btrblocks/src/compressor/rle.rs @@ -0,0 +1,159 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::fmt::Debug; +use std::hash::Hash; +use std::marker::PhantomData; + +use vortex_array::ArrayRef; +use vortex_array::Canonical; +use vortex_array::IntoArray; +use vortex_array::ToCanonical; +use vortex_array::arrays::PrimitiveArray; +use vortex_error::VortexResult; +use vortex_fastlanes::RLEArray; + +use crate::BtrBlocksCompressor; +use crate::CanonicalCompressor; +use crate::CompressorContext; +use crate::CompressorStats; +use crate::Excludes; +use crate::IntCode; +use crate::Scheme; +use crate::SchemeExt; + +/// Threshold for the average run length in an array before we consider run-length encoding. +pub const RUN_LENGTH_THRESHOLD: u32 = 4; + +/// Trait for accessing RLE-specific statistics. +pub trait RLEStats { + fn value_count(&self) -> u32; + fn average_run_length(&self) -> u32; + fn source(&self) -> &PrimitiveArray; +} + +/// Configuration trait for RLE schemes. +/// +/// Implement this trait to define the behavior of an RLE scheme for a specific +/// stats and code type combination. +pub trait RLEConfig: Debug + Send + Sync + 'static { + /// The statistics type used by this RLE scheme. + type Stats: RLEStats + CompressorStats; + /// The code type used to identify schemes. + type Code: Copy + Clone + Debug + Hash + PartialEq + Eq; + + /// The unique code identifying this RLE scheme. + const CODE: Self::Code; + + /// Compress the values array after RLE encoding. + fn compress_values( + compressor: &BtrBlocksCompressor, + values: &PrimitiveArray, + ctx: CompressorContext, + excludes: &[Self::Code], + ) -> VortexResult; +} + +/// RLE scheme that is generic over a configuration type. +/// +/// This is a ZST (zero-sized type) - all behavior is defined by the `RLEConfig` trait. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct RLEScheme(PhantomData); + +impl RLEScheme { + /// Creates a new RLE scheme. + pub const fn new() -> Self { + Self(PhantomData) + } +} + +impl Default for RLEScheme { + fn default() -> Self { + Self::new() + } +} + +impl Scheme for RLEScheme { + type StatsType = C::Stats; + type CodeType = C::Code; + + fn code(&self) -> C::Code { + C::CODE + } + + fn expected_compression_ratio( + &self, + compressor: &BtrBlocksCompressor, + stats: &Self::StatsType, + ctx: CompressorContext, + excludes: &[C::Code], + ) -> VortexResult { + // RLE is only useful when we cascade it with another encoding. + if ctx.allowed_cascading == 0 { + return Ok(0.0); + } + + // Don't compress all-null or empty arrays. + if stats.value_count() == 0 { + return Ok(0.0); + } + + // Check whether RLE is a good fit, based on the average run length. + if stats.average_run_length() < RUN_LENGTH_THRESHOLD { + return Ok(0.0); + } + + // Run compression on a sample to see how it performs. + self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes) + } + + fn compress( + &self, + compressor: &BtrBlocksCompressor, + stats: &Self::StatsType, + ctx: CompressorContext, + excludes: &[C::Code], + ) -> VortexResult { + let rle_array = RLEArray::encode(RLEStats::source(stats))?; + + if ctx.allowed_cascading == 0 { + return Ok(rle_array.into_array()); + } + + // Prevent RLE recursion. + let mut new_excludes = vec![self.code()]; + new_excludes.extend_from_slice(excludes); + + let compressed_values = C::compress_values( + compressor, + &rle_array.values().to_primitive(), + ctx.descend(), + &new_excludes, + )?; + + let compressed_indices = compressor.compress_canonical( + Canonical::Primitive(rle_array.indices().to_primitive().narrow()?), + ctx.descend(), + Excludes::from(&[IntCode::Dict]), + )?; + + let compressed_offsets = compressor.compress_canonical( + Canonical::Primitive(rle_array.values_idx_offsets().to_primitive().narrow()?), + ctx.descend(), + Excludes::from(&[IntCode::Dict]), + )?; + + // SAFETY: Recursive compression doesn't affect the invariants. + unsafe { + Ok(RLEArray::new_unchecked( + compressed_values, + compressed_indices, + compressed_offsets, + rle_array.dtype().clone(), + rle_array.offset(), + rle_array.len(), + ) + .into_array()) + } + } +} diff --git a/vortex-btrblocks/src/string.rs b/vortex-btrblocks/src/compressor/string.rs similarity index 69% rename from vortex-btrblocks/src/string.rs rename to vortex-btrblocks/src/compressor/string.rs index 2ee8d036ea9..8e52c48f355 100644 --- a/vortex-btrblocks/src/string.rs +++ b/vortex-btrblocks/src/compressor/string.rs @@ -1,7 +1,12 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::hash::Hash; +use std::hash::Hasher; + +use enum_iterator::Sequence; use vortex_array::ArrayRef; +use vortex_array::Canonical; use vortex_array::IntoArray; use vortex_array::ToCanonical; use vortex_array::arrays::ConstantArray; @@ -12,6 +17,7 @@ use vortex_array::arrays::VarBinViewArray; use vortex_array::arrays::VarBinViewVTable; use vortex_array::builders::dict::dict_encode; use vortex_array::compute::is_constant; +use vortex_array::vtable::VTable; use vortex_array::vtable::ValidityHelper; use vortex_error::VortexExpect; use vortex_error::VortexResult; @@ -24,13 +30,19 @@ use vortex_sparse::SparseArray; use vortex_sparse::SparseVTable; use vortex_utils::aliases::hash_set::HashSet; +use super::integer::DictScheme as IntDictScheme; +use super::integer::SequenceScheme as IntSequenceScheme; +use super::integer::SparseScheme as IntSparseScheme; +use crate::BtrBlocksCompressor; +use crate::CanonicalCompressor; use crate::Compressor; +use crate::CompressorContext; use crate::CompressorStats; +use crate::Excludes; use crate::GenerateStatsOptions; +use crate::IntCode; use crate::Scheme; -use crate::estimate_compression_ratio_with_sampling; -use crate::integer; -use crate::integer::IntCompressor; +use crate::SchemeExt; use crate::sample::sample; /// Array of variable-length byte arrays, and relevant stats for compression. @@ -105,75 +117,126 @@ impl CompressorStats for StringStats { } } +/// All available string compression schemes. +pub const ALL_STRING_SCHEMES: &[&dyn StringScheme] = &[ + &UncompressedScheme, + &DictScheme, + &FSSTScheme, + &ConstantScheme, + &NullDominated, +]; + /// [`Compressor`] for strings. -pub struct StringCompressor; +#[derive(Clone, Copy)] +pub struct StringCompressor<'a> { + /// Reference to the parent compressor. + pub btr_blocks_compressor: &'a dyn CanonicalCompressor, +} -impl Compressor for StringCompressor { +impl<'a> Compressor for StringCompressor<'a> { type ArrayVTable = VarBinViewVTable; type SchemeType = dyn StringScheme; type StatsType = StringStats; - fn schemes() -> &'static [&'static Self::SchemeType] { - &[ - &UncompressedScheme, - &DictScheme, - &FSSTScheme, - &ConstantScheme, - &NullDominated, - ] + fn gen_stats(&self, array: &::Array) -> Self::StatsType { + if self + .btr_blocks_compressor + .string_schemes() + .iter() + .any(|s| s.code() == DictScheme.code()) + { + StringStats::generate_opts( + array, + GenerateStatsOptions { + count_distinct_values: true, + }, + ) + } else { + StringStats::generate_opts( + array, + GenerateStatsOptions { + count_distinct_values: false, + }, + ) + } } - fn default_scheme() -> &'static Self::SchemeType { + fn schemes(&self) -> &[&'static dyn StringScheme] { + self.btr_blocks_compressor.string_schemes() + } + + fn default_scheme(&self) -> &'static Self::SchemeType { &UncompressedScheme } +} - fn dict_scheme_code() -> StringCode { - DICT_SCHEME +pub trait StringScheme: + Scheme + Send + Sync +{ +} + +impl StringScheme for T where + T: Scheme + Send + Sync +{ +} + +impl PartialEq for dyn StringScheme { + fn eq(&self, other: &Self) -> bool { + self.code() == other.code() } } -pub trait StringScheme: Scheme {} +impl Eq for dyn StringScheme {} -impl StringScheme for T where T: Scheme {} +impl Hash for dyn StringScheme { + fn hash(&self, state: &mut H) { + self.code().hash(state) + } +} -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct UncompressedScheme; -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct DictScheme; -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct FSSTScheme; -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct ConstantScheme; -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct NullDominated; -#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] -pub struct StringCode(u8); - -const UNCOMPRESSED_SCHEME: StringCode = StringCode(0); -const DICT_SCHEME: StringCode = StringCode(1); -const FSST_SCHEME: StringCode = StringCode(2); -const CONSTANT_SCHEME: StringCode = StringCode(3); - -const SPARSE_SCHEME: StringCode = StringCode(4); +/// Unique identifier for string compression schemes. +#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Sequence)] +pub enum StringCode { + /// No compression applied. + Uncompressed, + /// Dictionary encoding for low-cardinality strings. + Dict, + /// FSST (Fast Static Symbol Table) compression. + Fsst, + /// Constant encoding for arrays with a single distinct value. + Constant, + /// Sparse encoding for null-dominated arrays. + Sparse, +} impl Scheme for UncompressedScheme { type StatsType = StringStats; type CodeType = StringCode; fn code(&self) -> StringCode { - UNCOMPRESSED_SCHEME + StringCode::Uncompressed } fn expected_compression_ratio( &self, + _compressor: &BtrBlocksCompressor, _stats: &Self::StatsType, - _is_sample: bool, - _allowed_cascading: usize, + _ctx: CompressorContext, _excludes: &[StringCode], ) -> VortexResult { Ok(1.0) @@ -181,9 +244,9 @@ impl Scheme for UncompressedScheme { fn compress( &self, + _compressor: &BtrBlocksCompressor, stats: &Self::StatsType, - _is_sample: bool, - _allowed_cascading: usize, + _ctx: CompressorContext, _excludes: &[StringCode], ) -> VortexResult { Ok(stats.source().to_array()) @@ -195,14 +258,14 @@ impl Scheme for DictScheme { type CodeType = StringCode; fn code(&self) -> StringCode { - DICT_SCHEME + StringCode::Dict } fn expected_compression_ratio( &self, + compressor: &BtrBlocksCompressor, stats: &Self::StatsType, - is_sample: bool, - allowed_cascading: usize, + ctx: CompressorContext, excludes: &[StringCode], ) -> VortexResult { // If we don't have a sufficiently high number of distinct values, do not attempt Dict. @@ -215,44 +278,36 @@ impl Scheme for DictScheme { return Ok(0.0); } - estimate_compression_ratio_with_sampling( - self, - stats, - is_sample, - allowed_cascading, - excludes, - ) + self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes) } fn compress( &self, + compressor: &BtrBlocksCompressor, stats: &Self::StatsType, - is_sample: bool, - allowed_cascading: usize, + ctx: CompressorContext, _excludes: &[StringCode], ) -> VortexResult { let dict = dict_encode(&stats.source().clone().into_array())?; // If we are not allowed to cascade, do not attempt codes or values compression. - if allowed_cascading == 0 { + if ctx.allowed_cascading == 0 { return Ok(dict.into_array()); } // Find best compressor for codes and values separately - let compressed_codes = IntCompressor::compress( - &dict.codes().to_primitive(), - is_sample, - allowed_cascading - 1, - &[integer::DictScheme.code(), integer::SequenceScheme.code()], + let compressed_codes = compressor.compress_canonical( + Canonical::Primitive(dict.codes().to_primitive()), + ctx.descend(), + Excludes::from(&[IntDictScheme.code(), IntSequenceScheme.code()]), )?; // Attempt to compress the values with non-Dict compression. // Currently this will only be FSST. - let compressed_values = StringCompressor::compress( - &dict.values().to_varbinview(), - is_sample, - allowed_cascading - 1, - &[DictScheme.code()], + let compressed_values = compressor.compress_canonical( + Canonical::VarBinView(dict.values().to_varbinview()), + ctx.descend(), + Excludes::from(&[DictScheme.code()]), )?; // SAFETY: compressing codes or values does not alter the invariants @@ -271,31 +326,31 @@ impl Scheme for FSSTScheme { type CodeType = StringCode; fn code(&self) -> StringCode { - FSST_SCHEME + StringCode::Fsst } fn compress( &self, + compressor: &BtrBlocksCompressor, stats: &Self::StatsType, - is_sample: bool, - allowed_cascading: usize, + ctx: CompressorContext, _excludes: &[StringCode], ) -> VortexResult { - let compressor = fsst_train_compressor(&stats.src); - let fsst = fsst_compress(&stats.src, &compressor); - - let compressed_original_lengths = IntCompressor::compress( - &fsst.uncompressed_lengths().to_primitive().narrow()?, - is_sample, - allowed_cascading, - &[], + let fsst = { + let compressor = fsst_train_compressor(&stats.src); + fsst_compress(&stats.src, &compressor) + }; + + let compressed_original_lengths = compressor.compress_canonical( + Canonical::Primitive(fsst.uncompressed_lengths().to_primitive().narrow()?), + ctx, + Excludes::none(), )?; - let compressed_codes_offsets = IntCompressor::compress( - &fsst.codes().offsets().to_primitive().narrow()?, - is_sample, - allowed_cascading, - &[], + let compressed_codes_offsets = compressor.compress_canonical( + Canonical::Primitive(fsst.codes().offsets().to_primitive().narrow()?), + ctx, + Excludes::none(), )?; let compressed_codes = VarBinArray::try_new( compressed_codes_offsets, @@ -321,7 +376,7 @@ impl Scheme for ConstantScheme { type CodeType = StringCode; fn code(&self) -> Self::CodeType { - CONSTANT_SCHEME + StringCode::Constant } fn is_constant(&self) -> bool { @@ -330,12 +385,12 @@ impl Scheme for ConstantScheme { fn expected_compression_ratio( &self, + _compressor: &BtrBlocksCompressor, stats: &Self::StatsType, - is_sample: bool, - _allowed_cascading: usize, + ctx: CompressorContext, _excludes: &[Self::CodeType], ) -> VortexResult { - if is_sample { + if ctx.is_sample { return Ok(0.0); } @@ -350,9 +405,9 @@ impl Scheme for ConstantScheme { fn compress( &self, + _compressor: &BtrBlocksCompressor, stats: &Self::StatsType, - _is_sample: bool, - _allowed_cascading: usize, + _ctx: CompressorContext, _excludes: &[Self::CodeType], ) -> VortexResult { let scalar_idx = @@ -382,18 +437,18 @@ impl Scheme for NullDominated { type CodeType = StringCode; fn code(&self) -> Self::CodeType { - SPARSE_SCHEME + StringCode::Sparse } fn expected_compression_ratio( &self, + _compressor: &BtrBlocksCompressor, stats: &Self::StatsType, - _is_sample: bool, - allowed_cascading: usize, + ctx: CompressorContext, _excludes: &[Self::CodeType], ) -> VortexResult { // Only use `SparseScheme` if we can cascade. - if allowed_cascading == 0 { + if ctx.allowed_cascading == 0 { return Ok(0.0); } @@ -413,27 +468,25 @@ impl Scheme for NullDominated { fn compress( &self, + compressor: &BtrBlocksCompressor, stats: &Self::StatsType, - is_sample: bool, - allowed_cascading: usize, + ctx: CompressorContext, _excludes: &[Self::CodeType], ) -> VortexResult { - assert!(allowed_cascading > 0); + assert!(ctx.allowed_cascading > 0); // We pass None as we only run this pathway for NULL-dominated string arrays let sparse_encoded = SparseArray::encode(stats.src.as_ref(), None)?; if let Some(sparse) = sparse_encoded.as_opt::() { - // Compress the values - let new_excludes = vec![integer::SparseScheme.code()]; + // Compress the indices only (not the values for strings) + let new_excludes = vec![IntSparseScheme.code(), IntCode::Dict]; - // Don't attempt to compress the non-null values let indices = sparse.patches().indices().to_primitive().narrow()?; - let compressed_indices = IntCompressor::compress_no_dict( - &indices, - is_sample, - allowed_cascading - 1, - &new_excludes, + let compressed_indices = compressor.compress_canonical( + Canonical::Primitive(indices), + ctx.descend(), + Excludes::int_only(&new_excludes), )?; SparseArray::try_new( @@ -459,9 +512,7 @@ mod tests { use vortex_dtype::Nullability; use vortex_error::VortexResult; - use crate::Compressor; - use crate::MAX_CASCADE; - use crate::string::StringCompressor; + use crate::BtrBlocksCompressor; #[test] fn test_strings() -> VortexResult<()> { @@ -474,7 +525,7 @@ mod tests { } let strings = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable)); - let compressed = StringCompressor::compress(&strings, false, 3, &[])?; + let compressed = BtrBlocksCompressor::default().compress(strings.as_ref())?; assert_eq!(compressed.len(), 2048); let display = compressed @@ -495,7 +546,7 @@ mod tests { let strings = strings.finish_into_varbinview(); - let compressed = StringCompressor::compress(&strings, false, MAX_CASCADE, &[])?; + let compressed = BtrBlocksCompressor::default().compress(strings.as_ref())?; assert_eq!(compressed.len(), 100); let display = compressed @@ -519,14 +570,13 @@ mod scheme_selection_tests { use vortex_error::VortexResult; use vortex_fsst::FSSTVTable; - use crate::Compressor; - use crate::string::StringCompressor; + use crate::BtrBlocksCompressor; #[test] fn test_constant_compressed() -> VortexResult<()> { let strings: Vec> = vec![Some("constant_value"); 100]; let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable)); - let compressed = StringCompressor::compress(&array, false, 3, &[])?; + let compressed = BtrBlocksCompressor::default().compress(array.as_ref())?; assert!(compressed.is::()); Ok(()) } @@ -539,7 +589,7 @@ mod scheme_selection_tests { strings.push(Some(distinct_values[i % 3])); } let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable)); - let compressed = StringCompressor::compress(&array, false, 3, &[])?; + let compressed = BtrBlocksCompressor::default().compress(array.as_ref())?; assert!(compressed.is::()); Ok(()) } @@ -553,7 +603,7 @@ mod scheme_selection_tests { ))); } let array = VarBinViewArray::from_iter(strings, DType::Utf8(Nullability::NonNullable)); - let compressed = StringCompressor::compress(&array, false, 3, &[])?; + let compressed = BtrBlocksCompressor::default().compress(array.as_ref())?; assert!(compressed.is::()); Ok(()) } diff --git a/vortex-btrblocks/src/compressor/temporal.rs b/vortex-btrblocks/src/compressor/temporal.rs new file mode 100644 index 00000000000..6fb917be58d --- /dev/null +++ b/vortex-btrblocks/src/compressor/temporal.rs @@ -0,0 +1,52 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Specialized compressor for DateTimeParts metadata. + +use vortex_array::ArrayRef; +use vortex_array::Canonical; +use vortex_array::IntoArray; +use vortex_array::ToCanonical; +use vortex_array::arrays::TemporalArray; +use vortex_datetime_parts::DateTimePartsArray; +use vortex_datetime_parts::TemporalParts; +use vortex_datetime_parts::split_temporal; +use vortex_error::VortexResult; + +use crate::BtrBlocksCompressor; +use crate::CanonicalCompressor; +use crate::CompressorContext; +use crate::Excludes; + +/// Compress a temporal array into a `DateTimePartsArray`. +pub fn compress_temporal( + compressor: &BtrBlocksCompressor, + array: TemporalArray, +) -> VortexResult { + let dtype = array.dtype().clone(); + let TemporalParts { + days, + seconds, + subseconds, + } = split_temporal(array)?; + + let ctx = CompressorContext::default().descend(); + + let days = compressor.compress_canonical( + Canonical::Primitive(days.to_primitive().narrow()?), + ctx, + Excludes::none(), + )?; + let seconds = compressor.compress_canonical( + Canonical::Primitive(seconds.to_primitive().narrow()?), + ctx, + Excludes::none(), + )?; + let subseconds = compressor.compress_canonical( + Canonical::Primitive(subseconds.to_primitive().narrow()?), + ctx, + Excludes::none(), + )?; + + Ok(DateTimePartsArray::try_new(dtype, days, seconds, subseconds)?.into_array()) +} diff --git a/vortex-btrblocks/src/ctx.rs b/vortex-btrblocks/src/ctx.rs new file mode 100644 index 00000000000..f2cb6a37102 --- /dev/null +++ b/vortex-btrblocks/src/ctx.rs @@ -0,0 +1,136 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Compression context types for recursive compression. + +use crate::FloatCode; +use crate::IntCode; +use crate::MAX_CASCADE; +use crate::StringCode; + +/// Holds references to exclude lists for each compression code type. +/// +/// This struct is passed through recursive compression calls to specify +/// which schemes should be excluded at each level. +#[derive(Debug, Clone, Copy, Default)] +pub struct Excludes<'a> { + /// Integer schemes to exclude. + pub int: &'a [IntCode], + /// Float schemes to exclude. + pub float: &'a [FloatCode], + /// String schemes to exclude. + pub string: &'a [StringCode], +} + +impl<'a> Excludes<'a> { + /// Creates an empty excludes (no exclusions). + pub const fn none() -> Self { + Self { + int: &[], + float: &[], + string: &[], + } + } + + /// Creates excludes with only integer exclusions. + pub const fn int_only(int: &'a [IntCode]) -> Self { + Self { + int, + float: &[], + string: &[], + } + } + + /// Creates excludes with only float exclusions. + pub const fn float_only(float: &'a [FloatCode]) -> Self { + Self { + int: &[], + float, + string: &[], + } + } + + /// Creates excludes with only string exclusions. + pub const fn string_only(string: &'a [StringCode]) -> Self { + Self { + int: &[], + float: &[], + string, + } + } +} + +impl<'a> From<&'a [IntCode]> for Excludes<'a> { + fn from(int: &'a [IntCode]) -> Self { + Self::int_only(int) + } +} + +impl<'a, const N: usize> From<&'a [IntCode; N]> for Excludes<'a> { + fn from(int: &'a [IntCode; N]) -> Self { + Self::int_only(int) + } +} + +impl<'a> From<&'a [FloatCode]> for Excludes<'a> { + fn from(float: &'a [FloatCode]) -> Self { + Self::float_only(float) + } +} + +impl<'a, const N: usize> From<&'a [FloatCode; N]> for Excludes<'a> { + fn from(float: &'a [FloatCode; N]) -> Self { + Self::float_only(float) + } +} + +impl<'a> From<&'a [StringCode]> for Excludes<'a> { + fn from(string: &'a [StringCode]) -> Self { + Self::string_only(string) + } +} + +impl<'a, const N: usize> From<&'a [StringCode; N]> for Excludes<'a> { + fn from(string: &'a [StringCode; N]) -> Self { + Self::string_only(string) + } +} + +/// Context passed through recursive compression calls. +/// +/// Bundles `is_sample` and `allowed_cascading` which always travel together. +/// Excludes are passed separately since they're type-specific. +#[derive(Debug, Clone, Copy)] +pub struct CompressorContext { + /// Whether we're compressing a sample (for ratio estimation). + pub is_sample: bool, + /// Remaining cascade depth allowed. + pub allowed_cascading: usize, +} + +impl Default for CompressorContext { + fn default() -> Self { + Self { + is_sample: false, + allowed_cascading: MAX_CASCADE, + } + } +} + +impl CompressorContext { + /// Descend one level in the cascade (decrements `allowed_cascading`). + pub fn descend(self) -> Self { + Self { + allowed_cascading: self.allowed_cascading.saturating_sub(1), + ..self + } + } + + /// Returns a context marked as sample compression (for ratio estimation). + pub fn as_sample(self) -> Self { + Self { + is_sample: true, + ..self + } + } +} diff --git a/vortex-btrblocks/src/lib.rs b/vortex-btrblocks/src/lib.rs index 7888fa671dd..6d542352024 100644 --- a/vortex-btrblocks/src/lib.rs +++ b/vortex-btrblocks/src/lib.rs @@ -21,504 +21,46 @@ //! # Example //! //! ```rust -//! use vortex_btrblocks::BtrBlocksCompressor; +//! use vortex_btrblocks::{BtrBlocksCompressor, BtrBlocksCompressorBuilder, IntCode}; //! use vortex_array::Array; //! +//! // Default compressor with all schemes enabled //! let compressor = BtrBlocksCompressor::default(); -//! // let compressed = compressor.compress(&array)?; +//! +//! // Configure with builder to exclude specific schemes +//! let compressor = BtrBlocksCompressorBuilder::default() +//! .exclude_int([IntCode::Dict]) +//! .build(); //! ``` //! //! [BtrBlocks]: https://www.cs.cit.tum.de/fileadmin/w00cfj/dis/papers/btrblocks.pdf -use std::fmt::Debug; -use std::hash::Hash; - -use vortex_array::Array; -use vortex_array::ArrayRef; -use vortex_array::Canonical; -use vortex_array::IntoArray; -use vortex_array::ToCanonical; -use vortex_array::arrays::ConstantArray; -use vortex_array::arrays::ExtensionArray; -use vortex_array::arrays::FixedSizeListArray; -use vortex_array::arrays::ListArray; -use vortex_array::arrays::StructArray; -use vortex_array::arrays::TemporalArray; -use vortex_array::arrays::list_from_list_view; -use vortex_array::compute::Cost; -use vortex_array::compute::IsConstantOpts; -use vortex_array::compute::is_constant_opts; -use vortex_array::vtable::VTable; -use vortex_array::vtable::ValidityHelper; -use vortex_dtype::DType; -use vortex_dtype::Nullability; -use vortex_dtype::datetime::Timestamp; -use vortex_error::VortexExpect; -use vortex_error::VortexResult; - -use crate::decimal::compress_decimal; -pub use crate::float::FloatCompressor; -pub use crate::float::FloatStats; -pub use crate::float::dictionary::dictionary_encode as float_dictionary_encode; -pub use crate::integer::IntCompressor; -pub use crate::integer::IntegerStats; -pub use crate::integer::dictionary::dictionary_encode as integer_dictionary_encode; -pub use crate::string::StringCompressor; -pub use crate::string::StringStats; -pub use crate::temporal::compress_temporal; - -mod decimal; -mod float; -mod integer; -mod patches; -mod rle; +pub use compressor::float::FloatCode; +use compressor::float::FloatCompressor; +pub use compressor::integer::IntCode; +use compressor::integer::IntCompressor; +pub use compressor::string::StringCode; +use compressor::string::StringCompressor; + +mod builder; +mod canonical_compressor; +mod compressor; +mod ctx; mod sample; -mod string; -mod temporal; - -/// Configures how stats are generated. -pub struct GenerateStatsOptions { - /// Should distinct values should be counted during stats generation. - pub count_distinct_values: bool, - // pub count_runs: bool, - // should this be scheme-specific? -} - -impl Default for GenerateStatsOptions { - fn default() -> Self { - Self { - count_distinct_values: true, - // count_runs: true, - } - } -} - -/// The size of each sampled run. -const SAMPLE_SIZE: u32 = 64; -/// The number of sampled runs. -/// -/// # Warning -/// -/// The product of SAMPLE_SIZE and SAMPLE_COUNT should be (roughly) a multiple of 1024 so that -/// fastlanes bitpacking of sampled vectors does not introduce (large amounts of) padding. -const SAMPLE_COUNT: u32 = 16; - -/// Stats for the compressor. -pub trait CompressorStats: Debug + Clone { - /// The type of the underlying source array vtable. - type ArrayVTable: VTable; - - /// Generates stats with default options. - fn generate(input: &::Array) -> Self { - Self::generate_opts(input, GenerateStatsOptions::default()) - } - - /// Generates stats with provided options. - fn generate_opts( - input: &::Array, - opts: GenerateStatsOptions, - ) -> Self; - - /// Returns the underlying source array that statistics were generated from. - fn source(&self) -> &::Array; - - /// Sample the array with default options. - fn sample(&self, sample_size: u32, sample_count: u32) -> Self { - self.sample_opts(sample_size, sample_count, GenerateStatsOptions::default()) - } - - /// Sample the array with provided options. - fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self; -} - -/// Top-level compression scheme trait. -/// -/// Variants are specialized for each data type, e.g. see `IntegerScheme`, `FloatScheme`, etc. -pub trait Scheme: Debug { - /// Type of the stats generated by the compression scheme. - type StatsType: CompressorStats; - /// Type of the code used to uniquely identify the compression scheme. - type CodeType: Copy + Eq + Hash; - - /// Scheme unique identifier. - fn code(&self) -> Self::CodeType; - - /// True if this is the singular Constant scheme for this data type. - fn is_constant(&self) -> bool { - false - } - - /// Estimate the compression ratio for running this scheme (and its children) - /// for the given input. - /// - /// Depth is the depth in the encoding tree we've already reached before considering this - /// scheme. - /// - /// Returns the estimated compression ratio as well as the tree of compressors to use. - fn expected_compression_ratio( - &self, - stats: &Self::StatsType, - is_sample: bool, - allowed_cascading: usize, - excludes: &[Self::CodeType], - ) -> VortexResult { - estimate_compression_ratio_with_sampling( - self, - stats, - is_sample, - allowed_cascading, - excludes, - ) - } - - /// Compress the input with this scheme, yielding a new array. - fn compress( - &self, - stats: &Self::StatsType, - is_sample: bool, - allowed_cascading: usize, - excludes: &[Self::CodeType], - ) -> VortexResult; -} - -fn estimate_compression_ratio_with_sampling( - compressor: &T, - stats: &T::StatsType, - is_sample: bool, - allowed_cascading: usize, - excludes: &[T::CodeType], -) -> VortexResult { - let sample = if is_sample { - stats.clone() - } else { - // We want to sample about 1% of data - let source_len = stats.source().len(); - - // We want to sample about 1% of data, while keeping a minimal sample of 1024 values. - let approximately_one_percent = (source_len / 100) - / usize::try_from(SAMPLE_SIZE).vortex_expect("SAMPLE_SIZE must fit in usize"); - let sample_count = u32::max( - u32::next_multiple_of( - approximately_one_percent - .try_into() - .vortex_expect("sample count must fit in u32"), - 16, - ), - SAMPLE_COUNT, - ); - - tracing::trace!( - "Sampling {} values out of {}", - SAMPLE_SIZE as u64 * sample_count as u64, - source_len - ); - - stats.sample(SAMPLE_SIZE, sample_count) - }; - - let after = compressor - .compress(&sample, true, allowed_cascading, excludes)? - .nbytes(); - let before = sample.source().nbytes(); - - tracing::debug!( - "estimate_compression_ratio_with_sampling(compressor={compressor:#?} is_sample={is_sample}, allowed_cascading={allowed_cascading}) = {}", - before as f64 / after as f64 - ); - - Ok(before as f64 / after as f64) -} - -const MAX_CASCADE: usize = 3; - -/// A compressor for a particular input type. -/// -/// This trait defines the interface for type-specific compressors that can adaptively -/// choose and apply compression schemes based on data characteristics. Compressors -/// analyze input arrays, select optimal compression schemes, and handle cascading -/// compression with multiple encoding layers. -/// -/// The compressor works by generating statistics on the input data, evaluating -/// available compression schemes, and selecting the one with the best compression ratio. -pub trait Compressor { - /// The VTable type for arrays this compressor operates on. - type ArrayVTable: VTable; - /// The compression scheme type used by this compressor. - type SchemeType: Scheme + ?Sized; - /// The statistics type used to analyze arrays for compression. - type StatsType: CompressorStats; - - /// Returns all available compression schemes for this compressor. - fn schemes() -> &'static [&'static Self::SchemeType]; - /// Returns the default fallback compression scheme. - fn default_scheme() -> &'static Self::SchemeType; - /// Returns the scheme code for dictionary compression. - fn dict_scheme_code() -> ::CodeType; - - /// Compresses an array using the optimal compression scheme. - /// - /// Generates statistics on the input array, selects the best compression scheme, - /// and applies it. Returns the original array if compression would increase size. - fn compress( - array: &::Array, - is_sample: bool, - allowed_cascading: usize, - excludes: &[::CodeType], - ) -> VortexResult - where - Self::SchemeType: 'static, - { - // Avoid compressing empty arrays. - if array.is_empty() { - return Ok(array.to_array()); - } - - // Generate stats on the array directly. - let stats = if excludes.contains(&Self::dict_scheme_code()) { - Self::StatsType::generate_opts( - array, - GenerateStatsOptions { - count_distinct_values: false, - }, - ) - } else { - Self::StatsType::generate(array) - }; - let best_scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?; - - let output = best_scheme.compress(&stats, is_sample, allowed_cascading, excludes)?; - if output.nbytes() < array.nbytes() { - Ok(output) - } else { - tracing::debug!("resulting tree too large: {}", output.display_tree()); - Ok(array.to_array()) - } - } - - /// Selects the best compression scheme based on expected compression ratios. - /// - /// Evaluates all available schemes against the provided statistics and returns - /// the one with the highest compression ratio. Falls back to the default scheme - /// if no scheme provides compression benefits. - #[allow(clippy::cognitive_complexity)] - fn choose_scheme( - stats: &Self::StatsType, - is_sample: bool, - allowed_cascading: usize, - excludes: &[::CodeType], - ) -> VortexResult<&'static Self::SchemeType> { - let mut best_ratio = 1.0; - let mut best_scheme: Option<&'static Self::SchemeType> = None; - - // logging helpers - let depth = MAX_CASCADE - allowed_cascading; - - for scheme in Self::schemes().iter() { - if excludes.contains(&scheme.code()) { - continue; - } - - // We never choose Constant for a sample - if is_sample && scheme.is_constant() { - continue; - } - - tracing::trace!(is_sample, depth, ?scheme, "Trying compression scheme"); - - let ratio = - scheme.expected_compression_ratio(stats, is_sample, allowed_cascading, excludes)?; - tracing::trace!( - is_sample, - depth, - ratio, - ?scheme, - "Expected compression result" - ); - - if !(ratio.is_subnormal() || ratio.is_infinite() || ratio.is_nan()) { - if ratio > best_ratio { - best_ratio = ratio; - best_scheme = Some(*scheme); - } - } else { - tracing::trace!( - "Calculated invalid compression ratio {ratio} for scheme: {scheme:?}. Must not be sub-normal, infinite or nan." - ); - } - } - - tracing::trace!(depth, scheme = ?best_scheme, ratio = best_ratio, "best scheme found"); - - if let Some(best) = best_scheme { - Ok(best) - } else { - Ok(Self::default_scheme()) - } - } -} - -/// The main compressor type implementing BtrBlocks-inspired compression. -/// -/// This compressor applies adaptive compression schemes to arrays based on their data types -/// and characteristics. It recursively compresses nested structures like structs and lists, -/// and chooses optimal compression schemes for primitive types. -/// -/// The compressor works by: -/// 1. Canonicalizing input arrays to a standard representation -/// 2. Analyzing data characteristics to choose optimal compression schemes -/// 3. Recursively compressing nested structures -/// 4. Applying type-specific compression for primitives, strings, and temporal data -/// -/// # Examples -/// -/// ```rust -/// use vortex_btrblocks::BtrBlocksCompressor; -/// use vortex_array::Array; -/// -/// let compressor = BtrBlocksCompressor::default(); -/// // let compressed = compressor.compress(&array)?; -/// ``` -#[derive(Default, Debug, Clone)] -pub struct BtrBlocksCompressor { - /// Whether to exclude ints from dictionary encoding. - /// - /// When `true`, integer arrays will not use dictionary compression schemes, - /// which can be useful when the data has high cardinality or when dictionary - /// overhead would exceed compression benefits. - pub exclude_int_dict_encoding: bool, -} - -impl BtrBlocksCompressor { - /// Compresses an array using BtrBlocks-inspired compression. - /// - /// First canonicalizes and compacts the array, then applies optimal compression schemes. - pub fn compress(&self, array: &dyn Array) -> VortexResult { - // Canonicalize the array - let canonical = array.to_canonical()?; - - // Compact it, removing any wasted space before we attempt to compress it - let compact = canonical.compact()?; - - self.compress_canonical(compact) - } - - /// Compresses a canonical array by dispatching to type-specific compressors. - /// - /// Recursively compresses nested structures and applies optimal schemes for each data type. - pub fn compress_canonical(&self, array: Canonical) -> VortexResult { - match array { - Canonical::Null(null_array) => Ok(null_array.into_array()), - // TODO(aduffy): Sparse, other bool compressors. - Canonical::Bool(bool_array) => Ok(bool_array.into_array()), - Canonical::Primitive(primitive) => { - if primitive.ptype().is_int() { - if self.exclude_int_dict_encoding { - IntCompressor::compress_no_dict(&primitive, false, MAX_CASCADE, &[]) - } else { - IntCompressor::compress(&primitive, false, MAX_CASCADE, &[]) - } - } else { - FloatCompressor::compress(&primitive, false, MAX_CASCADE, &[]) - } - } - Canonical::Decimal(decimal) => compress_decimal(&decimal), - Canonical::Struct(struct_array) => { - let fields = struct_array - .unmasked_fields() - .iter() - .map(|field| self.compress(field)) - .collect::, _>>()?; - - Ok(StructArray::try_new( - struct_array.names().clone(), - fields, - struct_array.len(), - struct_array.validity().clone(), - )? - .into_array()) - } - Canonical::List(list_view_array) => { - // TODO(joe): We might want to write list views in the future and chose between - // list and list view. - let list_array = list_from_list_view(list_view_array)?; - - // Reset the offsets to remove garbage data that might prevent us from narrowing our - // offsets (there could be a large amount of trailing garbage data that the current - // views do not reference at all). - let list_array = list_array.reset_offsets(true)?; - - let compressed_elems = self.compress(list_array.elements())?; - - // Note that since the type of our offsets are not encoded in our `DType`, and since - // we guarantee above that all elements are referenced by offsets, we may narrow the - // widths. - - let compressed_offsets = IntCompressor::compress_no_dict( - &list_array.offsets().to_primitive().narrow()?, - false, - MAX_CASCADE, - &[], - )?; - - Ok(ListArray::try_new( - compressed_elems, - compressed_offsets, - list_array.validity().clone(), - )? - .into_array()) - } - Canonical::FixedSizeList(fsl_array) => { - let compressed_elems = self.compress(fsl_array.elements())?; - - Ok(FixedSizeListArray::try_new( - compressed_elems, - fsl_array.list_size(), - fsl_array.validity().clone(), - fsl_array.len(), - )? - .into_array()) - } - Canonical::VarBinView(strings) => { - if strings - .dtype() - .eq_ignore_nullability(&DType::Utf8(Nullability::NonNullable)) - { - StringCompressor::compress(&strings, false, MAX_CASCADE, &[]) - } else { - // Binary arrays do not compress - Ok(strings.into_array()) - } - } - Canonical::Extension(ext_array) => { - // We compress Timestamp-level arrays with DateTimeParts compression - if ext_array.ext_dtype().is::() { - if is_constant_opts( - ext_array.as_ref(), - &IsConstantOpts { - cost: Cost::Canonicalize, - }, - )? - .unwrap_or_default() - { - return Ok(ConstantArray::new( - ext_array.as_ref().scalar_at(0)?, - ext_array.len(), - ) - .into_array()); - } - - let temporal_array = TemporalArray::try_from(ext_array)?; - return compress_temporal(temporal_array); - } - - // Compress the underlying storage array. - let compressed_storage = self.compress(ext_array.storage())?; - - Ok( - ExtensionArray::new(ext_array.ext_dtype().clone(), compressed_storage) - .into_array(), - ) - } - } - } -} +mod scheme; +mod stats; + +pub use builder::BtrBlocksCompressorBuilder; +pub use canonical_compressor::BtrBlocksCompressor; +pub use canonical_compressor::CanonicalCompressor; +use compressor::Compressor; +use compressor::CompressorExt; +use compressor::MAX_CASCADE; +pub use compressor::integer::IntegerStats; +pub use compressor::integer::dictionary::dictionary_encode as integer_dictionary_encode; +use ctx::CompressorContext; +use ctx::Excludes; +use scheme::Scheme; +use scheme::SchemeExt; +pub use stats::CompressorStats; +pub use stats::GenerateStatsOptions; diff --git a/vortex-btrblocks/src/rle.rs b/vortex-btrblocks/src/rle.rs deleted file mode 100644 index 507e8cf8c0e..00000000000 --- a/vortex-btrblocks/src/rle.rs +++ /dev/null @@ -1,177 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use std::fmt::Debug; -use std::hash::Hash; - -use vortex_array::ArrayRef; -use vortex_array::IntoArray; -use vortex_array::ToCanonical; -use vortex_array::arrays::PrimitiveArray; -use vortex_error::VortexResult; -use vortex_fastlanes::RLEArray; - -use crate::CompressorStats; -use crate::Scheme; -use crate::estimate_compression_ratio_with_sampling; -use crate::integer::IntCompressor; - -/// Threshold for the average run length in an array before we consider run-length encoding. -pub const RUN_LENGTH_THRESHOLD: u32 = 4; - -pub trait RLEStats { - fn value_count(&self) -> u32; - fn average_run_length(&self) -> u32; - fn source(&self) -> &PrimitiveArray; -} - -/// RLE scheme that is generic over stats and code. -#[derive(Debug, Clone, Copy)] -pub struct RLEScheme { - pub code: Code, - /// Function to compress values - pub compress_values_fn: fn(&PrimitiveArray, bool, usize, &[Code]) -> VortexResult, - /// Phantom data to tie the scheme to specific stats type - _phantom: std::marker::PhantomData, -} - -impl RLEScheme { - pub const fn new( - code: C, - compress_values_fn: fn(&PrimitiveArray, bool, usize, &[C]) -> VortexResult, - ) -> Self { - Self { - code, - compress_values_fn, - _phantom: std::marker::PhantomData, - } - } -} - -impl Scheme for RLEScheme -where - S: RLEStats + CompressorStats, - C: Copy + Clone + Debug + Hash + PartialEq + Eq, -{ - type StatsType = S; - type CodeType = C; - - fn code(&self) -> C { - self.code - } - - fn expected_compression_ratio( - &self, - stats: &Self::StatsType, - is_sample: bool, - allowed_cascading: usize, - excludes: &[C], - ) -> VortexResult { - // RLE is only useful when we cascade it with another encoding. - if allowed_cascading == 0 { - return Ok(0.0); - } - - // Don't compress all-null or empty arrays. - if stats.value_count() == 0 { - return Ok(0.0); - } - - // Check whether RLE is a good fit, based on the average run length. - if stats.average_run_length() < RUN_LENGTH_THRESHOLD { - return Ok(0.0); - } - - // Run compression on a sample to see how it performs. - estimate_compression_ratio_with_sampling( - self, - stats, - is_sample, - allowed_cascading, - excludes, - ) - } - - fn compress( - &self, - stats: &Self::StatsType, - is_sample: bool, - allowed_cascading: usize, - excludes: &[C], - ) -> VortexResult { - let rle_array = RLEArray::encode(RLEStats::source(stats))?; - - if allowed_cascading == 0 { - return Ok(rle_array.into_array()); - } - - // Prevent RLE recursion. - let mut new_excludes = vec![self.code()]; - new_excludes.extend_from_slice(excludes); - - let compressed_values = (self.compress_values_fn)( - &rle_array.values().to_primitive(), - is_sample, - allowed_cascading - 1, - &new_excludes, - )?; - - // NOTE(aduffy): this encoding appears to be faulty, and was causing Undefined Behavior - // checks to trigger in the gharchive benchmark dataset decompression. - // Delta in an unstable encoding, once we deem it stable we can switch over to this always. - // #[cfg(feature = "unstable_encodings")] - // // For indices and offsets, we always use integer compression without dictionary encoding. - // let compressed_indices = try_compress_delta( - // &rle_array.indices().to_primitive().narrow()?, - // is_sample, - // allowed_cascading - 1, - // &[], - // )?; - - // #[cfg(not(feature = "unstable_encodings"))] - let compressed_indices = IntCompressor::compress_no_dict( - &rle_array.indices().to_primitive().narrow()?, - is_sample, - allowed_cascading - 1, - &[], - )?; - - let compressed_offsets = IntCompressor::compress_no_dict( - &rle_array.values_idx_offsets().to_primitive().narrow()?, - is_sample, - allowed_cascading - 1, - &[], - )?; - - // SAFETY: Recursive compression doesn't affect the invariants. - unsafe { - Ok(RLEArray::new_unchecked( - compressed_values, - compressed_indices, - compressed_offsets, - rle_array.dtype().clone(), - rle_array.offset(), - rle_array.len(), - ) - .into_array()) - } - } -} - -// #[cfg(feature = "unstable_encodings")] -// fn try_compress_delta( -// primitive_array: &PrimitiveArray, -// is_sample: bool, -// allowed_cascading: usize, -// excludes: &[IntCode], -// ) -> VortexResult { -// use vortex_fastlanes::{DeltaArray, delta_compress}; -// -// let (bases, deltas) = delta_compress(primitive_array)?; -// let compressed_bases = IntCompressor::compress(&bases, is_sample, allowed_cascading, excludes)?; -// let compressed_deltas = -// IntCompressor::compress_no_dict(&deltas, is_sample, allowed_cascading, excludes)?; -// -// DeltaArray::try_from_delta_compress_parts(compressed_bases, compressed_deltas) -// .map(DeltaArray::into_array) -// } diff --git a/vortex-btrblocks/src/scheme.rs b/vortex-btrblocks/src/scheme.rs new file mode 100644 index 00000000000..8dbd4d1a010 --- /dev/null +++ b/vortex-btrblocks/src/scheme.rs @@ -0,0 +1,132 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Compression scheme traits. + +use std::fmt::Debug; +use std::hash::Hash; +use std::hash::Hasher; + +use vortex_array::ArrayRef; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; + +use crate::BtrBlocksCompressor; +use crate::CompressorContext; +use crate::CompressorStats; +use crate::stats::SAMPLE_COUNT; +use crate::stats::SAMPLE_SIZE; + +/// Top-level compression scheme trait. +/// +/// Variants are specialized for each data type, e.g. see `IntegerScheme`, `FloatScheme`, etc. +pub trait Scheme: Debug { + /// Type of the stats generated by the compression scheme. + type StatsType: CompressorStats; + /// Type of the code used to uniquely identify the compression scheme. + type CodeType: Copy + Eq + Hash; + + /// Scheme unique identifier. + fn code(&self) -> Self::CodeType; + + /// True if this is the singular Constant scheme for this data type. + fn is_constant(&self) -> bool { + false + } + + /// Estimate the compression ratio for running this scheme (and its children) + /// for the given input. + /// + /// Depth is the depth in the encoding tree we've already reached before considering this + /// scheme. + /// + /// Returns the estimated compression ratio as well as the tree of compressors to use. + fn expected_compression_ratio( + &self, + compressor: &BtrBlocksCompressor, + stats: &Self::StatsType, + ctx: CompressorContext, + excludes: &[Self::CodeType], + ) -> VortexResult { + self.estimate_compression_ratio_with_sampling(compressor, stats, ctx, excludes) + } + + /// Compress the input with this scheme, yielding a new array. + fn compress( + &self, + compressor: &BtrBlocksCompressor, + stats: &Self::StatsType, + ctx: CompressorContext, + excludes: &[Self::CodeType], + ) -> VortexResult; +} + +impl PartialEq for dyn Scheme { + fn eq(&self, other: &Self) -> bool { + self.code() == other.code() + } +} +impl Eq for dyn Scheme {} +impl Hash for dyn Scheme { + fn hash(&self, state: &mut H) { + self.code().hash(state) + } +} + +/// Extension trait providing sampling-based compression ratio estimation for schemes. +pub trait SchemeExt: Scheme { + /// Estimates compression ratio by compressing a sample of the data. + /// + /// This method samples approximately 1% of the data (with a minimum of 1024 values) + /// and compresses it to estimate the overall compression ratio. + fn estimate_compression_ratio_with_sampling( + &self, + btr_blocks_compressor: &BtrBlocksCompressor, + stats: &Self::StatsType, + ctx: CompressorContext, + excludes: &[Self::CodeType], + ) -> VortexResult { + let sample = if ctx.is_sample { + stats.clone() + } else { + // We want to sample about 1% of data + let source_len = stats.source().len(); + + // We want to sample about 1% of data, while keeping a minimal sample of 1024 values. + let approximately_one_percent = (source_len / 100) + / usize::try_from(SAMPLE_SIZE).vortex_expect("SAMPLE_SIZE must fit in usize"); + let sample_count = u32::max( + u32::next_multiple_of( + approximately_one_percent + .try_into() + .vortex_expect("sample count must fit in u32"), + 16, + ), + SAMPLE_COUNT, + ); + + tracing::trace!( + "Sampling {} values out of {}", + SAMPLE_SIZE as u64 * sample_count as u64, + source_len + ); + + stats.sample(SAMPLE_SIZE, sample_count) + }; + + let after = self + .compress(btr_blocks_compressor, &sample, ctx.as_sample(), excludes)? + .nbytes(); + let before = sample.source().nbytes(); + + tracing::debug!( + "estimate_compression_ratio_with_sampling(compressor={self:#?} ctx={ctx:?}) = {}", + before as f64 / after as f64 + ); + + Ok(before as f64 / after as f64) + } +} + +// Blanket implementation for all Scheme types +impl SchemeExt for T {} diff --git a/vortex-btrblocks/src/stats.rs b/vortex-btrblocks/src/stats.rs new file mode 100644 index 00000000000..b3e25cfb8d6 --- /dev/null +++ b/vortex-btrblocks/src/stats.rs @@ -0,0 +1,63 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Compression statistics types. + +use std::fmt::Debug; + +use vortex_array::vtable::VTable; + +/// Configures how stats are generated. +pub struct GenerateStatsOptions { + /// Should distinct values should be counted during stats generation. + pub count_distinct_values: bool, + // pub count_runs: bool, + // should this be scheme-specific? +} + +impl Default for GenerateStatsOptions { + fn default() -> Self { + Self { + count_distinct_values: true, + // count_runs: true, + } + } +} + +/// The size of each sampled run. +pub(crate) const SAMPLE_SIZE: u32 = 64; +/// The number of sampled runs. +/// +/// # Warning +/// +/// The product of SAMPLE_SIZE and SAMPLE_COUNT should be (roughly) a multiple of 1024 so that +/// fastlanes bitpacking of sampled vectors does not introduce (large amounts of) padding. +pub(crate) const SAMPLE_COUNT: u32 = 16; + +/// Stats for the compressor. +pub trait CompressorStats: Debug + Clone { + /// The type of the underlying source array vtable. + type ArrayVTable: VTable; + + /// Generates stats with default options. + fn generate(input: &::Array) -> Self { + Self::generate_opts(input, GenerateStatsOptions::default()) + } + + /// Generates stats with provided options. + fn generate_opts( + input: &::Array, + opts: GenerateStatsOptions, + ) -> Self; + + /// Returns the underlying source array that statistics were generated from. + fn source(&self) -> &::Array; + + /// Sample the array with default options. + fn sample(&self, sample_size: u32, sample_count: u32) -> Self { + self.sample_opts(sample_size, sample_count, GenerateStatsOptions::default()) + } + + /// Sample the array with provided options. + fn sample_opts(&self, sample_size: u32, sample_count: u32, opts: GenerateStatsOptions) -> Self; +} diff --git a/vortex-btrblocks/src/temporal.rs b/vortex-btrblocks/src/temporal.rs deleted file mode 100644 index 67468c1b311..00000000000 --- a/vortex-btrblocks/src/temporal.rs +++ /dev/null @@ -1,44 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -//! Specialized compressor for DateTimeParts metadata. - -use vortex_array::ArrayRef; -use vortex_array::IntoArray; -use vortex_array::ToCanonical; -use vortex_array::arrays::TemporalArray; -use vortex_datetime_parts::DateTimePartsArray; -use vortex_datetime_parts::TemporalParts; -use vortex_datetime_parts::split_temporal; -use vortex_error::VortexResult; - -use crate::Compressor; -use crate::MAX_CASCADE; -use crate::integer::IntCompressor; - -/// Compress a temporal array into a `DateTimePartsArray`. -pub fn compress_temporal(array: TemporalArray) -> VortexResult { - let dtype = array.dtype().clone(); - let TemporalParts { - days, - seconds, - subseconds, - } = split_temporal(array)?; - - let days = - IntCompressor::compress(&days.to_primitive().narrow()?, false, MAX_CASCADE - 1, &[])?; - let seconds = IntCompressor::compress( - &seconds.to_primitive().narrow()?, - false, - MAX_CASCADE - 1, - &[], - )?; - let subseconds = IntCompressor::compress( - &subseconds.to_primitive().narrow()?, - false, - MAX_CASCADE - 1, - &[], - )?; - - Ok(DateTimePartsArray::try_new(dtype, days, seconds, subseconds)?.into_array()) -} diff --git a/vortex-layout/src/layouts/compressed.rs b/vortex-layout/src/layouts/compressed.rs index ca6000afc2f..9f45b5b25df 100644 --- a/vortex-layout/src/layouts/compressed.rs +++ b/vortex-layout/src/layouts/compressed.rs @@ -10,6 +10,8 @@ use vortex_array::ArrayContext; use vortex_array::ArrayRef; use vortex_array::expr::stats::Stat; use vortex_btrblocks::BtrBlocksCompressor; +use vortex_btrblocks::BtrBlocksCompressorBuilder; +use vortex_btrblocks::IntCode; use vortex_error::VortexResult; use vortex_io::runtime::Handle; @@ -75,12 +77,14 @@ impl CompressingStrategy { /// Set `exclude_int_dict_encoding` to true to prevent dictionary encoding of integer arrays, /// which is useful when compressing dictionary codes to avoid recursive dictionary encoding. pub fn new_btrblocks(child: S, exclude_int_dict_encoding: bool) -> Self { - Self::new( - child, - Arc::new(BtrBlocksCompressor { - exclude_int_dict_encoding, - }), - ) + let compressor = if exclude_int_dict_encoding { + BtrBlocksCompressorBuilder::default() + .exclude_int([IntCode::Dict]) + .build() + } else { + BtrBlocksCompressor::default() + }; + Self::new(child, Arc::new(compressor)) } /// Create a new writer that compresses using a `CompactCompressor` to compress chunks.