diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs similarity index 99% rename from datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs rename to datafusion/core/tests/physical_optimizer/filter_pushdown.rs index bb9e03c837c1e..3a0015068567b 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -64,15 +64,13 @@ use datafusion_physical_plan::{ sorts::sort::SortExec, }; +use super::pushdown_utils::{ + OptimizationTest, TestNode, TestScanBuilder, TestSource, format_plan_for_test, +}; use datafusion_physical_plan::union::UnionExec; use futures::StreamExt; use object_store::{ObjectStore, memory::InMemory}; use regex::Regex; -use util::{OptimizationTest, TestNode, TestScanBuilder, format_plan_for_test}; - -use crate::physical_optimizer::filter_pushdown::util::TestSource; - -mod util; #[test] fn test_pushdown_into_scan() { diff --git a/datafusion/core/tests/physical_optimizer/mod.rs b/datafusion/core/tests/physical_optimizer/mod.rs index d11322cd26be9..cf179cb727cf1 100644 --- a/datafusion/core/tests/physical_optimizer/mod.rs +++ b/datafusion/core/tests/physical_optimizer/mod.rs @@ -24,7 +24,6 @@ mod combine_partial_final_agg; mod enforce_distribution; mod enforce_sorting; mod enforce_sorting_monotonicity; -#[expect(clippy::needless_pass_by_value)] mod filter_pushdown; mod join_selection; #[expect(clippy::needless_pass_by_value)] @@ -38,3 +37,5 @@ mod sanity_checker; #[expect(clippy::needless_pass_by_value)] mod test_utils; mod window_optimize; + +mod pushdown_utils; diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/pushdown_utils.rs similarity index 92% rename from datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs rename to datafusion/core/tests/physical_optimizer/pushdown_utils.rs index 1afdc4823f0a4..524d33ae6edb6 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/pushdown_utils.rs @@ -24,6 +24,7 @@ use datafusion_datasource::{ file_scan_config::FileScanConfigBuilder, file_stream::FileOpenFuture, file_stream::FileOpener, source::DataSourceExec, }; +use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::filter::batch_filter; @@ -50,7 +51,7 @@ use std::{ pub struct TestOpener { batches: Vec, batch_size: Option, - projection: Option>, + projection: Option, predicate: Option>, } @@ -60,6 +61,7 @@ impl FileOpener for TestOpener { if self.batches.is_empty() { return Ok((async { Ok(TestStream::new(vec![]).boxed()) }).boxed()); } + let schema = self.batches[0].schema(); if let Some(batch_size) = self.batch_size { let batch = concat_batches(&batches[0].schema(), &batches)?; let mut new_batches = Vec::new(); @@ -83,9 +85,10 @@ impl FileOpener for TestOpener { batches = new_batches; if let Some(projection) = &self.projection { + let projector = projection.make_projector(&schema)?; batches = batches .into_iter() - .map(|batch| batch.project(projection).unwrap()) + .map(|batch| projector.project_batch(&batch).unwrap()) .collect(); } @@ -103,14 +106,13 @@ pub struct TestSource { batch_size: Option, batches: Vec, metrics: ExecutionPlanMetricsSet, - projection: Option>, + projection: Option, table_schema: datafusion_datasource::TableSchema, } impl TestSource { pub fn new(schema: SchemaRef, support: bool, batches: Vec) -> Self { - let table_schema = - datafusion_datasource::TableSchema::new(Arc::clone(&schema), vec![]); + let table_schema = datafusion_datasource::TableSchema::new(schema, vec![]); Self { support, metrics: ExecutionPlanMetricsSet::new(), @@ -210,6 +212,30 @@ impl FileSource for TestSource { } } + fn try_pushdown_projection( + &self, + projection: &ProjectionExprs, + ) -> Result>> { + if let Some(existing_projection) = &self.projection { + // Combine existing projection with new projection + let combined_projection = existing_projection.try_merge(projection)?; + Ok(Some(Arc::new(TestSource { + projection: Some(combined_projection), + table_schema: self.table_schema.clone(), + ..self.clone() + }))) + } else { + Ok(Some(Arc::new(TestSource { + projection: Some(projection.clone()), + ..self.clone() + }))) + } + } + + fn projection(&self) -> Option<&ProjectionExprs> { + self.projection.as_ref() + } + fn table_schema(&self) -> &datafusion_datasource::TableSchema { &self.table_schema } @@ -332,6 +358,7 @@ pub struct OptimizationTest { } impl OptimizationTest { + #[expect(clippy::needless_pass_by_value)] pub fn new( input_plan: Arc, opt: O,