Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions datafusion-examples/examples/query_planning/optimizer_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,10 @@ pub async fn optimizer_rule() -> Result<()> {
let plan = ctx.sql(sql).await?.into_optimized_plan()?;

// We can see the effect of our rewrite on the output plan that the filter
// has been rewritten to `my_eq`
// has been rewritten to `my_eq` (consolidated into TableScan as unsupported_filters)
assert_eq!(
plan.display_indent().to_string(),
"Filter: my_eq(person.age, Int32(22))\
\n TableScan: person projection=[name, age]"
"TableScan: person projection=[name, age], unsupported_filters=[my_eq(person.age, Int32(22))]"
);

// The query below doesn't respect a filter `where age = 22` because
Expand Down
6 changes: 4 additions & 2 deletions datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,11 @@ mod tests {
.select_columns(&["bool_col", "int_col"])?;

let plan = df.explain(false, false)?.collect().await?;
// Filters all the way to Parquet
// Filters all the way to Parquet - the physical planner creates FilterExec
// for inexact filters (parquet predicate pushdown is inexact). Column index
// is @2 because we expand projection to include filter columns.
let formatted = pretty::pretty_format_batches(&plan)?.to_string();
assert!(formatted.contains("FilterExec: id@0 = 1"), "{formatted}");
assert!(formatted.contains("FilterExec: id@2 = 1"), "{formatted}");

Ok(())
}
Expand Down
15 changes: 9 additions & 6 deletions datafusion/core/src/datasource/view_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,13 @@ mod tests {

let plan = df.explain(false, false)?.collect().await?;

// Filters all the way to Parquet
// Filters all the way to Parquet - physical planner creates FilterExec
// for inexact filters. Column index is @2 because we expand projection
// to include filter columns.
let formatted = arrow::util::pretty::pretty_format_batches(&plan)
.unwrap()
.to_string();
assert!(formatted.contains("FilterExec: id@0 = 1"));
assert!(formatted.contains("FilterExec: id@2 = 1"), "{formatted}");
Ok(())
}

Expand Down Expand Up @@ -396,23 +398,24 @@ mod tests {
.await?;
let plan = dataframe.into_optimized_plan()?;
let actual = format!("{}", plan.display_indent());
// Filters are now pushed to TableScan.filters during logical optimization
// (classification happens in physical planner)
let expected = "\
Explain\
\n CreateView: Bare { table: \"xyz\" }\
\n Filter: abc.column2 = Int64(5)\
\n TableScan: abc projection=[column1, column2, column3]";
\n TableScan: abc projection=[column1, column2, column3], unsupported_filters=[abc.column2 = Int64(5)]";
assert_eq!(expected, actual);

let dataframe = session_ctx
.sql("EXPLAIN CREATE VIEW xyz AS SELECT column1, column2 FROM abc WHERE column2 = 5")
.await?;
let plan = dataframe.into_optimized_plan()?;
let actual = format!("{}", plan.display_indent());
// Filters are now pushed to TableScan.filters during logical optimization
let expected = "\
Explain\
\n CreateView: Bare { table: \"xyz\" }\
\n Filter: abc.column2 = Int64(5)\
\n TableScan: abc projection=[column1, column2]";
\n TableScan: abc projection=[column1, column2], unsupported_filters=[abc.column2 = Int64(5)]";
assert_eq!(expected, actual);

Ok(())
Expand Down
250 changes: 238 additions & 12 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,18 +462,133 @@ impl DefaultPhysicalPlanner {
fetch,
..
}) => {
let source = source_as_provider(source)?;
use datafusion_expr::TableProviderFilterPushDown;
use datafusion_expr::utils::conjunction;

let provider = source_as_provider(source)?;
let source_schema = provider.schema();

// Remove all qualifiers from the scan as the provider
// doesn't know (nor should care) how the relation was
// referred to in the query
let filters = unnormalize_cols(filters.iter().cloned());
let filters_vec = filters.into_iter().collect::<Vec<_>>();
let filters: Vec<Expr> = unnormalize_cols(filters.iter().cloned());

// Separate volatile filters (never push to source)
let (volatile_filters, non_volatile_filters): (Vec<Expr>, Vec<Expr>) =
filters.into_iter().partition(|pred| pred.is_volatile());

// Classify non-volatile filters using provider
let filter_refs: Vec<&Expr> = non_volatile_filters.iter().collect();
let supported = provider.supports_filters_pushdown(&filter_refs)?;

assert_eq!(
non_volatile_filters.len(),
supported.len(),
"supports_filters_pushdown returned {} results for {} filters",
supported.len(),
non_volatile_filters.len()
);

// Separate into pushable (Exact/Inexact) and post-scan filters
let mut pushable_filters = Vec::new();
let mut post_scan_filters = Vec::new();

for (filter, support) in
non_volatile_filters.into_iter().zip(supported.iter())
{
match support {
TableProviderFilterPushDown::Exact => {
pushable_filters.push(filter);
}
TableProviderFilterPushDown::Inexact => {
pushable_filters.push(filter.clone());
post_scan_filters.push(filter);
}
TableProviderFilterPushDown::Unsupported => {
post_scan_filters.push(filter);
}
}
}

// Add volatile filters to post-scan (never pushed)
post_scan_filters.extend(volatile_filters);

// Compute scan projection that includes columns needed for post-scan filters
let user_projection_len = projection.as_ref().map(|p| p.len());
let (scan_projection, needs_final_projection) = self
.compute_scan_projection_with_filters(
projection,
&post_scan_filters,
source_schema.as_ref(),
)?;

// Limit can only be pushed if no post-filtering needed
let has_inexact =
supported.contains(&TableProviderFilterPushDown::Inexact);
let scan_limit = if has_inexact || !post_scan_filters.is_empty() {
None
} else {
*fetch
};

// Create the scan
let opts = ScanArgs::default()
.with_projection(projection.as_deref())
.with_filters(Some(&filters_vec))
.with_limit(*fetch);
let res = source.scan_with_args(session_state, opts).await?;
Arc::clone(res.plan())
.with_projection(scan_projection.as_deref())
.with_filters(if pushable_filters.is_empty() {
None
} else {
Some(&pushable_filters)
})
.with_limit(scan_limit);
let scan_result = provider.scan_with_args(session_state, opts).await?;
let mut plan: Arc<dyn ExecutionPlan> = Arc::clone(scan_result.plan());

// Wrap with FilterExec if needed
if !post_scan_filters.is_empty()
&& let Some(filter_expr) = conjunction(post_scan_filters.clone())
{
let scan_df_schema =
DFSchema::try_from(plan.schema().as_ref().clone())?;
let num_scan_columns = plan.schema().fields().len();
plan = self.create_filter_exec(
&filter_expr,
plan,
&scan_df_schema,
session_state,
num_scan_columns,
)?;
}

// If we expanded the projection for filter columns, project back to
// only the columns the user requested
if needs_final_projection && let Some(orig_len) = user_projection_len {
let schema = plan.schema();
let proj_exprs: Vec<ProjectionExpr> = (0..orig_len)
.map(|i| {
let field = schema.field(i);
let col_expr: Arc<dyn PhysicalExpr> = Arc::new(
datafusion_physical_expr::expressions::Column::new(
field.name(),
i,
),
);
ProjectionExpr {
expr: col_expr,
alias: field.name().to_string(),
}
})
.collect();
plan = Arc::new(ProjectionExec::try_new(proj_exprs, plan)?);
}

// Apply limit if not pushed to scan
if let Some(limit) = *fetch
&& scan_limit.is_none()
{
plan = Arc::new(GlobalLimitExec::new(plan, 0, Some(limit)));
}

plan
}
LogicalPlan::Values(Values { values, schema }) => {
let exprs = values
Expand Down Expand Up @@ -1959,7 +2074,7 @@ fn get_physical_expr_pair(
}

/// Extract filter predicates from a DML input plan (DELETE/UPDATE).
/// Walks the logical plan tree and collects Filter predicates,
/// Walks the logical plan tree and collects Filter predicates and TableScan filters,
/// splitting AND conjunctions into individual expressions.
/// Column qualifiers are stripped so expressions can be evaluated against
/// the TableProvider's schema.
Expand All @@ -1968,9 +2083,17 @@ fn extract_dml_filters(input: &Arc<LogicalPlan>) -> Result<Vec<Expr>> {
let mut filters = Vec::new();

input.apply(|node| {
if let LogicalPlan::Filter(filter) = node {
// Split AND predicates into individual expressions
filters.extend(split_conjunction(&filter.predicate).into_iter().cloned());
match node {
LogicalPlan::Filter(filter) => {
// Split AND predicates into individual expressions
filters.extend(split_conjunction(&filter.predicate).into_iter().cloned());
}
LogicalPlan::TableScan(scan) => {
// Also extract filters from TableScan (now that all filters are
// pushed to TableScan.filters during logical optimization)
filters.extend(scan.filters.iter().cloned());
}
_ => {}
}
Ok(TreeNodeRecursion::Continue)
})?;
Expand Down Expand Up @@ -2679,6 +2802,109 @@ impl DefaultPhysicalPlanner {
}
}

/// Compute column indices needed for the scan, including columns from
/// projection AND post-scan filters.
///
/// Returns (scan_projection, needs_final_projection) where:
/// - scan_projection: The projection to pass to the scan
/// - needs_final_projection: true if we added extra columns for filtering
/// that need to be projected away afterwards
fn compute_scan_projection_with_filters(
&self,
projection: &Option<Vec<usize>>,
post_filters: &[Expr],
source_schema: &Schema,
) -> Result<(Option<Vec<usize>>, bool)> {
use datafusion_expr::utils::expr_to_columns;
use std::collections::HashSet;

// If no projection, we use all columns anyway
if projection.is_none() {
return Ok((None, false));
}

// If no post-filters, just use the original projection
if post_filters.is_empty() {
return Ok((projection.clone(), false));
}

// Collect columns referenced by post-scan filters
let mut filter_columns = HashSet::new();
for filter in post_filters {
expr_to_columns(filter, &mut filter_columns)?;
}

// Get filter column indices in the source schema
let filter_indices: Vec<usize> = filter_columns
.iter()
.filter_map(|col| source_schema.index_of(col.name()).ok())
.collect();

let proj_indices = projection.as_ref().unwrap();

// Check if all filter columns are already in the projection
let proj_set: HashSet<usize> = proj_indices.iter().copied().collect();
let missing_indices: Vec<usize> = filter_indices
.iter()
.filter(|idx| !proj_set.contains(idx))
.copied()
.collect();

if missing_indices.is_empty() {
// All filter columns are in the projection, no expansion needed
return Ok((projection.clone(), false));
}

// Build expanded projection: original projection + missing filter columns
// We keep the original projection columns first (in their original order)
// then append the missing filter columns
let mut expanded: Vec<usize> = proj_indices.clone();
expanded.extend(missing_indices);

Ok((Some(expanded), true))
}

/// Create a FilterExec that handles async UDFs properly.
fn create_filter_exec(
&self,
predicate: &Expr,
input: Arc<dyn ExecutionPlan>,
input_dfschema: &DFSchema,
session_state: &SessionState,
num_input_columns: usize,
) -> Result<Arc<dyn ExecutionPlan>> {
let runtime_expr =
self.create_physical_expr(predicate, input_dfschema, session_state)?;
let input_schema = input.schema();

let filter = match self.try_plan_async_exprs(
num_input_columns,
PlannedExprResult::Expr(vec![runtime_expr]),
input_schema.as_ref(),
)? {
PlanAsyncExpr::Sync(PlannedExprResult::Expr(exprs)) => {
FilterExecBuilder::new(Arc::clone(&exprs[0]), input)
.with_batch_size(session_state.config().batch_size())
.build()?
}
PlanAsyncExpr::Async(async_map, PlannedExprResult::Expr(exprs)) => {
let async_exec = AsyncFuncExec::try_new(async_map.async_exprs, input)?;
FilterExecBuilder::new(Arc::clone(&exprs[0]), Arc::new(async_exec))
.apply_projection(Some((0..num_input_columns).collect()))?
.with_batch_size(session_state.config().batch_size())
.build()?
}
_ => return internal_err!("Unexpected result from try_plan_async_exprs"),
};

let selectivity = session_state
.config()
.options()
.optimizer
.default_filter_selectivity;
Ok(Arc::new(filter.with_default_selectivity(selectivity)?))
}

fn try_plan_async_exprs(
&self,
num_input_columns: usize,
Expand Down
Loading
Loading