-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Row group limit pruning for row groups that entirely match predicates #18868
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
52f012f to
d075c86
Compare
|
I also like to construct some good-fit cases to show some benchmark results. |
aaf76c0 to
cb5711d
Compare
|
I plan to review this PR carefully tomorrow |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for this PR @xudong963
The idea is very cool, and a nicely done PR; I have several suggestions, but I think it is very close.
Suggestions:
- Rename the flag to
preserve_order - Make this new flag on TableScan visible in EXPLAIN plans so we have a better chance of understanding when it is being applied/not applied
- Implement an end to end test for the behavior you want to see (specifically, a test that shows a query on a parquet file and demonstrates that only a single row group is fetched with this flag, and more than one is fetched without the flag)
In my mind, 3 is the most important
|
🤖 |
|
🤖: Benchmark completed Details
|
fdef903 to
7e1fc4f
Compare
90295f9 to
63cd878
Compare
Thanks for the review @alamb . I addressed some suggestions:
Things later the PR:
|
|
I was mentioning this PR to @crepererum and he also noted there is another interesting potential optimization when we know the predicate is true for the entire row group: we can skip evaluating the predicate for the row group entirely (as the filter can often be quite expensive itself) @xudong963 is this something else you are contemplating? This PR likely lays the foundation for such an optimization. If you think it is worthwhile I'll file a ticket |
I just reviewed this -- it looks nice 👍 |
60fed57 to
8c08ac9
Compare
Thank you @alamb , I addressed your newest comments in the commit. I also like to just merge into main and give it some "bake time". |
Thank you @adriangb , no worries, looking forward to your thoughts! |
| /// If should keep the output rows in order | ||
| pub preserve_order: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be pub? Is it pub only for tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we need the pub like other fields, because in create_file_opener, the opener is created by Structure way:
ParquetOpener {
partition_index: partition,
projection: self.projection.clone(),
batch_size: self
.batch_size
.expect("Batch size must set before creating ParquetOpener"),
limit: base_config.limit,
preserve_order: base_config.preserve_order,
predicate: self.predicate.clone(),
table_schema: self.table_schema.clone(),
metadata_size_hint: self.metadata_size_hint,
metrics: self.metrics().clone(),
parquet_file_reader_factory,
pushdown_filters: self.pushdown_filters(),
reorder_filters: self.reorder_filters(),
force_filter_selections: self.force_filter_selections(),
enable_page_index: self.enable_page_index(),
enable_bloom_filter: self.bloom_filter_on_read(),
enable_row_group_stats_pruning: self.table_parquet_options.global.pruning,
coerce_int96,
#[cfg(feature = "parquet_encryption")]
file_decryption_properties,
expr_adapter_factory,
#[cfg(feature = "parquet_encryption")]
encryption_factory: self.get_encryption_factory_with_config(),
max_predicate_cache_size: self.max_predicate_cache_size(),
reverse_row_groups: self.reverse_row_groups,
});
We could add a new method later, then see if some fields don't need the pub
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't it be pub(crate) then?
|
|
||
| // Simplify the NOT expression (e.g., NOT(c1 = 0) -> c1 != 0) | ||
| // before building the pruning predicate | ||
| let simplifier = PhysicalExprSimplifier::new(arrow_schema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we could short circuit by adding a not() to PruningPredicate or something like that to take an existing PruningPredicate and negate it by cloning the innner expression Arc or something.
| /// Whether the scan's limit is order sensitive | ||
| pub preserve_order: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seeing this in isolation is a bit confusing. I think adding 2-3 lines of docstring summarizing the explanation from the PR description or something would be helpful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done fd95aaf
| let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>() else { | ||
| return plan; | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It saddens me every time I see a downcast. Maybe you decided not to because it's more code churn or something but could we record in the FileScanConfig if an order was pushed down into it and if so set the preserve_order flag? As opposed to the case where the scan has an ordering but it isn't required by upstream operators.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could set the preserve_order flag to true when an order was pushed down into FileScanConfig.
But here, I'm trying to solve the case described in #18868 (comment)
However, a complication arises: if the data distribution and physical ordering of table t already satisfy ORDER BY a, the EnforceSorting phase in the Physical Optimizer will remove the Sort node. Subsequently, the LIMIT is pushed down to the DataSource during the LimitPushdown phase.
In this scenario, we cannot rely solely on the presence of the LIMIT at the Parquet level to decide whether to prune. If we prune based on a limit that was originally associated with a removed Sort, we might violate the required global ordering.
Proposed Solution: To address this fundamentally, when a Sort node is removed because the distribution already matches the requirements, we should mark the resulting Limit node as "Order-Sensitive." During the subsequent LimitPushdown, we can detect this flag and set the preserve_order attribute to true in the ParquetOpener. This ensures that Limit Pruning is bypassed when the global order must be maintained.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see yeah. I wonder if with the new sort pushdown rule we can tweak this?
My thought is that the scan can have a sort order that was discovered or user provided but not promise to upstream nodes that it's going to output sorted results until the upstream nodes ask it to do so. At that point it would set this internal flag so when we get a limit we can remember that we promised our parent nodes we'd be outputting sorted results.
Put another way, at the same time that a file scan node sets it's equivalence properties to output sorted results it should set this flag. So maybe all we need to do is in ListingTable set this flag if we also give the scan node a specified ordering? Then it gets set one of two ways:
- In the listing step, if we discover a sort order from Parquet metadata or if the user provided a sort order.
- Via the new sort pushdown rule.
So either way, if the scan node is outputting sorted data the flag is set?
I just dislike this sort of downcast matching in a generic engine like DataFusion. It means that custom scan nodes can't benefit from this feature, etc.
8a39aa1 to
682b68b
Compare
Which issue does this PR close?
Rationale for this change
See #18860 (comment)
What changes are included in this PR?
Are these changes tested?
Yes
Are there any user-facing changes?
No, no new configs, or API change