From af2e94fcef9d0878824c60b4980f5c8f4b1f638e Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Sat, 3 Jan 2026 13:57:36 +0530 Subject: [PATCH 1/5] fix(accumulators): preserve state in evaluate() for window frame queries This commit fixes issue #19612 where accumulators that don't implement retract_batch exhibit buggy behavior in window frame queries. When aggregate functions are used with window frames like `ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW`, DataFusion uses PlainAggregateWindowExpr which calls evaluate() multiple times on the same accumulator instance. Accumulators that use std::mem::take() in their evaluate() method consume their internal state, causing incorrect results on subsequent calls. 1. **percentile_cont**: Modified evaluate() to use mutable reference instead of consuming the Vec. Added retract_batch() support for both PercentileContAccumulator and DistinctPercentileContAccumulator. 2. **string_agg**: Changed SimpleStringAggAccumulator::evaluate() to clone the accumulated string instead of taking it. - datafusion/functions-aggregate/src/percentile_cont.rs: - Changed calculate_percentile() to take &mut [T::Native] instead of Vec - Updated PercentileContAccumulator::evaluate() to pass reference - Updated DistinctPercentileContAccumulator::evaluate() to clone values - Added retract_batch() implementation using HashMap for efficient removal - Updated PercentileContGroupsAccumulator::evaluate() for consistency - datafusion/functions-aggregate/src/string_agg.rs: - Changed evaluate() to use clone() instead of std::mem::take() - datafusion/sqllogictest/test_files/aggregate.slt: - Added test cases for percentile_cont with window frames - Added test comparing median() vs percentile_cont(0.5) behavior - Added test for string_agg cumulative window frame - docs/source/library-user-guide/functions/adding-udfs.md: - Added documentation about window-compatible accumulators - Explained evaluate() state preservation requirements - Documented retract_batch() implementation guidance Closes #19612 --- .../src/percentile_cont.rs | 84 +++++++++-- .../functions-aggregate/src/string_agg.rs | 13 +- .../sqllogictest/test_files/aggregate.slt | 134 ++++++++++++++++++ .../functions/adding-udfs.md | 65 +++++++++ 4 files changed, 277 insertions(+), 19 deletions(-) diff --git a/datafusion/functions-aggregate/src/percentile_cont.rs b/datafusion/functions-aggregate/src/percentile_cont.rs index a4e8332626b00..6280af4035ffd 100644 --- a/datafusion/functions-aggregate/src/percentile_cont.rs +++ b/datafusion/functions-aggregate/src/percentile_cont.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::fmt::Debug; use std::mem::{size_of, size_of_val}; use std::sync::Arc; @@ -52,7 +53,7 @@ use datafusion_expr::{ }; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::accumulate; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::filtered_null_mask; -use datafusion_functions_aggregate_common::utils::GenericDistinctBuffer; +use datafusion_functions_aggregate_common::utils::{GenericDistinctBuffer, Hashable}; use datafusion_macros::user_doc; use crate::utils::validate_percentile_expr; @@ -427,14 +428,55 @@ impl Accumulator for PercentileContAccumulator { } fn evaluate(&mut self) -> Result { - let d = std::mem::take(&mut self.all_values); - let value = calculate_percentile::(d, self.percentile); + let value = calculate_percentile::(&mut self.all_values, self.percentile); ScalarValue::new_primitive::(value, &T::DATA_TYPE) } fn size(&self) -> usize { size_of_val(self) + self.all_values.capacity() * size_of::() } + + fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + // Cast to target type if needed (e.g., integer to Float64) + let values = if values[0].data_type() != &T::DATA_TYPE { + arrow::compute::cast(&values[0], &T::DATA_TYPE)? + } else { + Arc::clone(&values[0]) + }; + + let mut to_remove: HashMap = HashMap::new(); + for i in 0..values.len() { + let v = ScalarValue::try_from_array(&values, i)?; + if !v.is_null() { + *to_remove.entry(v).or_default() += 1; + } + } + + let mut i = 0; + while i < self.all_values.len() { + let k = + ScalarValue::new_primitive::(Some(self.all_values[i]), &T::DATA_TYPE)?; + if let Some(count) = to_remove.get_mut(&k) + && *count > 0 + { + self.all_values.swap_remove(i); + *count -= 1; + if *count == 0 { + to_remove.remove(&k); + if to_remove.is_empty() { + break; + } + } + } else { + i += 1; + } + } + Ok(()) + } + + fn supports_retract_batch(&self) -> bool { + true + } } /// The percentile_cont groups accumulator accumulates the raw input values @@ -549,13 +591,13 @@ impl GroupsAccumulator fn evaluate(&mut self, emit_to: EmitTo) -> Result { // Emit values - let emit_group_values = emit_to.take_needed(&mut self.group_values); + let mut emit_group_values = emit_to.take_needed(&mut self.group_values); // Calculate percentile for each group let mut evaluate_result_builder = PrimitiveBuilder::::with_capacity(emit_group_values.len()); - for values in emit_group_values { - let value = calculate_percentile::(values, self.percentile); + for values in &mut emit_group_values { + let value = calculate_percentile::(values.as_mut_slice(), self.percentile); evaluate_result_builder.append_option(value); } @@ -652,17 +694,31 @@ impl Accumulator for DistinctPercentileContAccumula } fn evaluate(&mut self) -> Result { - let d = std::mem::take(&mut self.distinct_values.values) - .into_iter() - .map(|v| v.0) - .collect::>(); - let value = calculate_percentile::(d, self.percentile); + let mut values: Vec = + self.distinct_values.values.iter().map(|v| v.0).collect(); + let value = calculate_percentile::(&mut values, self.percentile); ScalarValue::new_primitive::(value, &T::DATA_TYPE) } fn size(&self) -> usize { size_of_val(self) + self.distinct_values.size() } + + fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + if values.is_empty() { + return Ok(()); + } + + let arr = values[0].as_primitive::(); + for value in arr.iter().flatten() { + self.distinct_values.values.remove(&Hashable(value)); + } + Ok(()) + } + + fn supports_retract_batch(&self) -> bool { + true + } } /// Calculate the percentile value for a given set of values. @@ -672,8 +728,12 @@ impl Accumulator for DistinctPercentileContAccumula /// For percentile p and n values: /// - If p * (n-1) is an integer, return the value at that position /// - Otherwise, interpolate between the two closest values +/// +/// Note: This function takes a mutable slice and sorts it in place, but does not +/// consume the data. This is important for window frame queries where evaluate() +/// may be called multiple times on the same accumulator state. fn calculate_percentile( - mut values: Vec, + values: &mut [T::Native], percentile: f64, ) -> Option { let cmp = |x: &T::Native, y: &T::Native| x.compare(*y); diff --git a/datafusion/functions-aggregate/src/string_agg.rs b/datafusion/functions-aggregate/src/string_agg.rs index 77e9f60afd3cf..1c10818c091db 100644 --- a/datafusion/functions-aggregate/src/string_agg.rs +++ b/datafusion/functions-aggregate/src/string_agg.rs @@ -384,14 +384,13 @@ impl Accumulator for SimpleStringAggAccumulator { } fn evaluate(&mut self) -> Result { - let result = if self.has_value { - ScalarValue::LargeUtf8(Some(std::mem::take(&mut self.accumulated_string))) + if self.has_value { + Ok(ScalarValue::LargeUtf8(Some( + self.accumulated_string.clone(), + ))) } else { - ScalarValue::LargeUtf8(None) - }; - - self.has_value = false; - Ok(result) + Ok(ScalarValue::LargeUtf8(None)) + } } fn size(&self) -> usize { diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index f6ce68917e03b..02429bfb81bae 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -8246,3 +8246,137 @@ query R select percentile_cont(null, 0.5); ---- NULL + +########### +# Issue #19612: Test that percentile_cont and median produce identical results +# in window frame queries with ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW. +# Previously percentile_cont consumed its internal state during evaluate(), +# causing incorrect results when called multiple times in window queries. +########### + +# Test percentile_cont window frame behavior (fix for issue #19612) +statement ok +CREATE TABLE percentile_window_test ( + timestamp INT, + tags VARCHAR, + value DOUBLE +); + +statement ok +INSERT INTO percentile_window_test (timestamp, tags, value) VALUES +(1, 'tag1', 10.0), +(2, 'tag1', 20.0), +(3, 'tag1', 30.0), +(4, 'tag1', 40.0), +(5, 'tag1', 50.0), +(1, 'tag2', 60.0), +(2, 'tag2', 70.0), +(3, 'tag2', 80.0), +(4, 'tag2', 90.0), +(5, 'tag2', 100.0); + +# Test that median and percentile_cont(0.5) produce the same results +# with ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW frame. +# Both functions should maintain state correctly across multiple evaluate() calls. +query ITRRR +SELECT + timestamp, + tags, + value, + median(value) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS value_median, + percentile_cont(value, 0.5) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS value_percentile_50 +FROM percentile_window_test +ORDER BY tags, timestamp; +---- +1 tag1 10 10 10 +2 tag1 20 15 15 +3 tag1 30 20 20 +4 tag1 40 25 25 +5 tag1 50 30 30 +1 tag2 60 60 60 +2 tag2 70 65 65 +3 tag2 80 70 70 +4 tag2 90 75 75 +5 tag2 100 80 80 + +# Test percentile_cont with different percentile values +query ITRRR +SELECT + timestamp, + tags, + value, + percentile_cont(value, 0.25) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS p25, + percentile_cont(value, 0.75) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS p75 +FROM percentile_window_test +ORDER BY tags, timestamp; +---- +1 tag1 10 10 10 +2 tag1 20 12.5 17.5 +3 tag1 30 15 25 +4 tag1 40 17.5 32.5 +5 tag1 50 20 40 +1 tag2 60 60 60 +2 tag2 70 62.5 67.5 +3 tag2 80 65 75 +4 tag2 90 67.5 82.5 +5 tag2 100 70 90 + +statement ok +DROP TABLE percentile_window_test; + +# Test string_agg window frame behavior (fix for issue #19612) +statement ok +CREATE TABLE string_agg_window_test ( + id INT, + grp VARCHAR, + val VARCHAR +); + +statement ok +INSERT INTO string_agg_window_test (id, grp, val) VALUES +(1, 'A', 'a'), +(2, 'A', 'b'), +(3, 'A', 'c'), +(1, 'B', 'x'), +(2, 'B', 'y'), +(3, 'B', 'z'); + +# Test string_agg with ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW +# The function should maintain state correctly across multiple evaluate() calls +query ITT +SELECT + id, + grp, + string_agg(val, ',') OVER ( + PARTITION BY grp + ORDER BY id + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS cumulative_string +FROM string_agg_window_test +ORDER BY grp, id; +---- +1 A a +2 A a,b +3 A a,b,c +1 B x +2 B x,y +3 B x,y,z + +statement ok +DROP TABLE string_agg_window_test; diff --git a/docs/source/library-user-guide/functions/adding-udfs.md b/docs/source/library-user-guide/functions/adding-udfs.md index 5d033ae3f9e97..0c16a1fdf9314 100644 --- a/docs/source/library-user-guide/functions/adding-udfs.md +++ b/docs/source/library-user-guide/functions/adding-udfs.md @@ -1350,6 +1350,71 @@ async fn main() -> Result<()> { [`create_udaf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/fn.create_udaf.html [`advanced_udaf.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/udf/advanced_udaf.rs +### Window Frame Compatible Accumulators + +When an aggregate function is used in a window context with a sliding frame (e.g., `ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW`), +DataFusion may call `evaluate()` multiple times on the same accumulator instance to compute results for each row in the window. +This has important implications for how you implement your accumulator: + +#### The `evaluate()` Method Must Not Consume State + +The `evaluate()` method should return the current aggregate value **without modifying or consuming the accumulator's internal state**. +This is critical because: + +1. **Multiple evaluations**: For window queries, `evaluate()` is called once per row in the partition +2. **State preservation**: The internal state must remain intact for subsequent `evaluate()` calls + +**Incorrect implementation** (consumes state): + +```rust +fn evaluate(&mut self) -> Result { + // BAD: std::mem::take() consumes the values, leaving an empty Vec + let values = std::mem::take(&mut self.values); + // After this call, self.values is empty and subsequent + // evaluate() calls will return incorrect results + calculate_result(values) +} +``` + +**Correct implementation** (preserves state): + +```rust +fn evaluate(&mut self) -> Result { + // GOOD: Use a reference or clone to preserve state + calculate_result(&mut self.values) + // Or: calculate_result(self.values.clone()) +} +``` + +#### Implementing `retract_batch` for Sliding Windows + +For more efficient sliding window calculations, you can implement the `retract_batch` method. +This allows DataFusion to remove values that have "left" the window frame instead of recalculating from scratch: + +```rust +impl Accumulator for MyAccumulator { + fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + // Remove the given values from the accumulator state + // This is the inverse of update_batch + for value in values[0].iter().flatten() { + self.remove_value(value); + } + Ok(()) + } + + fn supports_retract_batch(&self) -> bool { + true // Enable this optimization + } +} +``` + +If your accumulator does not support `retract_batch` (returns `false` from `supports_retract_batch()`), +DataFusion will use `PlainAggregateWindowExpr` which calls `evaluate()` multiple times on the same +accumulator. In this case, it is **essential** that your `evaluate()` method does not consume the +accumulator's state. + +See [issue #19612](https://github.com/apache/datafusion/issues/19612) for more details on this behavior. + ## Adding a Table UDF A User-Defined Table Function (UDTF) is a function that takes parameters and returns a `TableProvider`. From c5fe87bd7ef5e2b3da5a0f29ba7c95a065e9e4a9 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Sat, 3 Jan 2026 16:17:04 +0530 Subject: [PATCH 2/5] fix: format code and mark doc examples as ignore for doctest --- docs/source/library-user-guide/functions/adding-udfs.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/source/library-user-guide/functions/adding-udfs.md b/docs/source/library-user-guide/functions/adding-udfs.md index 0c16a1fdf9314..63ff0d16dde64 100644 --- a/docs/source/library-user-guide/functions/adding-udfs.md +++ b/docs/source/library-user-guide/functions/adding-udfs.md @@ -1366,7 +1366,7 @@ This is critical because: **Incorrect implementation** (consumes state): -```rust +```rust,ignore fn evaluate(&mut self) -> Result { // BAD: std::mem::take() consumes the values, leaving an empty Vec let values = std::mem::take(&mut self.values); @@ -1378,7 +1378,7 @@ fn evaluate(&mut self) -> Result { **Correct implementation** (preserves state): -```rust +```rust,ignore fn evaluate(&mut self) -> Result { // GOOD: Use a reference or clone to preserve state calculate_result(&mut self.values) @@ -1391,7 +1391,7 @@ fn evaluate(&mut self) -> Result { For more efficient sliding window calculations, you can implement the `retract_batch` method. This allows DataFusion to remove values that have "left" the window frame instead of recalculating from scratch: -```rust +```rust,ignore impl Accumulator for MyAccumulator { fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { // Remove the given values from the accumulator state From 3d4eeeea318a77d9a646ba91447d7d482317837c Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Tue, 6 Jan 2026 13:08:04 +0530 Subject: [PATCH 3/5] address review feedback: remove cast code, consolidate tests, update docstring --- datafusion/expr-common/src/accumulator.rs | 30 ++- .../src/percentile_cont.rs | 11 +- .../sqllogictest/test_files/aggregate.slt | 209 ++++++++++-------- .../functions/adding-udfs.md | 65 ------ 4 files changed, 143 insertions(+), 172 deletions(-) diff --git a/datafusion/expr-common/src/accumulator.rs b/datafusion/expr-common/src/accumulator.rs index fc4e90114beea..72b0b24e1a40f 100644 --- a/datafusion/expr-common/src/accumulator.rs +++ b/datafusion/expr-common/src/accumulator.rs @@ -58,17 +58,37 @@ pub trait Accumulator: Send + Sync + Debug { /// running sum. fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>; - /// Returns the final aggregate value, consuming the internal state. + /// Returns the final aggregate value. /// /// For example, the `SUM` accumulator maintains a running sum, /// and `evaluate` will produce that running sum as its output. /// - /// This function should not be called twice, otherwise it will - /// result in potentially non-deterministic behavior. - /// /// This function gets `&mut self` to allow for the accumulator to build /// arrow-compatible internal state that can be returned without copying - /// when possible (for example distinct strings) + /// when possible (for example distinct strings). + /// + /// # Window Frame Queries + /// + /// When used in a window context without [`Self::supports_retract_batch`], + /// `evaluate()` may be called multiple times on the same accumulator instance + /// (once per row in the partition). In this case, implementations **must not** + /// consume or modify internal state. Use references or clones to preserve state: + /// + /// ```ignore + /// // GOOD: Preserves state for subsequent calls + /// fn evaluate(&mut self) -> Result { + /// calculate_result(&self.values) // Use reference + /// } + /// + /// // BAD: Consumes state, breaks window queries + /// fn evaluate(&mut self) -> Result { + /// calculate_result(std::mem::take(&mut self.values)) + /// } + /// ``` + /// + /// For efficient sliding window calculations, consider implementing + /// [`Self::retract_batch`] which allows DataFusion to incrementally + /// update state rather than calling `evaluate()` repeatedly. fn evaluate(&mut self) -> Result; /// Returns the allocated size required for this accumulator, in diff --git a/datafusion/functions-aggregate/src/percentile_cont.rs b/datafusion/functions-aggregate/src/percentile_cont.rs index 6280af4035ffd..f60f1aa9f78c2 100644 --- a/datafusion/functions-aggregate/src/percentile_cont.rs +++ b/datafusion/functions-aggregate/src/percentile_cont.rs @@ -437,16 +437,9 @@ impl Accumulator for PercentileContAccumulator { } fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - // Cast to target type if needed (e.g., integer to Float64) - let values = if values[0].data_type() != &T::DATA_TYPE { - arrow::compute::cast(&values[0], &T::DATA_TYPE)? - } else { - Arc::clone(&values[0]) - }; - let mut to_remove: HashMap = HashMap::new(); - for i in 0..values.len() { - let v = ScalarValue::try_from_array(&values, i)?; + for i in 0..values[0].len() { + let v = ScalarValue::try_from_array(&values[0], i)?; if !v.is_null() { *to_remove.entry(v).or_default() += 1; } diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 02429bfb81bae..c3af5f209f619 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -1126,6 +1126,102 @@ ORDER BY tags, timestamp; 4 tag2 90 75 80 95 5 tag2 100 80 80 100 +########### +# Issue #19612: Test that percentile_cont produces correct results +# in window frame queries. Previously percentile_cont consumed its internal state +# during evaluate(), causing incorrect results when called multiple times. +########### + +# Test percentile_cont sliding window (same as median) +query ITRR +SELECT + timestamp, + tags, + value, + percentile_cont(value, 0.5) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING + ) AS value_percentile_50 +FROM median_window_test +ORDER BY tags, timestamp; +---- +1 tag1 10 15 +2 tag1 20 20 +3 tag1 30 30 +4 tag1 40 40 +5 tag1 50 45 +1 tag2 60 65 +2 tag2 70 70 +3 tag2 80 80 +4 tag2 90 90 +5 tag2 100 95 + +# Test percentile_cont non-sliding window +query ITRRRR +SELECT + timestamp, + tags, + value, + percentile_cont(value, 0.5) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS value_percentile_unbounded_preceding, + percentile_cont(value, 0.5) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS value_percentile_unbounded_both, + percentile_cont(value, 0.5) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING + ) AS value_percentile_unbounded_following +FROM median_window_test +ORDER BY tags, timestamp; +---- +1 tag1 10 10 30 30 +2 tag1 20 15 30 35 +3 tag1 30 20 30 40 +4 tag1 40 25 30 45 +5 tag1 50 30 30 50 +1 tag2 60 60 80 80 +2 tag2 70 65 80 85 +3 tag2 80 70 80 90 +4 tag2 90 75 80 95 +5 tag2 100 80 80 100 + +# Test percentile_cont with different percentile values +query ITRRR +SELECT + timestamp, + tags, + value, + percentile_cont(value, 0.25) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS p25, + percentile_cont(value, 0.75) OVER ( + PARTITION BY tags + ORDER BY timestamp + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS p75 +FROM median_window_test +ORDER BY tags, timestamp; +---- +1 tag1 10 10 10 +2 tag1 20 12.5 17.5 +3 tag1 30 15 25 +4 tag1 40 17.5 32.5 +5 tag1 50 20 40 +1 tag2 60 60 60 +2 tag2 70 62.5 67.5 +3 tag2 80 65 75 +4 tag2 90 67.5 82.5 +5 tag2 100 70 90 + statement ok DROP TABLE median_window_test; @@ -8247,99 +8343,6 @@ select percentile_cont(null, 0.5); ---- NULL -########### -# Issue #19612: Test that percentile_cont and median produce identical results -# in window frame queries with ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW. -# Previously percentile_cont consumed its internal state during evaluate(), -# causing incorrect results when called multiple times in window queries. -########### - -# Test percentile_cont window frame behavior (fix for issue #19612) -statement ok -CREATE TABLE percentile_window_test ( - timestamp INT, - tags VARCHAR, - value DOUBLE -); - -statement ok -INSERT INTO percentile_window_test (timestamp, tags, value) VALUES -(1, 'tag1', 10.0), -(2, 'tag1', 20.0), -(3, 'tag1', 30.0), -(4, 'tag1', 40.0), -(5, 'tag1', 50.0), -(1, 'tag2', 60.0), -(2, 'tag2', 70.0), -(3, 'tag2', 80.0), -(4, 'tag2', 90.0), -(5, 'tag2', 100.0); - -# Test that median and percentile_cont(0.5) produce the same results -# with ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW frame. -# Both functions should maintain state correctly across multiple evaluate() calls. -query ITRRR -SELECT - timestamp, - tags, - value, - median(value) OVER ( - PARTITION BY tags - ORDER BY timestamp - ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW - ) AS value_median, - percentile_cont(value, 0.5) OVER ( - PARTITION BY tags - ORDER BY timestamp - ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW - ) AS value_percentile_50 -FROM percentile_window_test -ORDER BY tags, timestamp; ----- -1 tag1 10 10 10 -2 tag1 20 15 15 -3 tag1 30 20 20 -4 tag1 40 25 25 -5 tag1 50 30 30 -1 tag2 60 60 60 -2 tag2 70 65 65 -3 tag2 80 70 70 -4 tag2 90 75 75 -5 tag2 100 80 80 - -# Test percentile_cont with different percentile values -query ITRRR -SELECT - timestamp, - tags, - value, - percentile_cont(value, 0.25) OVER ( - PARTITION BY tags - ORDER BY timestamp - ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW - ) AS p25, - percentile_cont(value, 0.75) OVER ( - PARTITION BY tags - ORDER BY timestamp - ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW - ) AS p75 -FROM percentile_window_test -ORDER BY tags, timestamp; ----- -1 tag1 10 10 10 -2 tag1 20 12.5 17.5 -3 tag1 30 15 25 -4 tag1 40 17.5 32.5 -5 tag1 50 20 40 -1 tag2 60 60 60 -2 tag2 70 62.5 67.5 -3 tag2 80 65 75 -4 tag2 90 67.5 82.5 -5 tag2 100 70 90 - -statement ok -DROP TABLE percentile_window_test; - # Test string_agg window frame behavior (fix for issue #19612) statement ok CREATE TABLE string_agg_window_test ( @@ -8378,5 +8381,25 @@ ORDER BY grp, id; 2 B x,y 3 B x,y,z +# Test string_agg with sliding window (ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) +query ITT +SELECT + id, + grp, + string_agg(val, ',') OVER ( + PARTITION BY grp + ORDER BY id + ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING + ) AS sliding_string +FROM string_agg_window_test +ORDER BY grp, id; +---- +1 A a,b +2 A a,b,c +3 A b,c +1 B x,y +2 B x,y,z +3 B y,z + statement ok DROP TABLE string_agg_window_test; diff --git a/docs/source/library-user-guide/functions/adding-udfs.md b/docs/source/library-user-guide/functions/adding-udfs.md index 63ff0d16dde64..5d033ae3f9e97 100644 --- a/docs/source/library-user-guide/functions/adding-udfs.md +++ b/docs/source/library-user-guide/functions/adding-udfs.md @@ -1350,71 +1350,6 @@ async fn main() -> Result<()> { [`create_udaf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/fn.create_udaf.html [`advanced_udaf.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/udf/advanced_udaf.rs -### Window Frame Compatible Accumulators - -When an aggregate function is used in a window context with a sliding frame (e.g., `ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW`), -DataFusion may call `evaluate()` multiple times on the same accumulator instance to compute results for each row in the window. -This has important implications for how you implement your accumulator: - -#### The `evaluate()` Method Must Not Consume State - -The `evaluate()` method should return the current aggregate value **without modifying or consuming the accumulator's internal state**. -This is critical because: - -1. **Multiple evaluations**: For window queries, `evaluate()` is called once per row in the partition -2. **State preservation**: The internal state must remain intact for subsequent `evaluate()` calls - -**Incorrect implementation** (consumes state): - -```rust,ignore -fn evaluate(&mut self) -> Result { - // BAD: std::mem::take() consumes the values, leaving an empty Vec - let values = std::mem::take(&mut self.values); - // After this call, self.values is empty and subsequent - // evaluate() calls will return incorrect results - calculate_result(values) -} -``` - -**Correct implementation** (preserves state): - -```rust,ignore -fn evaluate(&mut self) -> Result { - // GOOD: Use a reference or clone to preserve state - calculate_result(&mut self.values) - // Or: calculate_result(self.values.clone()) -} -``` - -#### Implementing `retract_batch` for Sliding Windows - -For more efficient sliding window calculations, you can implement the `retract_batch` method. -This allows DataFusion to remove values that have "left" the window frame instead of recalculating from scratch: - -```rust,ignore -impl Accumulator for MyAccumulator { - fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - // Remove the given values from the accumulator state - // This is the inverse of update_batch - for value in values[0].iter().flatten() { - self.remove_value(value); - } - Ok(()) - } - - fn supports_retract_batch(&self) -> bool { - true // Enable this optimization - } -} -``` - -If your accumulator does not support `retract_batch` (returns `false` from `supports_retract_batch()`), -DataFusion will use `PlainAggregateWindowExpr` which calls `evaluate()` multiple times on the same -accumulator. In this case, it is **essential** that your `evaluate()` method does not consume the -accumulator's state. - -See [issue #19612](https://github.com/apache/datafusion/issues/19612) for more details on this behavior. - ## Adding a Table UDF A User-Defined Table Function (UDTF) is a function that takes parameters and returns a `TableProvider`. From 3b1e6715329af1e805f2c6112c88833dbc44bbad Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Wed, 7 Jan 2026 01:36:02 +0530 Subject: [PATCH 4/5] fix: remove sliding window test for string_agg (no retract_batch support) --- .../sqllogictest/test_files/aggregate.slt | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index c3af5f209f619..a72e4f36ae084 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -8381,25 +8381,5 @@ ORDER BY grp, id; 2 B x,y 3 B x,y,z -# Test string_agg with sliding window (ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) -query ITT -SELECT - id, - grp, - string_agg(val, ',') OVER ( - PARTITION BY grp - ORDER BY id - ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING - ) AS sliding_string -FROM string_agg_window_test -ORDER BY grp, id; ----- -1 A a,b -2 A a,b,c -3 A b,c -1 B x,y -2 B x,y,z -3 B y,z - statement ok DROP TABLE string_agg_window_test; From a73d509535a07cccdd61a03bacbd4daf4af9a729 Mon Sep 17 00:00:00 2001 From: Ganesh Patil <7030871503ganeshpatil@gmail.com> Date: Fri, 9 Jan 2026 16:54:41 +0530 Subject: [PATCH 5/5] docs: update evaluate() docstring per reviewer feedback - Clarify that consuming internal state is the issue, not modifying it - Note that modifying state (e.g. reordering) is fine if it doesn't affect correctness - Clarify that retract_batch is for supporting more window frame types, not efficiency --- datafusion/expr-common/src/accumulator.rs | 35 +++++++++-------------- 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/datafusion/expr-common/src/accumulator.rs b/datafusion/expr-common/src/accumulator.rs index 72b0b24e1a40f..3acf110a0bfc7 100644 --- a/datafusion/expr-common/src/accumulator.rs +++ b/datafusion/expr-common/src/accumulator.rs @@ -67,28 +67,21 @@ pub trait Accumulator: Send + Sync + Debug { /// arrow-compatible internal state that can be returned without copying /// when possible (for example distinct strings). /// - /// # Window Frame Queries - /// - /// When used in a window context without [`Self::supports_retract_batch`], - /// `evaluate()` may be called multiple times on the same accumulator instance - /// (once per row in the partition). In this case, implementations **must not** - /// consume or modify internal state. Use references or clones to preserve state: - /// - /// ```ignore - /// // GOOD: Preserves state for subsequent calls - /// fn evaluate(&mut self) -> Result { - /// calculate_result(&self.values) // Use reference - /// } - /// - /// // BAD: Consumes state, breaks window queries - /// fn evaluate(&mut self) -> Result { - /// calculate_result(std::mem::take(&mut self.values)) - /// } - /// ``` + /// ## Correctness + /// + /// This function must not consume the internal state, as it is also used in window + /// aggregate functions where it can be executed multiple times depending on the + /// current window frame. Consuming the internal state can cause the next invocation + /// to have incorrect results. + /// + /// - Even if this accumulator doesn't implement [`retract_batch`] it may still be used + /// in window aggregate functions where the window frame is + /// `ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW` + /// + /// It is fine to modify the state (e.g. re-order elements within internal state vec) so long + /// as this doesn't cause an incorrect computation on the next call of evaluate. /// - /// For efficient sliding window calculations, consider implementing - /// [`Self::retract_batch`] which allows DataFusion to incrementally - /// update state rather than calling `evaluate()` repeatedly. + /// [`retract_batch`]: Self::retract_batch fn evaluate(&mut self) -> Result; /// Returns the allocated size required for this accumulator, in