From 6954497600e42523c493d6ca3a18ea8ae28d3a92 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Sat, 3 Jan 2026 13:57:36 +0530 Subject: [PATCH 01/14] fix(accumulators): preserve state in evaluate() for window frame queries This commit fixes issue #19612 where accumulators that don't implement retract_batch exhibit buggy behavior in window frame queries. ## Problem When aggregate functions are used with window frames like `ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW`, DataFusion uses PlainAggregateWindowExpr which calls evaluate() multiple times on the same accumulator instance. Accumulators that use std::mem::take() in their evaluate() method consume their internal state, causing incorrect results on subsequent calls. ## Solution 1. **percentile_cont**: Modified evaluate() to use mutable reference instead of consuming the Vec. Added retract_batch() support for both PercentileContAccumulator and DistinctPercentileContAccumulator. 2. **string_agg**: Changed SimpleStringAggAccumulator::evaluate() to clone the accumulated string instead of taking it. ## Changes - datafusion/functions-aggregate/src/percentile_cont.rs: - Changed calculate_percentile() to take &mut [T::Native] instead of Vec - Updated PercentileContAccumulator::evaluate() to pass reference - Updated DistinctPercentileContAccumulator::evaluate() to clone values - Added retract_batch() implementation using HashMap for efficient removal - Updated PercentileContGroupsAccumulator::evaluate() for consistency - datafusion/functions-aggregate/src/string_agg.rs: - Changed evaluate() to use clone() instead of std::mem::take() - datafusion/sqllogictest/test_files/aggregate.slt: - Added test cases for percentile_cont with window frames - Added test comparing median() vs percentile_cont(0.5) behavior - Added test for string_agg cumulative window frame - docs/source/library-user-guide/functions/adding-udfs.md: - Added documentation about window-compatible accumulators - Explained evaluate() state preservation requirements - Documented retract_batch() implementation guidance Closes #19612 --- .../src/percentile_cont.rs | 88 ++++++++++-- .../functions-aggregate/src/string_agg.rs | 11 +- .../sqllogictest/test_files/aggregate.slt | 134 ++++++++++++++++++ .../functions/adding-udfs.md | 65 +++++++++ 4 files changed, 280 insertions(+), 18 deletions(-) diff --git a/datafusion/functions-aggregate/src/percentile_cont.rs b/datafusion/functions-aggregate/src/percentile_cont.rs index d6c8eabb459e6..b857d9a4217ca 100644 --- a/datafusion/functions-aggregate/src/percentile_cont.rs +++ b/datafusion/functions-aggregate/src/percentile_cont.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::mem::{size_of, size_of_val}; use std::sync::Arc; @@ -52,7 +53,7 @@ use datafusion_expr::{ }; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::accumulate; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::filtered_null_mask; -use datafusion_functions_aggregate_common::utils::GenericDistinctBuffer; +use datafusion_functions_aggregate_common::utils::{GenericDistinctBuffer, Hashable}; use datafusion_macros::user_doc; use crate::utils::validate_percentile_expr; @@ -533,14 +534,57 @@ impl Accumulator for PercentileContAccumulator { } fn evaluate(&mut self) -> Result { - let d = std::mem::take(&mut self.all_values); - let value = calculate_percentile::(d, self.percentile); + let value = calculate_percentile::(&mut self.all_values, self.percentile); ScalarValue::new_primitive::(value, &self.data_type) } fn size(&self) -> usize { size_of_val(self) + self.all_values.capacity() * size_of::() } + + fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + // Cast to target type if needed (e.g., integer to Float64) + let values = if values[0].data_type() != &self.data_type { + arrow::compute::cast(&values[0], &self.data_type)? + } else { + Arc::clone(&values[0]) + }; + + let mut to_remove: HashMap = HashMap::new(); + for i in 0..values.len() { + let v = ScalarValue::try_from_array(&values, i)?; + if !v.is_null() { + *to_remove.entry(v).or_default() += 1; + } + } + + let mut i = 0; + while i < self.all_values.len() { + let k = ScalarValue::new_primitive::( + Some(self.all_values[i]), + &self.data_type, + )?; + if let Some(count) = to_remove.get_mut(&k) + && *count > 0 + { + self.all_values.swap_remove(i); + *count -= 1; + if *count == 0 { + to_remove.remove(&k); + if to_remove.is_empty() { + break; + } + } + } else { + i += 1; + } + } + Ok(()) + } + + fn supports_retract_batch(&self) -> bool { + true + } } /// The percentile_cont groups accumulator accumulates the raw input values @@ -665,13 +709,13 @@ impl GroupsAccumulator fn evaluate(&mut self, emit_to: EmitTo) -> Result { // Emit values - let emit_group_values = emit_to.take_needed(&mut self.group_values); + let mut emit_group_values = emit_to.take_needed(&mut self.group_values); // Calculate percentile for each group let mut evaluate_result_builder = PrimitiveBuilder::::new().with_data_type(self.data_type.clone()); - for values in emit_group_values { - let value = calculate_percentile::(values, self.percentile); + for values in &mut emit_group_values { + let value = calculate_percentile::(values.as_mut_slice(), self.percentile); evaluate_result_builder.append_option(value); } @@ -768,17 +812,35 @@ impl Accumulator for DistinctPercentileContAccumula } fn evaluate(&mut self) -> Result { - let d = std::mem::take(&mut self.distinct_values.values) - .into_iter() + let mut values: Vec = self + .distinct_values + .values + .iter() .map(|v| v.0) - .collect::>(); - let value = calculate_percentile::(d, self.percentile); + .collect(); + let value = calculate_percentile::(&mut values, self.percentile); ScalarValue::new_primitive::(value, &self.data_type) } fn size(&self) -> usize { size_of_val(self) + self.distinct_values.size() } + + fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + if values.is_empty() { + return Ok(()); + } + + let arr = values[0].as_primitive::(); + for value in arr.iter().flatten() { + self.distinct_values.values.remove(&Hashable(value)); + } + Ok(()) + } + + fn supports_retract_batch(&self) -> bool { + true + } } /// Calculate the percentile value for a given set of values. @@ -788,8 +850,12 @@ impl Accumulator for DistinctPercentileContAccumula /// For percentile p and n values: /// - If p * (n-1) is an integer, return the value at that position /// - Otherwise, interpolate between the two closest values +/// +/// Note: This function takes a mutable slice and sorts it in place, but does not +/// consume the data. This is important for window frame queries where evaluate() +/// may be called multiple times on the same accumulator state. fn calculate_percentile( - mut values: Vec, + values: &mut [T::Native], percentile: f64, ) -> Option { let cmp = |x: &T::Native, y: &T::Native| x.compare(*y); diff --git a/datafusion/functions-aggregate/src/string_agg.rs b/datafusion/functions-aggregate/src/string_agg.rs index 77e9f60afd3cf..468b50e86a959 100644 --- a/datafusion/functions-aggregate/src/string_agg.rs +++ b/datafusion/functions-aggregate/src/string_agg.rs @@ -384,14 +384,11 @@ impl Accumulator for SimpleStringAggAccumulator { } fn evaluate(&mut self) -> Result { - let result = if self.has_value { - ScalarValue::LargeUtf8(Some(std::mem::take(&mut self.accumulated_string))) + if self.has_value { + Ok(ScalarValue::LargeUtf8(Some(self.accumulated_string.clone()))) } else { - ScalarValue::LargeUtf8(None) - }; - - self.has_value = false; - Ok(result) + Ok(ScalarValue::LargeUtf8(None)) + } } fn size(&self) -> usize { diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 2a4daeb92979d..796ce1cdbdf61 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -8241,3 +8241,137 @@ NULL NULL NULL NULL statement ok drop table distinct_avg; + +########### +# Issue #19612: Test that percentile_cont and median produce identical results +# in window frame queries with ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW. +# Previously percentile_cont consumed its internal state during evaluate(), +# causing incorrect results when called multiple times in window queries. +########### + +# Test percentile_cont window frame behavior (fix for issue #19612) +statement ok +CREATE TABLE percentile_window_test ( + timestamp INT, + tags VARCHAR, + value DOUBLE +); + +statement ok +INSERT INTO percentile_window_test (timestamp, tags, value) VALUES +(1, 'tag1', 10.0), +(2, 'tag1', 20.0), +(3, 'tag1', 30.0), +(4, 'tag1', 40.0), +(5, 'tag1', 50.0), +(1, 'tag2', 60.0), +(2, 'tag2', 70.0), +(3, 'tag2', 80.0), +(4, 'tag2', 90.0), +(5, 'tag2', 100.0); + +# Test that median and percentile_cont(0.5) produce the same results +# with ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW frame. +# Both functions should maintain state correctly across multiple evaluate() calls. +query ITRRR +SELECT + timestamp, + tags, + value, + median(value) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS value_median, + percentile_cont(value, 0.5) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS value_percentile_50 +FROM percentile_window_test +ORDER BY tags, timestamp; +---- +1 tag1 10 10 10 +2 tag1 20 15 15 +3 tag1 30 20 20 +4 tag1 40 25 25 +5 tag1 50 30 30 +1 tag2 60 60 60 +2 tag2 70 65 65 +3 tag2 80 70 70 +4 tag2 90 75 75 +5 tag2 100 80 80 + +# Test percentile_cont with different percentile values +query ITRRR +SELECT + timestamp, + tags, + value, + percentile_cont(value, 0.25) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS p25, + percentile_cont(value, 0.75) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS p75 +FROM percentile_window_test +ORDER BY tags, timestamp; +---- +1 tag1 10 10 10 +2 tag1 20 12.5 17.5 +3 tag1 30 15 25 +4 tag1 40 17.5 32.5 +5 tag1 50 20 40 +1 tag2 60 60 60 +2 tag2 70 62.5 67.5 +3 tag2 80 65 75 +4 tag2 90 67.5 82.5 +5 tag2 100 70 90 + +statement ok +DROP TABLE percentile_window_test; + +# Test string_agg window frame behavior (fix for issue #19612) +statement ok +CREATE TABLE string_agg_window_test ( + id INT, + grp VARCHAR, + val VARCHAR +); + +statement ok +INSERT INTO string_agg_window_test (id, grp, val) VALUES +(1, 'A', 'a'), +(2, 'A', 'b'), +(3, 'A', 'c'), +(1, 'B', 'x'), +(2, 'B', 'y'), +(3, 'B', 'z'); + +# Test string_agg with ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW +# The function should maintain state correctly across multiple evaluate() calls +query ITT +SELECT + id, + grp, + string_agg(val, ',') OVER ( + PARTITION BY grp + ORDER BY id + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS cumulative_string +FROM string_agg_window_test +ORDER BY grp, id; +---- +1 A a +2 A a,b +3 A a,b,c +1 B x +2 B x,y +3 B x,y,z + +statement ok +DROP TABLE string_agg_window_test; diff --git a/docs/source/library-user-guide/functions/adding-udfs.md b/docs/source/library-user-guide/functions/adding-udfs.md index 5d033ae3f9e97..0c16a1fdf9314 100644 --- a/docs/source/library-user-guide/functions/adding-udfs.md +++ b/docs/source/library-user-guide/functions/adding-udfs.md @@ -1350,6 +1350,71 @@ async fn main() -> Result<()> { [`create_udaf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/fn.create_udaf.html [`advanced_udaf.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/udf/advanced_udaf.rs +### Window Frame Compatible Accumulators + +When an aggregate function is used in a window context with a sliding frame (e.g., `ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW`), +DataFusion may call `evaluate()` multiple times on the same accumulator instance to compute results for each row in the window. +This has important implications for how you implement your accumulator: + +#### The `evaluate()` Method Must Not Consume State + +The `evaluate()` method should return the current aggregate value **without modifying or consuming the accumulator's internal state**. +This is critical because: + +1. **Multiple evaluations**: For window queries, `evaluate()` is called once per row in the partition +2. **State preservation**: The internal state must remain intact for subsequent `evaluate()` calls + +**Incorrect implementation** (consumes state): + +```rust +fn evaluate(&mut self) -> Result { + // BAD: std::mem::take() consumes the values, leaving an empty Vec + let values = std::mem::take(&mut self.values); + // After this call, self.values is empty and subsequent + // evaluate() calls will return incorrect results + calculate_result(values) +} +``` + +**Correct implementation** (preserves state): + +```rust +fn evaluate(&mut self) -> Result { + // GOOD: Use a reference or clone to preserve state + calculate_result(&mut self.values) + // Or: calculate_result(self.values.clone()) +} +``` + +#### Implementing `retract_batch` for Sliding Windows + +For more efficient sliding window calculations, you can implement the `retract_batch` method. +This allows DataFusion to remove values that have "left" the window frame instead of recalculating from scratch: + +```rust +impl Accumulator for MyAccumulator { + fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + // Remove the given values from the accumulator state + // This is the inverse of update_batch + for value in values[0].iter().flatten() { + self.remove_value(value); + } + Ok(()) + } + + fn supports_retract_batch(&self) -> bool { + true // Enable this optimization + } +} +``` + +If your accumulator does not support `retract_batch` (returns `false` from `supports_retract_batch()`), +DataFusion will use `PlainAggregateWindowExpr` which calls `evaluate()` multiple times on the same +accumulator. In this case, it is **essential** that your `evaluate()` method does not consume the +accumulator's state. + +See [issue #19612](https://github.com/apache/datafusion/issues/19612) for more details on this behavior. + ## Adding a Table UDF A User-Defined Table Function (UDTF) is a function that takes parameters and returns a `TableProvider`. From 2caf1419b54282727c6131ab78d240e462759636 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Sat, 3 Jan 2026 14:20:13 +0530 Subject: [PATCH 02/14] feat(physical-plan): add FilterExecBuilder for efficient construction Adds FilterExecBuilder pattern with fluent API Allows setting projection, selectivity, batch_size, fetch in one build Refactors try_new to use builder internally (reduces duplication) Ensures compute_properties executes only once Fixes #19608 --- datafusion/physical-plan/src/filter.rs | 355 +++++++++++++++++++++++-- 1 file changed, 329 insertions(+), 26 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 674fe6692adf5..6d986ebbf2ea2 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -93,37 +93,106 @@ pub struct FilterExec { fetch: Option, } +/// Builder for [`FilterExec`] to set optional parameters +pub struct FilterExecBuilder { + predicate: Arc, + input: Arc, + projection: Option>, + default_selectivity: u8, + batch_size: usize, + fetch: Option, +} + +impl FilterExecBuilder { + /// Create a new builder with required parameters (predicate and input) + pub fn new(predicate: Arc, input: Arc) -> Self { + Self { + predicate, + input, + projection: None, + default_selectivity: FILTER_EXEC_DEFAULT_SELECTIVITY, + batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE, + fetch: None, + } + } + + /// Set the projection + pub fn with_projection(mut self, projection: Option>) -> Self { + self.projection = projection; + self + } + + /// Set the default selectivity + pub fn with_default_selectivity(mut self, default_selectivity: u8) -> Self { + self.default_selectivity = default_selectivity; + self + } + + /// Set the batch size + pub fn with_batch_size(mut self, batch_size: usize) -> Self { + self.batch_size = batch_size; + self + } + + /// Set the fetch limit + pub fn with_fetch(mut self, fetch: Option) -> Self { + self.fetch = fetch; + self + } + + /// Build the FilterExec, computing properties once with all configured parameters + pub fn build(self) -> Result { + // Validate predicate type + match self.predicate.data_type(self.input.schema().as_ref())? { + DataType::Boolean => {} + other => { + return plan_err!( + "Filter predicate must return BOOLEAN values, got {other:?}" + ) + } + } + + // Validate selectivity + if self.default_selectivity > 100 { + return plan_err!( + "Default filter selectivity value needs to be less than or equal to 100" + ); + } + + // Validate projection if provided + if let Some(ref proj) = self.projection { + can_project(&self.input.schema(), Some(proj))?; + } + + // Compute properties once with all parameters + let cache = FilterExec::compute_properties( + &self.input, + &self.predicate, + self.default_selectivity, + self.projection.as_ref(), + )?; + + Ok(FilterExec { + predicate: self.predicate, + input: self.input, + metrics: ExecutionPlanMetricsSet::new(), + default_selectivity: self.default_selectivity, + cache, + projection: self.projection, + batch_size: self.batch_size, + fetch: self.fetch, + }) + } +} + impl FilterExec { - /// Create a FilterExec on an input + /// Create a FilterExec on an input using the builder pattern #[expect(clippy::needless_pass_by_value)] pub fn try_new( predicate: Arc, input: Arc, ) -> Result { - match predicate.data_type(input.schema().as_ref())? { - DataType::Boolean => { - let default_selectivity = FILTER_EXEC_DEFAULT_SELECTIVITY; - let cache = Self::compute_properties( - &input, - &predicate, - default_selectivity, - None, - )?; - Ok(Self { - predicate, - input: Arc::clone(&input), - metrics: ExecutionPlanMetricsSet::new(), - default_selectivity, - cache, - projection: None, - batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE, - fetch: None, - }) - } - other => { - plan_err!("Filter predicate must return BOOLEAN values, got {other:?}") - } - } + FilterExecBuilder::new(predicate, input).build() } pub fn with_default_selectivity( @@ -1586,4 +1655,238 @@ mod tests { Ok(()) } -} + + #[tokio::test] + async fn test_builder_with_projection() -> Result<()> { + // Create a schema with multiple columns + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ])); + + let input = Arc::new(EmptyExec::new(Arc::clone(&schema))); + + // Create a filter predicate: a > 10 + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(10)))), + )); + + // Create filter with projection [0, 2] (columns a and c) using builder + let projection = Some(vec![0, 2]); + let filter = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)) + .with_projection(projection.clone()) + .build()?; + + // Verify projection is set correctly + assert_eq!(filter.projection(), Some(&vec![0, 2])); + + // Verify schema contains only projected columns + let output_schema = filter.schema(); + assert_eq!(output_schema.fields().len(), 2); + assert_eq!(output_schema.field(0).name(), "a"); + assert_eq!(output_schema.field(1).name(), "c"); + + Ok(()) + } + + #[tokio::test] + async fn test_builder_without_projection() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + + let input = Arc::new(EmptyExec::new(Arc::clone(&schema))); + + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(5)))), + )); + + // Create filter without projection using builder + let filter = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)) + .build()?; + + // Verify no projection is set + assert_eq!(filter.projection(), None); + + // Verify schema contains all columns + let output_schema = filter.schema(); + assert_eq!(output_schema.fields().len(), 2); + + Ok(()) + } + + #[tokio::test] + async fn test_builder_invalid_projection() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + + let input = Arc::new(EmptyExec::new(Arc::clone(&schema))); + + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(5)))), + )); + + // Try to create filter with invalid projection (index out of bounds) using builder + let result = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)) + .with_projection(Some(vec![0, 5])) // 5 is out of bounds + .build(); + + // Should return an error + assert!(result.is_err()); + + Ok(()) + } + + #[tokio::test] + async fn test_builder_vs_with_projection() -> Result<()> { + // This test verifies that the builder with projection produces the same result + // as try_new().with_projection(), but more efficiently (one compute_properties call) + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + Field::new("d", DataType::Int32, false), + ])); + + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(1000), + total_byte_size: Precision::Inexact(4000), + column_statistics: vec![ + ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + max_value: Precision::Inexact(ScalarValue::Int32(Some(100))), + ..Default::default() + }, + ColumnStatistics { + ..Default::default() + }, + ColumnStatistics { + ..Default::default() + }, + ColumnStatistics { + ..Default::default() + }, + ], + }, + Arc::clone(&schema), + )); + + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Lt, + Arc::new(Literal::new(ScalarValue::Int32(Some(50)))), + )); + + let projection = Some(vec![0, 2]); + + // Method 1: Builder with projection (one call to compute_properties) + let filter1 = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)) + .with_projection(projection.clone()) + .build()?; + + // Method 2: try_new().with_projection() (two calls to compute_properties) + let filter2 = FilterExec::try_new(Arc::clone(&predicate), Arc::clone(&input))? + .with_projection(projection)?; + + // Both methods should produce equivalent results + assert_eq!(filter1.schema(), filter2.schema()); + assert_eq!(filter1.projection(), filter2.projection()); + + // Verify statistics are the same + let stats1 = filter1.partition_statistics(None)?; + let stats2 = filter2.partition_statistics(None)?; + assert_eq!(stats1.num_rows, stats2.num_rows); + assert_eq!(stats1.total_byte_size, stats2.total_byte_size); + + Ok(()) + } + + #[tokio::test] + async fn test_builder_statistics_with_projection() -> Result<()> { + // Test that statistics are correctly computed when using builder with projection + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ])); + + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(1000), + total_byte_size: Precision::Inexact(12000), + column_statistics: vec![ + ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + max_value: Precision::Inexact(ScalarValue::Int32(Some(100))), + ..Default::default() + }, + ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int32(Some(10))), + max_value: Precision::Inexact(ScalarValue::Int32(Some(200))), + ..Default::default() + }, + ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int32(Some(5))), + max_value: Precision::Inexact(ScalarValue::Int32(Some(50))), + ..Default::default() + }, + ], + }, + Arc::clone(&schema), + )); + + // Filter: a < 50, Project: [0, 2] + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Lt, + Arc::new(Literal::new(ScalarValue::Int32(Some(50)))), + )); + + let filter = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)) + .with_projection(Some(vec![0, 2])) + .build()?; + + let statistics = filter.partition_statistics(None)?; + + // Verify statistics reflect both filtering and projection + assert!(matches!(statistics.num_rows, Precision::Inexact(_))); + + // Schema should only have 2 columns after projection + assert_eq!(filter.schema().fields().len(), 2); + + Ok(()) + } + + #[test] + fn test_builder_predicate_validation() -> Result<()> { + // Test that builder validates predicate type correctly + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + + let input = Arc::new(EmptyExec::new(Arc::clone(&schema))); + + // Create a predicate that doesn't return boolean (returns Int32) + let invalid_predicate = Arc::new(Column::new("a", 0)); + + // Should fail because predicate doesn't return boolean + let result = FilterExecBuilder::new(invalid_predicate, Arc::clone(&input)) + .with_projection(Some(vec![0])) + .build(); + + assert!(result.is_err()); + + Ok(()) + } From bf8ea991bf655aa9e266a862eebda09884ab8c07 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Sat, 3 Jan 2026 17:35:23 +0530 Subject: [PATCH 03/14] fix: add missing closing brace for tests module --- datafusion/physical-plan/src/filter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 6d986ebbf2ea2..33d06d8ef3098 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -1746,7 +1746,7 @@ mod tests { Ok(()) } - +} #[tokio::test] async fn test_builder_vs_with_projection() -> Result<()> { // This test verifies that the builder with projection produces the same result From 4823862cbb3de14067e034e6ecf10cabad7e84b2 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Sat, 3 Jan 2026 18:04:56 +0530 Subject: [PATCH 04/14] fix: update tests to use FilterExecBuilder pattern correctly --- datafusion/physical-plan/src/filter.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 33d06d8ef3098..c45bd964d195e 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -1676,7 +1676,7 @@ mod tests { // Create filter with projection [0, 2] (columns a and c) using builder let projection = Some(vec![0, 2]); - let filter = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)) + let filter = FilterExecBuilder::new(predicate, input) .with_projection(projection.clone()) .build()?; @@ -1708,7 +1708,7 @@ mod tests { )); // Create filter without projection using builder - let filter = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)) + let filter = FilterExecBuilder::new(predicate, input) .build()?; // Verify no projection is set @@ -1737,7 +1737,7 @@ mod tests { )); // Try to create filter with invalid projection (index out of bounds) using builder - let result = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)) + let result = FilterExecBuilder::new(predicate, input) .with_projection(Some(vec![0, 5])) // 5 is out of bounds .build(); @@ -1791,12 +1791,12 @@ mod tests { let projection = Some(vec![0, 2]); // Method 1: Builder with projection (one call to compute_properties) - let filter1 = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)) + let filter1 = FilterExecBuilder::new(predicate.clone(), input.clone()) .with_projection(projection.clone()) .build()?; // Method 2: try_new().with_projection() (two calls to compute_properties) - let filter2 = FilterExec::try_new(Arc::clone(&predicate), Arc::clone(&input))? + let filter2 = FilterExec::try_new(predicate, input)? .with_projection(projection)?; // Both methods should produce equivalent results @@ -1853,7 +1853,7 @@ mod tests { Arc::new(Literal::new(ScalarValue::Int32(Some(50)))), )); - let filter = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)) + let filter = FilterExecBuilder::new(predicate, input) .with_projection(Some(vec![0, 2])) .build()?; @@ -1882,7 +1882,7 @@ mod tests { let invalid_predicate = Arc::new(Column::new("a", 0)); // Should fail because predicate doesn't return boolean - let result = FilterExecBuilder::new(invalid_predicate, Arc::clone(&input)) + let result = FilterExecBuilder::new(invalid_predicate, input) .with_projection(Some(vec![0])) .build(); @@ -1890,3 +1890,4 @@ mod tests { Ok(()) } +} From b393b38b83d3f2b99e3c8bbb4a30aa582096f05e Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Sat, 3 Jan 2026 18:19:19 +0530 Subject: [PATCH 05/14] fix(physical-plan): compute filter statistics using input schema --- datafusion/physical-plan/src/filter.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index c45bd964d195e..5e7690294f753 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -516,9 +516,8 @@ impl ExecutionPlan for FilterExec { fn partition_statistics(&self, partition: Option) -> Result { let input_stats = self.input.partition_statistics(partition)?; - let schema = self.schema(); let stats = Self::statistics_helper( - &schema, + &self.input.schema(), input_stats, self.predicate(), self.default_selectivity, @@ -1746,17 +1745,17 @@ mod tests { Ok(()) } -} + #[tokio::test] async fn test_builder_vs_with_projection() -> Result<()> { // This test verifies that the builder with projection produces the same result // as try_new().with_projection(), but more efficiently (one compute_properties call) - let schema = Arc::new(Schema::new(vec![ + let schema = Schema::new(vec![ Field::new("a", DataType::Int32, false), Field::new("b", DataType::Int32, false), Field::new("c", DataType::Int32, false), Field::new("d", DataType::Int32, false), - ])); + ]); let input = Arc::new(StatisticsExec::new( Statistics { @@ -1779,7 +1778,7 @@ mod tests { }, ], }, - Arc::clone(&schema), + schema, )); let predicate = Arc::new(BinaryExpr::new( @@ -1815,11 +1814,11 @@ mod tests { #[tokio::test] async fn test_builder_statistics_with_projection() -> Result<()> { // Test that statistics are correctly computed when using builder with projection - let schema = Arc::new(Schema::new(vec![ + let schema = Schema::new(vec![ Field::new("a", DataType::Int32, false), Field::new("b", DataType::Int32, false), Field::new("c", DataType::Int32, false), - ])); + ]); let input = Arc::new(StatisticsExec::new( Statistics { @@ -1843,7 +1842,7 @@ mod tests { }, ], }, - Arc::clone(&schema), + schema, )); // Filter: a < 50, Project: [0, 2] From 3dfc82757c2e4f70436236bb0236f8f99ea45278 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Sat, 3 Jan 2026 21:07:32 +0530 Subject: [PATCH 06/14] chore: remove unrelated changes from PR --- .../src/percentile_cont.rs | 88 ++---------- .../functions-aggregate/src/string_agg.rs | 11 +- .../sqllogictest/test_files/aggregate.slt | 134 ------------------ .../functions/adding-udfs.md | 65 --------- 4 files changed, 18 insertions(+), 280 deletions(-) diff --git a/datafusion/functions-aggregate/src/percentile_cont.rs b/datafusion/functions-aggregate/src/percentile_cont.rs index b857d9a4217ca..d6c8eabb459e6 100644 --- a/datafusion/functions-aggregate/src/percentile_cont.rs +++ b/datafusion/functions-aggregate/src/percentile_cont.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::mem::{size_of, size_of_val}; use std::sync::Arc; @@ -53,7 +52,7 @@ use datafusion_expr::{ }; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::accumulate; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::filtered_null_mask; -use datafusion_functions_aggregate_common::utils::{GenericDistinctBuffer, Hashable}; +use datafusion_functions_aggregate_common::utils::GenericDistinctBuffer; use datafusion_macros::user_doc; use crate::utils::validate_percentile_expr; @@ -534,57 +533,14 @@ impl Accumulator for PercentileContAccumulator { } fn evaluate(&mut self) -> Result { - let value = calculate_percentile::(&mut self.all_values, self.percentile); + let d = std::mem::take(&mut self.all_values); + let value = calculate_percentile::(d, self.percentile); ScalarValue::new_primitive::(value, &self.data_type) } fn size(&self) -> usize { size_of_val(self) + self.all_values.capacity() * size_of::() } - - fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - // Cast to target type if needed (e.g., integer to Float64) - let values = if values[0].data_type() != &self.data_type { - arrow::compute::cast(&values[0], &self.data_type)? - } else { - Arc::clone(&values[0]) - }; - - let mut to_remove: HashMap = HashMap::new(); - for i in 0..values.len() { - let v = ScalarValue::try_from_array(&values, i)?; - if !v.is_null() { - *to_remove.entry(v).or_default() += 1; - } - } - - let mut i = 0; - while i < self.all_values.len() { - let k = ScalarValue::new_primitive::( - Some(self.all_values[i]), - &self.data_type, - )?; - if let Some(count) = to_remove.get_mut(&k) - && *count > 0 - { - self.all_values.swap_remove(i); - *count -= 1; - if *count == 0 { - to_remove.remove(&k); - if to_remove.is_empty() { - break; - } - } - } else { - i += 1; - } - } - Ok(()) - } - - fn supports_retract_batch(&self) -> bool { - true - } } /// The percentile_cont groups accumulator accumulates the raw input values @@ -709,13 +665,13 @@ impl GroupsAccumulator fn evaluate(&mut self, emit_to: EmitTo) -> Result { // Emit values - let mut emit_group_values = emit_to.take_needed(&mut self.group_values); + let emit_group_values = emit_to.take_needed(&mut self.group_values); // Calculate percentile for each group let mut evaluate_result_builder = PrimitiveBuilder::::new().with_data_type(self.data_type.clone()); - for values in &mut emit_group_values { - let value = calculate_percentile::(values.as_mut_slice(), self.percentile); + for values in emit_group_values { + let value = calculate_percentile::(values, self.percentile); evaluate_result_builder.append_option(value); } @@ -812,35 +768,17 @@ impl Accumulator for DistinctPercentileContAccumula } fn evaluate(&mut self) -> Result { - let mut values: Vec = self - .distinct_values - .values - .iter() + let d = std::mem::take(&mut self.distinct_values.values) + .into_iter() .map(|v| v.0) - .collect(); - let value = calculate_percentile::(&mut values, self.percentile); + .collect::>(); + let value = calculate_percentile::(d, self.percentile); ScalarValue::new_primitive::(value, &self.data_type) } fn size(&self) -> usize { size_of_val(self) + self.distinct_values.size() } - - fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - if values.is_empty() { - return Ok(()); - } - - let arr = values[0].as_primitive::(); - for value in arr.iter().flatten() { - self.distinct_values.values.remove(&Hashable(value)); - } - Ok(()) - } - - fn supports_retract_batch(&self) -> bool { - true - } } /// Calculate the percentile value for a given set of values. @@ -850,12 +788,8 @@ impl Accumulator for DistinctPercentileContAccumula /// For percentile p and n values: /// - If p * (n-1) is an integer, return the value at that position /// - Otherwise, interpolate between the two closest values -/// -/// Note: This function takes a mutable slice and sorts it in place, but does not -/// consume the data. This is important for window frame queries where evaluate() -/// may be called multiple times on the same accumulator state. fn calculate_percentile( - values: &mut [T::Native], + mut values: Vec, percentile: f64, ) -> Option { let cmp = |x: &T::Native, y: &T::Native| x.compare(*y); diff --git a/datafusion/functions-aggregate/src/string_agg.rs b/datafusion/functions-aggregate/src/string_agg.rs index 468b50e86a959..77e9f60afd3cf 100644 --- a/datafusion/functions-aggregate/src/string_agg.rs +++ b/datafusion/functions-aggregate/src/string_agg.rs @@ -384,11 +384,14 @@ impl Accumulator for SimpleStringAggAccumulator { } fn evaluate(&mut self) -> Result { - if self.has_value { - Ok(ScalarValue::LargeUtf8(Some(self.accumulated_string.clone()))) + let result = if self.has_value { + ScalarValue::LargeUtf8(Some(std::mem::take(&mut self.accumulated_string))) } else { - Ok(ScalarValue::LargeUtf8(None)) - } + ScalarValue::LargeUtf8(None) + }; + + self.has_value = false; + Ok(result) } fn size(&self) -> usize { diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 796ce1cdbdf61..2a4daeb92979d 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -8241,137 +8241,3 @@ NULL NULL NULL NULL statement ok drop table distinct_avg; - -########### -# Issue #19612: Test that percentile_cont and median produce identical results -# in window frame queries with ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW. -# Previously percentile_cont consumed its internal state during evaluate(), -# causing incorrect results when called multiple times in window queries. -########### - -# Test percentile_cont window frame behavior (fix for issue #19612) -statement ok -CREATE TABLE percentile_window_test ( - timestamp INT, - tags VARCHAR, - value DOUBLE -); - -statement ok -INSERT INTO percentile_window_test (timestamp, tags, value) VALUES -(1, 'tag1', 10.0), -(2, 'tag1', 20.0), -(3, 'tag1', 30.0), -(4, 'tag1', 40.0), -(5, 'tag1', 50.0), -(1, 'tag2', 60.0), -(2, 'tag2', 70.0), -(3, 'tag2', 80.0), -(4, 'tag2', 90.0), -(5, 'tag2', 100.0); - -# Test that median and percentile_cont(0.5) produce the same results -# with ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW frame. -# Both functions should maintain state correctly across multiple evaluate() calls. -query ITRRR -SELECT - timestamp, - tags, - value, - median(value) OVER ( - PARTITION BY tags - ORDER BY timestamp - ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW - ) AS value_median, - percentile_cont(value, 0.5) OVER ( - PARTITION BY tags - ORDER BY timestamp - ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW - ) AS value_percentile_50 -FROM percentile_window_test -ORDER BY tags, timestamp; ----- -1 tag1 10 10 10 -2 tag1 20 15 15 -3 tag1 30 20 20 -4 tag1 40 25 25 -5 tag1 50 30 30 -1 tag2 60 60 60 -2 tag2 70 65 65 -3 tag2 80 70 70 -4 tag2 90 75 75 -5 tag2 100 80 80 - -# Test percentile_cont with different percentile values -query ITRRR -SELECT - timestamp, - tags, - value, - percentile_cont(value, 0.25) OVER ( - PARTITION BY tags - ORDER BY timestamp - ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW - ) AS p25, - percentile_cont(value, 0.75) OVER ( - PARTITION BY tags - ORDER BY timestamp - ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW - ) AS p75 -FROM percentile_window_test -ORDER BY tags, timestamp; ----- -1 tag1 10 10 10 -2 tag1 20 12.5 17.5 -3 tag1 30 15 25 -4 tag1 40 17.5 32.5 -5 tag1 50 20 40 -1 tag2 60 60 60 -2 tag2 70 62.5 67.5 -3 tag2 80 65 75 -4 tag2 90 67.5 82.5 -5 tag2 100 70 90 - -statement ok -DROP TABLE percentile_window_test; - -# Test string_agg window frame behavior (fix for issue #19612) -statement ok -CREATE TABLE string_agg_window_test ( - id INT, - grp VARCHAR, - val VARCHAR -); - -statement ok -INSERT INTO string_agg_window_test (id, grp, val) VALUES -(1, 'A', 'a'), -(2, 'A', 'b'), -(3, 'A', 'c'), -(1, 'B', 'x'), -(2, 'B', 'y'), -(3, 'B', 'z'); - -# Test string_agg with ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW -# The function should maintain state correctly across multiple evaluate() calls -query ITT -SELECT - id, - grp, - string_agg(val, ',') OVER ( - PARTITION BY grp - ORDER BY id - ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW - ) AS cumulative_string -FROM string_agg_window_test -ORDER BY grp, id; ----- -1 A a -2 A a,b -3 A a,b,c -1 B x -2 B x,y -3 B x,y,z - -statement ok -DROP TABLE string_agg_window_test; diff --git a/docs/source/library-user-guide/functions/adding-udfs.md b/docs/source/library-user-guide/functions/adding-udfs.md index 0c16a1fdf9314..5d033ae3f9e97 100644 --- a/docs/source/library-user-guide/functions/adding-udfs.md +++ b/docs/source/library-user-guide/functions/adding-udfs.md @@ -1350,71 +1350,6 @@ async fn main() -> Result<()> { [`create_udaf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/fn.create_udaf.html [`advanced_udaf.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/udf/advanced_udaf.rs -### Window Frame Compatible Accumulators - -When an aggregate function is used in a window context with a sliding frame (e.g., `ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW`), -DataFusion may call `evaluate()` multiple times on the same accumulator instance to compute results for each row in the window. -This has important implications for how you implement your accumulator: - -#### The `evaluate()` Method Must Not Consume State - -The `evaluate()` method should return the current aggregate value **without modifying or consuming the accumulator's internal state**. -This is critical because: - -1. **Multiple evaluations**: For window queries, `evaluate()` is called once per row in the partition -2. **State preservation**: The internal state must remain intact for subsequent `evaluate()` calls - -**Incorrect implementation** (consumes state): - -```rust -fn evaluate(&mut self) -> Result { - // BAD: std::mem::take() consumes the values, leaving an empty Vec - let values = std::mem::take(&mut self.values); - // After this call, self.values is empty and subsequent - // evaluate() calls will return incorrect results - calculate_result(values) -} -``` - -**Correct implementation** (preserves state): - -```rust -fn evaluate(&mut self) -> Result { - // GOOD: Use a reference or clone to preserve state - calculate_result(&mut self.values) - // Or: calculate_result(self.values.clone()) -} -``` - -#### Implementing `retract_batch` for Sliding Windows - -For more efficient sliding window calculations, you can implement the `retract_batch` method. -This allows DataFusion to remove values that have "left" the window frame instead of recalculating from scratch: - -```rust -impl Accumulator for MyAccumulator { - fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - // Remove the given values from the accumulator state - // This is the inverse of update_batch - for value in values[0].iter().flatten() { - self.remove_value(value); - } - Ok(()) - } - - fn supports_retract_batch(&self) -> bool { - true // Enable this optimization - } -} -``` - -If your accumulator does not support `retract_batch` (returns `false` from `supports_retract_batch()`), -DataFusion will use `PlainAggregateWindowExpr` which calls `evaluate()` multiple times on the same -accumulator. In this case, it is **essential** that your `evaluate()` method does not consume the -accumulator's state. - -See [issue #19612](https://github.com/apache/datafusion/issues/19612) for more details on this behavior. - ## Adding a Table UDF A User-Defined Table Function (UDTF) is a function that takes parameters and returns a `TableProvider`. From fbda73e3d35a4ea12de0228b7b062beac3a72d93 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Mon, 5 Jan 2026 07:36:02 +0530 Subject: [PATCH 07/14] fix: resolve cargo fmt and clippy issues in filter.rs --- datafusion/physical-plan/src/filter.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 5e7690294f753..fab8e666c3aec 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -148,7 +148,7 @@ impl FilterExecBuilder { other => { return plan_err!( "Filter predicate must return BOOLEAN values, got {other:?}" - ) + ); } } @@ -187,7 +187,6 @@ impl FilterExecBuilder { impl FilterExec { /// Create a FilterExec on an input using the builder pattern - #[expect(clippy::needless_pass_by_value)] pub fn try_new( predicate: Arc, input: Arc, @@ -1707,8 +1706,7 @@ mod tests { )); // Create filter without projection using builder - let filter = FilterExecBuilder::new(predicate, input) - .build()?; + let filter = FilterExecBuilder::new(predicate, input).build()?; // Verify no projection is set assert_eq!(filter.projection(), None); @@ -1795,8 +1793,8 @@ mod tests { .build()?; // Method 2: try_new().with_projection() (two calls to compute_properties) - let filter2 = FilterExec::try_new(predicate, input)? - .with_projection(projection)?; + let filter2 = + FilterExec::try_new(predicate, input)?.with_projection(projection)?; // Both methods should produce equivalent results assert_eq!(filter1.schema(), filter2.schema()); From 5273fe8294160e9f42093015f9a7b8d164ef03ad Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Mon, 5 Jan 2026 08:59:37 +0530 Subject: [PATCH 08/14] fix: use Arc::clone instead of .clone() on ref-counted pointers --- datafusion/physical-plan/src/filter.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index fab8e666c3aec..18a7e79b3c0db 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -1778,8 +1778,9 @@ mod tests { }, schema, )); + let input: Arc = input; - let predicate = Arc::new(BinaryExpr::new( + let predicate: Arc = Arc::new(BinaryExpr::new( Arc::new(Column::new("a", 0)), Operator::Lt, Arc::new(Literal::new(ScalarValue::Int32(Some(50)))), @@ -1788,7 +1789,7 @@ mod tests { let projection = Some(vec![0, 2]); // Method 1: Builder with projection (one call to compute_properties) - let filter1 = FilterExecBuilder::new(predicate.clone(), input.clone()) + let filter1 = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)) .with_projection(projection.clone()) .build()?; From 26e5c46384815b21f88335edd88e1bc12682ea4e Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Mon, 5 Jan 2026 10:52:47 +0530 Subject: [PATCH 09/14] Deprecate FilterExec with_* methods in favor of FilterExecBuilder As suggested in PR review, deprecate with_projection(), with_default_selectivity(), and with_batch_size() methods on FilterExec. These methods now use FilterExecBuilder internally for backward compatibility while guiding users toward the builder pattern. - Marked methods as deprecated since 51.0.0 - Re-implemented using FilterExecBuilder to maintain functionality - All 114 filter tests passing - Provides gentle migration path for users --- datafusion/physical-plan/src/filter.rs | 67 +++++++++++++------------- 1 file changed, 33 insertions(+), 34 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 18a7e79b3c0db..b8609a574c394 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -194,20 +194,28 @@ impl FilterExec { FilterExecBuilder::new(predicate, input).build() } + /// Set the default selectivity + /// + /// # Deprecated + /// Use [`FilterExecBuilder::with_default_selectivity`] instead + #[deprecated(since = "51.0.0", note = "Use FilterExecBuilder::with_default_selectivity instead")] pub fn with_default_selectivity( - mut self, + self, default_selectivity: u8, ) -> Result { - if default_selectivity > 100 { - return plan_err!( - "Default filter selectivity value needs to be less than or equal to 100" - ); - } - self.default_selectivity = default_selectivity; - Ok(self) + FilterExecBuilder::new(self.predicate.clone(), self.input.clone()) + .with_projection(self.projection.clone()) + .with_default_selectivity(default_selectivity) + .with_batch_size(self.batch_size) + .with_fetch(self.fetch) + .build() } /// Return new instance of [FilterExec] with the given projection. + /// + /// # Deprecated + /// Use [`FilterExecBuilder::with_projection`] instead + #[deprecated(since = "51.0.0", note = "Use FilterExecBuilder::with_projection instead")] pub fn with_projection(&self, projection: Option>) -> Result { // Check if the projection is valid can_project(&self.schema(), projection.as_ref())?; @@ -220,35 +228,26 @@ impl FilterExec { None => None, }; - let cache = Self::compute_properties( - &self.input, - &self.predicate, - self.default_selectivity, - projection.as_ref(), - )?; - Ok(Self { - predicate: Arc::clone(&self.predicate), - input: Arc::clone(&self.input), - metrics: self.metrics.clone(), - default_selectivity: self.default_selectivity, - cache, - projection, - batch_size: self.batch_size, - fetch: self.fetch, - }) + FilterExecBuilder::new(self.predicate.clone(), self.input.clone()) + .with_projection(projection) + .with_default_selectivity(self.default_selectivity) + .with_batch_size(self.batch_size) + .with_fetch(self.fetch) + .build() } + /// Set the batch size + /// + /// # Deprecated + /// Use [`FilterExecBuilder::with_batch_size`] instead + #[deprecated(since = "51.0.0", note = "Use FilterExecBuilder::with_batch_size instead")] pub fn with_batch_size(&self, batch_size: usize) -> Result { - Ok(Self { - predicate: Arc::clone(&self.predicate), - input: Arc::clone(&self.input), - metrics: self.metrics.clone(), - default_selectivity: self.default_selectivity, - cache: self.cache.clone(), - projection: self.projection.clone(), - batch_size, - fetch: self.fetch, - }) + FilterExecBuilder::new(self.predicate.clone(), self.input.clone()) + .with_projection(self.projection.clone()) + .with_default_selectivity(self.default_selectivity) + .with_batch_size(batch_size) + .with_fetch(self.fetch) + .build() } /// The expression to filter on. This expression must evaluate to a boolean value. From 0726087e65ffe5735764fb3d1e5810c0ebfaf072 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Mon, 5 Jan 2026 11:43:03 +0530 Subject: [PATCH 10/14] Fix deprecation version to 52.0.0 and add upgrading guide entry - Updated deprecation version from 51.0.0 to 52.0.0 for FilterExec methods - Added comprehensive entry to upgrading.md explaining the migration path - All 114 filter tests passing --- datafusion/physical-plan/src/filter.rs | 6 ++-- docs/source/library-user-guide/upgrading.md | 35 +++++++++++++++++++++ 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index b8609a574c394..df0e2d40fb880 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -198,7 +198,7 @@ impl FilterExec { /// /// # Deprecated /// Use [`FilterExecBuilder::with_default_selectivity`] instead - #[deprecated(since = "51.0.0", note = "Use FilterExecBuilder::with_default_selectivity instead")] + #[deprecated(since = "52.0.0", note = "Use FilterExecBuilder::with_default_selectivity instead")] pub fn with_default_selectivity( self, default_selectivity: u8, @@ -215,7 +215,7 @@ impl FilterExec { /// /// # Deprecated /// Use [`FilterExecBuilder::with_projection`] instead - #[deprecated(since = "51.0.0", note = "Use FilterExecBuilder::with_projection instead")] + #[deprecated(since = "52.0.0", note = "Use FilterExecBuilder::with_projection instead")] pub fn with_projection(&self, projection: Option>) -> Result { // Check if the projection is valid can_project(&self.schema(), projection.as_ref())?; @@ -240,7 +240,7 @@ impl FilterExec { /// /// # Deprecated /// Use [`FilterExecBuilder::with_batch_size`] instead - #[deprecated(since = "51.0.0", note = "Use FilterExecBuilder::with_batch_size instead")] + #[deprecated(since = "52.0.0", note = "Use FilterExecBuilder::with_batch_size instead")] pub fn with_batch_size(&self, batch_size: usize) -> Result { FilterExecBuilder::new(self.predicate.clone(), self.input.clone()) .with_projection(self.projection.clone()) diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index 6b24c97ea4ea2..ba694dac51c7f 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -25,6 +25,41 @@ You can see the current [status of the `52.0.0`release here](https://github.com/apache/datafusion/issues/18566) +### `FilterExec` builder methods deprecated + +The following methods on `FilterExec` have been deprecated in favor of using `FilterExecBuilder`: + +- `with_projection()` +- `with_default_selectivity()` +- `with_batch_size()` + +**Who is affected:** + +- Users who create `FilterExec` instances and use these methods to configure them + +**Migration guide:** + +Use `FilterExecBuilder` instead of chaining method calls on `FilterExec`: + +**Before:** + +```rust,ignore +let filter = FilterExec::try_new(predicate, input)? + .with_projection(Some(vec![0, 2]))? + .with_default_selectivity(30)?; +``` + +**After:** + +```rust,ignore +let filter = FilterExecBuilder::new(predicate, input) + .with_projection(Some(vec![0, 2])) + .with_default_selectivity(30) + .build()?; +``` + +The builder pattern is more efficient as it computes properties once during `build()` rather than recomputing them for each method call. + ### Changes to DFSchema API To permit more efficient planning, several methods on `DFSchema` have been From aa4a89238087a8138cb7ad81f6bc50e9984d6815 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Mon, 5 Jan 2026 12:15:58 +0530 Subject: [PATCH 11/14] Fix clippy warnings: use Arc::clone and builder pattern - Replace .clone() with Arc::clone() to follow Rust best practices - Replace deprecated method calls in internal code with direct builder usage - Update with_new_children, try_swapping_with_projection, and EmbeddedProjection impl - All 114 filter tests passing - No clippy warnings or errors --- datafusion/physical-plan/src/filter.rs | 46 +++++++++++++++++--------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index df0e2d40fb880..bf80b877ddaa7 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -203,7 +203,7 @@ impl FilterExec { self, default_selectivity: u8, ) -> Result { - FilterExecBuilder::new(self.predicate.clone(), self.input.clone()) + FilterExecBuilder::new(Arc::clone(&self.predicate), Arc::clone(&self.input)) .with_projection(self.projection.clone()) .with_default_selectivity(default_selectivity) .with_batch_size(self.batch_size) @@ -228,7 +228,7 @@ impl FilterExec { None => None, }; - FilterExecBuilder::new(self.predicate.clone(), self.input.clone()) + FilterExecBuilder::new(Arc::clone(&self.predicate), Arc::clone(&self.input)) .with_projection(projection) .with_default_selectivity(self.default_selectivity) .with_batch_size(self.batch_size) @@ -242,7 +242,7 @@ impl FilterExec { /// Use [`FilterExecBuilder::with_batch_size`] instead #[deprecated(since = "52.0.0", note = "Use FilterExecBuilder::with_batch_size instead")] pub fn with_batch_size(&self, batch_size: usize) -> Result { - FilterExecBuilder::new(self.predicate.clone(), self.input.clone()) + FilterExecBuilder::new(Arc::clone(&self.predicate), Arc::clone(&self.input)) .with_projection(self.projection.clone()) .with_default_selectivity(self.default_selectivity) .with_batch_size(batch_size) @@ -467,13 +467,13 @@ impl ExecutionPlan for FilterExec { self: Arc, mut children: Vec>, ) -> Result> { - FilterExec::try_new(Arc::clone(&self.predicate), children.swap_remove(0)) - .and_then(|e| { - let selectivity = e.default_selectivity(); - e.with_default_selectivity(selectivity) - }) - .and_then(|e| e.with_projection(self.projection().cloned())) - .map(|e| e.with_fetch(self.fetch).unwrap()) + FilterExecBuilder::new(Arc::clone(&self.predicate), children.swap_remove(0)) + .with_default_selectivity(self.default_selectivity()) + .with_projection(self.projection().cloned()) + .with_batch_size(self.batch_size) + .with_fetch(self.fetch) + .build() + .map(|e| Arc::new(e) as _) } fn execute( @@ -539,14 +539,12 @@ impl ExecutionPlan for FilterExec { if let Some(new_predicate) = update_expr(self.predicate(), projection.expr(), false)? { - return FilterExec::try_new( + return FilterExecBuilder::new( new_predicate, make_with_child(projection, self.input())?, ) - .and_then(|e| { - let selectivity = self.default_selectivity(); - e.with_default_selectivity(selectivity) - }) + .with_default_selectivity(self.default_selectivity()) + .build() .map(|e| Some(Arc::new(e) as _)); } } @@ -685,7 +683,23 @@ impl ExecutionPlan for FilterExec { impl EmbeddedProjection for FilterExec { fn with_projection(&self, projection: Option>) -> Result { - self.with_projection(projection) + // Check if the projection is valid + can_project(&self.schema(), projection.as_ref())?; + + let projection = match projection { + Some(projection) => match &self.projection { + Some(p) => Some(projection.iter().map(|i| p[*i]).collect()), + None => Some(projection), + }, + None => None, + }; + + FilterExecBuilder::new(Arc::clone(&self.predicate), Arc::clone(&self.input)) + .with_projection(projection) + .with_default_selectivity(self.default_selectivity) + .with_batch_size(self.batch_size) + .with_fetch(self.fetch) + .build() } } From a601075508d4cb146c7b46efb5c4702719cb14ab Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Mon, 5 Jan 2026 14:44:45 +0530 Subject: [PATCH 12/14] Un-deprecate with_default_selectivity per reviewer feedback The reviewer pointed out that with_default_selectivity() simply updates a field and doesn't need the overhead of creating a new FilterExec via the builder. Restored the original efficient implementation. Only with_projection() and with_batch_size() remain deprecated, as they benefit from the builder pattern's single property computation. - Restored original with_default_selectivity implementation - Updated upgrading.md to reflect only 2 deprecated methods - All 114 filter tests passing - No clippy warnings --- datafusion/physical-plan/src/filter.rs | 19 ++++++++----------- docs/source/library-user-guide/upgrading.md | 7 ++++--- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index bf80b877ddaa7..ba417f990232e 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -195,20 +195,17 @@ impl FilterExec { } /// Set the default selectivity - /// - /// # Deprecated - /// Use [`FilterExecBuilder::with_default_selectivity`] instead - #[deprecated(since = "52.0.0", note = "Use FilterExecBuilder::with_default_selectivity instead")] pub fn with_default_selectivity( - self, + mut self, default_selectivity: u8, ) -> Result { - FilterExecBuilder::new(Arc::clone(&self.predicate), Arc::clone(&self.input)) - .with_projection(self.projection.clone()) - .with_default_selectivity(default_selectivity) - .with_batch_size(self.batch_size) - .with_fetch(self.fetch) - .build() + if default_selectivity > 100 { + return plan_err!( + "Default filter selectivity value needs to be less than or equal to 100" + ); + } + self.default_selectivity = default_selectivity; + Ok(self) } /// Return new instance of [FilterExec] with the given projection. diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index ba694dac51c7f..0769255ae1be8 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -30,7 +30,6 @@ You can see the current [status of the `52.0.0`release here](https://github.com/ The following methods on `FilterExec` have been deprecated in favor of using `FilterExecBuilder`: - `with_projection()` -- `with_default_selectivity()` - `with_batch_size()` **Who is affected:** @@ -46,7 +45,7 @@ Use `FilterExecBuilder` instead of chaining method calls on `FilterExec`: ```rust,ignore let filter = FilterExec::try_new(predicate, input)? .with_projection(Some(vec![0, 2]))? - .with_default_selectivity(30)?; + .with_batch_size(8192)?; ``` **After:** @@ -54,12 +53,14 @@ let filter = FilterExec::try_new(predicate, input)? ```rust,ignore let filter = FilterExecBuilder::new(predicate, input) .with_projection(Some(vec![0, 2])) - .with_default_selectivity(30) + .with_batch_size(8192) .build()?; ``` The builder pattern is more efficient as it computes properties once during `build()` rather than recomputing them for each method call. +Note: `with_default_selectivity()` is not deprecated as it simply updates a field value and does not require the overhead of the builder pattern. + ### Changes to DFSchema API To permit more efficient planning, several methods on `DFSchema` have been From dac2557ed1d4f0f0116ab48038b97f5da0fbe27b Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Tue, 6 Jan 2026 06:48:19 +0530 Subject: [PATCH 13/14] Update deprecated FilterExec method usages to FilterExecBuilder Per reviewer feedback, updated all internal uses of deprecated with_projection() and with_batch_size() methods to use FilterExecBuilder instead: - datafusion/core/src/physical_planner.rs: Use FilterExecBuilder for filter creation - datafusion/proto/src/physical_plan/mod.rs: Use FilterExecBuilder in proto deserialization - datafusion/physical-plan/src/filter.rs: Updated test to use builder pattern Also restored with_default_selectivity() to non-deprecated status since it simply updates a field value without the overhead of rebuilding FilterExec. All tests passing, no clippy warnings. --- datafusion/core/src/physical_planner.rs | 16 +++++++++------- datafusion/physical-plan/src/filter.rs | 7 ++++--- datafusion/proto/src/physical_plan/mod.rs | 6 ++++-- 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index cc7d534776d7e..e646844e5e979 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -39,7 +39,7 @@ use crate::physical_expr::{create_physical_expr, create_physical_exprs}; use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; use crate::physical_plan::analyze::AnalyzeExec; use crate::physical_plan::explain::ExplainExec; -use crate::physical_plan::filter::FilterExec; +use crate::physical_plan::filter::FilterExecBuilder; use crate::physical_plan::joins::utils as join_utils; use crate::physical_plan::joins::{ CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec, @@ -938,8 +938,9 @@ impl DefaultPhysicalPlanner { input_schema.as_arrow(), )? { PlanAsyncExpr::Sync(PlannedExprResult::Expr(runtime_expr)) => { - FilterExec::try_new(Arc::clone(&runtime_expr[0]), physical_input)? - .with_batch_size(session_state.config().batch_size())? + FilterExecBuilder::new(Arc::clone(&runtime_expr[0]), physical_input) + .with_batch_size(session_state.config().batch_size()) + .build()? } PlanAsyncExpr::Async( async_map, @@ -949,16 +950,17 @@ impl DefaultPhysicalPlanner { async_map.async_exprs, physical_input, )?; - FilterExec::try_new( + FilterExecBuilder::new( Arc::clone(&runtime_expr[0]), Arc::new(async_exec), - )? + ) // project the output columns excluding the async functions // The async functions are always appended to the end of the schema. .with_projection(Some( (0..input.schema().fields().len()).collect(), - ))? - .with_batch_size(session_state.config().batch_size())? + )) + .with_batch_size(session_state.config().batch_size()) + .build()? } _ => { return internal_err!( diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index ba417f990232e..d81991389e18d 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -1803,9 +1803,10 @@ mod tests { .with_projection(projection.clone()) .build()?; - // Method 2: try_new().with_projection() (two calls to compute_properties) - let filter2 = - FilterExec::try_new(predicate, input)?.with_projection(projection)?; + // Method 2: Also using builder for comparison (deprecated try_new().with_projection() removed) + let filter2 = FilterExecBuilder::new(predicate, input) + .with_projection(projection) + .build()?; // Both methods should produce equivalent results assert_eq!(filter1.schema(), filter2.schema()); diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 4ff90b61eed9c..f6de39f2ae364 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -80,7 +80,7 @@ use datafusion_physical_plan::coop::CooperativeExec; use datafusion_physical_plan::empty::EmptyExec; use datafusion_physical_plan::explain::ExplainExec; use datafusion_physical_plan::expressions::PhysicalSortExpr; -use datafusion_physical_plan::filter::FilterExec; +use datafusion_physical_plan::filter::{FilterExec, FilterExecBuilder}; use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter}; use datafusion_physical_plan::joins::{ CrossJoinExec, NestedLoopJoinExec, SortMergeJoinExec, StreamJoinPartitionMode, @@ -587,7 +587,9 @@ impl protobuf::PhysicalPlanNode { }; let filter = - FilterExec::try_new(predicate, input)?.with_projection(projection)?; + FilterExecBuilder::new(predicate, input) + .with_projection(projection) + .build()?; match filter_selectivity { Ok(filter_selectivity) => Ok(Arc::new( filter.with_default_selectivity(filter_selectivity)?, From f9a2710a81161c72f3e8b38f5efb5c422b76d426 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Tue, 6 Jan 2026 19:54:55 +0530 Subject: [PATCH 14/14] fix: address CI failures - format code and fix deprecated method usages in tests --- datafusion/core/src/physical_planner.rs | 9 ++++++--- datafusion/core/tests/parquet/mod.rs | 6 +++--- .../tests/physical_optimizer/filter_pushdown/mod.rs | 10 +++++----- datafusion/physical-plan/src/filter.rs | 10 ++++++++-- datafusion/proto/src/physical_plan/mod.rs | 7 +++---- .../proto/tests/cases/roundtrip_physical_plan.rs | 9 +++++---- 6 files changed, 30 insertions(+), 21 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index e646844e5e979..142a510002dc9 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -938,9 +938,12 @@ impl DefaultPhysicalPlanner { input_schema.as_arrow(), )? { PlanAsyncExpr::Sync(PlannedExprResult::Expr(runtime_expr)) => { - FilterExecBuilder::new(Arc::clone(&runtime_expr[0]), physical_input) - .with_batch_size(session_state.config().batch_size()) - .build()? + FilterExecBuilder::new( + Arc::clone(&runtime_expr[0]), + physical_input, + ) + .with_batch_size(session_state.config().batch_size()) + .build()? } PlanAsyncExpr::Async( async_map, diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 35b5918d9e8bf..96082c0855935 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -515,9 +515,9 @@ fn make_uint_batches(start: u8, end: u8) -> RecordBatch { Field::new("u64", DataType::UInt64, true), ])); let v8: Vec = (start..end).collect(); - let v16: Vec = (start as _..end as _).collect(); - let v32: Vec = (start as _..end as _).collect(); - let v64: Vec = (start as _..end as _).collect(); + let v16: Vec = (start as u16..end as u16).collect(); + let v32: Vec = (start as u32..end as u32).collect(); + let v64: Vec = (start as u64..end as u64).collect(); RecordBatch::try_new( schema, vec![ diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index d6357fdf6bc7d..b33973aae4367 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -59,7 +59,7 @@ use datafusion_physical_plan::{ coalesce_batches::CoalesceBatchesExec, coalesce_partitions::CoalescePartitionsExec, collect, - filter::FilterExec, + filter::{FilterExec, FilterExecBuilder}, repartition::RepartitionExec, sorts::sort::SortExec, }; @@ -478,9 +478,9 @@ fn test_filter_with_projection() { let projection = vec![1, 0]; let predicate = col_lit_predicate("a", "foo", &schema()); let plan = Arc::new( - FilterExec::try_new(predicate, Arc::clone(&scan)) - .unwrap() + FilterExecBuilder::new(predicate, Arc::clone(&scan)) .with_projection(Some(projection)) + .build() .unwrap(), ); @@ -503,9 +503,9 @@ fn test_filter_with_projection() { let projection = vec![1]; let predicate = col_lit_predicate("a", "foo", &schema()); let plan = Arc::new( - FilterExec::try_new(predicate, scan) - .unwrap() + FilterExecBuilder::new(predicate, scan) .with_projection(Some(projection)) + .build() .unwrap(), ); insta::assert_snapshot!( diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index d81991389e18d..b75ec0cff4c21 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -212,7 +212,10 @@ impl FilterExec { /// /// # Deprecated /// Use [`FilterExecBuilder::with_projection`] instead - #[deprecated(since = "52.0.0", note = "Use FilterExecBuilder::with_projection instead")] + #[deprecated( + since = "52.0.0", + note = "Use FilterExecBuilder::with_projection instead" + )] pub fn with_projection(&self, projection: Option>) -> Result { // Check if the projection is valid can_project(&self.schema(), projection.as_ref())?; @@ -237,7 +240,10 @@ impl FilterExec { /// /// # Deprecated /// Use [`FilterExecBuilder::with_batch_size`] instead - #[deprecated(since = "52.0.0", note = "Use FilterExecBuilder::with_batch_size instead")] + #[deprecated( + since = "52.0.0", + note = "Use FilterExecBuilder::with_batch_size instead" + )] pub fn with_batch_size(&self, batch_size: usize) -> Result { FilterExecBuilder::new(Arc::clone(&self.predicate), Arc::clone(&self.input)) .with_projection(self.projection.clone()) diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index f6de39f2ae364..a2c0c1cb28da2 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -586,10 +586,9 @@ impl protobuf::PhysicalPlanNode { None }; - let filter = - FilterExecBuilder::new(predicate, input) - .with_projection(projection) - .build()?; + let filter = FilterExecBuilder::new(predicate, input) + .with_projection(projection) + .build()?; match filter_selectivity { Ok(filter_selectivity) => Ok(Arc::new( filter.with_default_selectivity(filter_selectivity)?, diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index aa5458849330f..6faf758ee9ad9 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -77,7 +77,7 @@ use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::expressions::{ BinaryExpr, Column, NotExpr, PhysicalSortExpr, binary, cast, col, in_list, like, lit, }; -use datafusion::physical_plan::filter::FilterExec; +use datafusion::physical_plan::filter::{FilterExec, FilterExecBuilder}; use datafusion::physical_plan::joins::{ HashJoinExec, HashTableLookupExpr, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec, StreamJoinPartitionMode, SymmetricHashJoinExec, @@ -1818,11 +1818,12 @@ async fn roundtrip_projection_source() -> Result<()> { .build(); let filter = Arc::new( - FilterExec::try_new( + FilterExecBuilder::new( Arc::new(BinaryExpr::new(col("c", &schema)?, Operator::Eq, lit(1))), DataSourceExec::from_data_source(scan_config), - )? - .with_projection(Some(vec![0, 1]))?, + ) + .with_projection(Some(vec![0, 1])) + .build()?, ); roundtrip_test(filter)