-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Parquet: Push down supported list predicates (array_has/any/all) during decoding #19545
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Document supported nested pushdown semantics and update row-level predicate construction to utilize leaf-based projection masks. Enable list-aware predicates like array_has_all while maintaining unsupported nested structures on the fallback path. Expand filter candidate building for root and leaf projections of nested columns, facilitating cost estimation and mask creation aligned with Parquet leaf layouts. Include struct/list pushdown checks and add a new integration test to validate array_has_all pushdown behavior against Parquet row filters. Introduce dev dependencies for nested function helpers and temporary file creation used in the new tests.
Extract supports_list_predicates() into its own module and create a SUPPORTED_ARRAY_FUNCTIONS constant registry for improved management. Add is_supported_list_predicate() helper function for easier extensibility, along with comprehensive documentation and unit tests. Refactor check_single_column() using intermediate variables to clarify logic for handling structs and unsupported lists. Introduce a new test case for mixed primitive and struct predicates to ensure proper functionality and validation of pushable predicates.
Extract common test logic into test_array_predicate_pushdown helper function to reduce duplication and ensure parity across all three supported array functions (array_has, array_has_all, array_has_any). This makes it easier to maintain and extend test coverage for new array functions in the future. Benefits: - Reduces code duplication from ~70 lines × 3 to ~10 lines × 3 - Ensures consistent test methodology across all array functions - Clear documentation of expected behavior for each function - Easier to add tests for new supported functions
Add detailed rustdoc examples to can_expr_be_pushed_down_with_schemas() showing three key scenarios: 1. Primitive column filters (allowed) - e.g., age > 30 2. Struct column filters (blocked) - e.g., person IS NOT NULL 3. List column filters with supported predicates (allowed) - e.g., array_has_all(tags, ['rust']) These examples help users understand when filter pushdown to the Parquet decoder is available and guide them in writing efficient queries. Benefits: - Clear documentation of supported and unsupported cases - Helps users optimize query performance - Provides copy-paste examples for common patterns - Updated to reflect new list column support
- Replace 'while let Some(batch) = reader.next()' with idiomatic 'for batch in reader' - Remove unnecessary mut from reader variable - Addresses clippy::while_let_on_iterator warning
- Document function name detection assumption in supported_predicates - Note reliance on exact string matching - Suggest trait-based approach for future robustness - Explain ProjectionMask::leaves() choice for nested columns - Clarify why leaf indices are needed for nested structures - Helps reviewers understand Parquet schema descriptor usage These comments address Low Priority suggestions from code review, improving maintainability and onboarding for future contributors.
Remove SUPPORTED_ARRAY_FUNCTIONS array. Introduce dedicated predicate functions for NULL checks and scalar function support. Utilize pattern matching with matches! macro instead of array lookups. Enhance code clarity and idiomatic Rust usage with is_some_and() for condition checks and simplify recursion using a single expression.
Extract helper functions to reduce code duplication in array pushdown and physical plan tests. Consolidate similar assertions and checks, simplifying tests from ~50 to ~30 lines. Transform display tests into a single parameterized test, maintaining coverage while eliminating repeated code.
…monstrations" This reverts commit 94f1a99cee4e44e5176450156a684a2316af78e1.
Extract handle_nested_type() to encapsulate logic for determining if a nested type prevents pushdown. Introduce is_nested_type_supported() to isolate type checking for List/LargeList/FixedSizeList and predicate support. Simplify check_single_column() by reducing nesting depth and delegating nested type logic to helper methods.
|
Thank you @kosiew , i will review it soon! |
|
Great work on implementing list type pushdown! I suggest adding a benchmark to demonstrate the performance improvement more clearly. Here's a proposed test scenario: Benchmark Setup:
Expected Results:
|
Implement a new Criterion benchmark to test row-level pushdown for array_has predicates on a 100K-row dataset. Compare pushdown versus baseline scans and assert 90% row pruning. Enable the benchmark target and update the proposed dependencies in the Parquet datasource crate.
Extend list-predicate detection to include function names from scalar UDF expressions, ensuring array_has, array_has_all, and array_has_any qualify for Parquet list pushdown. Add row filter tests to verify support for list predicates and confirm correct row filters for list columns.
Introduce an additional large binary payload column in the Parquet nested filter pushdown benchmark. Update dataset generation and batch construction to populate the new column while maintaining existing pruning assertions. Benchmark results show improved performance with pushdown, averaging ~9.4 ms compared to ~37.7 ms without it.
…low criterion best practices Move schema and predicate setup outside the benchmark loop to measure only execution time, not plan creation. This follows the pattern used in topk_aggregate benchmarks where: - setup_reader() creates the schema once per case - create_predicate() builds the filter expression once per case - scan_with_predicate() performs the actual file scan and filtering inside the loop This ensures consistent benchmark measurements focused on filter pushdown effectiveness rather than setup overhead.
|
The issue of cargo audit failing is being addressed in #19657 |
|
hi @zhuqi-lucas I added a benchmark.
|
zhuqi-lucas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work, looks good to me, left some comments.
|
|
||
| // `ScalarUDFExpr` is currently an alias of `ScalarFunctionExpr` in this crate, | ||
| // but keep a separate type to support potential future divergence. | ||
| type ScalarUDFExpr = ScalarFunctionExpr; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't see a difference here, why not just use ScalarFunctionExpr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll remove it.
| /// Does the expression reference any columns not present in the file schema? | ||
| projected_columns: bool, | ||
| /// Indices into the file schema of columns required to evaluate the expression. | ||
| required_columns: BTreeSet<usize>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to this PR, if we can optimize to required_columns: Vec for small column set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tracking issue - #19673
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @kosiew !
|
run benchmarks |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
run benchmarks |
|
I restarted the benchmarks to see if the results are reproducible -- I also merged up to get the Ci green |
|
🤖 |
|
🤖: Benchmark completed Details
|
Co-authored-by: Qi Zhu <821684824@qq.com>
Co-authored-by: Qi Zhu <821684824@qq.com>
Co-authored-by: Qi Zhu <821684824@qq.com>
|
@zhuqi-lucas |
|
Thank you @kosiew, great work, LGTM now. |
adriangb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate on why some predicates are "safe" and others are not "safe"? What is unsafe about reading a single field of a struct column? Won't you also run into #9066?
| /// // Can safely push down to Parquet decoder | ||
| /// } | ||
| /// ``` | ||
| pub trait SupportsListPushdown { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why this is a trait. Do any concrete types implement this trait? Why not just write fn supports_list_pushdown(expr: &dyn PhyiscalExpr)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rationale is
// Current (trait-based) - natural and discoverable
if expr.supports_list_pushdown() { ... }
while
// Alternative (function-only) - requires searching for a function
if supports_list_pushdown(&expr) { ... }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it's only used within this one module. Either way I think this may all be changed soon by #19556

Which issue does this PR close?
Rationale for this change
DataFusion’s Parquet row-level filter pushdown previously rejected all nested Arrow types (lists/structs), which prevented common and performance-sensitive filters on list columns (for example
array_has,array_has_all,array_has_any) from being evaluated during Parquet decoding.Enabling safe pushdown for a small, well-defined set of list-aware predicates allows Parquet decoding to apply these filters earlier, reducing materialization work and improving scan performance, while still keeping unsupported nested projections (notably structs) evaluated after batches are materialized.
What changes are included in this PR?
Allow a registry of list-aware predicates to be considered pushdown-compatible:
array_has,array_has_all,array_has_anyIS NULL/IS NOT NULLIntroduce
supported_predicatesmodule to detect whether an expression tree contains supported list predicates.Update Parquet filter candidate selection to:
Switch Parquet projection mask construction from root indices to leaf indices (
ProjectionMask::leaves) so nested list filters project the correct leaf columns for decoding-time evaluation.Expand root column indices to leaf indices for nested columns using the Parquet
SchemaDescriptor.Add unit tests verifying:
array_has,array_has_all,array_has_anyactually filter rows during decoding using a temp Parquet file.Add sqllogictest coverage proving both correctness and plan behavior:
EXPLAINshows predicates pushed intoDataSourceExecfor Parquet.Are these changes tested?
Yes.
Rust unit tests in
datafusion/datasource-parquet/src/row_filter.rs:SQL logic tests in
datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt:array_has,array_has_all,array_has_anyand combinations (OR / AND with other predicates).DataSourceExec ... predicate=...).Are there any user-facing changes?
Yes.
Parquet filter pushdown now supports list columns for the following predicates:
array_has,array_has_all,array_has_anyIS NULL,IS NOT NULLThis can improve query performance for workloads that filter on array/list columns.
No breaking changes are introduced; unsupported nested types (for example structs) continue to be evaluated after decoding.
LLM-generated code disclosure
This PR includes LLM-generated code and comments. All LLM-generated content has been manually reviewed and tested.