From 9de59563c2b2facf52557bb1f74cb4504fc252d2 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 12 Aug 2025 13:14:59 -0600 Subject: [PATCH 01/19] quick fix --- native/core/src/execution/planner.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 7fa7bfe905..1d568aa19c 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1279,10 +1279,16 @@ impl PhysicalPlanner { // SortExec fails in some cases if we do not unpack dictionary-encoded arrays, and // it would be more efficient if we could avoid that. // https://github.com/apache/datafusion-comet/issues/963 - let child_copied = Self::wrap_in_copy_exec(Arc::clone(&child.native_plan)); + + // TODO optimize this so that we only add the CopyExec if needed + // https://github.com/apache/datafusion-comet/issues/2131 + let child_copied = Arc::new(CopyExec::new( + Arc::clone(&child.native_plan), + CopyMode::UnpackOrDeepCopy, + )); let sort = Arc::new( - SortExec::new(LexOrdering::new(exprs?).unwrap(), Arc::clone(&child_copied)) + SortExec::new(LexOrdering::new(exprs?).unwrap(), child_copied) .with_fetch(fetch), ); From e5b5454f1a18a9bb50a8d05518e28295c22a9b44 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 12 Aug 2025 16:28:36 -0600 Subject: [PATCH 02/19] test --- native/core/src/execution/planner.rs | 37 ++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 7fa7bfe905..fab341fcbd 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -2785,6 +2785,7 @@ mod tests { use datafusion::logical_expr::ScalarUDF; use datafusion::physical_plan::ExecutionPlan; use datafusion::{assert_batches_eq, physical_plan::common::collect, prelude::SessionContext}; + use datafusion::physical_plan::display::DisplayableExecutionPlan; use tempfile::TempDir; use tokio::sync::mpsc; @@ -2803,6 +2804,42 @@ mod tests { }; use datafusion_comet_spark_expr::EvalMode; + #[test] + fn copy_exec() { + let scan_exec = create_scan(); + + // Create a SortOrder expression that sorts by the first column (index 0) + // in ascending order with nulls first + let sort_order_expr = spark_expression::Expr { + expr_struct: Some(ExprStruct::SortOrder(Box::new(spark_expression::SortOrder { + child: Some(Box::new(spark_expression::Expr { + expr_struct: Some(ExprStruct::Bound(spark_expression::BoundReference { + index: 0, + datatype: Some(spark_expression::DataType { + type_id: 3, // Int32 + type_info: None, + }), + })), + })), + direction: spark_expression::SortDirection::Ascending as i32, + null_ordering: spark_expression::NullOrdering::NullsFirst as i32, + }))), + }; + + let sort_exec = Operator { + plan_id: 0, + children: vec![scan_exec], + op_struct: Some(OpStruct::Sort(spark_operator::Sort { + sort_orders: vec![sort_order_expr], + fetch: None, + })) + }; + let planner = PhysicalPlanner::default(); + let (_scans, datafusion_plan) = planner.create_plan(&sort_exec, &mut vec![], 1).unwrap(); + let plan_str = format!("{}", DisplayableExecutionPlan::new(datafusion_plan.native_plan.as_ref()).indent(true)); + println!("{plan_str}"); + } + #[test] fn test_unpack_dictionary_primitive() { let op_scan = Operator { From 8bc85051c6c4f75201492a494402f0d4930f7b89 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 12 Aug 2025 16:45:41 -0600 Subject: [PATCH 03/19] tests --- native/core/src/execution/planner.rs | 106 ++++++++++++------ .../apache/comet/serde/QueryPlanSerde.scala | 1 + 2 files changed, 70 insertions(+), 37 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 9f391a2468..ba765f7063 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -2789,9 +2789,9 @@ mod tests { }; use datafusion::error::DataFusionError; use datafusion::logical_expr::ScalarUDF; + use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::ExecutionPlan; use datafusion::{assert_batches_eq, physical_plan::common::collect, prelude::SessionContext}; - use datafusion::physical_plan::display::DisplayableExecutionPlan; use tempfile::TempDir; use tokio::sync::mpsc; @@ -2809,42 +2809,7 @@ mod tests { spark_operator::{operator::OpStruct, Operator}, }; use datafusion_comet_spark_expr::EvalMode; - - #[test] - fn copy_exec() { - let scan_exec = create_scan(); - - // Create a SortOrder expression that sorts by the first column (index 0) - // in ascending order with nulls first - let sort_order_expr = spark_expression::Expr { - expr_struct: Some(ExprStruct::SortOrder(Box::new(spark_expression::SortOrder { - child: Some(Box::new(spark_expression::Expr { - expr_struct: Some(ExprStruct::Bound(spark_expression::BoundReference { - index: 0, - datatype: Some(spark_expression::DataType { - type_id: 3, // Int32 - type_info: None, - }), - })), - })), - direction: spark_expression::SortDirection::Ascending as i32, - null_ordering: spark_expression::NullOrdering::NullsFirst as i32, - }))), - }; - - let sort_exec = Operator { - plan_id: 0, - children: vec![scan_exec], - op_struct: Some(OpStruct::Sort(spark_operator::Sort { - sort_orders: vec![sort_order_expr], - fetch: None, - })) - }; - let planner = PhysicalPlanner::default(); - let (_scans, datafusion_plan) = planner.create_plan(&sort_exec, &mut vec![], 1).unwrap(); - let plan_str = format!("{}", DisplayableExecutionPlan::new(datafusion_plan.native_plan.as_ref()).indent(true)); - println!("{plan_str}"); - } + use crate::execution::spark_plan::SparkPlan; #[test] fn test_unpack_dictionary_primitive() { @@ -3036,6 +3001,41 @@ mod tests { assert_eq!(comet_err.to_string(), "Error from DataFusion: exec error."); } + #[test] + fn add_copy_to_scan() { + let scan_exec = create_scan(); + let sort_exec = create_sort_exec(scan_exec); + let planner = PhysicalPlanner::default(); + let (_scans, datafusion_plan) = planner.create_plan(&sort_exec, &mut vec![], 1).unwrap(); + let plan_str = explain_plan(datafusion_plan); + let expected_str = r"SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false] + CopyExec [UnpackOrDeepCopy] + ScanExec: source=[], schema=[col_0: Int32]"; + assert_eq!(plan_str, expected_str); + } + + #[test] + fn add_copy_to_filter_scan() { + let scan_exec = create_scan(); + let filter_exec = create_filter(scan_exec, 1); + let sort_exec = create_sort_exec(filter_exec); + let planner = PhysicalPlanner::default(); + let (_scans, datafusion_plan) = planner.create_plan(&sort_exec, &mut vec![], 1).unwrap(); + let plan_str = explain_plan(datafusion_plan); + let expected_str = r"SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false] + CopyExec [UnpackOrDeepCopy] + CometFilterExec: col_0@0 = 1 + ScanExec: source=[], schema=[col_0: Int32]"; + assert_eq!(plan_str, expected_str); + } + + fn explain_plan(datafusion_plan: Arc) -> String { + format!( + "{}", + DisplayableExecutionPlan::new(datafusion_plan.native_plan.as_ref()).indent(true) + ) + } + // Creates a filter operator which takes an `Int32Array` and selects rows that are equal to // `value`. fn create_filter(child_op: spark_operator::Operator, value: i32) -> spark_operator::Operator { @@ -3087,6 +3087,38 @@ mod tests { } } + fn create_sort_exec(scan_exec: Operator) -> Operator { + // Create a SortOrder expression that sorts by the first column (index 0) + // in ascending order with nulls first + let sort_order_expr = spark_expression::Expr { + expr_struct: Some(ExprStruct::SortOrder(Box::new( + spark_expression::SortOrder { + child: Some(Box::new(spark_expression::Expr { + expr_struct: Some(ExprStruct::Bound(spark_expression::BoundReference { + index: 0, + datatype: Some(spark_expression::DataType { + type_id: 3, // Int32 + type_info: None, + }), + })), + })), + direction: spark_expression::SortDirection::Ascending as i32, + null_ordering: spark_expression::NullOrdering::NullsFirst as i32, + }, + ))), + }; + + let sort_exec = Operator { + plan_id: 0, + children: vec![scan_exec], + op_struct: Some(OpStruct::Sort(spark_operator::Sort { + sort_orders: vec![sort_order_expr], + fetch: None, + })), + }; + sort_exec + } + #[test] fn spark_plan_metrics_filter() { let op_scan = create_scan(); diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 35ebabdacb..eb008d4a19 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1796,6 +1796,7 @@ object QueryPlanSerde extends Logging with CometExprShim { // TODO this could be optimized more to stop walking the tree on hitting // certain operators such as join or aggregate which will copy batches def containsNativeCometScan(plan: SparkPlan): Boolean = { + // TODO this needs updating to support Iceberg invoking native_comet scan plan match { case w: CometScanWrapper => containsNativeCometScan(w.originalPlan) case scan: CometScanExec => scan.scanImpl == CometConf.SCAN_NATIVE_COMET From f86dc9db98ceae02b04b2f0d176b2a9c1146986f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 12 Aug 2025 16:51:25 -0600 Subject: [PATCH 04/19] format --- native/core/src/execution/planner.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index ba765f7063..95ce0a7acf 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -3002,7 +3002,7 @@ mod tests { } #[test] - fn add_copy_to_scan() { + fn add_copy_to_sort_on_scan() { let scan_exec = create_scan(); let sort_exec = create_sort_exec(scan_exec); let planner = PhysicalPlanner::default(); @@ -3010,12 +3010,13 @@ mod tests { let plan_str = explain_plan(datafusion_plan); let expected_str = r"SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false] CopyExec [UnpackOrDeepCopy] - ScanExec: source=[], schema=[col_0: Int32]"; + ScanExec: source=[], schema=[col_0: Int32] +"; assert_eq!(plan_str, expected_str); } #[test] - fn add_copy_to_filter_scan() { + fn add_copy_to_sort_on_filtered_scan() { let scan_exec = create_scan(); let filter_exec = create_filter(scan_exec, 1); let sort_exec = create_sort_exec(filter_exec); @@ -3025,7 +3026,8 @@ mod tests { let expected_str = r"SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false] CopyExec [UnpackOrDeepCopy] CometFilterExec: col_0@0 = 1 - ScanExec: source=[], schema=[col_0: Int32]"; + ScanExec: source=[], schema=[col_0: Int32] +"; assert_eq!(plan_str, expected_str); } From 3f2b9c79d73aeda50d84a464785f61d9a45ba117 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 12 Aug 2025 17:42:39 -0600 Subject: [PATCH 05/19] add has_buffer_reuse tag to scan proto --- native/core/src/execution/operators/scan.rs | 4 ++++ native/core/src/execution/planner.rs | 16 +++++++++++++--- native/proto/src/proto/operator.proto | 4 ++++ .../org/apache/comet/serde/QueryPlanSerde.scala | 8 ++++++++ 4 files changed, 29 insertions(+), 3 deletions(-) diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index a842efaa30..12c199dad6 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -63,6 +63,8 @@ pub struct ScanExec { pub input_source: Option>, /// A description of the input source for informational purposes pub input_source_description: String, + /// Whether this is a native_comet scan that reuses buffers + pub has_buffer_reuse: bool, /// The data types of columns of the input batch. Converted from Spark schema. pub data_types: Vec, /// Schema of first batch @@ -88,6 +90,7 @@ impl ScanExec { input_source: Option>, input_source_description: &str, data_types: Vec, + has_buffer_reuse: bool, ) -> Result { let metrics_set = ExecutionPlanMetricsSet::default(); let baseline_metrics = BaselineMetrics::new(&metrics_set, 0); @@ -130,6 +133,7 @@ impl ScanExec { exec_context_id, input_source, input_source_description: input_source_description.to_string(), + has_buffer_reuse, data_types, batch: Arc::new(Mutex::new(Some(first_batch))), cache, diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 95ce0a7acf..1dbe683a23 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1430,8 +1430,13 @@ impl PhysicalPlanner { }; // The `ScanExec` operator will take actual arrays from Spark during execution - let scan = - ScanExec::new(self.exec_context_id, input_source, &scan.source, data_types)?; + let scan = ScanExec::new( + self.exec_context_id, + input_source, + &scan.source, + data_types, + scan.has_buffer_reuse, + )?; Ok(( vec![scan.clone()], @@ -2798,6 +2803,7 @@ mod tests { use crate::execution::{operators::InputBatch, planner::PhysicalPlanner}; use crate::execution::operators::ExecutionError; + use crate::execution::spark_plan::SparkPlan; use crate::parquet::parquet_support::SparkParquetOptions; use crate::parquet::schema_adapter::SparkSchemaAdapterFactory; use datafusion_comet_proto::spark_expression::expr::ExprStruct; @@ -2809,7 +2815,6 @@ mod tests { spark_operator::{operator::OpStruct, Operator}, }; use datafusion_comet_spark_expr::EvalMode; - use crate::execution::spark_plan::SparkPlan; #[test] fn test_unpack_dictionary_primitive() { @@ -2822,6 +2827,7 @@ mod tests { type_info: None, }], source: "".to_string(), + has_buffer_reuse: true, })), }; @@ -2895,6 +2901,7 @@ mod tests { type_info: None, }], source: "".to_string(), + has_buffer_reuse: true, })), }; @@ -3175,6 +3182,7 @@ mod tests { op_struct: Some(OpStruct::Scan(spark_operator::Scan { fields: vec![create_proto_datatype()], source: "".to_string(), + has_buffer_reuse: true, })), } } @@ -3217,6 +3225,7 @@ mod tests { }, ], source: "".to_string(), + has_buffer_reuse: true, })), }; @@ -3331,6 +3340,7 @@ mod tests { }, ], source: "".to_string(), + has_buffer_reuse: true, })), }; diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 3470ba8100..b086bc9480 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -76,6 +76,10 @@ message Scan { // is purely for informational purposes when viewing native query plans in // debug mode. string source = 2; + // we need to specialize if the source is native_comet scan (either directly, + // or via Iceberg) because the native code needs to insert CopyExec nodes to + // avoid memory corruption issues due to the reuse of mutable buffers + bool has_buffer_reuse = 3; } message NativeScan { diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index eb008d4a19..b80de3c007 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -52,6 +52,7 @@ import org.apache.comet.CometSparkSessionExtensions.{isCometScan, withInfo} import org.apache.comet.DataTypeSupport.isComplexType import org.apache.comet.expressions._ import org.apache.comet.objectstore.NativeConfig +import org.apache.comet.parquet.SupportsComet import org.apache.comet.serde.ExprOuterClass.{AggExpr, DataType => ProtoDataType, Expr, ScalarFunc} import org.apache.comet.serde.ExprOuterClass.DataType._ import org.apache.comet.serde.OperatorOuterClass.{AggregateMode => CometAggregateMode, BuildSide, JoinType, Operator} @@ -2209,6 +2210,13 @@ object QueryPlanSerde extends Logging with CometExprShim { serializeDataType(attr.dataType) } + // iceberg + op match { + case batchScan: CometBatchScanExec if batchScan.scan.isInstanceOf[SupportsComet] => + scanBuilder.setHasBufferReuse(true) + case _ => + } + if (scanTypes.length == op.output.length) { scanBuilder.addAllFields(scanTypes.asJava) From cb26d0accb958612b740ced9b37285029b472a88 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 12 Aug 2025 18:20:38 -0600 Subject: [PATCH 06/19] Fix memory corruption issue with Iceberg scans --- native/core/src/execution/operators/copy.rs | 6 +- native/core/src/execution/planner.rs | 121 +++++++++++++----- native/proto/src/proto/operator.proto | 1 - .../apache/comet/serde/QueryPlanSerde.scala | 13 -- 4 files changed, 94 insertions(+), 47 deletions(-) diff --git a/native/core/src/execution/operators/copy.rs b/native/core/src/execution/operators/copy.rs index a809031d5d..842db13f9c 100644 --- a/native/core/src/execution/operators/copy.rs +++ b/native/core/src/execution/operators/copy.rs @@ -49,7 +49,7 @@ pub struct CopyExec { mode: CopyMode, } -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone)] pub enum CopyMode { /// Perform a deep copy and also unpack dictionaries UnpackOrDeepCopy, @@ -91,6 +91,10 @@ impl CopyExec { mode, } } + + pub fn mode(&self) -> &CopyMode { + &self.mode + } } impl DisplayAs for CopyExec { diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 1dbe683a23..57a965f991 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1146,8 +1146,10 @@ impl PhysicalPlanner { let predicate = self.create_expr(filter.predicate.as_ref().unwrap(), child.schema())?; + let use_datafusion_filter = !backed_by_mutable_buffers(&child.native_plan); + let filter: Arc = - match (filter.wrap_child_in_copy_exec, filter.use_datafusion_filter) { + match (filter.wrap_child_in_copy_exec, use_datafusion_filter) { (true, true) => Arc::new(DataFusionFilterExec::try_new( predicate, Self::wrap_in_copy_exec(Arc::clone(&child.native_plan)), @@ -1279,16 +1281,10 @@ impl PhysicalPlanner { // SortExec fails in some cases if we do not unpack dictionary-encoded arrays, and // it would be more efficient if we could avoid that. // https://github.com/apache/datafusion-comet/issues/963 - - // TODO optimize this so that we only add the CopyExec if needed - // https://github.com/apache/datafusion-comet/issues/2131 - let child_copied = Arc::new(CopyExec::new( - Arc::clone(&child.native_plan), - CopyMode::UnpackOrDeepCopy, - )); + let child_copied = Self::wrap_in_copy_exec(Arc::clone(&child.native_plan)); let sort = Arc::new( - SortExec::new(LexOrdering::new(exprs?).unwrap(), child_copied) + SortExec::new(LexOrdering::new(exprs?).unwrap(), Arc::clone(&child_copied)) .with_fetch(fetch), ); @@ -1522,7 +1518,7 @@ impl PhysicalPlanner { // the data corruption. Note that we only need to copy the input batch // if the child operator is `ScanExec`, because other operators after `ScanExec` // will create new arrays for the output batch. - let input = if can_reuse_input_batch(&child.native_plan) { + let input = if backed_by_mutable_buffers(&child.native_plan) { Arc::new(CopyExec::new( Arc::clone(&child.native_plan), CopyMode::UnpackOrDeepCopy, @@ -1859,7 +1855,7 @@ impl PhysicalPlanner { /// Wrap an ExecutionPlan in a CopyExec, which will unpack any dictionary-encoded arrays /// and make a deep copy of other arrays if the plan re-uses batches. fn wrap_in_copy_exec(plan: Arc) -> Arc { - if can_reuse_input_batch(&plan) { + if backed_by_mutable_buffers(&plan) { Arc::new(CopyExec::new(plan, CopyMode::UnpackOrDeepCopy)) } else { Arc::new(CopyExec::new(plan, CopyMode::UnpackOrClone)) @@ -2564,14 +2560,32 @@ impl From for DataFusionError { } } -/// Returns true if given operator can return input array as output array without -/// modification. This is used to determine if we need to copy the input batch to avoid -/// data corruption from reusing the input batch. -fn can_reuse_input_batch(op: &Arc) -> bool { - if op.as_any().is::() || op.as_any().is::() { - can_reuse_input_batch(op.children()[0]) +/// Returns true if given operator can produce arrays that are backed +/// by mutable buffers from a native_comet scan that has not been wrapped +/// in CopyMode::UnpackOrDeepCopy +fn backed_by_mutable_buffers(op: &Arc) -> bool { + if op.as_any().is::() { + // check for native_comet scan (either direct or via Iceberg) + op.as_any() + .downcast_ref::() + .unwrap() + .has_buffer_reuse + } else if op.as_any().is::() { + let copy_exec = op.as_any().downcast_ref::().unwrap(); + match copy_exec.mode() { + CopyMode::UnpackOrDeepCopy => false, + CopyMode::UnpackOrClone => backed_by_mutable_buffers(copy_exec.children()[0]), + } + } else if op.as_any().is::() { + // CometFilterExec guarantees that copies are made + false } else { - op.as_any().is::() + for child in op.children() { + if backed_by_mutable_buffers(child) { + return true; + } + } + false } } @@ -2980,7 +2994,7 @@ mod tests { #[tokio::test()] #[allow(clippy::field_reassign_with_default)] async fn to_datafusion_filter() { - let op_scan = create_scan(); + let op_scan = create_native_comet_scan(); let op = create_filter(op_scan, 0); let planner = PhysicalPlanner::default(); @@ -3009,31 +3023,61 @@ mod tests { } #[test] - fn add_copy_to_sort_on_scan() { - let scan_exec = create_scan(); + fn add_copy_to_sort_on_native_comet_scan() { + let scan_exec = create_native_comet_scan(); let sort_exec = create_sort_exec(scan_exec); let planner = PhysicalPlanner::default(); let (_scans, datafusion_plan) = planner.create_plan(&sort_exec, &mut vec![], 1).unwrap(); let plan_str = explain_plan(datafusion_plan); let expected_str = r"SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false] CopyExec [UnpackOrDeepCopy] - ScanExec: source=[], schema=[col_0: Int32] + ScanExec: source=[native_comet], schema=[col_0: Int32] "; assert_eq!(plan_str, expected_str); } #[test] - fn add_copy_to_sort_on_filtered_scan() { - let scan_exec = create_scan(); + fn add_copy_to_sort_on_sink_scan() { + let scan_exec = create_comet_sink_scan(); + let sort_exec = create_sort_exec(scan_exec); + let planner = PhysicalPlanner::default(); + let (_scans, datafusion_plan) = planner.create_plan(&sort_exec, &mut vec![], 1).unwrap(); + let plan_str = explain_plan(datafusion_plan); + let expected_str = r"SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false] + CopyExec [UnpackOrClone] + ScanExec: source=[sink], schema=[col_0: Int32] +"; + assert_eq!(plan_str, expected_str); + } + + #[test] + fn add_copy_to_sort_on_filtered_native_comet_scan() { + let scan_exec = create_native_comet_scan(); let filter_exec = create_filter(scan_exec, 1); let sort_exec = create_sort_exec(filter_exec); let planner = PhysicalPlanner::default(); let (_scans, datafusion_plan) = planner.create_plan(&sort_exec, &mut vec![], 1).unwrap(); let plan_str = explain_plan(datafusion_plan); let expected_str = r"SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false] - CopyExec [UnpackOrDeepCopy] + CopyExec [UnpackOrClone] CometFilterExec: col_0@0 = 1 - ScanExec: source=[], schema=[col_0: Int32] + ScanExec: source=[native_comet], schema=[col_0: Int32] +"; + assert_eq!(plan_str, expected_str); + } + + #[test] + fn add_copy_to_sort_on_filtered_scan() { + let scan_exec = create_comet_sink_scan(); + let filter_exec = create_filter(scan_exec, 1); + let sort_exec = create_sort_exec(filter_exec); + let planner = PhysicalPlanner::default(); + let (_scans, datafusion_plan) = planner.create_plan(&sort_exec, &mut vec![], 1).unwrap(); + let plan_str = explain_plan(datafusion_plan); + let expected_str = r"SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false] + CopyExec [UnpackOrClone] + FilterExec: col_0@0 = 1 + ScanExec: source=[sink], schema=[col_0: Int32] "; assert_eq!(plan_str, expected_str); } @@ -3090,7 +3134,6 @@ mod tests { children: vec![child_op], op_struct: Some(OpStruct::Filter(spark_operator::Filter { predicate: Some(expr), - use_datafusion_filter: false, wrap_child_in_copy_exec: false, })), } @@ -3130,7 +3173,7 @@ mod tests { #[test] fn spark_plan_metrics_filter() { - let op_scan = create_scan(); + let op_scan = create_native_comet_scan(); let op = create_filter(op_scan, 0); let planner = PhysicalPlanner::default(); @@ -3143,7 +3186,7 @@ mod tests { #[test] fn spark_plan_metrics_hash_join() { - let op_scan = create_scan(); + let op_scan = create_native_comet_scan(); let op_join = Operator { plan_id: 0, children: vec![op_scan.clone(), op_scan.clone()], @@ -3175,14 +3218,28 @@ mod tests { } } - fn create_scan() -> Operator { + /// Create a scan for reading from native_comet Parquet reader + fn create_native_comet_scan() -> Operator { Operator { plan_id: 0, children: vec![], op_struct: Some(OpStruct::Scan(spark_operator::Scan { fields: vec![create_proto_datatype()], - source: "".to_string(), - has_buffer_reuse: true, + source: "native_comet".to_string(), + has_buffer_reuse: true, // buffer reuse for native_comet + })), + } + } + + /// Create a scan for reading from a shuffle + fn create_comet_sink_scan() -> Operator { + Operator { + plan_id: 0, + children: vec![], + op_struct: Some(OpStruct::Scan(spark_operator::Scan { + fields: vec![create_proto_datatype()], + source: "sink".to_string(), + has_buffer_reuse: false, // no buffer reuse when reading a sink })), } } diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index b086bc9480..485a1be534 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -113,7 +113,6 @@ message Projection { message Filter { spark.spark_expression.Expr predicate = 1; - bool use_datafusion_filter = 2; // Some expressions don't support dictionary arrays, so may need to wrap the child in a CopyExec bool wrap_child_in_copy_exec = 3; } diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index b80de3c007..02d74e4d18 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1794,18 +1794,6 @@ object QueryPlanSerde extends Logging with CometExprShim { // batches through whereas the Comet implementation guarantees that a copy is always // made, which is critical when using `native_comet` scans due to buffer re-use - // TODO this could be optimized more to stop walking the tree on hitting - // certain operators such as join or aggregate which will copy batches - def containsNativeCometScan(plan: SparkPlan): Boolean = { - // TODO this needs updating to support Iceberg invoking native_comet scan - plan match { - case w: CometScanWrapper => containsNativeCometScan(w.originalPlan) - case scan: CometScanExec => scan.scanImpl == CometConf.SCAN_NATIVE_COMET - case _: CometNativeScanExec => false - case _ => plan.children.exists(containsNativeCometScan) - } - } - // Some native expressions do not support operating on dictionary-encoded arrays, so // wrap the child in a CopyExec to unpack dictionaries first. def wrapChildInCopyExec(condition: Expression): Boolean = { @@ -1818,7 +1806,6 @@ object QueryPlanSerde extends Logging with CometExprShim { val filterBuilder = OperatorOuterClass.Filter .newBuilder() .setPredicate(cond.get) - .setUseDatafusionFilter(!containsNativeCometScan(op)) .setWrapChildInCopyExec(wrapChildInCopyExec(condition)) Some(builder.setFilter(filterBuilder).build()) } else { From ddf7517fc683b60c1f2500470a5cb388e8fd7001 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 12 Aug 2025 18:47:03 -0600 Subject: [PATCH 07/19] clippy --- native/core/src/execution/planner.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 57a965f991..854e1c3753 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -3160,15 +3160,14 @@ mod tests { ))), }; - let sort_exec = Operator { + Operator { plan_id: 0, children: vec![scan_exec], op_struct: Some(OpStruct::Sort(spark_operator::Sort { sort_orders: vec![sort_order_expr], fetch: None, })), - }; - sort_exec + } } #[test] From 2a4a4adc7af53d1792ef105d6d9941dff3a72b12 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 12 Aug 2025 18:49:16 -0600 Subject: [PATCH 08/19] prep for review --- .../main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 5 ----- 1 file changed, 5 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 02d74e4d18..129909504f 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1789,11 +1789,6 @@ object QueryPlanSerde extends Logging with CometExprShim { val cond = exprToProto(condition, child.output) if (cond.isDefined && childOp.nonEmpty) { - // We need to determine whether to use DataFusion's FilterExec or Comet's - // FilterExec. The difference is that DataFusion's implementation will sometimes pass - // batches through whereas the Comet implementation guarantees that a copy is always - // made, which is critical when using `native_comet` scans due to buffer re-use - // Some native expressions do not support operating on dictionary-encoded arrays, so // wrap the child in a CopyExec to unpack dictionaries first. def wrapChildInCopyExec(condition: Expression): Boolean = { From 2f5463bcda47da177d89060ab5e8f8c03e1b7443 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 12 Aug 2025 18:50:22 -0600 Subject: [PATCH 09/19] prep for review --- native/core/src/execution/operators/copy.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native/core/src/execution/operators/copy.rs b/native/core/src/execution/operators/copy.rs index 842db13f9c..ff1a18e673 100644 --- a/native/core/src/execution/operators/copy.rs +++ b/native/core/src/execution/operators/copy.rs @@ -49,7 +49,7 @@ pub struct CopyExec { mode: CopyMode, } -#[derive(Debug, PartialEq, Eq, Clone)] +#[derive(Debug, PartialEq, Clone)] pub enum CopyMode { /// Perform a deep copy and also unpack dictionaries UnpackOrDeepCopy, From 8e5fa4f9235d46f001cdb06cca88e5974f7042b9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 13 Aug 2025 03:58:57 -0600 Subject: [PATCH 10/19] fix --- .../main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 129909504f..3b8949aa39 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2195,6 +2195,11 @@ object QueryPlanSerde extends Logging with CometExprShim { // iceberg op match { case batchScan: CometBatchScanExec if batchScan.scan.isInstanceOf[SupportsComet] => + // Iceberg integration + scanBuilder.setHasBufferReuse(true) + case _: CoalesceExec => + // CoalesceExec could be wrapping a native_comet scan on the Scala side + // See CometCastSuite for examples scanBuilder.setHasBufferReuse(true) case _ => } From 844c4f0a16b0ced8c0120d879bddaaaa99aadefb Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 13 Aug 2025 03:59:15 -0600 Subject: [PATCH 11/19] fix --- spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 3b8949aa39..9ac6e8c5de 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2192,7 +2192,6 @@ object QueryPlanSerde extends Logging with CometExprShim { serializeDataType(attr.dataType) } - // iceberg op match { case batchScan: CometBatchScanExec if batchScan.scan.isInstanceOf[SupportsComet] => // Iceberg integration From 2a01623a47af770b4b91f3e5e22b49245d5c17b0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 13 Aug 2025 04:09:02 -0600 Subject: [PATCH 12/19] fix --- .../apache/comet/serde/QueryPlanSerde.scala | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 9ac6e8c5de..3e818aec8c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2192,15 +2192,24 @@ object QueryPlanSerde extends Logging with CometExprShim { serializeDataType(attr.dataType) } - op match { - case batchScan: CometBatchScanExec if batchScan.scan.isInstanceOf[SupportsComet] => - // Iceberg integration - scanBuilder.setHasBufferReuse(true) - case _: CoalesceExec => - // CoalesceExec could be wrapping a native_comet scan on the Scala side - // See CometCastSuite for examples - scanBuilder.setHasBufferReuse(true) - case _ => + def hasBufferReuse(op: SparkPlan): Boolean = { + op match { + case batchScan: CometBatchScanExec if batchScan.scan.isInstanceOf[SupportsComet] => + // Iceberg integration + true + case scan: CometScanExec => + // native_comet scan + scan.scanImpl == CometConf.SCAN_NATIVE_COMET + case CometScanWrapper(_, scan) => + hasBufferReuse(scan) + case _ => + // wrapper around scan such as CoalesceExec + op.children.exists(hasBufferReuse) + } + } + + if (hasBufferReuse(op)) { + scanBuilder.setHasBufferReuse(true) } if (scanTypes.length == op.output.length) { From 47841fd73f33f8e4572c1ec87e382a6425c06100 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 13 Aug 2025 04:42:18 -0600 Subject: [PATCH 13/19] fix --- .../org/apache/spark/sql/comet/CometExecUtils.scala | 10 ++++++++-- .../execution/shuffle/CometNativeShuffleWriter.scala | 5 ++++- .../scala/org/apache/spark/sql/CometTestBase.scala | 1 + 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala index 2fc73bb7c5..fce45cb18d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala @@ -88,7 +88,10 @@ object CometExecUtils { * child partition */ def getLimitNativePlan(outputAttributes: Seq[Attribute], limit: Int): Option[Operator] = { - val scanBuilder = OperatorOuterClass.Scan.newBuilder().setSource("LimitInput") + val scanBuilder = OperatorOuterClass.Scan + .newBuilder() + .setSource("LimitInput") + .setHasBufferReuse(true) // TODO needed? val scanOpBuilder = OperatorOuterClass.Operator.newBuilder() val scanTypes = outputAttributes.flatten { attr => @@ -118,7 +121,10 @@ object CometExecUtils { sortOrder: Seq[SortOrder], child: SparkPlan, limit: Int): Option[Operator] = { - val scanBuilder = OperatorOuterClass.Scan.newBuilder().setSource("TopKInput") + val scanBuilder = OperatorOuterClass.Scan + .newBuilder() + .setSource("TopKInput") + .setHasBufferReuse(true) // TODO needed? val scanOpBuilder = OperatorOuterClass.Operator.newBuilder() val scanTypes = outputAttributes.flatten { attr => diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala index 5d772be403..275d976f5f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala @@ -141,7 +141,10 @@ class CometNativeShuffleWriter[K, V]( } private def getNativePlan(dataFile: String, indexFile: String): Operator = { - val scanBuilder = OperatorOuterClass.Scan.newBuilder().setSource("ShuffleWriterInput") + val scanBuilder = OperatorOuterClass.Scan + .newBuilder() + .setSource("ShuffleWriterInput") + .setHasBufferReuse(true) // TODO can be optimized? val opBuilder = OperatorOuterClass.Operator.newBuilder() val scanTypes = outputAttributes.flatten { attr => diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index cf11bdf590..dcedebd01b 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -89,6 +89,7 @@ abstract class CometTestBase conf.set(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key, "true") conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "2g") conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, "true") + conf.set(CometConf.COMET_EXPLAIN_NATIVE_ENABLED.key, "true") conf } From 003af675e90869f6f51f3537e2548eb0aa362d87 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 13 Aug 2025 04:46:31 -0600 Subject: [PATCH 14/19] revert config change --- spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index dcedebd01b..cf11bdf590 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -89,7 +89,6 @@ abstract class CometTestBase conf.set(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key, "true") conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "2g") conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, "true") - conf.set(CometConf.COMET_EXPLAIN_NATIVE_ENABLED.key, "true") conf } From 530ba4729ca7e468d8d6133b45514173ecad4f38 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 13 Aug 2025 08:25:05 -0600 Subject: [PATCH 15/19] simplify --- native/core/src/execution/mod.rs | 9 - native/core/src/execution/operators/copy.rs | 2 +- native/core/src/execution/operators/filter.rs | 574 ------------------ native/core/src/execution/operators/mod.rs | 3 - native/core/src/execution/operators/scan.rs | 15 +- native/core/src/execution/planner.rs | 98 +-- 6 files changed, 36 insertions(+), 665 deletions(-) delete mode 100644 native/core/src/execution/operators/filter.rs diff --git a/native/core/src/execution/mod.rs b/native/core/src/execution/mod.rs index c55b96f2a9..9dc1e87b28 100644 --- a/native/core/src/execution/mod.rs +++ b/native/core/src/execution/mod.rs @@ -29,12 +29,3 @@ pub use datafusion_comet_spark_expr::timezone; mod memory_pools; pub(crate) mod tracing; pub(crate) mod utils; - -#[cfg(test)] -mod tests { - #[test] - fn it_works() { - let result = 2 + 2; - assert_eq!(result, 4); - } -} diff --git a/native/core/src/execution/operators/copy.rs b/native/core/src/execution/operators/copy.rs index ff1a18e673..14c1b44daa 100644 --- a/native/core/src/execution/operators/copy.rs +++ b/native/core/src/execution/operators/copy.rs @@ -241,7 +241,7 @@ impl RecordBatchStream for CopyStream { } /// Copy an Arrow Array -fn copy_array(array: &dyn Array) -> ArrayRef { +pub(crate) fn copy_array(array: &dyn Array) -> ArrayRef { let capacity = array.len(); let data = array.to_data(); diff --git a/native/core/src/execution/operators/filter.rs b/native/core/src/execution/operators/filter.rs deleted file mode 100644 index 272bbd4d89..0000000000 --- a/native/core/src/execution/operators/filter.rs +++ /dev/null @@ -1,574 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::any::Any; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{ready, Context, Poll}; - -use datafusion::physical_plan::{ - metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, - ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, - PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, -}; - -use arrow::array::{ - make_array, Array, ArrayRef, BooleanArray, MutableArrayData, RecordBatchOptions, -}; -use arrow::compute::filter_record_batch; -use arrow::datatypes::{DataType, SchemaRef}; -use arrow::error::ArrowError; -use arrow::record_batch::RecordBatch; -use datafusion::common::cast::as_boolean_array; -use datafusion::common::stats::Precision; -use datafusion::common::{ - internal_err, plan_err, project_schema, DataFusionError, Result, ScalarValue, -}; -use datafusion::execution::TaskContext; -use datafusion::logical_expr::Operator; -use datafusion::physical_expr::equivalence::ProjectionMapping; -use datafusion::physical_expr::expressions::BinaryExpr; -use datafusion::physical_expr::intervals::utils::check_support; -use datafusion::physical_expr::utils::collect_columns; -use datafusion::physical_expr::{ - analyze, split_conjunction, AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries, - PhysicalExpr, -}; -use datafusion::physical_plan::common::can_project; -use datafusion::physical_plan::execution_plan::CardinalityEffect; -use futures::stream::{Stream, StreamExt}; -use log::trace; - -/// This is a copy of DataFusion's FilterExec with one modification to ensure that input -/// batches are never passed through unchanged. The changes are between the comments -/// `BEGIN Comet change` and `END Comet change`. -#[derive(Debug, Clone)] -pub struct FilterExec { - /// The expression to filter on. This expression must evaluate to a boolean value. - predicate: Arc, - /// The input plan - input: Arc, - /// Execution metrics - metrics: ExecutionPlanMetricsSet, - /// Selectivity for statistics. 0 = no rows, 100 = all rows - default_selectivity: u8, - /// Properties equivalence properties, partitioning, etc. - cache: PlanProperties, - /// The projection indices of the columns in the output schema of join - projection: Option>, -} - -impl FilterExec { - /// Create a FilterExec on an input - pub fn try_new( - predicate: Arc, - input: Arc, - ) -> Result { - match predicate.data_type(input.schema().as_ref())? { - DataType::Boolean => { - let default_selectivity = 20; - let cache = - Self::compute_properties(&input, &predicate, default_selectivity, None)?; - Ok(Self { - predicate, - input: Arc::clone(&input), - metrics: ExecutionPlanMetricsSet::new(), - default_selectivity, - cache, - projection: None, - }) - } - other => { - plan_err!("Filter predicate must return BOOLEAN values, got {other:?}") - } - } - } - - pub fn with_default_selectivity( - mut self, - default_selectivity: u8, - ) -> Result { - if default_selectivity > 100 { - return plan_err!( - "Default filter selectivity value needs to be less than or equal to 100" - ); - } - self.default_selectivity = default_selectivity; - Ok(self) - } - - /// Return new instance of [FilterExec] with the given projection. - pub fn with_projection(&self, projection: Option>) -> Result { - // Check if the projection is valid - can_project(&self.schema(), projection.as_ref())?; - - let projection = match projection { - Some(projection) => match &self.projection { - Some(p) => Some(projection.iter().map(|i| p[*i]).collect()), - None => Some(projection), - }, - None => None, - }; - - let cache = Self::compute_properties( - &self.input, - &self.predicate, - self.default_selectivity, - projection.as_ref(), - )?; - Ok(Self { - predicate: Arc::clone(&self.predicate), - input: Arc::clone(&self.input), - metrics: self.metrics.clone(), - default_selectivity: self.default_selectivity, - cache, - projection, - }) - } - - /// The expression to filter on. This expression must evaluate to a boolean value. - pub fn predicate(&self) -> &Arc { - &self.predicate - } - - /// The input plan - pub fn input(&self) -> &Arc { - &self.input - } - - /// The default selectivity - pub fn default_selectivity(&self) -> u8 { - self.default_selectivity - } - - /// Projection - pub fn projection(&self) -> Option<&Vec> { - self.projection.as_ref() - } - - /// Calculates `Statistics` for `FilterExec`, by applying selectivity (either default, or estimated) to input statistics. - fn statistics_helper( - input: &Arc, - predicate: &Arc, - default_selectivity: u8, - ) -> Result { - let input_stats = input.partition_statistics(None)?; - let schema = input.schema(); - if !check_support(predicate, &schema) { - let selectivity = default_selectivity as f64 / 100.0; - let mut stats = input_stats.to_inexact(); - stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity); - stats.total_byte_size = stats - .total_byte_size - .with_estimated_selectivity(selectivity); - return Ok(stats); - } - - let num_rows = input_stats.num_rows; - let total_byte_size = input_stats.total_byte_size; - let input_analysis_ctx = - AnalysisContext::try_from_statistics(&input.schema(), &input_stats.column_statistics)?; - - let analysis_ctx = analyze(predicate, input_analysis_ctx, &schema)?; - - // Estimate (inexact) selectivity of predicate - let selectivity = analysis_ctx.selectivity.unwrap_or(1.0); - let num_rows = num_rows.with_estimated_selectivity(selectivity); - let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity); - - let column_statistics = - collect_new_statistics(&input_stats.column_statistics, analysis_ctx.boundaries); - Ok(Statistics { - num_rows, - total_byte_size, - column_statistics, - }) - } - - fn extend_constants( - input: &Arc, - predicate: &Arc, - ) -> Vec { - let mut res_constants = Vec::new(); - let input_eqs = input.equivalence_properties(); - - let conjunctions = split_conjunction(predicate); - for conjunction in conjunctions { - if let Some(binary) = conjunction.as_any().downcast_ref::() { - if binary.op() == &Operator::Eq { - // Filter evaluates to single value for all partitions - if input_eqs.is_expr_constant(binary.left()).is_some() { - let across = input_eqs - .is_expr_constant(binary.right()) - .unwrap_or_default(); - res_constants.push(ConstExpr::new(Arc::clone(binary.right()), across)); - } else if input_eqs.is_expr_constant(binary.right()).is_some() { - let across = input_eqs - .is_expr_constant(binary.left()) - .unwrap_or_default(); - res_constants.push(ConstExpr::new(Arc::clone(binary.left()), across)); - } - } - } - } - res_constants - } - /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. - fn compute_properties( - input: &Arc, - predicate: &Arc, - default_selectivity: u8, - projection: Option<&Vec>, - ) -> Result { - // Combine the equal predicates with the input equivalence properties - // to construct the equivalence properties: - let stats = Self::statistics_helper(input, predicate, default_selectivity)?; - let mut eq_properties = input.equivalence_properties().clone(); - let (equal_pairs, _) = collect_columns_from_predicate(predicate); - for (lhs, rhs) in equal_pairs { - eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))? - } - // Add the columns that have only one viable value (singleton) after - // filtering to constants. - let constants = collect_columns(predicate) - .into_iter() - .filter(|column| stats.column_statistics[column.index()].is_singleton()) - .map(|column| { - let value = stats.column_statistics[column.index()] - .min_value - .get_value(); - let expr = Arc::new(column) as _; - ConstExpr::new(expr, AcrossPartitions::Uniform(value.cloned())) - }); - // This is for statistics - eq_properties.add_constants(constants)?; - // This is for logical constant (for example: a = '1', then a could be marked as a constant) - // to do: how to deal with a multiple situation to represent = (for example, c1 between 0 and 0) - eq_properties.add_constants(Self::extend_constants(input, predicate))?; - - let mut output_partitioning = input.output_partitioning().clone(); - // If contains projection, update the PlanProperties. - if let Some(projection) = projection { - let schema = eq_properties.schema(); - let projection_mapping = ProjectionMapping::from_indices(projection, schema)?; - let out_schema = project_schema(schema, Some(projection))?; - output_partitioning = output_partitioning.project(&projection_mapping, &eq_properties); - eq_properties = eq_properties.project(&projection_mapping, out_schema); - } - - Ok(PlanProperties::new( - eq_properties, - output_partitioning, - input.pipeline_behavior(), - input.boundedness(), - )) - } -} - -impl DisplayAs for FilterExec { - fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - let display_projections = if let Some(projection) = self.projection.as_ref() { - format!( - ", projection=[{}]", - projection - .iter() - .map(|index| format!( - "{}@{}", - self.input.schema().fields().get(*index).unwrap().name(), - index - )) - .collect::>() - .join(", ") - ) - } else { - "".to_string() - }; - write!( - f, - "CometFilterExec: {}{}", - self.predicate, display_projections - ) - } - DisplayFormatType::TreeRender => unimplemented!(), - } - } -} - -impl ExecutionPlan for FilterExec { - fn name(&self) -> &'static str { - "CometFilterExec" - } - - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn properties(&self) -> &PlanProperties { - &self.cache - } - - fn children(&self) -> Vec<&Arc> { - vec![&self.input] - } - - fn maintains_input_order(&self) -> Vec { - // Tell optimizer this operator doesn't reorder its input - vec![true] - } - - fn with_new_children( - self: Arc, - mut children: Vec>, - ) -> Result> { - FilterExec::try_new(Arc::clone(&self.predicate), children.swap_remove(0)) - .and_then(|e| { - let selectivity = e.default_selectivity(); - e.with_default_selectivity(selectivity) - }) - .and_then(|e| e.with_projection(self.projection().cloned())) - .map(|e| Arc::new(e) as _) - } - - fn execute( - &self, - partition: usize, - context: Arc, - ) -> Result { - trace!( - "Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", - partition, - context.session_id(), - context.task_id() - ); - let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); - Ok(Box::pin(FilterExecStream { - schema: self.schema(), - predicate: Arc::clone(&self.predicate), - input: self.input.execute(partition, context)?, - baseline_metrics, - projection: self.projection.clone(), - })) - } - - fn metrics(&self) -> Option { - Some(self.metrics.clone_inner()) - } - - /// The output statistics of a filtering operation can be estimated if the - /// predicate's selectivity value can be determined for the incoming data. - fn statistics(&self) -> Result { - let stats = - Self::statistics_helper(&self.input, self.predicate(), self.default_selectivity)?; - Ok(stats.project(self.projection.as_ref())) - } - - fn cardinality_effect(&self) -> CardinalityEffect { - CardinalityEffect::LowerEqual - } -} - -/// This function ensures that all bounds in the `ExprBoundaries` vector are -/// converted to closed bounds. If a lower/upper bound is initially open, it -/// is adjusted by using the next/previous value for its data type to convert -/// it into a closed bound. -fn collect_new_statistics( - input_column_stats: &[ColumnStatistics], - analysis_boundaries: Vec, -) -> Vec { - analysis_boundaries - .into_iter() - .enumerate() - .map( - |( - idx, - ExprBoundaries { - interval, - distinct_count, - .. - }, - )| { - let Some(interval) = interval else { - // If the interval is `None`, we can say that there are no rows: - return ColumnStatistics { - null_count: Precision::Exact(0), - max_value: Precision::Exact(ScalarValue::Null), - min_value: Precision::Exact(ScalarValue::Null), - sum_value: Precision::Exact(ScalarValue::Null), - distinct_count: Precision::Exact(0), - }; - }; - let (lower, upper) = interval.into_bounds(); - let (min_value, max_value) = if lower.eq(&upper) { - (Precision::Exact(lower), Precision::Exact(upper)) - } else { - (Precision::Inexact(lower), Precision::Inexact(upper)) - }; - ColumnStatistics { - null_count: input_column_stats[idx].null_count.to_inexact(), - max_value, - min_value, - sum_value: Precision::Absent, - distinct_count: distinct_count.to_inexact(), - } - }, - ) - .collect() -} - -/// The FilterExec streams wraps the input iterator and applies the predicate expression to -/// determine which rows to include in its output batches -struct FilterExecStream { - /// Output schema after the projection - schema: SchemaRef, - /// The expression to filter on. This expression must evaluate to a boolean value. - predicate: Arc, - /// The input partition to filter. - input: SendableRecordBatchStream, - /// Runtime metrics recording - baseline_metrics: BaselineMetrics, - /// The projection indices of the columns in the input schema - projection: Option>, -} - -fn filter_and_project( - batch: &RecordBatch, - predicate: &Arc, - projection: Option<&Vec>, - output_schema: &SchemaRef, -) -> Result { - predicate - .evaluate(batch) - .and_then(|v| v.into_array(batch.num_rows())) - .and_then(|array| { - Ok(match (as_boolean_array(&array), projection) { - // Apply filter array to record batch - (Ok(filter_array), None) => comet_filter_record_batch(batch, filter_array)?, - (Ok(filter_array), Some(projection)) => { - let projected_columns = projection - .iter() - .map(|i| Arc::clone(batch.column(*i))) - .collect(); - let projected_batch = - RecordBatch::try_new(Arc::clone(output_schema), projected_columns)?; - comet_filter_record_batch(&projected_batch, filter_array)? - } - (Err(_), _) => { - return internal_err!("Cannot create filter_array from non-boolean predicates"); - } - }) - }) -} - -// BEGIN Comet changes -// `FilterExec` could modify input batch or return input batch without change. Instead of always -// adding `CopyExec` on top of it, we only copy input batch for the special case. -pub fn comet_filter_record_batch( - record_batch: &RecordBatch, - predicate: &BooleanArray, -) -> std::result::Result { - if predicate.true_count() == record_batch.num_rows() { - // special case where we just make an exact copy - let arrays: Vec = record_batch - .columns() - .iter() - .map(|array| { - let capacity = array.len(); - let data = array.to_data(); - let mut mutable = MutableArrayData::new(vec![&data], false, capacity); - mutable.extend(0, 0, capacity); - make_array(mutable.freeze()) - }) - .collect(); - let options = RecordBatchOptions::new().with_row_count(Some(record_batch.num_rows())); - RecordBatch::try_new_with_options(Arc::clone(&record_batch.schema()), arrays, &options) - } else { - filter_record_batch(record_batch, predicate) - } -} -// END Comet changes - -impl Stream for FilterExecStream { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let poll; - loop { - match ready!(self.input.poll_next_unpin(cx)) { - Some(Ok(batch)) => { - let timer = self.baseline_metrics.elapsed_compute().timer(); - let filtered_batch = filter_and_project( - &batch, - &self.predicate, - self.projection.as_ref(), - &self.schema, - )?; - timer.done(); - // Skip entirely filtered batches - if filtered_batch.num_rows() == 0 { - continue; - } - poll = Poll::Ready(Some(Ok(filtered_batch))); - break; - } - value => { - poll = Poll::Ready(value); - break; - } - } - } - self.baseline_metrics.record_poll(poll) - } - - fn size_hint(&self) -> (usize, Option) { - // Same number of record batches - self.input.size_hint() - } -} - -impl RecordBatchStream for FilterExecStream { - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } -} - -/// Return the equals Column-Pairs and Non-equals Column-Pairs -fn collect_columns_from_predicate(predicate: &Arc) -> EqualAndNonEqual<'_> { - let mut eq_predicate_columns = Vec::::new(); - let mut ne_predicate_columns = Vec::::new(); - - let predicates = split_conjunction(predicate); - predicates.into_iter().for_each(|p| { - if let Some(binary) = p.as_any().downcast_ref::() { - match binary.op() { - Operator::Eq => eq_predicate_columns.push((binary.left(), binary.right())), - Operator::NotEq => ne_predicate_columns.push((binary.left(), binary.right())), - _ => {} - } - } - }); - - (eq_predicate_columns, ne_predicate_columns) -} - -/// Pair of `Arc`s -pub type PhysicalExprPairRef<'a> = (&'a Arc, &'a Arc); - -/// The equals Column-Pairs and Non-equals Column-Pairs in the Predicates -pub type EqualAndNonEqual<'a> = (Vec>, Vec>); diff --git a/native/core/src/execution/operators/mod.rs b/native/core/src/execution/operators/mod.rs index 4e15e4341a..c8cfebd45e 100644 --- a/native/core/src/execution/operators/mod.rs +++ b/native/core/src/execution/operators/mod.rs @@ -22,14 +22,11 @@ use std::fmt::Debug; use jni::objects::GlobalRef; pub use copy::*; -pub use filter::comet_filter_record_batch; -pub use filter::FilterExec; pub use scan::*; mod copy; mod expand; pub use expand::ExpandExec; -mod filter; mod scan; /// Error returned during executing operators. diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index 12c199dad6..32d2fee1da 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::execution::operators::copy_array; use crate::{ errors::CometError, execution::{ @@ -111,6 +112,7 @@ impl ScanExec { data_types.len(), &jvm_fetch_time, &arrow_ffi_time, + has_buffer_reuse, )?; timer.stop(); batch @@ -190,6 +192,7 @@ impl ScanExec { self.data_types.len(), &self.jvm_fetch_time, &self.arrow_ffi_time, + self.has_buffer_reuse, )?; *current_batch = Some(next_batch); } @@ -206,6 +209,7 @@ impl ScanExec { num_cols: usize, jvm_fetch_time: &Time, arrow_ffi_time: &Time, + copy_arrays: bool, ) -> Result { if exec_context_id == TEST_EXEC_CONTEXT_ID { // This is a unit test. We don't need to call JNI. @@ -280,8 +284,17 @@ impl ScanExec { let array_data = ArrayData::from_spark((array_ptr, schema_ptr))?; // TODO: validate array input data + // array_data.validate_full()?; - inputs.push(make_array(array_data)); + let array = make_array(array_data); + + // for native_comet scans, copy the array to avoid memory + // corruption issues due to mutable buffer reuse + if copy_arrays { + inputs.push(copy_array(&array)); + } else { + inputs.push(array); + } // Drop the Arcs to avoid memory leak unsafe { diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 854e1c3753..1b2409bafa 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -18,7 +18,6 @@ //! Converts Spark physical plan to DataFusion physical plan use crate::execution::operators::CopyMode; -use crate::execution::operators::FilterExec as CometFilterExec; use crate::{ errors::ExpressionError, execution::{ @@ -1146,27 +1145,17 @@ impl PhysicalPlanner { let predicate = self.create_expr(filter.predicate.as_ref().unwrap(), child.schema())?; - let use_datafusion_filter = !backed_by_mutable_buffers(&child.native_plan); - - let filter: Arc = - match (filter.wrap_child_in_copy_exec, use_datafusion_filter) { - (true, true) => Arc::new(DataFusionFilterExec::try_new( - predicate, - Self::wrap_in_copy_exec(Arc::clone(&child.native_plan)), - )?), - (true, false) => Arc::new(CometFilterExec::try_new( - predicate, - Self::wrap_in_copy_exec(Arc::clone(&child.native_plan)), - )?), - (false, true) => Arc::new(DataFusionFilterExec::try_new( - predicate, - Arc::clone(&child.native_plan), - )?), - (false, false) => Arc::new(CometFilterExec::try_new( - predicate, - Arc::clone(&child.native_plan), - )?), - }; + let filter: Arc = if filter.wrap_child_in_copy_exec { + Arc::new(DataFusionFilterExec::try_new( + predicate, + Self::wrap_in_copy_exec(Arc::clone(&child.native_plan)), + )?) + } else { + Arc::new(DataFusionFilterExec::try_new( + predicate, + Arc::clone(&child.native_plan), + )?) + }; Ok(( scans, @@ -1511,22 +1500,11 @@ impl PhysicalPlanner { .map(|(idx, dt)| Field::new(format!("col_{idx}"), dt.clone(), true)) .collect(); let schema = Arc::new(Schema::new(fields)); - - // `Expand` operator keeps the input batch and expands it to multiple output - // batches. However, `ScanExec` will reuse input arrays for the next - // input batch. Therefore, we need to copy the input batch to avoid - // the data corruption. Note that we only need to copy the input batch - // if the child operator is `ScanExec`, because other operators after `ScanExec` - // will create new arrays for the output batch. - let input = if backed_by_mutable_buffers(&child.native_plan) { - Arc::new(CopyExec::new( - Arc::clone(&child.native_plan), - CopyMode::UnpackOrDeepCopy, - )) - } else { - Arc::clone(&child.native_plan) - }; - let expand = Arc::new(ExpandExec::new(projections, input, schema)); + let expand = Arc::new(ExpandExec::new( + projections, + Arc::clone(&child.native_plan), + schema, + )); Ok(( scans, Arc::new(SparkPlan::new(spark_plan.plan_id, expand, vec![child])), @@ -1852,14 +1830,9 @@ impl PhysicalPlanner { )) } - /// Wrap an ExecutionPlan in a CopyExec, which will unpack any dictionary-encoded arrays - /// and make a deep copy of other arrays if the plan re-uses batches. + /// Wrap an ExecutionPlan in a CopyExec, which will unpack any dictionary-encoded arrays. fn wrap_in_copy_exec(plan: Arc) -> Arc { - if backed_by_mutable_buffers(&plan) { - Arc::new(CopyExec::new(plan, CopyMode::UnpackOrDeepCopy)) - } else { - Arc::new(CopyExec::new(plan, CopyMode::UnpackOrClone)) - } + Arc::new(CopyExec::new(plan, CopyMode::UnpackOrClone)) } /// Create a DataFusion physical aggregate expression from Spark physical aggregate expression @@ -2560,35 +2533,6 @@ impl From for DataFusionError { } } -/// Returns true if given operator can produce arrays that are backed -/// by mutable buffers from a native_comet scan that has not been wrapped -/// in CopyMode::UnpackOrDeepCopy -fn backed_by_mutable_buffers(op: &Arc) -> bool { - if op.as_any().is::() { - // check for native_comet scan (either direct or via Iceberg) - op.as_any() - .downcast_ref::() - .unwrap() - .has_buffer_reuse - } else if op.as_any().is::() { - let copy_exec = op.as_any().downcast_ref::().unwrap(); - match copy_exec.mode() { - CopyMode::UnpackOrDeepCopy => false, - CopyMode::UnpackOrClone => backed_by_mutable_buffers(copy_exec.children()[0]), - } - } else if op.as_any().is::() { - // CometFilterExec guarantees that copies are made - false - } else { - for child in op.children() { - if backed_by_mutable_buffers(child) { - return true; - } - } - false - } -} - /// Collects the indices of the columns in the input schema that are used in the expression /// and returns them as a pair of vectors, one for the left side and one for the right side. fn expr_to_columns( @@ -3030,7 +2974,7 @@ mod tests { let (_scans, datafusion_plan) = planner.create_plan(&sort_exec, &mut vec![], 1).unwrap(); let plan_str = explain_plan(datafusion_plan); let expected_str = r"SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false] - CopyExec [UnpackOrDeepCopy] + CopyExec [UnpackOrClone] ScanExec: source=[native_comet], schema=[col_0: Int32] "; assert_eq!(plan_str, expected_str); @@ -3060,7 +3004,7 @@ mod tests { let plan_str = explain_plan(datafusion_plan); let expected_str = r"SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false] CopyExec [UnpackOrClone] - CometFilterExec: col_0@0 = 1 + FilterExec: col_0@0 = 1 ScanExec: source=[native_comet], schema=[col_0: Int32] "; assert_eq!(plan_str, expected_str); @@ -3178,7 +3122,7 @@ mod tests { let (_scans, filter_exec) = planner.create_plan(&op, &mut vec![], 1).unwrap(); - assert_eq!("CometFilterExec", filter_exec.native_plan.name()); + assert_eq!("FilterExec", filter_exec.native_plan.name()); assert_eq!(1, filter_exec.children.len()); assert_eq!(0, filter_exec.additional_native_plans.len()); } From bf697a2e219b1dc9fbec34d742c83fbcef548607 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 13 Aug 2025 08:31:33 -0600 Subject: [PATCH 16/19] delete bench --- native/core/benches/filter.rs | 112 ---------------------------------- 1 file changed, 112 deletions(-) delete mode 100644 native/core/benches/filter.rs diff --git a/native/core/benches/filter.rs b/native/core/benches/filter.rs deleted file mode 100644 index 82fa4aac66..0000000000 --- a/native/core/benches/filter.rs +++ /dev/null @@ -1,112 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License.use arrow::array::{ArrayRef, BooleanBuilder, Int32Builder, RecordBatch, StringBuilder}; - -use arrow::array::builder::{BooleanBuilder, Int32Builder, StringBuilder}; -use arrow::array::{ArrayRef, RecordBatch}; -use arrow::compute::filter_record_batch; -use arrow::datatypes::{DataType, Field, Schema}; -use comet::execution::operators::comet_filter_record_batch; -use criterion::{criterion_group, criterion_main, Criterion}; -use std::hint::black_box; -use std::sync::Arc; -use std::time::Duration; - -fn criterion_benchmark(c: &mut Criterion) { - let mut group = c.benchmark_group("filter"); - - let num_rows = 8192; - let num_int_cols = 4; - let num_string_cols = 4; - - let batch = create_record_batch(num_rows, num_int_cols, num_string_cols); - - // create some different predicates - let mut predicate_select_few = BooleanBuilder::with_capacity(num_rows); - let mut predicate_select_many = BooleanBuilder::with_capacity(num_rows); - let mut predicate_select_all = BooleanBuilder::with_capacity(num_rows); - for i in 0..num_rows { - predicate_select_few.append_value(i % 10 == 0); - predicate_select_many.append_value(i % 10 > 0); - predicate_select_all.append_value(true); - } - let predicate_select_few = predicate_select_few.finish(); - let predicate_select_many = predicate_select_many.finish(); - let predicate_select_all = predicate_select_all.finish(); - - // baseline uses Arrow's filter_record_batch method - group.bench_function("arrow_filter_record_batch - few rows selected", |b| { - b.iter(|| filter_record_batch(black_box(&batch), black_box(&predicate_select_few))) - }); - group.bench_function("arrow_filter_record_batch - many rows selected", |b| { - b.iter(|| filter_record_batch(black_box(&batch), black_box(&predicate_select_many))) - }); - group.bench_function("arrow_filter_record_batch - all rows selected", |b| { - b.iter(|| filter_record_batch(black_box(&batch), black_box(&predicate_select_all))) - }); - - group.bench_function("comet_filter_record_batch - few rows selected", |b| { - b.iter(|| comet_filter_record_batch(black_box(&batch), black_box(&predicate_select_few))) - }); - group.bench_function("comet_filter_record_batch - many rows selected", |b| { - b.iter(|| comet_filter_record_batch(black_box(&batch), black_box(&predicate_select_many))) - }); - group.bench_function("comet_filter_record_batch - all rows selected", |b| { - b.iter(|| comet_filter_record_batch(black_box(&batch), black_box(&predicate_select_all))) - }); - - group.finish(); -} - -fn create_record_batch(num_rows: usize, num_int_cols: i32, num_string_cols: i32) -> RecordBatch { - let mut int32_builder = Int32Builder::with_capacity(num_rows); - let mut string_builder = StringBuilder::with_capacity(num_rows, num_rows * 32); - for i in 0..num_rows { - int32_builder.append_value(i as i32); - string_builder.append_value(format!("this is string #{i}")); - } - let int32_array = Arc::new(int32_builder.finish()); - let string_array = Arc::new(string_builder.finish()); - - let mut fields = vec![]; - let mut columns: Vec = vec![]; - let mut i = 0; - for _ in 0..num_int_cols { - fields.push(Field::new(format!("c{i}"), DataType::Int32, false)); - columns.push(int32_array.clone()); // note this is just copying a reference to the array - i += 1; - } - for _ in 0..num_string_cols { - fields.push(Field::new(format!("c{i}"), DataType::Utf8, false)); - columns.push(string_array.clone()); // note this is just copying a reference to the array - i += 1; - } - let schema = Schema::new(fields); - RecordBatch::try_new(Arc::new(schema), columns).unwrap() -} - -fn config() -> Criterion { - Criterion::default() - .measurement_time(Duration::from_millis(500)) - .warm_up_time(Duration::from_millis(500)) -} - -criterion_group! { - name = benches; - config = config(); - targets = criterion_benchmark -} -criterion_main!(benches); From 310ef0c53c5c47229d63581b2db60b69cc23cc14 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 13 Aug 2025 08:31:50 -0600 Subject: [PATCH 17/19] delete bench --- native/core/Cargo.toml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 92b7da8e5d..5bd62a8903 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -119,7 +119,3 @@ harness = false [[bench]] name = "parquet_decode" harness = false - -[[bench]] -name = "filter" -harness = false From 904882263f4c5895a197cc9814f0febdf13e24cf Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 13 Aug 2025 09:28:32 -0600 Subject: [PATCH 18/19] improve explain output --- native/core/src/execution/operators/scan.rs | 1 + native/core/src/execution/planner.rs | 8 ++++---- .../org/apache/comet/serde/QueryPlanSerde.scala | 12 +++++++----- .../org/apache/spark/sql/comet/CometScanExec.scala | 2 +- 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index 32d2fee1da..c4676c0222 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -392,6 +392,7 @@ impl DisplayAs for ScanExec { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { write!(f, "ScanExec: source=[{}], ", self.input_source_description)?; + write!(f, "hasBufferReuse={}, ", self.has_buffer_reuse)?; let fields: Vec = self .data_types .iter() diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 1b2409bafa..b21d4d68da 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -2975,7 +2975,7 @@ mod tests { let plan_str = explain_plan(datafusion_plan); let expected_str = r"SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false] CopyExec [UnpackOrClone] - ScanExec: source=[native_comet], schema=[col_0: Int32] + ScanExec: source=[native_comet], hasBufferReuse=true, schema=[col_0: Int32] "; assert_eq!(plan_str, expected_str); } @@ -2989,7 +2989,7 @@ mod tests { let plan_str = explain_plan(datafusion_plan); let expected_str = r"SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false] CopyExec [UnpackOrClone] - ScanExec: source=[sink], schema=[col_0: Int32] + ScanExec: source=[sink], hasBufferReuse=false, schema=[col_0: Int32] "; assert_eq!(plan_str, expected_str); } @@ -3005,7 +3005,7 @@ mod tests { let expected_str = r"SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false] CopyExec [UnpackOrClone] FilterExec: col_0@0 = 1 - ScanExec: source=[native_comet], schema=[col_0: Int32] + ScanExec: source=[native_comet], hasBufferReuse=true, schema=[col_0: Int32] "; assert_eq!(plan_str, expected_str); } @@ -3021,7 +3021,7 @@ mod tests { let expected_str = r"SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false] CopyExec [UnpackOrClone] FilterExec: col_0@0 = 1 - ScanExec: source=[sink], schema=[col_0: Int32] + ScanExec: source=[sink], hasBufferReuse=false, schema=[col_0: Int32] "; assert_eq!(plan_str, expected_str); } diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 3e818aec8c..360d18827e 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2181,11 +2181,13 @@ object QueryPlanSerde extends Logging with CometExprShim { // These operators are source of Comet native execution chain val scanBuilder = OperatorOuterClass.Scan.newBuilder() - val source = op.simpleStringWithNodeId() - if (source.isEmpty) { - scanBuilder.setSource(op.getClass.getSimpleName) - } else { - scanBuilder.setSource(source) + op match { + case scan: CometScanExec => + scanBuilder.setSource( + s"CometScanExec[${scan.scanImpl}]: ${op.simpleStringWithNodeId()}") + case _ => + scanBuilder.setSource( + s"[${op.getClass.getSimpleName}]: ${op.simpleStringWithNodeId()}") } val scanTypes = op.output.flatten { attr => diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala index 5679a87b30..bba2fe99c3 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala @@ -284,7 +284,7 @@ case class CometScanExec( } override val nodeName: String = - s"CometScan $relation ${tableIdentifier.map(_.unquotedString).getOrElse("")}" + s"CometScan [$scanImpl] $relation ${tableIdentifier.map(_.unquotedString).getOrElse("")}" /** * Create an RDD for bucketed reads. The non-bucketed variant of this function is From a86ca1298b537b6bbdea341f01f074442ad10a75 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 13 Aug 2025 09:34:05 -0600 Subject: [PATCH 19/19] always deep copy in scan --- native/core/src/execution/planner.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index b21d4d68da..4101f3f16e 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1420,7 +1420,8 @@ impl PhysicalPlanner { input_source, &scan.source, data_types, - scan.has_buffer_reuse, + // TODO scan.has_buffer_reuse, + true )?; Ok((