Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fuzz/fuzz_targets/file_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fuzz_target!(|fuzz: FuzzFileAction| -> Corpus {
let write_options = match compressor_strategy {
CompressorStrategy::Default => SESSION.write_options(),
CompressorStrategy::Compact => {
let strategy = WriteStrategyBuilder::new()
let strategy = WriteStrategyBuilder::default()
.with_compressor(CompactCompressor::default())
.build();
SESSION.write_options().with_strategy(strategy)
Expand Down
1 change: 1 addition & 0 deletions vortex-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub mod mask;
mod mask_future;
pub mod matchers;
mod metadata;
pub mod normalize;
pub mod optimizer;
mod partial_ord;
pub mod patches;
Expand Down
52 changes: 52 additions & 0 deletions vortex-array/src/normalize.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use itertools::Itertools;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_session::registry::Id;

use crate::Array;
use crate::ArrayRef;
use crate::ArrayVisitor;
use crate::session::ArrayRegistry;

/// Options for normalizing an array.
pub struct NormalizeOptions<'a> {
/// The set of allowed array encodings (in addition to the canonical ones) that are permitted
/// in the normalized array.
pub allowed: &'a ArrayRegistry,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this just be Vec?

/// The operation to perform when a non-allowed encoding is encountered.
pub operation: Operation,
}

/// The operation to perform when a non-allowed encoding is encountered.
pub enum Operation {
Error,
// TODO(joe): add into canonical variant
}

impl dyn Array + '_ {
/// Normalize the array according to given options.
///
/// This operation performs a recursive traversal of the array. Any non-allowed encoding is
/// normalized per the configured operation.
pub fn normalize(self: ArrayRef, options: &mut NormalizeOptions) -> VortexResult<ArrayRef> {
let array_ids = options.allowed.ids().collect_vec();
self.normalize_with_error(&array_ids)?;
// Note this takes ownership so we can at a later date remove non-allowed encodings.
Ok(self)
}

fn normalize_with_error(self: &ArrayRef, allowed: &[Id]) -> VortexResult<()> {
if !allowed.contains(&self.encoding_id()) {
vortex_bail!(AssertionFailed: "normalize forbids encoding ({})", self.encoding_id())
}

for child in ArrayVisitor::children(self) {
let child: ArrayRef = child;
child.normalize_with_error(allowed)?
}
Ok(())
}
}
2 changes: 1 addition & 1 deletion vortex-bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ impl CompactionStrategy {
pub fn apply_options(&self, options: VortexWriteOptions) -> VortexWriteOptions {
match self {
CompactionStrategy::Compact => options.with_strategy(
WriteStrategyBuilder::new()
WriteStrategyBuilder::default()
.with_compressor(CompactCompressor::default())
.build(),
),
Expand Down
117 changes: 105 additions & 12 deletions vortex-file/src/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,37 @@
//! This module defines the default layout strategy for a Vortex file.

use std::sync::Arc;

use std::sync::LazyLock;

// Compressed encodings from encoding crates
use vortex_alp::ALPRDVTable;
use vortex_alp::ALPVTable;
// Canonical array encodings from vortex-array
use vortex_array::arrays::BoolVTable;
use vortex_array::arrays::ChunkedVTable;
use vortex_array::arrays::ConstantVTable;
use vortex_array::arrays::DecimalVTable;
use vortex_array::arrays::DictVTable;
use vortex_array::arrays::ExtensionVTable;
use vortex_array::arrays::FixedSizeListVTable;
use vortex_array::arrays::ListVTable;
use vortex_array::arrays::ListViewVTable;
use vortex_array::arrays::MaskedVTable;
use vortex_array::arrays::NullVTable;
use vortex_array::arrays::PrimitiveVTable;
use vortex_array::arrays::StructVTable;
use vortex_array::arrays::VarBinVTable;
use vortex_array::arrays::VarBinViewVTable;
use vortex_array::session::ArrayRegistry;
use vortex_bytebool::ByteBoolVTable;
use vortex_datetime_parts::DateTimePartsVTable;
use vortex_decimal_byte_parts::DecimalBytePartsVTable;
use vortex_dtype::FieldPath;
use vortex_fastlanes::BitPackedVTable;
use vortex_fastlanes::DeltaVTable;
use vortex_fastlanes::FoRVTable;
use vortex_fastlanes::RLEVTable;
use vortex_fsst::FSSTVTable;
use vortex_layout::LayoutStrategy;
use vortex_layout::layouts::buffered::BufferedStrategy;
use vortex_layout::layouts::chunked::writer::ChunkedLayoutStrategy;
Expand All @@ -19,10 +48,64 @@ use vortex_layout::layouts::repartition::RepartitionWriterOptions;
use vortex_layout::layouts::table::TableStrategy;
use vortex_layout::layouts::zoned::writer::ZonedLayoutOptions;
use vortex_layout::layouts::zoned::writer::ZonedStrategy;
use vortex_pco::PcoVTable;
use vortex_runend::RunEndVTable;
use vortex_sequence::SequenceVTable;
use vortex_sparse::SparseVTable;
use vortex_utils::aliases::hash_map::HashMap;
use vortex_zigzag::ZigZagVTable;
#[cfg(feature = "zstd")]
use vortex_zstd::ZstdVTable;

const ONE_MEG: u64 = 1 << 20;

/// Static registry of all allowed array encodings for file writing.
///
/// This includes all canonical encodings from vortex-array plus all compressed
/// encodings from the various encoding crates.
pub static ALLOWED_ENCODINGS: LazyLock<ArrayRegistry> = LazyLock::new(|| {
let registry = ArrayRegistry::default();

// Canonical encodings from vortex-array
registry.register(NullVTable::ID, NullVTable);
registry.register(BoolVTable::ID, BoolVTable);
registry.register(PrimitiveVTable::ID, PrimitiveVTable);
registry.register(DecimalVTable::ID, DecimalVTable);
registry.register(VarBinVTable::ID, VarBinVTable);
registry.register(VarBinViewVTable::ID, VarBinViewVTable);
registry.register(ListVTable::ID, ListVTable);
registry.register(ListViewVTable::ID, ListViewVTable);
registry.register(FixedSizeListVTable::ID, FixedSizeListVTable);
registry.register(StructVTable::ID, StructVTable);
registry.register(ExtensionVTable::ID, ExtensionVTable);
registry.register(ChunkedVTable::ID, ChunkedVTable);
registry.register(ConstantVTable::ID, ConstantVTable);
registry.register(MaskedVTable::ID, MaskedVTable);
registry.register(DictVTable::ID, DictVTable);

// Compressed encodings from encoding crates
registry.register(ALPVTable::ID, ALPVTable);
registry.register(ALPRDVTable::ID, ALPRDVTable);
registry.register(BitPackedVTable::ID, BitPackedVTable);
registry.register(ByteBoolVTable::ID, ByteBoolVTable);
registry.register(DateTimePartsVTable::ID, DateTimePartsVTable);
registry.register(DecimalBytePartsVTable::ID, DecimalBytePartsVTable);
registry.register(DeltaVTable::ID, DeltaVTable);
registry.register(FoRVTable::ID, FoRVTable);
registry.register(FSSTVTable::ID, FSSTVTable);
registry.register(PcoVTable::ID, PcoVTable);
registry.register(RLEVTable::ID, RLEVTable);
registry.register(RunEndVTable::ID, RunEndVTable);
registry.register(SequenceVTable::ID, SequenceVTable);
registry.register(SparseVTable::ID, SparseVTable);
registry.register(ZigZagVTable::ID, ZigZagVTable);

#[cfg(feature = "zstd")]
registry.register(ZstdVTable::ID, ZstdVTable);

registry
});

/// Build a new [writer strategy][LayoutStrategy] to compress and reorganize chunks of a Vortex file.
///
/// Vortex provides an out-of-the-box file writer that optimizes the layout of chunks on-disk,
Expand All @@ -32,25 +115,23 @@ pub struct WriteStrategyBuilder {
compressor: Option<Arc<dyn CompressorPlugin>>,
row_block_size: usize,
field_writers: HashMap<FieldPath, Arc<dyn LayoutStrategy>>,
allow_encodings: Option<ArrayRegistry>,
}

impl Default for WriteStrategyBuilder {
/// Create a new empty builder. It can be further configured,
/// and then finally built yielding the [`LayoutStrategy`].
fn default() -> Self {
Self::new()
}
}

impl WriteStrategyBuilder {
/// Create a new empty builder. It can be further configured, and then finally built
/// yielding the [`LayoutStrategy`].
pub fn new() -> Self {
Self {
compressor: None,
row_block_size: 8192,
field_writers: HashMap::new(),
allow_encodings: None,
}
}
}

impl WriteStrategyBuilder {
/// Override the [compressor][CompressorPlugin] used for compressing chunks in the file.
///
/// If not provided, this will use a BtrBlocks-style cascading compressor that tries to balance
Expand All @@ -77,11 +158,23 @@ impl WriteStrategyBuilder {
self
}

/// Override the allowed array encodings for normalization.
pub fn with_allow_encodings(mut self, allow_encodings: ArrayRegistry) -> Self {
self.allow_encodings = Some(allow_encodings);
self
}

/// Builds the canonical [`LayoutStrategy`] implementation, with the configured overrides
/// applied.
pub fn build(self) -> Arc<dyn LayoutStrategy> {
let flat = if let Some(allow_encodings) = self.allow_encodings {
FlatLayoutStrategy::default().with_allow_encodings(allow_encodings)
} else {
FlatLayoutStrategy::default()
};

// 7. for each chunk create a flat layout
let chunked = ChunkedLayoutStrategy::new(FlatLayoutStrategy::default());
let chunked = ChunkedLayoutStrategy::new(flat.clone());
// 6. buffer chunks so they end up with closer segment ids physically
let buffered = BufferedStrategy::new(chunked, 2 * ONE_MEG); // 2MB
// 5. compress each chunk
Expand Down Expand Up @@ -110,9 +203,9 @@ impl WriteStrategyBuilder {

// 2.1. | 3.1. compress stats tables and dict values.
let compress_then_flat = if let Some(ref compressor) = self.compressor {
CompressingStrategy::new_opaque(FlatLayoutStrategy::default(), compressor.clone())
CompressingStrategy::new_opaque(flat, compressor.clone())
} else {
CompressingStrategy::new_btrblocks(FlatLayoutStrategy::default(), false)
CompressingStrategy::new_btrblocks(flat, false)
};

// 3. apply dict encoding or fallback
Expand Down
7 changes: 4 additions & 3 deletions vortex-file/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ pub struct VortexWriteOptions {
pub trait WriteOptionsSessionExt: SessionExt {
/// Create [`VortexWriteOptions`] for writing to a Vortex file.
fn write_options(&self) -> VortexWriteOptions {
let session = self.session();
VortexWriteOptions {
session: self.session(),
strategy: WriteStrategyBuilder::new().build(),
strategy: WriteStrategyBuilder::default().build(),
session,
exclude_dtype: false,
file_statistics: PRUNING_STATS.to_vec(),
max_variable_length_statistics_size: 64,
Expand All @@ -84,8 +85,8 @@ impl VortexWriteOptions {
/// Create a new [`VortexWriteOptions`] with the given session.
pub fn new(session: VortexSession) -> Self {
VortexWriteOptions {
strategy: WriteStrategyBuilder::default().build(),
session,
strategy: WriteStrategyBuilder::new().build(),
exclude_dtype: false,
file_statistics: PRUNING_STATS.to_vec(),
max_variable_length_statistics_size: 64,
Expand Down
Loading
Loading