Skip to content

Commit 8305bcd

Browse files
authored
feat: heuristic rule for fuse parquet dictionary page (#19024)
* wip * enable dictionary page heuristic rules (not for streeam write yet) * fix typos * wip * wip * wip * apply heuristic ruule to stream write * fix: parquet writer not inited on time * use correct num_rows to init arrow writer * combine ndv stats before init writer * minor refactor * refine * add unit tests * fix ArrowParquetWriter::in_progress_size Returns zero if the writer is not initialized, rather than panicking. * remove unnecessary schema conversions * cleanup * fix ColumnPath * refactor column path extraction * refine collect_leaf_column_paths * minor refactor * cleanup * enable dictionary page by default * refine doc * tweak logic tests * fix filter_nulls.test
1 parent 67548ed commit 8305bcd

File tree

17 files changed

+482
-71
lines changed

17 files changed

+482
-71
lines changed

src/query/expression/src/converts/arrow/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
mod from;
1616
mod to;
17+
pub use self::to::table_schema_arrow_leaf_paths;
1718

1819
pub const EXTENSION_KEY: &str = "Extension";
1920
pub const ARROW_EXT_TYPE_EMPTY_ARRAY: &str = "EmptyArray";

src/query/expression/src/converts/arrow/to.rs

Lines changed: 82 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use super::ARROW_EXT_TYPE_VARIANT;
4141
use super::ARROW_EXT_TYPE_VECTOR;
4242
use super::EXTENSION_KEY;
4343
use crate::infer_table_schema;
44+
use crate::schema::is_internal_column_id;
4445
use crate::types::DataType;
4546
use crate::types::DecimalColumn;
4647
use crate::types::DecimalDataType;
@@ -50,6 +51,7 @@ use crate::types::VectorColumn;
5051
use crate::types::VectorDataType;
5152
use crate::with_number_type;
5253
use crate::Column;
54+
use crate::ColumnId;
5355
use crate::DataBlock;
5456
use crate::DataField;
5557
use crate::DataSchema;
@@ -82,6 +84,57 @@ impl From<&TableSchema> for Schema {
8284
}
8385
}
8486

