From 4d1f8dd69b426bf84a27431f2ef7d7c4f4a04d70 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 29 Jan 2026 17:50:06 -0500 Subject: [PATCH 1/3] Consoliadate AggregateMode --- datafusion/core/src/physical_planner.rs | 44 +++-- .../enforce_distribution.rs | 7 +- .../partition_statistics.rs | 12 +- .../src/combine_partial_final_agg.rs | 21 ++- .../src/enforce_distribution.rs | 12 +- .../src/limited_distinct_aggregation.rs | 3 +- .../physical-plan/src/aggregates/mod.rs | 169 ++++++++++++------ .../src/aggregates/no_grouping.rs | 16 +- .../physical-plan/src/aggregates/row_hash.rs | 19 +- datafusion/proto/src/physical_plan/mod.rs | 49 +++-- 10 files changed, 215 insertions(+), 137 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index b1aa850284aee..dedd454b14924 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -36,7 +36,9 @@ use crate::logical_expr::{ UserDefinedLogicalNode, }; use crate::physical_expr::{create_physical_expr, create_physical_exprs}; -use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; +use crate::physical_plan::aggregates::{ + AggregateExec, AggregateInputPartitioning, AggregateMode, PhysicalGroupBy, +}; use crate::physical_plan::analyze::AnalyzeExec; use crate::physical_plan::explain::ExplainExec; use crate::physical_plan::filter::FilterExecBuilder; @@ -942,19 +944,19 @@ impl DefaultPhysicalPlanner { // `AggregateFunctionExpr`/`PhysicalSortExpr` objects. let updated_aggregates = initial_aggr.aggr_expr().to_vec(); - let next_partition_mode = if can_repartition { - // construct a second aggregation with 'AggregateMode::FinalPartitioned' - AggregateMode::FinalPartitioned + let next_partitioning = if can_repartition { + // construct a second aggregation with hash-partitioned input + AggregateInputPartitioning::HashPartitioned } else { - // construct a second aggregation, keeping the final column name equal to the - // first aggregation and the expressions corresponding to the respective aggregate - AggregateMode::Final + // construct a second aggregation with single-partition input + AggregateInputPartitioning::SinglePartition }; let final_grouping_set = initial_aggr.group_expr().as_final(); - Arc::new(AggregateExec::try_new( - next_partition_mode, + Arc::new(AggregateExec::try_new_with_partitioning( + AggregateMode::Final, + next_partitioning, final_grouping_set, updated_aggregates, filters, @@ -3436,9 +3438,11 @@ mod tests { let execution_plan = plan(&logical_plan).await?; let formatted = format!("{execution_plan:?}"); - // Make sure the plan contains a FinalPartitioned, which means it will not use the Final - // mode in Aggregate (which is slower) - assert!(formatted.contains("FinalPartitioned")); + // Make sure the plan uses hash-partitioned input for the final aggregate. + assert!( + formatted.contains("FinalPartitioned") + || formatted.contains("HashPartitioned") + ); Ok(()) } @@ -3467,9 +3471,11 @@ mod tests { let execution_plan = plan(&logical_plan).await?; let formatted = format!("{execution_plan:?}"); - // Make sure the plan contains a FinalPartitioned, which means it will not use the Final - // mode in Aggregate (which is slower) - assert!(formatted.contains("FinalPartitioned")); + // Make sure the plan uses hash-partitioned input for the final aggregate. + assert!( + formatted.contains("FinalPartitioned") + || formatted.contains("HashPartitioned") + ); Ok(()) } @@ -3488,9 +3494,11 @@ mod tests { let execution_plan = plan(&logical_plan).await?; let formatted = format!("{execution_plan:?}"); - // Make sure the plan contains a FinalPartitioned, which means it will not use the Final - // mode in Aggregate (which is slower) - assert!(formatted.contains("FinalPartitioned")); + // Make sure the plan uses hash-partitioned input for the final aggregate. + assert!( + formatted.contains("FinalPartitioned") + || formatted.contains("HashPartitioned") + ); Ok(()) } diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 94ae82a9ad755..5a417de734c8e 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -54,7 +54,7 @@ use datafusion_physical_optimizer::enforce_distribution::*; use datafusion_physical_optimizer::enforce_sorting::EnforceSorting; use datafusion_physical_optimizer::output_requirements::OutputRequirements; use datafusion_physical_plan::aggregates::{ - AggregateExec, AggregateMode, PhysicalGroupBy, + AggregateExec, AggregateInputPartitioning, AggregateMode, PhysicalGroupBy, }; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; @@ -328,8 +328,9 @@ fn aggregate_exec_with_alias( let final_grouping = PhysicalGroupBy::new_single(final_group_by_expr); Arc::new( - AggregateExec::try_new( - AggregateMode::FinalPartitioned, + AggregateExec::try_new_with_partitioning( + AggregateMode::Final, + AggregateInputPartitioning::HashPartitioned, final_grouping, vec![], vec![], diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index b33305c23ede2..fa8b48a0077ae 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -39,7 +39,7 @@ mod test { use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::aggregates::{ - AggregateExec, AggregateMode, PhysicalGroupBy, + AggregateExec, AggregateInputPartitioning, AggregateMode, PhysicalGroupBy, }; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::common::compute_record_batch_statistics; @@ -881,8 +881,9 @@ mod test { ) .await?; - let agg_final = Arc::new(AggregateExec::try_new( - AggregateMode::FinalPartitioned, + let agg_final = Arc::new(AggregateExec::try_new_with_partitioning( + AggregateMode::Final, + AggregateInputPartitioning::HashPartitioned, group_by.clone(), aggr_expr.clone(), vec![None], @@ -953,8 +954,9 @@ mod test { scan_schema.clone(), )?); - let agg_final = Arc::new(AggregateExec::try_new( - AggregateMode::FinalPartitioned, + let agg_final = Arc::new(AggregateExec::try_new_with_partitioning( + AggregateMode::Final, + AggregateInputPartitioning::HashPartitioned, group_by.clone(), aggr_expr.clone(), vec![None], diff --git a/datafusion/physical-optimizer/src/combine_partial_final_agg.rs b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs index 6d8e7995c18c2..d4869439a2b2f 100644 --- a/datafusion/physical-optimizer/src/combine_partial_final_agg.rs +++ b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use datafusion_common::error::Result; use datafusion_physical_plan::ExecutionPlan; use datafusion_physical_plan::aggregates::{ - AggregateExec, AggregateMode, PhysicalGroupBy, + AggregateExec, AggregateInputPartitioning, AggregateMode, PhysicalGroupBy, }; use crate::PhysicalOptimizerRule; @@ -58,10 +58,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate { return Ok(Transformed::no(plan)); }; - if !matches!( - agg_exec.mode(), - AggregateMode::Final | AggregateMode::FinalPartitioned - ) { + if !matches!(agg_exec.mode(), AggregateMode::Final) { return Ok(Transformed::no(plan)); } @@ -85,13 +82,15 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate { input_agg_exec.filter_expr(), ), ) { - let mode = if agg_exec.mode() == &AggregateMode::Final { - AggregateMode::Single - } else { - AggregateMode::SinglePartitioned + let input_partitioning = match agg_exec.input_partitioning() { + AggregateInputPartitioning::HashPartitioned => { + AggregateInputPartitioning::HashPartitioned + } + _ => AggregateInputPartitioning::SinglePartition, }; - AggregateExec::try_new( - mode, + AggregateExec::try_new_with_partitioning( + AggregateMode::Single, + input_partitioning, input_agg_exec.group_expr().clone(), input_agg_exec.aggr_expr().to_vec(), input_agg_exec.filter_expr().to_vec(), diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index acb1c588097ee..b177bce802bf4 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -44,7 +44,7 @@ use datafusion_physical_expr::{ }; use datafusion_physical_plan::ExecutionPlanProperties; use datafusion_physical_plan::aggregates::{ - AggregateExec, AggregateMode, PhysicalGroupBy, + AggregateExec, AggregateInputPartitioning, AggregateMode, PhysicalGroupBy, }; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::execution_plan::EmissionType; @@ -393,7 +393,10 @@ pub fn adjust_input_keys_ordering( .map(Transformed::yes); } else if let Some(aggregate_exec) = plan.as_any().downcast_ref::() { if !requirements.data.is_empty() { - if aggregate_exec.mode() == &AggregateMode::FinalPartitioned { + if aggregate_exec.mode() == &AggregateMode::Final + && aggregate_exec.input_partitioning() + == AggregateInputPartitioning::HashPartitioned + { return reorder_aggregate_keys(requirements, aggregate_exec) .map(Transformed::yes); } else { @@ -523,8 +526,9 @@ pub fn reorder_aggregate_keys( .map(|(idx, expr)| (expr, group_exprs[idx].1.clone())) .collect(), ); - let new_final_agg = Arc::new(AggregateExec::try_new( - AggregateMode::FinalPartitioned, + let new_final_agg = Arc::new(AggregateExec::try_new_with_partitioning( + AggregateMode::Final, + AggregateInputPartitioning::HashPartitioned, new_group_by, agg_exec.aggr_expr().to_vec(), agg_exec.filter_expr().to_vec(), diff --git a/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs b/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs index fe9636f67619b..ed067491e760d 100644 --- a/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs +++ b/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs @@ -54,8 +54,9 @@ impl LimitedDistinctAggregation { } // We found what we want: clone, copy the limit down, and return modified node - let new_aggr = AggregateExec::try_new( + let new_aggr = AggregateExec::try_new_with_partitioning( *aggr.mode(), + aggr.input_partitioning(), aggr.group_expr().clone(), aggr.aggr_expr().to_vec(), aggr.filter_expr().to_vec(), diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index d645f5c55d434..3ce2c40435707 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -95,22 +95,19 @@ const AGGREGATION_HASH_SEED: ahash::RandomState = /// aggregation and how these modes are used. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum AggregateMode { - /// One of multiple layers of aggregation, any input partitioning + /// One of multiple layers of aggregation, input values -> accumulator state. /// /// Partial aggregate that can be applied in parallel across input /// partitions. /// /// This is the first phase of a multi-phase aggregation. Partial, - /// *Final* of multiple layers of aggregation, in exactly one partition + /// *Final* of multiple layers of aggregation, accumulator state -> final value. /// - /// Final aggregate that produces a single partition of output by combining - /// the output of multiple partial aggregates. + /// Final aggregate that combines the output of multiple partial aggregates. /// /// This is the second phase of a multi-phase aggregation. /// - /// This mode requires that the input is a single partition - /// /// Note: Adjacent `Partial` and `Final` mode aggregation is equivalent to a `Single` /// mode aggregation node. The `Final` mode is required since this is used in an /// intermediate step. The [`CombinePartialFinalAggregate`] physical optimizer rule @@ -118,32 +115,12 @@ pub enum AggregateMode { /// /// [`CombinePartialFinalAggregate`]: https://docs.rs/datafusion/latest/datafusion/physical_optimizer/combine_partial_final_agg/struct.CombinePartialFinalAggregate.html Final, - /// *Final* of multiple layers of aggregation, input is *Partitioned* - /// - /// Final aggregate that works on pre-partitioned data. - /// - /// This mode requires that all rows with a particular grouping key are in - /// the same partitions, such as is the case with Hash repartitioning on the - /// group keys. If a group key is duplicated, duplicate groups would be - /// produced - FinalPartitioned, - /// *Single* layer of Aggregation, input is exactly one partition + /// *Single* layer of aggregation, input values -> final value. /// /// Applies the entire logical aggregation operation in a single operator, /// as opposed to Partial / Final modes which apply the logical aggregation using /// two operators. - /// - /// This mode requires that the input is a single partition (like Final) Single, - /// *Single* layer of Aggregation, input is *Partitioned* - /// - /// Applies the entire logical aggregation operation in a single operator, - /// as opposed to Partial / Final modes which apply the logical aggregation - /// using two operators. - /// - /// This mode requires that the input has more than one partition, and is - /// partitioned by group key (like FinalPartitioned). - SinglePartitioned, } impl AggregateMode { @@ -152,10 +129,30 @@ impl AggregateMode { /// `merge_batch` method will not be called for these modes. pub fn is_first_stage(&self) -> bool { match self { - AggregateMode::Partial - | AggregateMode::Single - | AggregateMode::SinglePartitioned => true, - AggregateMode::Final | AggregateMode::FinalPartitioned => false, + AggregateMode::Partial | AggregateMode::Single => true, + AggregateMode::Final => false, + } + } +} + +/// Required input partitioning for an aggregate. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum AggregateInputPartitioning { + /// No specific distribution requirement. + Unspecified, + /// Requires all rows to be in a single partition. + SinglePartition, + /// Requires input to be hash partitioned by group keys. + HashPartitioned, +} + +impl AggregateInputPartitioning { + fn default_for_mode(mode: AggregateMode) -> Self { + match mode { + AggregateMode::Partial => AggregateInputPartitioning::Unspecified, + AggregateMode::Final | AggregateMode::Single => { + AggregateInputPartitioning::SinglePartition + } } } } @@ -543,6 +540,8 @@ impl LimitOptions { pub struct AggregateExec { /// Aggregation mode (full, partial) mode: AggregateMode, + /// Required input partitioning. + input_partitioning: AggregateInputPartitioning, /// Group by expressions group_by: PhysicalGroupBy, /// Aggregate expressions @@ -592,6 +591,7 @@ impl AggregateExec { input_order_mode: self.input_order_mode.clone(), cache: self.cache.clone(), mode: self.mode, + input_partitioning: self.input_partitioning, group_by: self.group_by.clone(), filter_expr: self.filter_expr.clone(), limit_options: self.limit_options, @@ -612,6 +612,7 @@ impl AggregateExec { input_order_mode: self.input_order_mode.clone(), cache: self.cache.clone(), mode: self.mode, + input_partitioning: self.input_partitioning, group_by: self.group_by.clone(), aggr_expr: self.aggr_expr.clone(), filter_expr: self.filter_expr.clone(), @@ -634,12 +635,35 @@ impl AggregateExec { filter_expr: Vec>>, input: Arc, input_schema: SchemaRef, + ) -> Result { + let input_partitioning = AggregateInputPartitioning::default_for_mode(mode); + Self::try_new_with_partitioning( + mode, + input_partitioning, + group_by, + aggr_expr, + filter_expr, + input, + input_schema, + ) + } + + /// Create a new hash aggregate execution plan with explicit input partitioning. + pub fn try_new_with_partitioning( + mode: AggregateMode, + input_partitioning: AggregateInputPartitioning, + group_by: PhysicalGroupBy, + aggr_expr: Vec>, + filter_expr: Vec>>, + input: Arc, + input_schema: SchemaRef, ) -> Result { let schema = create_schema(&input.schema(), &group_by, &aggr_expr, mode)?; let schema = Arc::new(schema); AggregateExec::try_new_with_schema( mode, + input_partitioning, group_by, aggr_expr, filter_expr, @@ -659,6 +683,7 @@ impl AggregateExec { /// the schema in such cases. fn try_new_with_schema( mode: AggregateMode, + input_partitioning: AggregateInputPartitioning, group_by: PhysicalGroupBy, mut aggr_expr: Vec>, filter_expr: Vec>>, @@ -666,6 +691,7 @@ impl AggregateExec { input_schema: SchemaRef, schema: SchemaRef, ) -> Result { + validate_aggregate_partitioning(mode, input_partitioning)?; // Make sure arguments are consistent in size assert_eq_or_internal_err!( aggr_expr.len(), @@ -737,6 +763,7 @@ impl AggregateExec { let mut exec = AggregateExec { mode, + input_partitioning, group_by, aggr_expr, filter_expr, @@ -761,6 +788,26 @@ impl AggregateExec { &self.mode } + /// Display-friendly mode label that includes input partitioning. + fn mode_display(&self) -> &'static str { + match (self.mode, self.input_partitioning) { + (AggregateMode::Final, AggregateInputPartitioning::HashPartitioned) => { + "FinalPartitioned" + } + (AggregateMode::Single, AggregateInputPartitioning::HashPartitioned) => { + "SinglePartitioned" + } + (AggregateMode::Final, _) => "Final", + (AggregateMode::Single, _) => "Single", + (AggregateMode::Partial, _) => "Partial", + } + } + + /// Required input partitioning. + pub fn input_partitioning(&self) -> AggregateInputPartitioning { + self.input_partitioning + } + /// Set the limit options for this AggExec pub fn with_limit_options(mut self, limit_options: Option) -> Self { self.limit_options = limit_options; @@ -974,9 +1021,7 @@ impl AggregateExec { column_statistics }; match self.mode { - AggregateMode::Final | AggregateMode::FinalPartitioned - if self.group_by.expr.is_empty() => - { + AggregateMode::Final if self.group_by.expr.is_empty() => { let total_byte_size = Self::calculate_scaled_byte_size(child_statistics, 1); @@ -1124,7 +1169,7 @@ impl DisplayAs for AggregateExec { } }; - write!(f, "AggregateExec: mode={:?}", self.mode)?; + write!(f, "AggregateExec: mode={}", self.mode_display())?; let g: Vec = if self.group_by.is_single() { self.group_by .expr @@ -1216,7 +1261,7 @@ impl DisplayAs for AggregateExec { .iter() .map(|agg| agg.human_display().to_string()) .collect(); - writeln!(f, "mode={:?}", self.mode)?; + writeln!(f, "mode={}", self.mode_display())?; if !g.is_empty() { writeln!(f, "group_by={}", g.join(", "))?; } @@ -1247,14 +1292,14 @@ impl ExecutionPlan for AggregateExec { } fn required_input_distribution(&self) -> Vec { - match &self.mode { - AggregateMode::Partial => { + match self.input_partitioning { + AggregateInputPartitioning::Unspecified => { vec![Distribution::UnspecifiedDistribution] } - AggregateMode::FinalPartitioned | AggregateMode::SinglePartitioned => { + AggregateInputPartitioning::HashPartitioned => { vec![Distribution::HashPartitioned(self.group_by.input_exprs())] } - AggregateMode::Final | AggregateMode::Single => { + AggregateInputPartitioning::SinglePartition => { vec![Distribution::SinglePartition] } } @@ -1287,6 +1332,7 @@ impl ExecutionPlan for AggregateExec { ) -> Result> { let mut me = AggregateExec::try_new_with_schema( self.mode, + self.input_partitioning, self.group_by.clone(), self.aggr_expr.clone(), self.filter_expr.clone(), @@ -1468,6 +1514,26 @@ impl ExecutionPlan for AggregateExec { } } +fn validate_aggregate_partitioning( + mode: AggregateMode, + input_partitioning: AggregateInputPartitioning, +) -> Result<()> { + match mode { + AggregateMode::Partial => match input_partitioning { + AggregateInputPartitioning::Unspecified => Ok(()), + _ => not_impl_err!( + "Partial aggregation requires unspecified input partitioning" + ), + }, + AggregateMode::Final | AggregateMode::Single => match input_partitioning { + AggregateInputPartitioning::Unspecified => not_impl_err!( + "Final or single aggregation requires an explicit input partitioning" + ), + _ => Ok(()), + }, + } +} + fn create_schema( input_schema: &Schema, group_by: &PhysicalGroupBy, @@ -1484,10 +1550,7 @@ fn create_schema( fields.extend(expr.state_fields()?.iter().cloned()); } } - AggregateMode::Final - | AggregateMode::FinalPartitioned - | AggregateMode::Single - | AggregateMode::SinglePartitioned => { + AggregateMode::Final | AggregateMode::Single => { // in final mode, the field with the final result of the accumulator for expr in aggr_expr { fields.push(expr.field()) @@ -1697,9 +1760,7 @@ pub fn aggregate_expressions( col_idx_base: usize, ) -> Result>>> { match mode { - AggregateMode::Partial - | AggregateMode::Single - | AggregateMode::SinglePartitioned => Ok(aggr_expr + AggregateMode::Partial | AggregateMode::Single => Ok(aggr_expr .iter() .map(|agg| { let mut result = agg.expressions(); @@ -1711,7 +1772,7 @@ pub fn aggregate_expressions( }) .collect()), // In this mode, we build the merge expressions of the aggregation. - AggregateMode::Final | AggregateMode::FinalPartitioned => { + AggregateMode::Final => { let mut col_idx_base = col_idx_base; aggr_expr .iter() @@ -1754,7 +1815,7 @@ pub fn create_accumulators( } /// returns a vector of ArrayRefs, where each entry corresponds to either the -/// final value (mode = Final, FinalPartitioned and Single) or states (mode = Partial) +/// final value (mode = Final and Single) or states (mode = Partial) pub fn finalize_aggregation( accumulators: &mut [AccumulatorItem], mode: &AggregateMode, @@ -1774,10 +1835,7 @@ pub fn finalize_aggregation( .flatten_ok() .collect() } - AggregateMode::Final - | AggregateMode::FinalPartitioned - | AggregateMode::Single - | AggregateMode::SinglePartitioned => { + AggregateMode::Final | AggregateMode::Single => { // Merge the state to the final value accumulators .iter_mut() @@ -3105,8 +3163,9 @@ mod tests { Arc::::clone(&batch.schema()), None, )?; - let aggregate_exec = Arc::new(AggregateExec::try_new( - AggregateMode::FinalPartitioned, + let aggregate_exec = Arc::new(AggregateExec::try_new_with_partitioning( + AggregateMode::Final, + AggregateInputPartitioning::HashPartitioned, group_by, aggr_expr, vec![None], diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs b/datafusion/physical-plan/src/aggregates/no_grouping.rs index a55d70ca6fb27..dc6e2c5206d10 100644 --- a/datafusion/physical-plan/src/aggregates/no_grouping.rs +++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs @@ -283,12 +283,8 @@ impl AggregateStream { let aggregate_expressions = aggregate_expressions(&agg.aggr_expr, &agg.mode, 0)?; let filter_expressions = match agg.mode { - AggregateMode::Partial - | AggregateMode::Single - | AggregateMode::SinglePartitioned => agg_filter_expr, - AggregateMode::Final | AggregateMode::FinalPartitioned => { - vec![None; agg.aggr_expr.len()] - } + AggregateMode::Partial | AggregateMode::Single => agg_filter_expr, + AggregateMode::Final => vec![None; agg.aggr_expr.len()], }; let accumulators = create_accumulators(&agg.aggr_expr)?; @@ -456,12 +452,10 @@ fn aggregate_batch( // 1.4 let size_pre = accum.size(); let res = match mode { - AggregateMode::Partial - | AggregateMode::Single - | AggregateMode::SinglePartitioned => accum.update_batch(&values), - AggregateMode::Final | AggregateMode::FinalPartitioned => { - accum.merge_batch(&values) + AggregateMode::Partial | AggregateMode::Single => { + accum.update_batch(&values) } + AggregateMode::Final => accum.merge_batch(&values), }; let size_post = accum.size(); allocated += size_post.saturating_sub(size_pre); diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 49ce125e739b3..fba06b0788bed 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -492,12 +492,8 @@ impl GroupedHashAggregateStream { )?; let filter_expressions = match agg.mode { - AggregateMode::Partial - | AggregateMode::Single - | AggregateMode::SinglePartitioned => agg_filter_expr, - AggregateMode::Final | AggregateMode::FinalPartitioned => { - vec![None; agg.aggr_expr.len()] - } + AggregateMode::Partial | AggregateMode::Single => agg_filter_expr, + AggregateMode::Final => vec![None; agg.aggr_expr.len()], }; // Instantiate the accumulators @@ -983,9 +979,7 @@ impl GroupedHashAggregateStream { // Call the appropriate method on each aggregator with // the entire input row and the relevant group indexes match self.mode { - AggregateMode::Partial - | AggregateMode::Single - | AggregateMode::SinglePartitioned + AggregateMode::Partial | AggregateMode::Single if !self.spill_state.is_stream_merging => { acc.update_batch( @@ -1099,10 +1093,9 @@ impl GroupedHashAggregateStream { // merged and re-evaluated later. output.extend(acc.state(emit_to)?) } - AggregateMode::Final - | AggregateMode::FinalPartitioned - | AggregateMode::Single - | AggregateMode::SinglePartitioned => output.push(acc.evaluate(emit_to)?), + AggregateMode::Final | AggregateMode::Single => { + output.push(acc.evaluate(emit_to)?) + } } } drop(timer); diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index ca213bc722a17..0390683aa68ba 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -50,7 +50,8 @@ use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctio use datafusion_physical_expr::async_scalar_function::AsyncFuncExpr; use datafusion_physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef}; use datafusion_physical_plan::aggregates::{ - AggregateExec, AggregateMode, LimitOptions, PhysicalGroupBy, + AggregateExec, AggregateInputPartitioning, AggregateMode, LimitOptions, + PhysicalGroupBy, }; use datafusion_physical_plan::analyze::AnalyzeExec; use datafusion_physical_plan::async_func::AsyncFuncExec; @@ -1078,14 +1079,27 @@ impl protobuf::PhysicalPlanNode { hash_agg.mode )) })?; - let agg_mode: AggregateMode = match mode { - protobuf::AggregateMode::Partial => AggregateMode::Partial, - protobuf::AggregateMode::Final => AggregateMode::Final, - protobuf::AggregateMode::FinalPartitioned => AggregateMode::FinalPartitioned, - protobuf::AggregateMode::Single => AggregateMode::Single, - protobuf::AggregateMode::SinglePartitioned => { - AggregateMode::SinglePartitioned - } + let (agg_mode, input_partitioning) = match mode { + protobuf::AggregateMode::Partial => ( + AggregateMode::Partial, + AggregateInputPartitioning::Unspecified, + ), + protobuf::AggregateMode::Final => ( + AggregateMode::Final, + AggregateInputPartitioning::SinglePartition, + ), + protobuf::AggregateMode::FinalPartitioned => ( + AggregateMode::Final, + AggregateInputPartitioning::HashPartitioned, + ), + protobuf::AggregateMode::Single => ( + AggregateMode::Single, + AggregateInputPartitioning::SinglePartition, + ), + protobuf::AggregateMode::SinglePartitioned => ( + AggregateMode::Single, + AggregateInputPartitioning::HashPartitioned, + ), }; let num_expr = hash_agg.group_expr.len(); @@ -1220,8 +1234,9 @@ impl protobuf::PhysicalPlanNode { }) .collect::, _>>()?; - let agg = AggregateExec::try_new( + let agg = AggregateExec::try_new_with_partitioning( agg_mode, + input_partitioning, PhysicalGroupBy::new(group_expr, null_expr, groups, has_grouping_set), physical_aggr_expr, physical_filter_expr, @@ -2669,14 +2684,16 @@ impl protobuf::PhysicalPlanNode { .map(|expr| expr.name().to_string()) .collect::>(); - let agg_mode = match exec.mode() { - AggregateMode::Partial => protobuf::AggregateMode::Partial, - AggregateMode::Final => protobuf::AggregateMode::Final, - AggregateMode::FinalPartitioned => protobuf::AggregateMode::FinalPartitioned, - AggregateMode::Single => protobuf::AggregateMode::Single, - AggregateMode::SinglePartitioned => { + let agg_mode = match (exec.mode(), exec.input_partitioning()) { + (AggregateMode::Partial, _) => protobuf::AggregateMode::Partial, + (AggregateMode::Final, AggregateInputPartitioning::HashPartitioned) => { + protobuf::AggregateMode::FinalPartitioned + } + (AggregateMode::Final, _) => protobuf::AggregateMode::Final, + (AggregateMode::Single, AggregateInputPartitioning::HashPartitioned) => { protobuf::AggregateMode::SinglePartitioned } + (AggregateMode::Single, _) => protobuf::AggregateMode::Single, }; let input_schema = exec.input_schema(); let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter( From 3974bf4a86a181fc72695e4ed230ecb3eff18cd4 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 30 Jan 2026 08:49:34 -0500 Subject: [PATCH 2/3] simplity --- datafusion/core/src/physical_planner.rs | 53 +++--- .../enforce_distribution.rs | 11 +- .../partition_statistics.rs | 8 +- .../src/combine_partial_final_agg.rs | 17 +- .../src/enforce_distribution.rs | 17 +- .../src/limited_distinct_aggregation.rs | 57 ++++++- .../physical-plan/src/aggregates/mod.rs | 156 +++++++----------- datafusion/proto/proto/datafusion.proto | 1 + datafusion/proto/src/generated/pbjson.rs | 23 +++ datafusion/proto/src/generated/prost.rs | 2 + datafusion/proto/src/physical_plan/mod.rs | 49 ++---- 11 files changed, 206 insertions(+), 188 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index dedd454b14924..8ff6493c6a48d 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -36,9 +36,7 @@ use crate::logical_expr::{ UserDefinedLogicalNode, }; use crate::physical_expr::{create_physical_expr, create_physical_exprs}; -use crate::physical_plan::aggregates::{ - AggregateExec, AggregateInputPartitioning, AggregateMode, PhysicalGroupBy, -}; +use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; use crate::physical_plan::analyze::AnalyzeExec; use crate::physical_plan::explain::ExplainExec; use crate::physical_plan::filter::FilterExecBuilder; @@ -924,19 +922,22 @@ impl DefaultPhysicalPlanner { input_exec }; - let initial_aggr = Arc::new(AggregateExec::try_new( - AggregateMode::Partial, - groups.clone(), - aggregates, - filters.clone(), - input_exec, - Arc::clone(&physical_input_schema), - )?); - let can_repartition = !groups.is_empty() && session_state.config().target_partitions() > 1 && session_state.config().repartition_aggregations(); + let initial_aggr = Arc::new( + AggregateExec::try_new( + AggregateMode::Partial, + groups.clone(), + aggregates, + filters.clone(), + input_exec, + Arc::clone(&physical_input_schema), + )? + .with_repartition_aggregations(can_repartition), + ); + // Some aggregators may be modified during initialization for // optimization purposes. For example, a FIRST_VALUE may turn // into a LAST_VALUE with the reverse ordering requirement. @@ -944,25 +945,19 @@ impl DefaultPhysicalPlanner { // `AggregateFunctionExpr`/`PhysicalSortExpr` objects. let updated_aggregates = initial_aggr.aggr_expr().to_vec(); - let next_partitioning = if can_repartition { - // construct a second aggregation with hash-partitioned input - AggregateInputPartitioning::HashPartitioned - } else { - // construct a second aggregation with single-partition input - AggregateInputPartitioning::SinglePartition - }; - let final_grouping_set = initial_aggr.group_expr().as_final(); - Arc::new(AggregateExec::try_new_with_partitioning( - AggregateMode::Final, - next_partitioning, - final_grouping_set, - updated_aggregates, - filters, - initial_aggr, - Arc::clone(&physical_input_schema), - )?) + Arc::new( + AggregateExec::try_new( + AggregateMode::Final, + final_grouping_set, + updated_aggregates, + filters, + initial_aggr, + Arc::clone(&physical_input_schema), + )? + .with_repartition_aggregations(can_repartition), + ) } LogicalPlan::Projection(Projection { input, expr, .. }) => self .create_project_physical_exec( diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 5a417de734c8e..ef8a36ed3ef12 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -54,7 +54,7 @@ use datafusion_physical_optimizer::enforce_distribution::*; use datafusion_physical_optimizer::enforce_sorting::EnforceSorting; use datafusion_physical_optimizer::output_requirements::OutputRequirements; use datafusion_physical_plan::aggregates::{ - AggregateExec, AggregateInputPartitioning, AggregateMode, PhysicalGroupBy, + AggregateExec, AggregateMode, PhysicalGroupBy, }; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; @@ -328,9 +328,8 @@ fn aggregate_exec_with_alias( let final_grouping = PhysicalGroupBy::new_single(final_group_by_expr); Arc::new( - AggregateExec::try_new_with_partitioning( + AggregateExec::try_new( AggregateMode::Final, - AggregateInputPartitioning::HashPartitioned, final_grouping, vec![], vec![], @@ -343,11 +342,13 @@ fn aggregate_exec_with_alias( input, schema.clone(), ) - .unwrap(), + .unwrap() + .with_repartition_aggregations(true), ), schema, ) - .unwrap(), + .unwrap() + .with_repartition_aggregations(true), ) } diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index fa8b48a0077ae..707d6907d9730 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -39,7 +39,7 @@ mod test { use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::aggregates::{ - AggregateExec, AggregateInputPartitioning, AggregateMode, PhysicalGroupBy, + AggregateExec, AggregateMode, PhysicalGroupBy, }; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::common::compute_record_batch_statistics; @@ -881,9 +881,8 @@ mod test { ) .await?; - let agg_final = Arc::new(AggregateExec::try_new_with_partitioning( + let agg_final = Arc::new(AggregateExec::try_new( AggregateMode::Final, - AggregateInputPartitioning::HashPartitioned, group_by.clone(), aggr_expr.clone(), vec![None], @@ -954,9 +953,8 @@ mod test { scan_schema.clone(), )?); - let agg_final = Arc::new(AggregateExec::try_new_with_partitioning( + let agg_final = Arc::new(AggregateExec::try_new( AggregateMode::Final, - AggregateInputPartitioning::HashPartitioned, group_by.clone(), aggr_expr.clone(), vec![None], diff --git a/datafusion/physical-optimizer/src/combine_partial_final_agg.rs b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs index d4869439a2b2f..c5383d50f99a3 100644 --- a/datafusion/physical-optimizer/src/combine_partial_final_agg.rs +++ b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use datafusion_common::error::Result; use datafusion_physical_plan::ExecutionPlan; use datafusion_physical_plan::aggregates::{ - AggregateExec, AggregateInputPartitioning, AggregateMode, PhysicalGroupBy, + AggregateExec, AggregateMode, PhysicalGroupBy, }; use crate::PhysicalOptimizerRule; @@ -82,15 +82,8 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate { input_agg_exec.filter_expr(), ), ) { - let input_partitioning = match agg_exec.input_partitioning() { - AggregateInputPartitioning::HashPartitioned => { - AggregateInputPartitioning::HashPartitioned - } - _ => AggregateInputPartitioning::SinglePartition, - }; - AggregateExec::try_new_with_partitioning( + AggregateExec::try_new( AggregateMode::Single, - input_partitioning, input_agg_exec.group_expr().clone(), input_agg_exec.aggr_expr().to_vec(), input_agg_exec.filter_expr().to_vec(), @@ -98,7 +91,11 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate { input_agg_exec.input_schema(), ) .map(|combined_agg| { - combined_agg.with_limit_options(agg_exec.limit_options()) + combined_agg + .with_limit_options(agg_exec.limit_options()) + .with_repartition_aggregations( + agg_exec.repartition_aggregations(), + ) }) .ok() .map(Arc::new) diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index b177bce802bf4..14a2f83d3c218 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -44,7 +44,7 @@ use datafusion_physical_expr::{ }; use datafusion_physical_plan::ExecutionPlanProperties; use datafusion_physical_plan::aggregates::{ - AggregateExec, AggregateInputPartitioning, AggregateMode, PhysicalGroupBy, + AggregateExec, AggregateMode, PhysicalGroupBy, }; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::execution_plan::EmissionType; @@ -394,8 +394,10 @@ pub fn adjust_input_keys_ordering( } else if let Some(aggregate_exec) = plan.as_any().downcast_ref::() { if !requirements.data.is_empty() { if aggregate_exec.mode() == &AggregateMode::Final - && aggregate_exec.input_partitioning() - == AggregateInputPartitioning::HashPartitioned + && matches!( + aggregate_exec.required_input_distribution().first(), + Some(Distribution::HashPartitioned(_)) + ) { return reorder_aggregate_keys(requirements, aggregate_exec) .map(Transformed::yes); @@ -514,7 +516,8 @@ pub fn reorder_aggregate_keys( agg_exec.filter_expr().to_vec(), Arc::clone(agg_exec.input()), Arc::clone(&agg_exec.input_schema), - )?); + )? + .with_repartition_aggregations(agg_exec.repartition_aggregations())); // Build new group expressions that correspond to the output // of the "reordered" aggregator: let group_exprs = partial_agg.group_expr().expr(); @@ -526,15 +529,15 @@ pub fn reorder_aggregate_keys( .map(|(idx, expr)| (expr, group_exprs[idx].1.clone())) .collect(), ); - let new_final_agg = Arc::new(AggregateExec::try_new_with_partitioning( + let new_final_agg = Arc::new(AggregateExec::try_new( AggregateMode::Final, - AggregateInputPartitioning::HashPartitioned, new_group_by, agg_exec.aggr_expr().to_vec(), agg_exec.filter_expr().to_vec(), Arc::clone(&partial_agg) as _, agg_exec.input_schema(), - )?); + )? + .with_repartition_aggregations(agg_exec.repartition_aggregations())); agg_node.plan = Arc::clone(&new_final_agg) as _; agg_node.data.clear(); diff --git a/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs b/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs index ed067491e760d..f4215b6dbf390 100644 --- a/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs +++ b/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs @@ -20,7 +20,8 @@ use std::sync::Arc; -use datafusion_physical_plan::aggregates::{AggregateExec, LimitOptions}; +use datafusion_physical_plan::aggregates::{AggregateExec, AggregateMode, LimitOptions}; +use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; @@ -54,9 +55,8 @@ impl LimitedDistinctAggregation { } // We found what we want: clone, copy the limit down, and return modified node - let new_aggr = AggregateExec::try_new_with_partitioning( + let new_aggr = AggregateExec::try_new( *aggr.mode(), - aggr.input_partitioning(), aggr.group_expr().clone(), aggr.aggr_expr().to_vec(), aggr.filter_expr().to_vec(), @@ -64,7 +64,56 @@ impl LimitedDistinctAggregation { aggr.input_schema(), ) .expect("Unable to copy Aggregate!") - .with_limit_options(Some(LimitOptions::new(limit))); + .with_limit_options(Some(LimitOptions::new(limit))) + .with_repartition_aggregations(aggr.repartition_aggregations()); + + if matches!(aggr.mode(), AggregateMode::Final) + && let (child_plan, wrap_coalesce) = if let Some(coalesce) = aggr + .input() + .as_any() + .downcast_ref::() + { + (Arc::clone(coalesce.input()), true) + } else { + (Arc::clone(aggr.input()), false) + } + && let Some(child_agg) = child_plan.as_any().downcast_ref::() + && matches!(child_agg.mode(), AggregateMode::Partial) + && !child_agg.group_expr().has_grouping_set() + { + let new_child = AggregateExec::try_new( + AggregateMode::Partial, + child_agg.group_expr().clone(), + child_agg.aggr_expr().to_vec(), + child_agg.filter_expr().to_vec(), + child_agg.input().to_owned(), + child_agg.input_schema(), + ) + .expect("Unable to copy Aggregate!") + .with_limit_options(Some(LimitOptions::new(limit))) + .with_repartition_aggregations(child_agg.repartition_aggregations()); + + let new_input: Arc = if wrap_coalesce { + Arc::new(CoalescePartitionsExec::new(Arc::new(new_child))) + } else { + Arc::new(new_child) + }; + + let rebuilt_final = AggregateExec::try_new( + AggregateMode::Final, + new_aggr.group_expr().clone(), + new_aggr.aggr_expr().to_vec(), + new_aggr.filter_expr().to_vec(), + new_input, + new_aggr.input_schema(), + ) + .expect("Unable to copy Aggregate!") + .with_limit_options(Some(LimitOptions::new(limit))) + .with_repartition_aggregations(new_aggr.repartition_aggregations()); + + return Some(Arc::new(rebuilt_final)); + } + Some(Arc::new(new_aggr)) } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 3ce2c40435707..af04f1633fb2d 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -135,28 +135,6 @@ impl AggregateMode { } } -/// Required input partitioning for an aggregate. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum AggregateInputPartitioning { - /// No specific distribution requirement. - Unspecified, - /// Requires all rows to be in a single partition. - SinglePartition, - /// Requires input to be hash partitioned by group keys. - HashPartitioned, -} - -impl AggregateInputPartitioning { - fn default_for_mode(mode: AggregateMode) -> Self { - match mode { - AggregateMode::Partial => AggregateInputPartitioning::Unspecified, - AggregateMode::Final | AggregateMode::Single => { - AggregateInputPartitioning::SinglePartition - } - } - } -} - /// Represents `GROUP BY` clause in the plan (including the more general GROUPING SET) /// In the case of a simple `GROUP BY a, b` clause, this will contain the expression [a, b] /// and a single group [false, false]. @@ -536,12 +514,10 @@ impl LimitOptions { } /// Hash aggregate execution plan -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct AggregateExec { /// Aggregation mode (full, partial) mode: AggregateMode, - /// Required input partitioning. - input_partitioning: AggregateInputPartitioning, /// Group by expressions group_by: PhysicalGroupBy, /// Aggregate expressions @@ -565,6 +541,8 @@ pub struct AggregateExec { required_input_ordering: Option, /// Describes how the input is ordered relative to the group by columns input_order_mode: InputOrderMode, + /// Whether hash repartitioning is enabled for final/single aggregation + repartition_aggregations: bool, cache: PlanProperties, /// During initialization, if the plan supports dynamic filtering (see [`AggrDynFilter`]), /// it is set to `Some(..)` regardless of whether it can be pushed down to a child node. @@ -575,6 +553,27 @@ pub struct AggregateExec { dynamic_filter: Option>, } +impl std::fmt::Debug for AggregateExec { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AggregateExec") + .field("mode", &self.mode_display()) + .field("group_by", &self.group_by) + .field("aggr_expr", &self.aggr_expr) + .field("filter_expr", &self.filter_expr) + .field("limit_options", &self.limit_options) + .field("input", &self.input) + .field("schema", &self.schema) + .field("input_schema", &self.input_schema) + .field("metrics", &self.metrics) + .field("required_input_ordering", &self.required_input_ordering) + .field("input_order_mode", &self.input_order_mode) + .field("repartition_aggregations", &self.repartition_aggregations) + .field("cache", &self.cache) + .field("dynamic_filter", &self.dynamic_filter) + .finish() + } +} + impl AggregateExec { /// Function used in `OptimizeAggregateOrder` optimizer rule, /// where we need parts of the new value, others cloned from the old one @@ -589,9 +588,9 @@ impl AggregateExec { required_input_ordering: self.required_input_ordering.clone(), metrics: ExecutionPlanMetricsSet::new(), input_order_mode: self.input_order_mode.clone(), + repartition_aggregations: self.repartition_aggregations, cache: self.cache.clone(), mode: self.mode, - input_partitioning: self.input_partitioning, group_by: self.group_by.clone(), filter_expr: self.filter_expr.clone(), limit_options: self.limit_options, @@ -610,9 +609,9 @@ impl AggregateExec { required_input_ordering: self.required_input_ordering.clone(), metrics: ExecutionPlanMetricsSet::new(), input_order_mode: self.input_order_mode.clone(), + repartition_aggregations: self.repartition_aggregations, cache: self.cache.clone(), mode: self.mode, - input_partitioning: self.input_partitioning, group_by: self.group_by.clone(), aggr_expr: self.aggr_expr.clone(), filter_expr: self.filter_expr.clone(), @@ -635,35 +634,12 @@ impl AggregateExec { filter_expr: Vec>>, input: Arc, input_schema: SchemaRef, - ) -> Result { - let input_partitioning = AggregateInputPartitioning::default_for_mode(mode); - Self::try_new_with_partitioning( - mode, - input_partitioning, - group_by, - aggr_expr, - filter_expr, - input, - input_schema, - ) - } - - /// Create a new hash aggregate execution plan with explicit input partitioning. - pub fn try_new_with_partitioning( - mode: AggregateMode, - input_partitioning: AggregateInputPartitioning, - group_by: PhysicalGroupBy, - aggr_expr: Vec>, - filter_expr: Vec>>, - input: Arc, - input_schema: SchemaRef, ) -> Result { let schema = create_schema(&input.schema(), &group_by, &aggr_expr, mode)?; let schema = Arc::new(schema); AggregateExec::try_new_with_schema( mode, - input_partitioning, group_by, aggr_expr, filter_expr, @@ -683,7 +659,6 @@ impl AggregateExec { /// the schema in such cases. fn try_new_with_schema( mode: AggregateMode, - input_partitioning: AggregateInputPartitioning, group_by: PhysicalGroupBy, mut aggr_expr: Vec>, filter_expr: Vec>>, @@ -691,7 +666,6 @@ impl AggregateExec { input_schema: SchemaRef, schema: SchemaRef, ) -> Result { - validate_aggregate_partitioning(mode, input_partitioning)?; // Make sure arguments are consistent in size assert_eq_or_internal_err!( aggr_expr.len(), @@ -763,7 +737,6 @@ impl AggregateExec { let mut exec = AggregateExec { mode, - input_partitioning, group_by, aggr_expr, filter_expr, @@ -774,6 +747,7 @@ impl AggregateExec { required_input_ordering, limit_options: None, input_order_mode, + repartition_aggregations: false, cache, dynamic_filter: None, }; @@ -790,30 +764,39 @@ impl AggregateExec { /// Display-friendly mode label that includes input partitioning. fn mode_display(&self) -> &'static str { - match (self.mode, self.input_partitioning) { - (AggregateMode::Final, AggregateInputPartitioning::HashPartitioned) => { - "FinalPartitioned" - } - (AggregateMode::Single, AggregateInputPartitioning::HashPartitioned) => { - "SinglePartitioned" - } - (AggregateMode::Final, _) => "Final", - (AggregateMode::Single, _) => "Single", + let is_hash_partitioned = matches!( + self.required_input_distribution().first(), + Some(Distribution::HashPartitioned(_)) + ); + match (self.mode, is_hash_partitioned) { + (AggregateMode::Final, true) => "FinalPartitioned", + (AggregateMode::Single, true) => "SinglePartitioned", + (AggregateMode::Final, false) => "Final", + (AggregateMode::Single, false) => "Single", (AggregateMode::Partial, _) => "Partial", } } - /// Required input partitioning. - pub fn input_partitioning(&self) -> AggregateInputPartitioning { - self.input_partitioning - } - /// Set the limit options for this AggExec pub fn with_limit_options(mut self, limit_options: Option) -> Self { self.limit_options = limit_options; self } + /// Configure whether final/single aggregates should require hash repartitioning. + pub fn with_repartition_aggregations( + mut self, + repartition_aggregations: bool, + ) -> Self { + self.repartition_aggregations = repartition_aggregations; + self + } + + /// Returns whether hash repartitioning is enabled for final/single aggregation. + pub fn repartition_aggregations(&self) -> bool { + self.repartition_aggregations + } + /// Get the limit options (if set) pub fn limit_options(&self) -> Option { self.limit_options @@ -1292,16 +1275,16 @@ impl ExecutionPlan for AggregateExec { } fn required_input_distribution(&self) -> Vec { - match self.input_partitioning { - AggregateInputPartitioning::Unspecified => { - vec![Distribution::UnspecifiedDistribution] + match self.mode { + AggregateMode::Partial => vec![Distribution::UnspecifiedDistribution], + AggregateMode::Final | AggregateMode::Single + if !self.repartition_aggregations || self.group_by.expr.is_empty() => + { + vec![Distribution::SinglePartition] } - AggregateInputPartitioning::HashPartitioned => { + AggregateMode::Final | AggregateMode::Single => { vec![Distribution::HashPartitioned(self.group_by.input_exprs())] } - AggregateInputPartitioning::SinglePartition => { - vec![Distribution::SinglePartition] - } } } @@ -1332,7 +1315,6 @@ impl ExecutionPlan for AggregateExec { ) -> Result> { let mut me = AggregateExec::try_new_with_schema( self.mode, - self.input_partitioning, self.group_by.clone(), self.aggr_expr.clone(), self.filter_expr.clone(), @@ -1341,6 +1323,7 @@ impl ExecutionPlan for AggregateExec { Arc::clone(&self.schema), )?; me.limit_options = self.limit_options; + me.repartition_aggregations = self.repartition_aggregations; me.dynamic_filter = self.dynamic_filter.clone(); Ok(Arc::new(me)) @@ -1514,26 +1497,6 @@ impl ExecutionPlan for AggregateExec { } } -fn validate_aggregate_partitioning( - mode: AggregateMode, - input_partitioning: AggregateInputPartitioning, -) -> Result<()> { - match mode { - AggregateMode::Partial => match input_partitioning { - AggregateInputPartitioning::Unspecified => Ok(()), - _ => not_impl_err!( - "Partial aggregation requires unspecified input partitioning" - ), - }, - AggregateMode::Final | AggregateMode::Single => match input_partitioning { - AggregateInputPartitioning::Unspecified => not_impl_err!( - "Final or single aggregation requires an explicit input partitioning" - ), - _ => Ok(()), - }, - } -} - fn create_schema( input_schema: &Schema, group_by: &PhysicalGroupBy, @@ -3163,9 +3126,8 @@ mod tests { Arc::::clone(&batch.schema()), None, )?; - let aggregate_exec = Arc::new(AggregateExec::try_new_with_partitioning( + let aggregate_exec = Arc::new(AggregateExec::try_new( AggregateMode::Final, - AggregateInputPartitioning::HashPartitioned, group_by, aggr_expr, vec![None], diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 810ec6d1f17a3..3b986ef740ed4 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1252,6 +1252,7 @@ message AggregateExecNode { repeated MaybeFilter filter_expr = 10; AggLimit limit = 11; bool has_grouping_set = 12; + bool repartition_aggregations = 13; } message GlobalLimitExecNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 7ed20785ab384..421595e41fd27 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -154,6 +154,9 @@ impl serde::Serialize for AggregateExecNode { if self.has_grouping_set { len += 1; } + if self.repartition_aggregations { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.AggregateExecNode", len)?; if !self.group_expr.is_empty() { struct_ser.serialize_field("groupExpr", &self.group_expr)?; @@ -193,6 +196,12 @@ impl serde::Serialize for AggregateExecNode { if self.has_grouping_set { struct_ser.serialize_field("hasGroupingSet", &self.has_grouping_set)?; } + if self.repartition_aggregations { + struct_ser.serialize_field( + "repartitionAggregations", + &self.repartition_aggregations, + )?; + } struct_ser.end() } } @@ -223,6 +232,8 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode { "limit", "has_grouping_set", "hasGroupingSet", + "repartition_aggregations", + "repartitionAggregations", ]; #[allow(clippy::enum_variant_names)] @@ -239,6 +250,7 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode { FilterExpr, Limit, HasGroupingSet, + RepartitionAggregations, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -272,6 +284,7 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode { "filterExpr" | "filter_expr" => Ok(GeneratedField::FilterExpr), "limit" => Ok(GeneratedField::Limit), "hasGroupingSet" | "has_grouping_set" => Ok(GeneratedField::HasGroupingSet), + "repartitionAggregations" | "repartition_aggregations" => Ok(GeneratedField::RepartitionAggregations), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -303,6 +316,7 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode { let mut filter_expr__ = None; let mut limit__ = None; let mut has_grouping_set__ = None; + let mut repartition_aggregations__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::GroupExpr => { @@ -377,6 +391,14 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode { } has_grouping_set__ = Some(map_.next_value()?); } + GeneratedField::RepartitionAggregations => { + if repartition_aggregations__.is_some() { + return Err(serde::de::Error::duplicate_field( + "repartitionAggregations", + )); + } + repartition_aggregations__ = Some(map_.next_value()?); + } } } Ok(AggregateExecNode { @@ -392,6 +414,7 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode { filter_expr: filter_expr__.unwrap_or_default(), limit: limit__, has_grouping_set: has_grouping_set__.unwrap_or_default(), + repartition_aggregations: repartition_aggregations__.unwrap_or_default(), }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 0c9320c77892b..ca0a74275d018 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1871,6 +1871,8 @@ pub struct AggregateExecNode { pub limit: ::core::option::Option, #[prost(bool, tag = "12")] pub has_grouping_set: bool, + #[prost(bool, tag = "13")] + pub repartition_aggregations: bool, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct GlobalLimitExecNode { diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 0390683aa68ba..a4d28aad4420f 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -48,10 +48,9 @@ use datafusion_functions_table::generate_series::{ }; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use datafusion_physical_expr::async_scalar_function::AsyncFuncExpr; -use datafusion_physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef}; +use datafusion_physical_expr::{LexOrdering, LexRequirement, Partitioning, PhysicalExprRef}; use datafusion_physical_plan::aggregates::{ - AggregateExec, AggregateInputPartitioning, AggregateMode, LimitOptions, - PhysicalGroupBy, + AggregateExec, AggregateMode, LimitOptions, PhysicalGroupBy, }; use datafusion_physical_plan::analyze::AnalyzeExec; use datafusion_physical_plan::async_func::AsyncFuncExec; @@ -79,7 +78,9 @@ use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeE use datafusion_physical_plan::union::{InterleaveExec, UnionExec}; use datafusion_physical_plan::unnest::{ListUnnest, UnnestExec}; use datafusion_physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; -use datafusion_physical_plan::{ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr}; +use datafusion_physical_plan::{ + ExecutionPlan, ExecutionPlanProperties, InputOrderMode, PhysicalExpr, WindowExpr, +}; use prost::Message; use prost::bytes::BufMut; @@ -1079,27 +1080,12 @@ impl protobuf::PhysicalPlanNode { hash_agg.mode )) })?; - let (agg_mode, input_partitioning) = match mode { - protobuf::AggregateMode::Partial => ( - AggregateMode::Partial, - AggregateInputPartitioning::Unspecified, - ), - protobuf::AggregateMode::Final => ( - AggregateMode::Final, - AggregateInputPartitioning::SinglePartition, - ), - protobuf::AggregateMode::FinalPartitioned => ( - AggregateMode::Final, - AggregateInputPartitioning::HashPartitioned, - ), - protobuf::AggregateMode::Single => ( - AggregateMode::Single, - AggregateInputPartitioning::SinglePartition, - ), - protobuf::AggregateMode::SinglePartitioned => ( - AggregateMode::Single, - AggregateInputPartitioning::HashPartitioned, - ), + let agg_mode = match mode { + protobuf::AggregateMode::Partial => AggregateMode::Partial, + protobuf::AggregateMode::Final + | protobuf::AggregateMode::FinalPartitioned => AggregateMode::Final, + protobuf::AggregateMode::Single + | protobuf::AggregateMode::SinglePartitioned => AggregateMode::Single, }; let num_expr = hash_agg.group_expr.len(); @@ -1234,15 +1220,15 @@ impl protobuf::PhysicalPlanNode { }) .collect::, _>>()?; - let agg = AggregateExec::try_new_with_partitioning( + let agg = AggregateExec::try_new( agg_mode, - input_partitioning, PhysicalGroupBy::new(group_expr, null_expr, groups, has_grouping_set), physical_aggr_expr, physical_filter_expr, input, physical_schema, - )?; + )? + .with_repartition_aggregations(hash_agg.repartition_aggregations); let agg = if let Some(limit_proto) = &hash_agg.limit { let limit = limit_proto.limit as usize; @@ -2684,13 +2670,13 @@ impl protobuf::PhysicalPlanNode { .map(|expr| expr.name().to_string()) .collect::>(); - let agg_mode = match (exec.mode(), exec.input_partitioning()) { + let agg_mode = match (exec.mode(), exec.input().output_partitioning()) { (AggregateMode::Partial, _) => protobuf::AggregateMode::Partial, - (AggregateMode::Final, AggregateInputPartitioning::HashPartitioned) => { + (AggregateMode::Final, Partitioning::Hash(_, _)) => { protobuf::AggregateMode::FinalPartitioned } (AggregateMode::Final, _) => protobuf::AggregateMode::Final, - (AggregateMode::Single, AggregateInputPartitioning::HashPartitioned) => { + (AggregateMode::Single, Partitioning::Hash(_, _)) => { protobuf::AggregateMode::SinglePartitioned } (AggregateMode::Single, _) => protobuf::AggregateMode::Single, @@ -2736,6 +2722,7 @@ impl protobuf::PhysicalPlanNode { groups, limit, has_grouping_set: exec.group_expr().has_grouping_set(), + repartition_aggregations: exec.repartition_aggregations(), }, ))), }) From b4b16baf9373ec870bffa6b0c5d6d3d47ca56393 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 30 Jan 2026 17:15:14 -0500 Subject: [PATCH 3/3] cleanu --- datafusion/core/src/physical_planner.rs | 27 ++++--- .../enforce_distribution.rs | 12 +-- .../src/combine_partial_final_agg.rs | 9 +-- .../src/enforce_distribution.rs | 12 +-- .../src/limited_distinct_aggregation.rs | 18 ++--- .../physical-plan/src/aggregates/mod.rs | 78 +++++++++++++++---- datafusion/proto/proto/datafusion.proto | 2 +- datafusion/proto/src/generated/pbjson.rs | 29 ++++--- datafusion/proto/src/generated/prost.rs | 2 +- datafusion/proto/src/physical_plan/mod.rs | 13 ++-- 10 files changed, 127 insertions(+), 75 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 8ff6493c6a48d..6af727777a42c 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -925,17 +925,18 @@ impl DefaultPhysicalPlanner { let can_repartition = !groups.is_empty() && session_state.config().target_partitions() > 1 && session_state.config().repartition_aggregations(); + let require_single_output_partition = !can_repartition; let initial_aggr = Arc::new( - AggregateExec::try_new( + AggregateExec::try_new_with_require_single_output_partition( AggregateMode::Partial, groups.clone(), aggregates, filters.clone(), input_exec, Arc::clone(&physical_input_schema), - )? - .with_repartition_aggregations(can_repartition), + require_single_output_partition, + )?, ); // Some aggregators may be modified during initialization for @@ -947,17 +948,15 @@ impl DefaultPhysicalPlanner { let final_grouping_set = initial_aggr.group_expr().as_final(); - Arc::new( - AggregateExec::try_new( - AggregateMode::Final, - final_grouping_set, - updated_aggregates, - filters, - initial_aggr, - Arc::clone(&physical_input_schema), - )? - .with_repartition_aggregations(can_repartition), - ) + Arc::new(AggregateExec::try_new_with_require_single_output_partition( + AggregateMode::Final, + final_grouping_set, + updated_aggregates, + filters, + initial_aggr, + Arc::clone(&physical_input_schema), + require_single_output_partition, + )?) } LogicalPlan::Projection(Projection { input, expr, .. }) => self .create_project_physical_exec( diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index ef8a36ed3ef12..97fd1225a387c 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -328,27 +328,27 @@ fn aggregate_exec_with_alias( let final_grouping = PhysicalGroupBy::new_single(final_group_by_expr); Arc::new( - AggregateExec::try_new( + AggregateExec::try_new_with_require_single_output_partition( AggregateMode::Final, final_grouping, vec![], vec![], Arc::new( - AggregateExec::try_new( + AggregateExec::try_new_with_require_single_output_partition( AggregateMode::Partial, group_by, vec![], vec![], input, schema.clone(), + false, ) - .unwrap() - .with_repartition_aggregations(true), + .unwrap(), ), schema, + false, ) - .unwrap() - .with_repartition_aggregations(true), + .unwrap(), ) } diff --git a/datafusion/physical-optimizer/src/combine_partial_final_agg.rs b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs index c5383d50f99a3..821c8addc2bc8 100644 --- a/datafusion/physical-optimizer/src/combine_partial_final_agg.rs +++ b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs @@ -82,7 +82,8 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate { input_agg_exec.filter_expr(), ), ) { - AggregateExec::try_new( + AggregateExec::try_new_with_settings_from( + agg_exec, AggregateMode::Single, input_agg_exec.group_expr().clone(), input_agg_exec.aggr_expr().to_vec(), @@ -91,11 +92,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate { input_agg_exec.input_schema(), ) .map(|combined_agg| { - combined_agg - .with_limit_options(agg_exec.limit_options()) - .with_repartition_aggregations( - agg_exec.repartition_aggregations(), - ) + combined_agg.with_limit_options(agg_exec.limit_options()) }) .ok() .map(Arc::new) diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index 14a2f83d3c218..073301f0e84f1 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -509,15 +509,15 @@ pub fn reorder_aggregate_keys( .into_iter() .map(|idx| group_exprs[idx].clone()) .collect(); - let partial_agg = Arc::new(AggregateExec::try_new( + let partial_agg = Arc::new(AggregateExec::try_new_with_settings_from( + agg_exec, AggregateMode::Partial, PhysicalGroupBy::new_single(new_group_exprs), agg_exec.aggr_expr().to_vec(), agg_exec.filter_expr().to_vec(), Arc::clone(agg_exec.input()), Arc::clone(&agg_exec.input_schema), - )? - .with_repartition_aggregations(agg_exec.repartition_aggregations())); + )?); // Build new group expressions that correspond to the output // of the "reordered" aggregator: let group_exprs = partial_agg.group_expr().expr(); @@ -529,15 +529,15 @@ pub fn reorder_aggregate_keys( .map(|(idx, expr)| (expr, group_exprs[idx].1.clone())) .collect(), ); - let new_final_agg = Arc::new(AggregateExec::try_new( + let new_final_agg = Arc::new(AggregateExec::try_new_with_settings_from( + agg_exec, AggregateMode::Final, new_group_by, agg_exec.aggr_expr().to_vec(), agg_exec.filter_expr().to_vec(), Arc::clone(&partial_agg) as _, agg_exec.input_schema(), - )? - .with_repartition_aggregations(agg_exec.repartition_aggregations())); + )?); agg_node.plan = Arc::clone(&new_final_agg) as _; agg_node.data.clear(); diff --git a/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs b/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs index f4215b6dbf390..175cbc0a0df07 100644 --- a/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs +++ b/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs @@ -55,7 +55,8 @@ impl LimitedDistinctAggregation { } // We found what we want: clone, copy the limit down, and return modified node - let new_aggr = AggregateExec::try_new( + let new_aggr = AggregateExec::try_new_with_settings_from( + aggr, *aggr.mode(), aggr.group_expr().clone(), aggr.aggr_expr().to_vec(), @@ -64,8 +65,7 @@ impl LimitedDistinctAggregation { aggr.input_schema(), ) .expect("Unable to copy Aggregate!") - .with_limit_options(Some(LimitOptions::new(limit))) - .with_repartition_aggregations(aggr.repartition_aggregations()); + .with_limit_options(Some(LimitOptions::new(limit))); if matches!(aggr.mode(), AggregateMode::Final) && let (child_plan, wrap_coalesce) = if let Some(coalesce) = aggr @@ -81,7 +81,8 @@ impl LimitedDistinctAggregation { && matches!(child_agg.mode(), AggregateMode::Partial) && !child_agg.group_expr().has_grouping_set() { - let new_child = AggregateExec::try_new( + let new_child = AggregateExec::try_new_with_settings_from( + child_agg, AggregateMode::Partial, child_agg.group_expr().clone(), child_agg.aggr_expr().to_vec(), @@ -90,8 +91,7 @@ impl LimitedDistinctAggregation { child_agg.input_schema(), ) .expect("Unable to copy Aggregate!") - .with_limit_options(Some(LimitOptions::new(limit))) - .with_repartition_aggregations(child_agg.repartition_aggregations()); + .with_limit_options(Some(LimitOptions::new(limit))); let new_input: Arc = if wrap_coalesce { Arc::new(CoalescePartitionsExec::new(Arc::new(new_child))) @@ -99,7 +99,8 @@ impl LimitedDistinctAggregation { Arc::new(new_child) }; - let rebuilt_final = AggregateExec::try_new( + let rebuilt_final = AggregateExec::try_new_with_settings_from( + &new_aggr, AggregateMode::Final, new_aggr.group_expr().clone(), new_aggr.aggr_expr().to_vec(), @@ -108,8 +109,7 @@ impl LimitedDistinctAggregation { new_aggr.input_schema(), ) .expect("Unable to copy Aggregate!") - .with_limit_options(Some(LimitOptions::new(limit))) - .with_repartition_aggregations(new_aggr.repartition_aggregations()); + .with_limit_options(Some(LimitOptions::new(limit))); return Some(Arc::new(rebuilt_final)); } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index af04f1633fb2d..d53ff25085ab8 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -541,8 +541,8 @@ pub struct AggregateExec { required_input_ordering: Option, /// Describes how the input is ordered relative to the group by columns input_order_mode: InputOrderMode, - /// Whether hash repartitioning is enabled for final/single aggregation - repartition_aggregations: bool, + /// Whether final/single aggregation should require a single input partition + require_single_output_partition: bool, cache: PlanProperties, /// During initialization, if the plan supports dynamic filtering (see [`AggrDynFilter`]), /// it is set to `Some(..)` regardless of whether it can be pushed down to a child node. @@ -567,7 +567,10 @@ impl std::fmt::Debug for AggregateExec { .field("metrics", &self.metrics) .field("required_input_ordering", &self.required_input_ordering) .field("input_order_mode", &self.input_order_mode) - .field("repartition_aggregations", &self.repartition_aggregations) + .field( + "require_single_output_partition", + &self.require_single_output_partition, + ) .field("cache", &self.cache) .field("dynamic_filter", &self.dynamic_filter) .finish() @@ -588,7 +591,7 @@ impl AggregateExec { required_input_ordering: self.required_input_ordering.clone(), metrics: ExecutionPlanMetricsSet::new(), input_order_mode: self.input_order_mode.clone(), - repartition_aggregations: self.repartition_aggregations, + require_single_output_partition: self.require_single_output_partition, cache: self.cache.clone(), mode: self.mode, group_by: self.group_by.clone(), @@ -609,7 +612,7 @@ impl AggregateExec { required_input_ordering: self.required_input_ordering.clone(), metrics: ExecutionPlanMetricsSet::new(), input_order_mode: self.input_order_mode.clone(), - repartition_aggregations: self.repartition_aggregations, + require_single_output_partition: self.require_single_output_partition, cache: self.cache.clone(), mode: self.mode, group_by: self.group_by.clone(), @@ -649,6 +652,50 @@ impl AggregateExec { ) } + /// Create a new [`AggregateExec`] with an explicit input partition requirement. + pub fn try_new_with_require_single_output_partition( + mode: AggregateMode, + group_by: PhysicalGroupBy, + aggr_expr: Vec>, + filter_expr: Vec>>, + input: Arc, + input_schema: SchemaRef, + require_single_output_partition: bool, + ) -> Result { + let mut exec = AggregateExec::try_new( + mode, + group_by, + aggr_expr, + filter_expr, + input, + input_schema, + )?; + exec.require_single_output_partition = require_single_output_partition; + Ok(exec) + } + + /// Create a new [`AggregateExec`] while reusing the input partitioning + /// requirement from an existing aggregate. + pub fn try_new_with_settings_from( + settings: &AggregateExec, + mode: AggregateMode, + group_by: PhysicalGroupBy, + aggr_expr: Vec>, + filter_expr: Vec>>, + input: Arc, + input_schema: SchemaRef, + ) -> Result { + Self::try_new_with_require_single_output_partition( + mode, + group_by, + aggr_expr, + filter_expr, + input, + input_schema, + settings.require_single_output_partition, + ) + } + /// Create a new hash aggregate execution plan with the given schema. /// This constructor isn't part of the public API, it is used internally /// by DataFusion to enforce schema consistency during when re-creating @@ -747,7 +794,7 @@ impl AggregateExec { required_input_ordering, limit_options: None, input_order_mode, - repartition_aggregations: false, + require_single_output_partition: true, cache, dynamic_filter: None, }; @@ -783,18 +830,18 @@ impl AggregateExec { self } - /// Configure whether final/single aggregates should require hash repartitioning. - pub fn with_repartition_aggregations( + /// Configure whether final/single aggregates should require a single input partition. + pub fn with_require_single_output_partition( mut self, - repartition_aggregations: bool, + require_single_output_partition: bool, ) -> Self { - self.repartition_aggregations = repartition_aggregations; + self.require_single_output_partition = require_single_output_partition; self } - /// Returns whether hash repartitioning is enabled for final/single aggregation. - pub fn repartition_aggregations(&self) -> bool { - self.repartition_aggregations + /// Returns whether final/single aggregation requires a single input partition. + pub fn require_single_output_partition(&self) -> bool { + self.require_single_output_partition } /// Get the limit options (if set) @@ -1278,7 +1325,8 @@ impl ExecutionPlan for AggregateExec { match self.mode { AggregateMode::Partial => vec![Distribution::UnspecifiedDistribution], AggregateMode::Final | AggregateMode::Single - if !self.repartition_aggregations || self.group_by.expr.is_empty() => + if self.require_single_output_partition + || self.group_by.expr.is_empty() => { vec![Distribution::SinglePartition] } @@ -1323,7 +1371,7 @@ impl ExecutionPlan for AggregateExec { Arc::clone(&self.schema), )?; me.limit_options = self.limit_options; - me.repartition_aggregations = self.repartition_aggregations; + me.require_single_output_partition = self.require_single_output_partition; me.dynamic_filter = self.dynamic_filter.clone(); Ok(Arc::new(me)) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 3b986ef740ed4..707041d8363c7 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1252,7 +1252,7 @@ message AggregateExecNode { repeated MaybeFilter filter_expr = 10; AggLimit limit = 11; bool has_grouping_set = 12; - bool repartition_aggregations = 13; + bool require_single_output_partition = 13; } message GlobalLimitExecNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 421595e41fd27..6edf0f585827b 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -154,7 +154,7 @@ impl serde::Serialize for AggregateExecNode { if self.has_grouping_set { len += 1; } - if self.repartition_aggregations { + if self.require_single_output_partition { len += 1; } let mut struct_ser = serializer.serialize_struct("datafusion.AggregateExecNode", len)?; @@ -196,10 +196,10 @@ impl serde::Serialize for AggregateExecNode { if self.has_grouping_set { struct_ser.serialize_field("hasGroupingSet", &self.has_grouping_set)?; } - if self.repartition_aggregations { + if self.require_single_output_partition { struct_ser.serialize_field( - "repartitionAggregations", - &self.repartition_aggregations, + "requireSingleOutputPartition", + &self.require_single_output_partition, )?; } struct_ser.end() @@ -232,6 +232,8 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode { "limit", "has_grouping_set", "hasGroupingSet", + "require_single_output_partition", + "requireSingleOutputPartition", "repartition_aggregations", "repartitionAggregations", ]; @@ -250,7 +252,7 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode { FilterExpr, Limit, HasGroupingSet, - RepartitionAggregations, + RequireSingleOutputPartition, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -284,7 +286,8 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode { "filterExpr" | "filter_expr" => Ok(GeneratedField::FilterExpr), "limit" => Ok(GeneratedField::Limit), "hasGroupingSet" | "has_grouping_set" => Ok(GeneratedField::HasGroupingSet), - "repartitionAggregations" | "repartition_aggregations" => Ok(GeneratedField::RepartitionAggregations), + "requireSingleOutputPartition" | "require_single_output_partition" => Ok(GeneratedField::RequireSingleOutputPartition), + "repartitionAggregations" | "repartition_aggregations" => Ok(GeneratedField::RequireSingleOutputPartition), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -316,7 +319,7 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode { let mut filter_expr__ = None; let mut limit__ = None; let mut has_grouping_set__ = None; - let mut repartition_aggregations__ = None; + let mut require_single_output_partition__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::GroupExpr => { @@ -391,13 +394,14 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode { } has_grouping_set__ = Some(map_.next_value()?); } - GeneratedField::RepartitionAggregations => { - if repartition_aggregations__.is_some() { + GeneratedField::RequireSingleOutputPartition => { + if require_single_output_partition__.is_some() { return Err(serde::de::Error::duplicate_field( - "repartitionAggregations", + "requireSingleOutputPartition", )); } - repartition_aggregations__ = Some(map_.next_value()?); + require_single_output_partition__ = + Some(map_.next_value()?); } } } @@ -414,7 +418,8 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode { filter_expr: filter_expr__.unwrap_or_default(), limit: limit__, has_grouping_set: has_grouping_set__.unwrap_or_default(), - repartition_aggregations: repartition_aggregations__.unwrap_or_default(), + require_single_output_partition: require_single_output_partition__ + .unwrap_or_default(), }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index ca0a74275d018..caa392d4a050a 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1872,7 +1872,7 @@ pub struct AggregateExecNode { #[prost(bool, tag = "12")] pub has_grouping_set: bool, #[prost(bool, tag = "13")] - pub repartition_aggregations: bool, + pub require_single_output_partition: bool, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct GlobalLimitExecNode { diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index a4d28aad4420f..931e8cb3332f7 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -48,7 +48,9 @@ use datafusion_functions_table::generate_series::{ }; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use datafusion_physical_expr::async_scalar_function::AsyncFuncExpr; -use datafusion_physical_expr::{LexOrdering, LexRequirement, Partitioning, PhysicalExprRef}; +use datafusion_physical_expr::{ + LexOrdering, LexRequirement, Partitioning, PhysicalExprRef, +}; use datafusion_physical_plan::aggregates::{ AggregateExec, AggregateMode, LimitOptions, PhysicalGroupBy, }; @@ -1220,15 +1222,15 @@ impl protobuf::PhysicalPlanNode { }) .collect::, _>>()?; - let agg = AggregateExec::try_new( + let agg = AggregateExec::try_new_with_require_single_output_partition( agg_mode, PhysicalGroupBy::new(group_expr, null_expr, groups, has_grouping_set), physical_aggr_expr, physical_filter_expr, input, physical_schema, - )? - .with_repartition_aggregations(hash_agg.repartition_aggregations); + hash_agg.require_single_output_partition, + )?; let agg = if let Some(limit_proto) = &hash_agg.limit { let limit = limit_proto.limit as usize; @@ -2722,7 +2724,8 @@ impl protobuf::PhysicalPlanNode { groups, limit, has_grouping_set: exec.group_expr().has_grouping_set(), - repartition_aggregations: exec.repartition_aggregations(), + require_single_output_partition: exec + .require_single_output_partition(), }, ))), })