From af2e94fcef9d0878824c60b4980f5c8f4b1f638e Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Sat, 3 Jan 2026 13:57:36 +0530 Subject: [PATCH 01/13] fix(accumulators): preserve state in evaluate() for window frame queries This commit fixes issue #19612 where accumulators that don't implement retract_batch exhibit buggy behavior in window frame queries. When aggregate functions are used with window frames like `ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW`, DataFusion uses PlainAggregateWindowExpr which calls evaluate() multiple times on the same accumulator instance. Accumulators that use std::mem::take() in their evaluate() method consume their internal state, causing incorrect results on subsequent calls. 1. **percentile_cont**: Modified evaluate() to use mutable reference instead of consuming the Vec. Added retract_batch() support for both PercentileContAccumulator and DistinctPercentileContAccumulator. 2. **string_agg**: Changed SimpleStringAggAccumulator::evaluate() to clone the accumulated string instead of taking it. - datafusion/functions-aggregate/src/percentile_cont.rs: - Changed calculate_percentile() to take &mut [T::Native] instead of Vec - Updated PercentileContAccumulator::evaluate() to pass reference - Updated DistinctPercentileContAccumulator::evaluate() to clone values - Added retract_batch() implementation using HashMap for efficient removal - Updated PercentileContGroupsAccumulator::evaluate() for consistency - datafusion/functions-aggregate/src/string_agg.rs: - Changed evaluate() to use clone() instead of std::mem::take() - datafusion/sqllogictest/test_files/aggregate.slt: - Added test cases for percentile_cont with window frames - Added test comparing median() vs percentile_cont(0.5) behavior - Added test for string_agg cumulative window frame - docs/source/library-user-guide/functions/adding-udfs.md: - Added documentation about window-compatible accumulators - Explained evaluate() state preservation requirements - Documented retract_batch() implementation guidance Closes #19612 --- .../src/percentile_cont.rs | 84 +++++++++-- .../functions-aggregate/src/string_agg.rs | 13 +- .../sqllogictest/test_files/aggregate.slt | 134 ++++++++++++++++++ .../functions/adding-udfs.md | 65 +++++++++ 4 files changed, 277 insertions(+), 19 deletions(-) diff --git a/datafusion/functions-aggregate/src/percentile_cont.rs b/datafusion/functions-aggregate/src/percentile_cont.rs index a4e8332626b00..6280af4035ffd 100644 --- a/datafusion/functions-aggregate/src/percentile_cont.rs +++ b/datafusion/functions-aggregate/src/percentile_cont.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::fmt::Debug; use std::mem::{size_of, size_of_val}; use std::sync::Arc; @@ -52,7 +53,7 @@ use datafusion_expr::{ }; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::accumulate; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::filtered_null_mask; -use datafusion_functions_aggregate_common::utils::GenericDistinctBuffer; +use datafusion_functions_aggregate_common::utils::{GenericDistinctBuffer, Hashable}; use datafusion_macros::user_doc; use crate::utils::validate_percentile_expr; @@ -427,14 +428,55 @@ impl Accumulator for PercentileContAccumulator { } fn evaluate(&mut self) -> Result { - let d = std::mem::take(&mut self.all_values); - let value = calculate_percentile::(d, self.percentile); + let value = calculate_percentile::(&mut self.all_values, self.percentile); ScalarValue::new_primitive::(value, &T::DATA_TYPE) } fn size(&self) -> usize { size_of_val(self) + self.all_values.capacity() * size_of::() } + + fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + // Cast to target type if needed (e.g., integer to Float64) + let values = if values[0].data_type() != &T::DATA_TYPE { + arrow::compute::cast(&values[0], &T::DATA_TYPE)? + } else { + Arc::clone(&values[0]) + }; + + let mut to_remove: HashMap = HashMap::new(); + for i in 0..values.len() { + let v = ScalarValue::try_from_array(&values, i)?; + if !v.is_null() { + *to_remove.entry(v).or_default() += 1; + } + } + + let mut i = 0; + while i < self.all_values.len() { + let k = + ScalarValue::new_primitive::(Some(self.all_values[i]), &T::DATA_TYPE)?; + if let Some(count) = to_remove.get_mut(&k) + && *count > 0 + { + self.all_values.swap_remove(i); + *count -= 1; + if *count == 0 { + to_remove.remove(&k); + if to_remove.is_empty() { + break; + } + } + } else { + i += 1; + } + } + Ok(()) + } + + fn supports_retract_batch(&self) -> bool { + true + } } /// The percentile_cont groups accumulator accumulates the raw input values @@ -549,13 +591,13 @@ impl GroupsAccumulator fn evaluate(&mut self, emit_to: EmitTo) -> Result { // Emit values - let emit_group_values = emit_to.take_needed(&mut self.group_values); + let mut emit_group_values = emit_to.take_needed(&mut self.group_values); // Calculate percentile for each group let mut evaluate_result_builder = PrimitiveBuilder::::with_capacity(emit_group_values.len()); - for values in emit_group_values { - let value = calculate_percentile::(values, self.percentile); + for values in &mut emit_group_values { + let value = calculate_percentile::(values.as_mut_slice(), self.percentile); evaluate_result_builder.append_option(value); } @@ -652,17 +694,31 @@ impl Accumulator for DistinctPercentileContAccumula } fn evaluate(&mut self) -> Result { - let d = std::mem::take(&mut self.distinct_values.values) - .into_iter() - .map(|v| v.0) - .collect::>(); - let value = calculate_percentile::(d, self.percentile); + let mut values: Vec = + self.distinct_values.values.iter().map(|v| v.0).collect(); + let value = calculate_percentile::(&mut values, self.percentile); ScalarValue::new_primitive::(value, &T::DATA_TYPE) } fn size(&self) -> usize { size_of_val(self) + self.distinct_values.size() } + + fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + if values.is_empty() { + return Ok(()); + } + + let arr = values[0].as_primitive::(); + for value in arr.iter().flatten() { + self.distinct_values.values.remove(&Hashable(value)); + } + Ok(()) + } + + fn supports_retract_batch(&self) -> bool { + true + } } /// Calculate the percentile value for a given set of values. @@ -672,8 +728,12 @@ impl Accumulator for DistinctPercentileContAccumula /// For percentile p and n values: /// - If p * (n-1) is an integer, return the value at that position /// - Otherwise, interpolate between the two closest values +/// +/// Note: This function takes a mutable slice and sorts it in place, but does not +/// consume the data. This is important for window frame queries where evaluate() +/// may be called multiple times on the same accumulator state. fn calculate_percentile( - mut values: Vec, + values: &mut [T::Native], percentile: f64, ) -> Option { let cmp = |x: &T::Native, y: &T::Native| x.compare(*y); diff --git a/datafusion/functions-aggregate/src/string_agg.rs b/datafusion/functions-aggregate/src/string_agg.rs index 77e9f60afd3cf..1c10818c091db 100644 --- a/datafusion/functions-aggregate/src/string_agg.rs +++ b/datafusion/functions-aggregate/src/string_agg.rs @@ -384,14 +384,13 @@ impl Accumulator for SimpleStringAggAccumulator { } fn evaluate(&mut self) -> Result { - let result = if self.has_value { - ScalarValue::LargeUtf8(Some(std::mem::take(&mut self.accumulated_string))) + if self.has_value { + Ok(ScalarValue::LargeUtf8(Some( + self.accumulated_string.clone(), + ))) } else { - ScalarValue::LargeUtf8(None) - }; - - self.has_value = false; - Ok(result) + Ok(ScalarValue::LargeUtf8(None)) + } } fn size(&self) -> usize { diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index f6ce68917e03b..02429bfb81bae 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -8246,3 +8246,137 @@ query R select percentile_cont(null, 0.5); ---- NULL + +########### +# Issue #19612: Test that percentile_cont and median produce identical results +# in window frame queries with ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW. +# Previously percentile_cont consumed its internal state during evaluate(), +# causing incorrect results when called multiple times in window queries. +########### + +# Test percentile_cont window frame behavior (fix for issue #19612) +statement ok +CREATE TABLE percentile_window_test ( + timestamp INT, + tags VARCHAR, + value DOUBLE +); + +statement ok +INSERT INTO percentile_window_test (timestamp, tags, value) VALUES +(1, 'tag1', 10.0), +(2, 'tag1', 20.0), +(3, 'tag1', 30.0), +(4, 'tag1', 40.0), +(5, 'tag1', 50.0), +(1, 'tag2', 60.0), +(2, 'tag2', 70.0), +(3, 'tag2', 80.0), +(4, 'tag2', 90.0), +(5, 'tag2', 100.0); + +# Test that median and percentile_cont(0.5) produce the same results +# with ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW frame. +# Both functions should maintain state correctly across multiple evaluate() calls. +query ITRRR +SELECT + timestamp, + tags, + value, + median(value) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS value_median, + percentile_cont(value, 0.5) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS value_percentile_50 +FROM percentile_window_test +ORDER BY tags, timestamp; +---- +1 tag1 10 10 10 +2 tag1 20 15 15 +3 tag1 30 20 20 +4 tag1 40 25 25 +5 tag1 50 30 30 +1 tag2 60 60 60 +2 tag2 70 65 65 +3 tag2 80 70 70 +4 tag2 90 75 75 +5 tag2 100 80 80 + +# Test percentile_cont with different percentile values +query ITRRR +SELECT + timestamp, + tags, + value, + percentile_cont(value, 0.25) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS p25, + percentile_cont(value, 0.75) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS p75 +FROM percentile_window_test +ORDER BY tags, timestamp; +---- +1 tag1 10 10 10 +2 tag1 20 12.5 17.5 +3 tag1 30 15 25 +4 tag1 40 17.5 32.5 +5 tag1 50 20 40 +1 tag2 60 60 60 +2 tag2 70 62.5 67.5 +3 tag2 80 65 75 +4 tag2 90 67.5 82.5 +5 tag2 100 70 90 + +statement ok +DROP TABLE percentile_window_test; + +# Test string_agg window frame behavior (fix for issue #19612) +statement ok +CREATE TABLE string_agg_window_test ( + id INT, + grp VARCHAR, + val VARCHAR +); + +statement ok +INSERT INTO string_agg_window_test (id, grp, val) VALUES +(1, 'A', 'a'), +(2, 'A', 'b'), +(3, 'A', 'c'), +(1, 'B', 'x'), +(2, 'B', 'y'), +(3, 'B', 'z'); + +# Test string_agg with ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW +# The function should maintain state correctly across multiple evaluate() calls +query ITT +SELECT + id, + grp, + string_agg(val, ',') OVER ( + PARTITION BY grp + ORDER BY id + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS cumulative_string +FROM string_agg_window_test +ORDER BY grp, id; +---- +1 A a +2 A a,b +3 A a,b,c +1 B x +2 B x,y +3 B x,y,z + +statement ok +DROP TABLE string_agg_window_test; diff --git a/docs/source/library-user-guide/functions/adding-udfs.md b/docs/source/library-user-guide/functions/adding-udfs.md index 5d033ae3f9e97..0c16a1fdf9314 100644 --- a/docs/source/library-user-guide/functions/adding-udfs.md +++ b/docs/source/library-user-guide/functions/adding-udfs.md @@ -1350,6 +1350,71 @@ async fn main() -> Result<()> { [`create_udaf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/fn.create_udaf.html [`advanced_udaf.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/udf/advanced_udaf.rs +### Window Frame Compatible Accumulators + +When an aggregate function is used in a window context with a sliding frame (e.g., `ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW`), +DataFusion may call `evaluate()` multiple times on the same accumulator instance to compute results for each row in the window. +This has important implications for how you implement your accumulator: + +#### The `evaluate()` Method Must Not Consume State + +The `evaluate()` method should return the current aggregate value **without modifying or consuming the accumulator's internal state**. +This is critical because: + +1. **Multiple evaluations**: For window queries, `evaluate()` is called once per row in the partition +2. **State preservation**: The internal state must remain intact for subsequent `evaluate()` calls + +**Incorrect implementation** (consumes state): + +```rust +fn evaluate(&mut self) -> Result { + // BAD: std::mem::take() consumes the values, leaving an empty Vec + let values = std::mem::take(&mut self.values); + // After this call, self.values is empty and subsequent + // evaluate() calls will return incorrect results + calculate_result(values) +} +``` + +**Correct implementation** (preserves state): + +```rust +fn evaluate(&mut self) -> Result { + // GOOD: Use a reference or clone to preserve state + calculate_result(&mut self.values) + // Or: calculate_result(self.values.clone()) +} +``` + +#### Implementing `retract_batch` for Sliding Windows + +For more efficient sliding window calculations, you can implement the `retract_batch` method. +This allows DataFusion to remove values that have "left" the window frame instead of recalculating from scratch: + +```rust +impl Accumulator for MyAccumulator { + fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + // Remove the given values from the accumulator state + // This is the inverse of update_batch + for value in values[0].iter().flatten() { + self.remove_value(value); + } + Ok(()) + } + + fn supports_retract_batch(&self) -> bool { + true // Enable this optimization + } +} +``` + +If your accumulator does not support `retract_batch` (returns `false` from `supports_retract_batch()`), +DataFusion will use `PlainAggregateWindowExpr` which calls `evaluate()` multiple times on the same +accumulator. In this case, it is **essential** that your `evaluate()` method does not consume the +accumulator's state. + +See [issue #19612](https://github.com/apache/datafusion/issues/19612) for more details on this behavior. + ## Adding a Table UDF A User-Defined Table Function (UDTF) is a function that takes parameters and returns a `TableProvider`. From c5fe87bd7ef5e2b3da5a0f29ba7c95a065e9e4a9 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Sat, 3 Jan 2026 16:17:04 +0530 Subject: [PATCH 02/13] fix: format code and mark doc examples as ignore for doctest --- docs/source/library-user-guide/functions/adding-udfs.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/source/library-user-guide/functions/adding-udfs.md b/docs/source/library-user-guide/functions/adding-udfs.md index 0c16a1fdf9314..63ff0d16dde64 100644 --- a/docs/source/library-user-guide/functions/adding-udfs.md +++ b/docs/source/library-user-guide/functions/adding-udfs.md @@ -1366,7 +1366,7 @@ This is critical because: **Incorrect implementation** (consumes state): -```rust +```rust,ignore fn evaluate(&mut self) -> Result { // BAD: std::mem::take() consumes the values, leaving an empty Vec let values = std::mem::take(&mut self.values); @@ -1378,7 +1378,7 @@ fn evaluate(&mut self) -> Result { **Correct implementation** (preserves state): -```rust +```rust,ignore fn evaluate(&mut self) -> Result { // GOOD: Use a reference or clone to preserve state calculate_result(&mut self.values) @@ -1391,7 +1391,7 @@ fn evaluate(&mut self) -> Result { For more efficient sliding window calculations, you can implement the `retract_batch` method. This allows DataFusion to remove values that have "left" the window frame instead of recalculating from scratch: -```rust +```rust,ignore impl Accumulator for MyAccumulator { fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { // Remove the given values from the accumulator state From 3d4eeeea318a77d9a646ba91447d7d482317837c Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Tue, 6 Jan 2026 13:08:04 +0530 Subject: [PATCH 03/13] address review feedback: remove cast code, consolidate tests, update docstring --- datafusion/expr-common/src/accumulator.rs | 30 ++- .../src/percentile_cont.rs | 11 +- .../sqllogictest/test_files/aggregate.slt | 209 ++++++++++-------- .../functions/adding-udfs.md | 65 ------ 4 files changed, 143 insertions(+), 172 deletions(-) diff --git a/datafusion/expr-common/src/accumulator.rs b/datafusion/expr-common/src/accumulator.rs index fc4e90114beea..72b0b24e1a40f 100644 --- a/datafusion/expr-common/src/accumulator.rs +++ b/datafusion/expr-common/src/accumulator.rs @@ -58,17 +58,37 @@ pub trait Accumulator: Send + Sync + Debug { /// running sum. fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>; - /// Returns the final aggregate value, consuming the internal state. + /// Returns the final aggregate value. /// /// For example, the `SUM` accumulator maintains a running sum, /// and `evaluate` will produce that running sum as its output. /// - /// This function should not be called twice, otherwise it will - /// result in potentially non-deterministic behavior. - /// /// This function gets `&mut self` to allow for the accumulator to build /// arrow-compatible internal state that can be returned without copying - /// when possible (for example distinct strings) + /// when possible (for example distinct strings). + /// + /// # Window Frame Queries + /// + /// When used in a window context without [`Self::supports_retract_batch`], + /// `evaluate()` may be called multiple times on the same accumulator instance + /// (once per row in the partition). In this case, implementations **must not** + /// consume or modify internal state. Use references or clones to preserve state: + /// + /// ```ignore + /// // GOOD: Preserves state for subsequent calls + /// fn evaluate(&mut self) -> Result { + /// calculate_result(&self.values) // Use reference + /// } + /// + /// // BAD: Consumes state, breaks window queries + /// fn evaluate(&mut self) -> Result { + /// calculate_result(std::mem::take(&mut self.values)) + /// } + /// ``` + /// + /// For efficient sliding window calculations, consider implementing + /// [`Self::retract_batch`] which allows DataFusion to incrementally + /// update state rather than calling `evaluate()` repeatedly. fn evaluate(&mut self) -> Result; /// Returns the allocated size required for this accumulator, in diff --git a/datafusion/functions-aggregate/src/percentile_cont.rs b/datafusion/functions-aggregate/src/percentile_cont.rs index 6280af4035ffd..f60f1aa9f78c2 100644 --- a/datafusion/functions-aggregate/src/percentile_cont.rs +++ b/datafusion/functions-aggregate/src/percentile_cont.rs @@ -437,16 +437,9 @@ impl Accumulator for PercentileContAccumulator { } fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - // Cast to target type if needed (e.g., integer to Float64) - let values = if values[0].data_type() != &T::DATA_TYPE { - arrow::compute::cast(&values[0], &T::DATA_TYPE)? - } else { - Arc::clone(&values[0]) - }; - let mut to_remove: HashMap = HashMap::new(); - for i in 0..values.len() { - let v = ScalarValue::try_from_array(&values, i)?; + for i in 0..values[0].len() { + let v = ScalarValue::try_from_array(&values[0], i)?; if !v.is_null() { *to_remove.entry(v).or_default() += 1; } diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 02429bfb81bae..c3af5f209f619 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -1126,6 +1126,102 @@ ORDER BY tags, timestamp; 4 tag2 90 75 80 95 5 tag2 100 80 80 100 +########### +# Issue #19612: Test that percentile_cont produces correct results +# in window frame queries. Previously percentile_cont consumed its internal state +# during evaluate(), causing incorrect results when called multiple times. +########### + +# Test percentile_cont sliding window (same as median) +query ITRR +SELECT + timestamp, + tags, + value, + percentile_cont(value, 0.5) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING + ) AS value_percentile_50 +FROM median_window_test +ORDER BY tags, timestamp; +---- +1 tag1 10 15 +2 tag1 20 20 +3 tag1 30 30 +4 tag1 40 40 +5 tag1 50 45 +1 tag2 60 65 +2 tag2 70 70 +3 tag2 80 80 +4 tag2 90 90 +5 tag2 100 95 + +# Test percentile_cont non-sliding window +query ITRRRR +SELECT + timestamp, + tags, + value, + percentile_cont(value, 0.5) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS value_percentile_unbounded_preceding, + percentile_cont(value, 0.5) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS value_percentile_unbounded_both, + percentile_cont(value, 0.5) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING + ) AS value_percentile_unbounded_following +FROM median_window_test +ORDER BY tags, timestamp; +---- +1 tag1 10 10 30 30 +2 tag1 20 15 30 35 +3 tag1 30 20 30 40 +4 tag1 40 25 30 45 +5 tag1 50 30 30 50 +1 tag2 60 60 80 80 +2 tag2 70 65 80 85 +3 tag2 80 70 80 90 +4 tag2 90 75 80 95 +5 tag2 100 80 80 100 + +# Test percentile_cont with different percentile values +query ITRRR +SELECT + timestamp, + tags, + value, + percentile_cont(value, 0.25) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS p25, + percentile_cont(value, 0.75) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS p75 +FROM median_window_test +ORDER BY tags, timestamp; +---- +1 tag1 10 10 10 +2 tag1 20 12.5 17.5 +3 tag1 30 15 25 +4 tag1 40 17.5 32.5 +5 tag1 50 20 40 +1 tag2 60 60 60 +2 tag2 70 62.5 67.5 +3 tag2 80 65 75 +4 tag2 90 67.5 82.5 +5 tag2 100 70 90 + statement ok DROP TABLE median_window_test; @@ -8247,99 +8343,6 @@ select percentile_cont(null, 0.5); ---- NULL -########### -# Issue #19612: Test that percentile_cont and median produce identical results -# in window frame queries with ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW. -# Previously percentile_cont consumed its internal state during evaluate(), -# causing incorrect results when called multiple times in window queries. -########### - -# Test percentile_cont window frame behavior (fix for issue #19612) -statement ok -CREATE TABLE percentile_window_test ( - timestamp INT, - tags VARCHAR, - value DOUBLE -); - -statement ok -INSERT INTO percentile_window_test (timestamp, tags, value) VALUES -(1, 'tag1', 10.0), -(2, 'tag1', 20.0), -(3, 'tag1', 30.0), -(4, 'tag1', 40.0), -(5, 'tag1', 50.0), -(1, 'tag2', 60.0), -(2, 'tag2', 70.0), -(3, 'tag2', 80.0), -(4, 'tag2', 90.0), -(5, 'tag2', 100.0); - -# Test that median and percentile_cont(0.5) produce the same results -# with ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW frame. -# Both functions should maintain state correctly across multiple evaluate() calls. -query ITRRR -SELECT - timestamp, - tags, - value, - median(value) OVER ( - PARTITION BY tags - ORDER BY timestamp - ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW - ) AS value_median, - percentile_cont(value, 0.5) OVER ( - PARTITION BY tags - ORDER BY timestamp - ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW - ) AS value_percentile_50 -FROM percentile_window_test -ORDER BY tags, timestamp; ----- -1 tag1 10 10 10 -2 tag1 20 15 15 -3 tag1 30 20 20 -4 tag1 40 25 25 -5 tag1 50 30 30 -1 tag2 60 60 60 -2 tag2 70 65 65 -3 tag2 80 70 70 -4 tag2 90 75 75 -5 tag2 100 80 80 - -# Test percentile_cont with different percentile values -query ITRRR -SELECT - timestamp, - tags, - value, - percentile_cont(value, 0.25) OVER ( - PARTITION BY tags - ORDER BY timestamp - ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW - ) AS p25, - percentile_cont(value, 0.75) OVER ( - PARTITION BY tags - ORDER BY timestamp - ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW - ) AS p75 -FROM percentile_window_test -ORDER BY tags, timestamp; ----- -1 tag1 10 10 10 -2 tag1 20 12.5 17.5 -3 tag1 30 15 25 -4 tag1 40 17.5 32.5 -5 tag1 50 20 40 -1 tag2 60 60 60 -2 tag2 70 62.5 67.5 -3 tag2 80 65 75 -4 tag2 90 67.5 82.5 -5 tag2 100 70 90 - -statement ok -DROP TABLE percentile_window_test; - # Test string_agg window frame behavior (fix for issue #19612) statement ok CREATE TABLE string_agg_window_test ( @@ -8378,5 +8381,25 @@ ORDER BY grp, id; 2 B x,y 3 B x,y,z +# Test string_agg with sliding window (ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) +query ITT +SELECT + id, + grp, + string_agg(val, ',') OVER ( + PARTITION BY grp + ORDER BY id + ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING + ) AS sliding_string +FROM string_agg_window_test +ORDER BY grp, id; +---- +1 A a,b +2 A a,b,c +3 A b,c +1 B x,y +2 B x,y,z +3 B y,z + statement ok DROP TABLE string_agg_window_test; diff --git a/docs/source/library-user-guide/functions/adding-udfs.md b/docs/source/library-user-guide/functions/adding-udfs.md index 63ff0d16dde64..5d033ae3f9e97 100644 --- a/docs/source/library-user-guide/functions/adding-udfs.md +++ b/docs/source/library-user-guide/functions/adding-udfs.md @@ -1350,71 +1350,6 @@ async fn main() -> Result<()> { [`create_udaf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/fn.create_udaf.html [`advanced_udaf.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/udf/advanced_udaf.rs -### Window Frame Compatible Accumulators - -When an aggregate function is used in a window context with a sliding frame (e.g., `ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW`), -DataFusion may call `evaluate()` multiple times on the same accumulator instance to compute results for each row in the window. -This has important implications for how you implement your accumulator: - -#### The `evaluate()` Method Must Not Consume State - -The `evaluate()` method should return the current aggregate value **without modifying or consuming the accumulator's internal state**. -This is critical because: - -1. **Multiple evaluations**: For window queries, `evaluate()` is called once per row in the partition -2. **State preservation**: The internal state must remain intact for subsequent `evaluate()` calls - -**Incorrect implementation** (consumes state): - -```rust,ignore -fn evaluate(&mut self) -> Result { - // BAD: std::mem::take() consumes the values, leaving an empty Vec - let values = std::mem::take(&mut self.values); - // After this call, self.values is empty and subsequent - // evaluate() calls will return incorrect results - calculate_result(values) -} -``` - -**Correct implementation** (preserves state): - -```rust,ignore -fn evaluate(&mut self) -> Result { - // GOOD: Use a reference or clone to preserve state - calculate_result(&mut self.values) - // Or: calculate_result(self.values.clone()) -} -``` - -#### Implementing `retract_batch` for Sliding Windows - -For more efficient sliding window calculations, you can implement the `retract_batch` method. -This allows DataFusion to remove values that have "left" the window frame instead of recalculating from scratch: - -```rust,ignore -impl Accumulator for MyAccumulator { - fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - // Remove the given values from the accumulator state - // This is the inverse of update_batch - for value in values[0].iter().flatten() { - self.remove_value(value); - } - Ok(()) - } - - fn supports_retract_batch(&self) -> bool { - true // Enable this optimization - } -} -``` - -If your accumulator does not support `retract_batch` (returns `false` from `supports_retract_batch()`), -DataFusion will use `PlainAggregateWindowExpr` which calls `evaluate()` multiple times on the same -accumulator. In this case, it is **essential** that your `evaluate()` method does not consume the -accumulator's state. - -See [issue #19612](https://github.com/apache/datafusion/issues/19612) for more details on this behavior. - ## Adding a Table UDF A User-Defined Table Function (UDTF) is a function that takes parameters and returns a `TableProvider`. From 3b1e6715329af1e805f2c6112c88833dbc44bbad Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Wed, 7 Jan 2026 01:36:02 +0530 Subject: [PATCH 04/13] fix: remove sliding window test for string_agg (no retract_batch support) --- .../sqllogictest/test_files/aggregate.slt | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index c3af5f209f619..a72e4f36ae084 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -8381,25 +8381,5 @@ ORDER BY grp, id; 2 B x,y 3 B x,y,z -# Test string_agg with sliding window (ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) -query ITT -SELECT - id, - grp, - string_agg(val, ',') OVER ( - PARTITION BY grp - ORDER BY id - ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING - ) AS sliding_string -FROM string_agg_window_test -ORDER BY grp, id; ----- -1 A a,b -2 A a,b,c -3 A b,c -1 B x,y -2 B x,y,z -3 B y,z - statement ok DROP TABLE string_agg_window_test; From 4d9cbefcbacf1097c27badda00fa6d3ec50aa3c9 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Thu, 8 Jan 2026 01:15:48 +0530 Subject: [PATCH 05/13] Deprecate AggregateUDFImpl::is_nullable in favor of return_field nullability inference This change improves how nullability is computed for aggregate UDF outputs by making it depend on the nullability of input fields, aligning with the pattern used for scalar UDFs. Changes: - Mark is_nullable() method as deprecated in AggregateUDFImpl trait - Update udaf_default_return_field() to compute output nullability from input fields: * Output is nullable if ANY input field is nullable * Output is non-nullable only if ALL input fields are non-nullable - Add deprecation migration guide in is_nullable() documentation - Add #[allow(deprecated)] to wrapper method calls in AggregateUDF and AliasedAggregateUDFImpl Testing: - Add 4 new tests validating nullability inference from input fields: * test_return_field_nullability_from_nullable_input * test_return_field_nullability_from_non_nullable_input * test_return_field_nullability_with_mixed_inputs * test_return_field_preserves_return_type - All existing tests continue to pass (test_partial_eq, test_partial_ord) - No regressions in aggregate function execution Documentation: - Create new docs/source/library-user-guide/functions/udf-nullability.md * Explains the nullability change and rationale * Provides migration guide for custom UDAF implementations * Includes examples for default and custom nullability behavior * References scalar UDF patterns - Update docs/source/library-user-guide/functions/adding-udfs.md * Add section on nullability of aggregate functions * Link to new comprehensive nullability documentation Fixes: #19511 (related to #18882) --- datafusion/expr/src/udaf.rs | 107 ++++++- .../functions/adding-udfs.md | 6 + .../functions/udf-nullability.md | 268 ++++++++++++++++++ 3 files changed, 378 insertions(+), 3 deletions(-) create mode 100644 docs/source/library-user-guide/functions/udf-nullability.md diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index a69176e1173a5..395c6aa3ec1ad 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -208,6 +208,7 @@ impl AggregateUDF { self.inner.window_function_display_name(params) } + #[allow(deprecated)] pub fn is_nullable(&self) -> bool { self.inner.is_nullable() } @@ -528,10 +529,32 @@ pub trait AggregateUDFImpl: Debug + DynEq + DynHash + Send + Sync { /// Whether the aggregate function is nullable. /// + /// **DEPRECATED**: This method is deprecated and will be removed in a future version. + /// Nullability should instead be specified in [`Self::return_field`] which can provide + /// more context-aware nullability based on input field properties. + /// /// Nullable means that the function could return `null` for any inputs. /// For example, aggregate functions like `COUNT` always return a non null value /// but others like `MIN` will return `NULL` if there is nullable input. /// Note that if the function is declared as *not* nullable, make sure the [`AggregateUDFImpl::default_value`] is `non-null` + /// + /// # Migration Guide + /// + /// If you need to override nullability, implement [`Self::return_field`] instead: + /// + /// ```ignore + /// fn return_field(&self, arg_fields: &[FieldRef]) -> Result { + /// let arg_types: Vec<_> = arg_fields.iter().map(|f| f.data_type()).cloned().collect(); + /// let data_type = self.return_type(&arg_types)?; + /// // Specify nullability based on your function's logic + /// let nullable = arg_fields.iter().any(|f| f.is_nullable()); + /// Ok(Arc::new(Field::new(self.name(), data_type, nullable))) + /// } + /// ``` + #[deprecated( + since = "42.0.0", + note = "Use `return_field` to specify nullability instead of `is_nullable`" + )] fn is_nullable(&self) -> bool { true } @@ -1091,6 +1114,13 @@ pub fn udaf_default_window_function_display_name( } /// Encapsulates default implementation of [`AggregateUDFImpl::return_field`]. +/// +/// This function computes nullability based on input field nullability: +/// - The result is nullable if ANY input field is nullable +/// - The result is non-nullable only if ALL input fields are non-nullable +/// +/// This replaces the previous behavior of always deferring to `is_nullable()`, +/// providing more accurate nullability inference for aggregate functions. pub fn udaf_default_return_field( func: &F, arg_fields: &[FieldRef], @@ -1098,10 +1128,13 @@ pub fn udaf_default_return_field( let arg_types: Vec<_> = arg_fields.iter().map(|f| f.data_type()).cloned().collect(); let data_type = func.return_type(&arg_types)?; + // Determine nullability: result is nullable if any input is nullable + let is_nullable = arg_fields.iter().any(|f| f.is_nullable()); + Ok(Arc::new(Field::new( func.name(), data_type, - func.is_nullable(), + is_nullable, ))) } @@ -1247,6 +1280,7 @@ impl AggregateUDFImpl for AliasedAggregateUDFImpl { self.inner.return_field(arg_fields) } + #[allow(deprecated)] fn is_nullable(&self) -> bool { self.inner.is_nullable() } @@ -1343,7 +1377,7 @@ mod test { &self.signature } fn return_type(&self, _args: &[DataType]) -> Result { - unimplemented!() + Ok(DataType::Float64) } fn accumulator( &self, @@ -1383,7 +1417,7 @@ mod test { &self.signature } fn return_type(&self, _args: &[DataType]) -> Result { - unimplemented!() + Ok(DataType::Float64) } fn accumulator( &self, @@ -1424,4 +1458,71 @@ mod test { value.hash(hasher); hasher.finish() } + + #[test] + fn test_return_field_nullability_from_nullable_input() { + // Test that return_field derives nullability from input field nullability + use arrow::datatypes::Field; + use std::sync::Arc; + + let a = AggregateUDF::from(AMeanUdf::new()); + + // Create a nullable input field + let nullable_field = Arc::new(Field::new("col", DataType::Float64, true)); + let return_field = a.return_field(&[nullable_field]).unwrap(); + + // When input is nullable, output should be nullable + assert!(return_field.is_nullable()); + } + + #[test] + fn test_return_field_nullability_from_non_nullable_input() { + // Test that return_field respects non-nullable input fields + use arrow::datatypes::Field; + use std::sync::Arc; + + let a = AggregateUDF::from(AMeanUdf::new()); + + // Create a non-nullable input field + let non_nullable_field = Arc::new(Field::new("col", DataType::Float64, false)); + let return_field = a.return_field(&[non_nullable_field]).unwrap(); + + // When input is non-nullable, output should also be non-nullable + assert!(!return_field.is_nullable()); + } + + #[test] + fn test_return_field_nullability_with_mixed_inputs() { + // Test that return_field is nullable if ANY input is nullable + use arrow::datatypes::Field; + use std::sync::Arc; + + let a = AggregateUDF::from(AMeanUdf::new()); + + // With multiple inputs (typical for aggregates in more complex scenarios) + let nullable_field = Arc::new(Field::new("col1", DataType::Float64, true)); + let non_nullable_field = Arc::new(Field::new("col2", DataType::Float64, false)); + + let return_field = a.return_field(&[non_nullable_field, nullable_field]).unwrap(); + + // If ANY input is nullable, result should be nullable + assert!(return_field.is_nullable()); + } + + #[test] + fn test_return_field_preserves_return_type() { + // Test that return_field correctly preserves the return type + use arrow::datatypes::Field; + use std::sync::Arc; + + let a = AggregateUDF::from(AMeanUdf::new()); + + let nullable_field = Arc::new(Field::new("col", DataType::Float64, true)); + let return_field = a.return_field(&[nullable_field]).unwrap(); + + // Verify data type is preserved + assert_eq!(*return_field.data_type(), DataType::Float64); + // Verify name matches function name + assert_eq!(return_field.name(), "a"); + } } diff --git a/docs/source/library-user-guide/functions/adding-udfs.md b/docs/source/library-user-guide/functions/adding-udfs.md index 5d033ae3f9e97..422d498757d77 100644 --- a/docs/source/library-user-guide/functions/adding-udfs.md +++ b/docs/source/library-user-guide/functions/adding-udfs.md @@ -1350,6 +1350,12 @@ async fn main() -> Result<()> { [`create_udaf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/fn.create_udaf.html [`advanced_udaf.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/udf/advanced_udaf.rs +### Nullability of Aggregate Functions + +By default, aggregate functions return nullable output if any of their input fields are nullable. This behavior is automatically computed by DataFusion and works correctly for most functions like `MIN`, `MAX`, `SUM`, and `AVG`. + +For more advanced control over nullability or to understand how it works, see the [Aggregate UDF Nullability Guide](udf-nullability.md). + ## Adding a Table UDF A User-Defined Table Function (UDTF) is a function that takes parameters and returns a `TableProvider`. diff --git a/docs/source/library-user-guide/functions/udf-nullability.md b/docs/source/library-user-guide/functions/udf-nullability.md new file mode 100644 index 0000000000000..5a87070129ed2 --- /dev/null +++ b/docs/source/library-user-guide/functions/udf-nullability.md @@ -0,0 +1,268 @@ + + +# Aggregate UDF Nullability + +## Overview + +DataFusion distinguishes between the nullability of aggregate function outputs and their input fields. This document explains how nullability is computed for aggregate User Defined Functions (UDAFs) and how to correctly specify it in your custom functions. + +## The Change: From `is_nullable()` to `return_field()` + +### What Changed? + +In earlier versions of DataFusion, aggregate function nullability was controlled by the `is_nullable()` method on `AggregateUDFImpl`. This method returned a simple boolean indicating whether the function could ever return `NULL`, regardless of input characteristics. + +**This approach has been deprecated** in favor of computing nullability more accurately via the `return_field()` method, which has access to the actual field metadata of the inputs. This allows for more precise nullability inference based on whether input fields are nullable. + +### Why the Change? + +1. **Input-aware nullability**: The new approach allows the function's nullability to depend on the nullability of its inputs. For example, `MIN(column)` should only be nullable if `column` is nullable. + +2. **Consistency with scalar UDFs**: Scalar UDFs already follow this pattern using `return_field_from_args()`, and aggregate UDFs now align with this design. + +3. **More accurate schema inference**: Query optimizers and executors can now make better decisions about whether intermediate or final results can be null. + +## How Nullability Works Now + +By default, the `return_field()` method in `AggregateUDFImpl` computes the output field using this logic: + +```text +output_is_nullable = ANY input field is nullable +``` + +In other words: +- **If ALL input fields are non-nullable**, the output is **non-nullable** +- **If ANY input field is nullable**, the output is **nullable** + +This default behavior works well for most aggregate functions like `MIN`, `MAX`, `SUM`, and `AVG`. + +## Implementing Custom Aggregate UDFs + +### Default Behavior (Recommended) + +For most aggregate functions, you don't need to override `return_field()`. The default implementation will correctly infer nullability from inputs: + +```rust +use std::sync::Arc; +use std::any::Any; +use arrow::datatypes::{DataType, Field, FieldRef}; +use datafusion_common::Result; +use datafusion_expr::{AggregateUDFImpl, Signature, Volatility}; +use datafusion_functions_aggregate_common::accumulator::AccumulatorArgs; + +#[derive(Debug)] +struct MyAggregateFunction { + signature: Signature, +} + +impl MyAggregateFunction { + fn new() -> Self { + Self { + signature: Signature::uniform( + 1, + vec![DataType::Float64], + Volatility::Immutable, + ), + } + } +} + +impl AggregateUDFImpl for MyAggregateFunction { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "my_agg" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Float64) + } + + // Don't override return_field() - let the default handle nullability + // The default will make the output nullable if any input is nullable + + fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { + // Implementation... + # unimplemented!() + } + + // ... other required methods +} +``` + +### Custom Nullability (Advanced) + +If your function has special nullability semantics, you can override `return_field()`: + +```rust +use std::sync::Arc; +use arrow::datatypes::Field; + +impl AggregateUDFImpl for MyAggregateFunction { + // ... other methods ... + + fn return_field(&self, arg_fields: &[FieldRef]) -> Result { + let arg_types: Vec<_> = arg_fields + .iter() + .map(|f| f.data_type()) + .cloned() + .collect(); + let data_type = self.return_type(&arg_types)?; + + // Example: COUNT always returns non-nullable i64, regardless of input + let is_nullable = false; // COUNT never returns NULL + + Ok(Arc::new(Field::new(self.name(), data_type, is_nullable))) + } +} +``` + +## Migration Guide + +If you have existing code that uses `is_nullable()`, here's how to migrate: + +### Before (Deprecated) + +```rust +impl AggregateUDFImpl for MyFunction { + fn is_nullable(&self) -> bool { + // Only returns true or false, independent of inputs + true + } +} +``` + +### After (Recommended) + +**Option 1: Use the default (simplest)** + +```rust +impl AggregateUDFImpl for MyFunction { + // Remove is_nullable() entirely + // The default return_field() will compute nullability from inputs +} +``` + +**Option 2: Override return_field() for custom logic** + +```rust +impl AggregateUDFImpl for MyFunction { + fn return_field(&self, arg_fields: &[FieldRef]) -> Result { + let arg_types: Vec<_> = arg_fields + .iter() + .map(|f| f.data_type()) + .cloned() + .collect(); + let data_type = self.return_type(&arg_types)?; + + // Your custom nullability logic here + let is_nullable = arg_fields.iter().any(|f| f.is_nullable()); + + Ok(Arc::new(Field::new(self.name(), data_type, is_nullable))) + } +} +``` + +## Examples + +### Example 1: MIN Function + +`MIN` returns the smallest value from a group. It should be nullable if and only if its input is nullable: + +```rust +impl AggregateUDFImpl for MinFunction { + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + Ok(arg_types[0].clone()) + } + + // No need to override return_field() - the default handles it correctly: + // - If input is nullable, output is nullable + // - If input is non-nullable, output is non-nullable + + fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { + // Implementation... + } +} +``` + +### Example 2: COUNT Function + +`COUNT` always returns a non-nullable integer, regardless of input nullability: + +```rust +impl AggregateUDFImpl for CountFunction { + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Int64) + } + + // Override return_field to always return non-nullable + fn return_field(&self, _arg_fields: &[FieldRef]) -> Result { + Ok(Arc::new(Field::new(self.name(), DataType::Int64, false))) + } + + fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { + // Implementation... + } +} +``` + +## Deprecation Timeline + +- **Deprecated in v42.0.0**: The `is_nullable()` method in `AggregateUDFImpl` is now deprecated +- **Will be removed in a future version**: `is_nullable()` will be removed entirely + +During the deprecation period, both `is_nullable()` and the new `return_field()` approach work, but new code should use `return_field()`. + +## Troubleshooting + +### Issue: "Function `X` uses deprecated `is_nullable()`" + +**Solution**: Remove the `is_nullable()` implementation and let the default `return_field()` handle nullability, or override `return_field()` directly. + +### Issue: "Output field nullability is incorrect" + +**Check**: +1. Are your input fields correctly marked as nullable/non-nullable? +2. Does your function need custom nullability logic? If so, override `return_field()`. + +### Issue: "Tests fail with null value where non-null expected" + +**Check**: +1. Verify that your function's accumulator actually returns a non-null default value when the input is empty and your function declares non-nullable output +2. Override `return_field()` to adjust the nullability if needed + +## See Also + +- [Adding User Defined Functions](adding-udfs.md) - General guide to implementing UDFs +- [Scalar UDF Nullability](#) - Similar concepts for scalar UDFs (which already use `return_field_from_args()`) From 1e487fbdcee8643a7ed396f679de586731669c75 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Fri, 9 Jan 2026 22:00:02 +0530 Subject: [PATCH 06/13] Update datafusion/expr/src/udaf.rs Co-authored-by: Martin Grigorov --- datafusion/expr/src/udaf.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index 395c6aa3ec1ad..eefbc199a4757 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -1515,7 +1515,7 @@ mod test { use arrow::datatypes::Field; use std::sync::Arc; - let a = AggregateUDF::from(AMeanUdf::new()); + let udf = AggregateUDF::from(AMeanUdf::new()); let nullable_field = Arc::new(Field::new("col", DataType::Float64, true)); let return_field = a.return_field(&[nullable_field]).unwrap(); From 26e22611f8f2e7146df5f7b0323df7e294632cd8 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Fri, 9 Jan 2026 22:00:10 +0530 Subject: [PATCH 07/13] Update datafusion/expr/src/udaf.rs Co-authored-by: Martin Grigorov --- datafusion/expr/src/udaf.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index eefbc199a4757..959547799afcf 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -1485,7 +1485,7 @@ mod test { // Create a non-nullable input field let non_nullable_field = Arc::new(Field::new("col", DataType::Float64, false)); - let return_field = a.return_field(&[non_nullable_field]).unwrap(); + let return_field = udf.return_field(&[non_nullable_field]).unwrap(); // When input is non-nullable, output should also be non-nullable assert!(!return_field.is_nullable()); From fa355e88a40fe17ca63369d2c93ceb5d23202620 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Fri, 9 Jan 2026 22:00:50 +0530 Subject: [PATCH 08/13] Update datafusion/functions-aggregate/src/percentile_cont.rs Co-authored-by: Martin Grigorov --- datafusion/functions-aggregate/src/percentile_cont.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/functions-aggregate/src/percentile_cont.rs b/datafusion/functions-aggregate/src/percentile_cont.rs index f60f1aa9f78c2..a9bd4744d23ec 100644 --- a/datafusion/functions-aggregate/src/percentile_cont.rs +++ b/datafusion/functions-aggregate/src/percentile_cont.rs @@ -437,6 +437,9 @@ impl Accumulator for PercentileContAccumulator { } fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + if values.is_empty() { + return Ok(()); + } let mut to_remove: HashMap = HashMap::new(); for i in 0..values[0].len() { let v = ScalarValue::try_from_array(&values[0], i)?; From 5bfae7a5dc6af8f7474da2544e37d64f1909d319 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Fri, 9 Jan 2026 22:21:11 +0530 Subject: [PATCH 09/13] Update datafusion/expr/src/udaf.rs Co-authored-by: Martin Grigorov --- datafusion/expr/src/udaf.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index 959547799afcf..528e83aebd968 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -1469,7 +1469,7 @@ mod test { // Create a nullable input field let nullable_field = Arc::new(Field::new("col", DataType::Float64, true)); - let return_field = a.return_field(&[nullable_field]).unwrap(); + let return_field = udf.return_field(&[nullable_field]).unwrap(); // When input is nullable, output should be nullable assert!(return_field.is_nullable()); From 5a220b6b95cfe29a0636ff071a7c47659beaa3cd Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Fri, 9 Jan 2026 22:21:21 +0530 Subject: [PATCH 10/13] Update datafusion/expr/src/udaf.rs Co-authored-by: Martin Grigorov --- datafusion/expr/src/udaf.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index 528e83aebd968..d528b5c44a3ad 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -1518,7 +1518,7 @@ mod test { let udf = AggregateUDF::from(AMeanUdf::new()); let nullable_field = Arc::new(Field::new("col", DataType::Float64, true)); - let return_field = a.return_field(&[nullable_field]).unwrap(); + let return_field = udf.return_field(&[nullable_field]).unwrap(); // Verify data type is preserved assert_eq!(*return_field.data_type(), DataType::Float64); From e928c231cb3f23fa4ec7d2839a418aed84cb83cd Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Fri, 9 Jan 2026 22:25:59 +0530 Subject: [PATCH 11/13] Update datafusion/expr/src/udaf.rs Co-authored-by: Martin Grigorov --- datafusion/expr/src/udaf.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index d528b5c44a3ad..ddb72b23d8433 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -1481,7 +1481,7 @@ mod test { use arrow::datatypes::Field; use std::sync::Arc; - let a = AggregateUDF::from(AMeanUdf::new()); + let udf = AggregateUDF::from(AMeanUdf::new()); // Create a non-nullable input field let non_nullable_field = Arc::new(Field::new("col", DataType::Float64, false)); From 9590099b79ccd7b799e17d764d6bab5bc457bb01 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Fri, 9 Jan 2026 22:26:13 +0530 Subject: [PATCH 12/13] Update datafusion/expr/src/udaf.rs Co-authored-by: Martin Grigorov --- datafusion/expr/src/udaf.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index ddb72b23d8433..57f9e9ae0a44f 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -1465,7 +1465,7 @@ mod test { use arrow::datatypes::Field; use std::sync::Arc; - let a = AggregateUDF::from(AMeanUdf::new()); + let udf = AggregateUDF::from(AMeanUdf::new()); // Create a nullable input field let nullable_field = Arc::new(Field::new("col", DataType::Float64, true)); From f263d9133800e394e0ebf206450caaa86a093960 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Fri, 9 Jan 2026 22:26:21 +0530 Subject: [PATCH 13/13] Update datafusion/expr/src/udaf.rs Co-authored-by: Martin Grigorov --- datafusion/expr/src/udaf.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index 57f9e9ae0a44f..599f481a5e986 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -552,7 +552,7 @@ pub trait AggregateUDFImpl: Debug + DynEq + DynHash + Send + Sync { /// } /// ``` #[deprecated( - since = "42.0.0", + since = "52.0.0", note = "Use `return_field` to specify nullability instead of `is_nullable`" )] fn is_nullable(&self) -> bool {