87+
pub fn table_schema_arrow_leaf_paths(table_schema: &TableSchema) -> Vec<(ColumnId, Vec<String>)> {
88+
let mut arrow_paths = Vec::new();
89+
for field in table_schema.fields() {
90+
if is_internal_column_id(field.column_id()) {
91+
continue;
92+
}
93+
94+
let arrow_field = Field::from(field);
95+
let mut current_path = vec![arrow_field.name().clone()];
96+
collect_arrow_leaf_paths(&arrow_field, &mut current_path, &mut arrow_paths);
97+
}
98+
99+
let leaf_fields = table_schema.leaf_fields();
100+
debug_assert_eq!(leaf_fields.len(), arrow_paths.len());
101+
102+
leaf_fields
103+
.into_iter()
104+
.zip(arrow_paths)
105+
.map(|(field, path)| (field.column_id(), path))
106+
.collect()
107+
}
108+
109+
fn collect_arrow_leaf_paths(
110+
arrow_field: &Field,
111+
current_path: &mut Vec<String>,
112+
paths: &mut Vec<Vec<String>>,
113+
) {
114+
match arrow_field.data_type() {
115+
ArrowDataType::Struct(children) => {
116+
for child in children {
117+
current_path.push(child.name().clone());
118+
collect_arrow_leaf_paths(child.as_ref(), current_path, paths);
119+
current_path.pop();
120+
}
121+
}
122+
ArrowDataType::Map(child, _) => {
123+
current_path.push(child.name().clone());
124+
collect_arrow_leaf_paths(child.as_ref(), current_path, paths);
125+
current_path.pop();
126+
}
127+
ArrowDataType::LargeList(child)
128+
| ArrowDataType::List(child)
129+
| ArrowDataType::FixedSizeList(child, _) => {
130+
current_path.push(child.name().clone());
131+
collect_arrow_leaf_paths(child.as_ref(), current_path, paths);
132+
current_path.pop();
133+
}
134+
_ => paths.push(current_path.clone()),
135+
}
136+
}
137+
85138
impl From<&DataType> for ArrowDataType {
86139
fn from(ty: &DataType) -> Self {
87140
let fields = DataField::new("dummy", ty.clone());
@@ -93,7 +146,6 @@ impl From<&DataType> for ArrowDataType {
93146
impl From<&TableField> for Field {
94147
fn from(f: &TableField) -> Self {
95148
let mut metadata = HashMap::new();
96-
97149
let ty = match &f.data_type {
98150
TableDataType::Null => ArrowDataType::Null,
99151
TableDataType::EmptyArray => {
@@ -264,6 +316,30 @@ impl DataBlock {
264316
self.to_record_batch(&table_schema)
265317
}
266318

319+
pub fn to_record_batch_with_arrow_schema(
320+
self,
321+
arrow_schema: &Arc<arrow_schema::Schema>,
322+
) -> Result<RecordBatch> {
323+
let num_fields = arrow_schema.fields.len();
324+
if self.columns().len() != num_fields {
325+
return Err(ErrorCode::Internal(format!(
326+
"The number of columns in the data block does not match the number of fields in the table schema, block_columns: {}, table_schema_fields: {}",
327+
self.columns().len(),
328+
num_fields,
329+
)));
330+
}
331+
332+
if num_fields == 0 {
333+
return Ok(RecordBatch::try_new_with_options(
334+
Arc::new(Schema::empty()),
335+
vec![],
336+
&RecordBatchOptions::default().with_row_count(Some(self.num_rows())),
337+
)?);
338+
}
339+
340+
self.build_record_batch(arrow_schema.clone())
341+
}
342+
267343
pub fn to_record_batch(self, table_schema: &TableSchema) -> Result<RecordBatch> {
268344
if self.columns().len() != table_schema.num_fields() {
269345
return Err(ErrorCode::Internal(format!(
@@ -282,14 +358,18 @@ impl DataBlock {
282358
}
283359

284360
let arrow_schema = Schema::from(table_schema);
361+
self.build_record_batch(Arc::new(arrow_schema))
362+
}
363+
364+
fn build_record_batch(self, arrow_schema: Arc<arrow_schema::Schema>) -> Result<RecordBatch> {
285365
let mut arrays = Vec::with_capacity(self.columns().len());
286366
for (entry, arrow_field) in self.take_columns().into_iter().zip(arrow_schema.fields()) {
287367
let array = entry.to_column().maybe_gc().into_arrow_rs();
288368

289369
// Adjust struct array names
290370
arrays.push(Self::adjust_nested_array(array, arrow_field.as_ref()));
291371
}
292-
Ok(RecordBatch::try_new(Arc::new(arrow_schema), arrays)?)
372+
Ok(RecordBatch::try_new(arrow_schema, arrays)?)
293373
}
294374

295375
fn adjust_nested_array(array: Arc<dyn Array>, arrow_field: &Field) -> Arc<dyn Array> {

src/query/storages/common/blocks/src/parquet_rs.rs

Lines changed: 200 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,23 @@
1515
use std::sync::Arc;
1616

1717
use databend_common_exception::Result;
18+
use databend_common_expression::converts::arrow::table_schema_arrow_leaf_paths;
19+
use databend_common_expression::ColumnId;
1820
use databend_common_expression::DataBlock;
1921
use databend_common_expression::TableSchema;
22+
use databend_storages_common_table_meta::meta::StatisticsOfColumns;
2023
use databend_storages_common_table_meta::table::TableCompression;
2124
use parquet::arrow::ArrowWriter;
2225
use parquet::basic::Encoding;
2326
use parquet::file::metadata::KeyValue;
2427
use parquet::file::properties::EnabledStatistics;
2528
use parquet::file::properties::WriterProperties;
26-
use parquet::file::properties::WriterPropertiesBuilder;
2729
use parquet::file::properties::WriterVersion;
2830
use parquet::format::FileMetaData;
31+
use parquet::schema::types::ColumnPath;
32+
33+
/// Disable dictionary encoding once the NDV-to-row ratio is greater than this threshold.
34+
const HIGH_CARDINALITY_RATIO_THRESHOLD: f64 = 0.1;
2935

3036
/// Serialize data blocks to parquet format.
3137
pub fn blocks_to_parquet(
@@ -35,16 +41,58 @@ pub fn blocks_to_parquet(
3541
compression: TableCompression,
3642
enable_dictionary: bool,
3743
metadata: Option<Vec<KeyValue>>,
44+
) -> Result<FileMetaData> {
45+
blocks_to_parquet_with_stats(
46+
table_schema,
47+
blocks,
48+
write_buffer,
49+
compression,
50+
enable_dictionary,
51+
metadata,
52+
None,
53+
)
54+
}
55+
56+
/// Serialize blocks while optionally tuning dictionary behavior via NDV statistics.
57+
///
58+
/// * `table_schema` - Logical schema used to build Arrow batches.
59+
/// * `blocks` - In-memory blocks that will be serialized into a single Parquet file.
60+
/// * `write_buffer` - Destination buffer that receives the serialized Parquet bytes.
61+
/// * `compression` - Compression algorithm specified by table-level settings.
62+
/// * `enable_dictionary` - Enables dictionary encoding globally before per-column overrides.
63+
/// * `metadata` - Additional user metadata embedded into the Parquet footer.
64+
/// * `column_stats` - Optional NDV stats from the first block, used to configure writer properties
65+
/// before ArrowWriter instantiation disables further changes.
66+
pub fn blocks_to_parquet_with_stats(
67+
table_schema: &TableSchema,
68+
blocks: Vec<DataBlock>,
69+
write_buffer: &mut Vec<u8>,
70+
compression: TableCompression,
71+
enable_dictionary: bool,
72+
metadata: Option<Vec<KeyValue>>,
73+
column_stats: Option<&StatisticsOfColumns>,
3874
) -> Result<FileMetaData> {
3975
assert!(!blocks.is_empty());
40-
let builder = parquet_writer_properties_builder(compression, enable_dictionary, metadata);
4176

42-
let props = builder.build();
77+
// Writer properties cannot be tweaked after ArrowWriter creation, so we mirror the behavior of
78+
// the streaming writer and only rely on the first block's NDV (and row count) snapshot.
79+
let num_rows = blocks[0].num_rows();
80+
let arrow_schema = Arc::new(table_schema.into());
81+
82+
let props = build_parquet_writer_properties(
83+
compression,
84+
enable_dictionary,
85+
column_stats,
86+
metadata,
87+
num_rows,
88+
table_schema,
89+
);
90+
4391
let batches = blocks
4492
.into_iter()
45-
.map(|block| block.to_record_batch(table_schema))
93+
.map(|block| block.to_record_batch_with_arrow_schema(&arrow_schema))
4694
.collect::<Result<Vec<_>>>()?;
47-
let arrow_schema = Arc::new(table_schema.into());
95+
4896
let mut writer = ArrowWriter::try_new(write_buffer, arrow_schema, Some(props))?;
4997
for batch in batches {
5098
writer.write(&batch)?;
@@ -53,12 +101,16 @@ pub fn blocks_to_parquet(
53101
Ok(file_meta)
54102
}
55103

56-
pub fn parquet_writer_properties_builder(
104+
/// Create writer properties, optionally disabling dictionaries for high-cardinality columns.
105+
pub fn build_parquet_writer_properties(
57106
compression: TableCompression,
58107
enable_dictionary: bool,
108+
cols_stats: Option<impl NdvProvider>,
59109
metadata: Option<Vec<KeyValue>>,
60-
) -> WriterPropertiesBuilder {
61-
let builder = WriterProperties::builder()
110+
num_rows: usize,
111+
table_schema: &TableSchema,
112+
) -> WriterProperties {
113+
let mut builder = WriterProperties::builder()
62114
.set_compression(compression.into())
63115
// use `usize::MAX` to effectively limit the number of row groups to 1
64116
.set_max_row_group_size(usize::MAX)
@@ -68,10 +120,147 @@ pub fn parquet_writer_properties_builder(
68120
.set_key_value_metadata(metadata);
69121

70122
if enable_dictionary {
71-
builder
123+
// Enable dictionary for all columns
124+
builder = builder
72125
.set_writer_version(WriterVersion::PARQUET_2_0)
73-
.set_dictionary_enabled(true)
126+
.set_dictionary_enabled(true);
127+
if let Some(cols_stats) = cols_stats {
128+
// Disable dictionary of columns that have high cardinality
129+
for (column_id, components) in table_schema_arrow_leaf_paths(table_schema) {
130+
if let Some(ndv) = cols_stats.column_ndv(&column_id) {
131+
if num_rows > 0
132+
&& (ndv as f64 / num_rows as f64) > HIGH_CARDINALITY_RATIO_THRESHOLD
133+
{
134+
builder = builder
135+
.set_column_dictionary_enabled(ColumnPath::from(components), false);
136+
}
137+
}
138+
}
139+
}
140+
builder.build()
74141
} else {
75-
builder.set_dictionary_enabled(false)
142+
builder.set_dictionary_enabled(false).build()
143+
}
144+
}
145+
146+
/// Provides per column NDV statistics
147+
pub trait NdvProvider {
148+
fn column_ndv(&self, column_id: &ColumnId) -> Option<u64>;
149+
}
150+
151+
impl NdvProvider for &StatisticsOfColumns {
152+
fn column_ndv(&self, column_id: &ColumnId) -> Option<u64> {
153+
self.get(column_id).and_then(|item| item.distinct_of_values)
154+
}
155+
}
156+
157+
#[cfg(test)]
158+
mod tests {
159+
use std::collections::HashMap;
160+
161+
use databend_common_expression::types::number::NumberDataType;
162+
use databend_common_expression::TableDataType;
163+
use databend_common_expression::TableField;
164+
165+
use super::*;
166+
167+
struct TestNdvProvider {
168+
ndv: HashMap<ColumnId, u64>,
169+
}
170+
171+
impl NdvProvider for TestNdvProvider {
172+
fn column_ndv(&self, column_id: &ColumnId) -> Option<u64> {
173+
self.ndv.get(column_id).copied()
174+
}
175+
}
176+
177+
fn sample_schema() -> TableSchema {
178+
TableSchema::new(vec![
179+
TableField::new("simple", TableDataType::Number(NumberDataType::Int32)),
180+
TableField::new("nested", TableDataType::Tuple {
181+
fields_name: vec!["leaf".to_string(), "arr".to_string()],
182+
fields_type: vec![
183+
TableDataType::Number(NumberDataType::Int64),
184+
TableDataType::Array(Box::new(TableDataType::Number(NumberDataType::UInt64))),
185+
],
186+
}),
187+
TableField::new("no_stats", TableDataType::String),
188+
])
189+
}
190+
191+
fn column_id(schema: &TableSchema, name: &str) -> ColumnId {
192+
schema
193+
.leaf_fields()
194+
.into_iter()
195+
.find(|field| field.name() == name)
196+
.unwrap_or_else(|| panic!("missing field {}", name))
197+
.column_id()
198+
}
199+
200+
#[test]
201+
fn test_build_parquet_writer_properties_handles_nested_leaves() {
202+
let schema = sample_schema();
203+
204+
let mut ndv = HashMap::new();
205+
ndv.insert(column_id(&schema, "simple"), 500);
206+
ndv.insert(column_id(&schema, "nested:leaf"), 50);
207+
ndv.insert(column_id(&schema, "nested:arr:0"), 400);
208+
209+
let column_paths: HashMap<ColumnId, ColumnPath> = table_schema_arrow_leaf_paths(&schema)
210+
.into_iter()
211+
.map(|(id, path)| (id, ColumnPath::from(path)))
212+
.collect();
213+
214+
let props = build_parquet_writer_properties(
215+
TableCompression::Zstd,
216+
true,
217+
Some(TestNdvProvider { ndv }),
218+
None,
219+
1000,
220+
&schema,
221+
);
222+
223+
assert!(
224+
!props.dictionary_enabled(&column_paths[&column_id(&schema, "simple")]),
225+
"high cardinality top-level column should disable dictionary"
226+
);
227+
assert!(
228+
props.dictionary_enabled(&column_paths[&column_id(&schema, "nested:leaf")]),
229+
"low cardinality nested column should keep dictionary"
230+
);
231+
assert!(
232+
!props.dictionary_enabled(&column_paths[&column_id(&schema, "nested:arr:0")]),
233+
"high cardinality nested array element should disable dictionary"
234+
);
235+
assert!(
236+
props.dictionary_enabled(&column_paths[&column_id(&schema, "no_stats")]),
237+
"columns without NDV stats keep the default dictionary behavior"
238+
);
239+
}
240+
241+
#[test]
242+
fn test_build_parquet_writer_properties_disabled_globally() {
243+
let schema = sample_schema();
244+
245+
let column_paths: HashMap<ColumnId, ColumnPath> = table_schema_arrow_leaf_paths(&schema)
246+
.into_iter()
247+
.map(|(id, path)| (id, ColumnPath::from(path)))
248+
.collect();
249+
250+
let props = build_parquet_writer_properties(
251+
TableCompression::Zstd,
252+
false,
253+
None::<TestNdvProvider>,
254+
None,
255+
1000,
256+
&schema,
257+
);
258+
259+
for field in schema.leaf_fields() {
260+
assert!(
261+
!props.dictionary_enabled(&column_paths[&field.column_id()]),
262+
"dictionary must remain disabled when enable_dictionary is false",
263+
);
264+
}
76265
}
77266
}

0 commit comments

Comments
 (0)