diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 92b7da8e5d..5bd62a8903 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -119,7 +119,3 @@ harness = false [[bench]] name = "parquet_decode" harness = false - -[[bench]] -name = "filter" -harness = false diff --git a/native/core/benches/filter.rs b/native/core/benches/filter.rs deleted file mode 100644 index 82fa4aac66..0000000000 --- a/native/core/benches/filter.rs +++ /dev/null @@ -1,112 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License.use arrow::array::{ArrayRef, BooleanBuilder, Int32Builder, RecordBatch, StringBuilder}; - -use arrow::array::builder::{BooleanBuilder, Int32Builder, StringBuilder}; -use arrow::array::{ArrayRef, RecordBatch}; -use arrow::compute::filter_record_batch; -use arrow::datatypes::{DataType, Field, Schema}; -use comet::execution::operators::comet_filter_record_batch; -use criterion::{criterion_group, criterion_main, Criterion}; -use std::hint::black_box; -use std::sync::Arc; -use std::time::Duration; - -fn criterion_benchmark(c: &mut Criterion) { - let mut group = c.benchmark_group("filter"); - - let num_rows = 8192; - let num_int_cols = 4; - let num_string_cols = 4; - - let batch = create_record_batch(num_rows, num_int_cols, num_string_cols); - - // create some different predicates - let mut predicate_select_few = BooleanBuilder::with_capacity(num_rows); - let mut predicate_select_many = BooleanBuilder::with_capacity(num_rows); - let mut predicate_select_all = BooleanBuilder::with_capacity(num_rows); - for i in 0..num_rows { - predicate_select_few.append_value(i % 10 == 0); - predicate_select_many.append_value(i % 10 > 0); - predicate_select_all.append_value(true); - } - let predicate_select_few = predicate_select_few.finish(); - let predicate_select_many = predicate_select_many.finish(); - let predicate_select_all = predicate_select_all.finish(); - - // baseline uses Arrow's filter_record_batch method - group.bench_function("arrow_filter_record_batch - few rows selected", |b| { - b.iter(|| filter_record_batch(black_box(&batch), black_box(&predicate_select_few))) - }); - group.bench_function("arrow_filter_record_batch - many rows selected", |b| { - b.iter(|| filter_record_batch(black_box(&batch), black_box(&predicate_select_many))) - }); - group.bench_function("arrow_filter_record_batch - all rows selected", |b| { - b.iter(|| filter_record_batch(black_box(&batch), black_box(&predicate_select_all))) - }); - - group.bench_function("comet_filter_record_batch - few rows selected", |b| { - b.iter(|| comet_filter_record_batch(black_box(&batch), black_box(&predicate_select_few))) - }); - group.bench_function("comet_filter_record_batch - many rows selected", |b| { - b.iter(|| comet_filter_record_batch(black_box(&batch), black_box(&predicate_select_many))) - }); - group.bench_function("comet_filter_record_batch - all rows selected", |b| { - b.iter(|| comet_filter_record_batch(black_box(&batch), black_box(&predicate_select_all))) - }); - - group.finish(); -} - -fn create_record_batch(num_rows: usize, num_int_cols: i32, num_string_cols: i32) -> RecordBatch { - let mut int32_builder = Int32Builder::with_capacity(num_rows); - let mut string_builder = StringBuilder::with_capacity(num_rows, num_rows * 32); - for i in 0..num_rows { - int32_builder.append_value(i as i32); - string_builder.append_value(format!("this is string #{i}")); - } - let int32_array = Arc::new(int32_builder.finish()); - let string_array = Arc::new(string_builder.finish()); - - let mut fields = vec![]; - let mut columns: Vec = vec![]; - let mut i = 0; - for _ in 0..num_int_cols { - fields.push(Field::new(format!("c{i}"), DataType::Int32, false)); - columns.push(int32_array.clone()); // note this is just copying a reference to the array - i += 1; - } - for _ in 0..num_string_cols { - fields.push(Field::new(format!("c{i}"), DataType::Utf8, false)); - columns.push(string_array.clone()); // note this is just copying a reference to the array - i += 1; - } - let schema = Schema::new(fields); - RecordBatch::try_new(Arc::new(schema), columns).unwrap() -} - -fn config() -> Criterion { - Criterion::default() - .measurement_time(Duration::from_millis(500)) - .warm_up_time(Duration::from_millis(500)) -} - -criterion_group! { - name = benches; - config = config(); - targets = criterion_benchmark -} -criterion_main!(benches); diff --git a/native/core/src/execution/mod.rs b/native/core/src/execution/mod.rs index c55b96f2a9..9dc1e87b28 100644 --- a/native/core/src/execution/mod.rs +++ b/native/core/src/execution/mod.rs @@ -29,12 +29,3 @@ pub use datafusion_comet_spark_expr::timezone; mod memory_pools; pub(crate) mod tracing; pub(crate) mod utils; - -#[cfg(test)] -mod tests { - #[test] - fn it_works() { - let result = 2 + 2; - assert_eq!(result, 4); - } -} diff --git a/native/core/src/execution/operators/copy.rs b/native/core/src/execution/operators/copy.rs index a809031d5d..14c1b44daa 100644 --- a/native/core/src/execution/operators/copy.rs +++ b/native/core/src/execution/operators/copy.rs @@ -91,6 +91,10 @@ impl CopyExec { mode, } } + + pub fn mode(&self) -> &CopyMode { + &self.mode + } } impl DisplayAs for CopyExec { @@ -237,7 +241,7 @@ impl RecordBatchStream for CopyStream { } /// Copy an Arrow Array -fn copy_array(array: &dyn Array) -> ArrayRef { +pub(crate) fn copy_array(array: &dyn Array) -> ArrayRef { let capacity = array.len(); let data = array.to_data(); diff --git a/native/core/src/execution/operators/filter.rs b/native/core/src/execution/operators/filter.rs deleted file mode 100644 index 272bbd4d89..0000000000 --- a/native/core/src/execution/operators/filter.rs +++ /dev/null @@ -1,574 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::any::Any; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{ready, Context, Poll}; - -use datafusion::physical_plan::{ - metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, - ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, - PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, -}; - -use arrow::array::{ - make_array, Array, ArrayRef, BooleanArray, MutableArrayData, RecordBatchOptions, -}; -use arrow::compute::filter_record_batch; -use arrow::datatypes::{DataType, SchemaRef}; -use arrow::error::ArrowError; -use arrow::record_batch::RecordBatch; -use datafusion::common::cast::as_boolean_array; -use datafusion::common::stats::Precision; -use datafusion::common::{ - internal_err, plan_err, project_schema, DataFusionError, Result, ScalarValue, -}; -use datafusion::execution::TaskContext; -use datafusion::logical_expr::Operator; -use datafusion::physical_expr::equivalence::ProjectionMapping; -use datafusion::physical_expr::expressions::BinaryExpr; -use datafusion::physical_expr::intervals::utils::check_support; -use datafusion::physical_expr::utils::collect_columns; -use datafusion::physical_expr::{ - analyze, split_conjunction, AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries, - PhysicalExpr, -}; -use datafusion::physical_plan::common::can_project; -use datafusion::physical_plan::execution_plan::CardinalityEffect; -use futures::stream::{Stream, StreamExt}; -use log::trace; - -/// This is a copy of DataFusion's FilterExec with one modification to ensure that input -/// batches are never passed through unchanged. The changes are between the comments -/// `BEGIN Comet change` and `END Comet change`. -#[derive(Debug, Clone)] -pub struct FilterExec { - /// The expression to filter on. This expression must evaluate to a boolean value. - predicate: Arc, - /// The input plan - input: Arc, - /// Execution metrics - metrics: ExecutionPlanMetricsSet, - /// Selectivity for statistics. 0 = no rows, 100 = all rows - default_selectivity: u8, - /// Properties equivalence properties, partitioning, etc. - cache: PlanProperties, - /// The projection indices of the columns in the output schema of join - projection: Option>, -} - -impl FilterExec { - /// Create a FilterExec on an input - pub fn try_new( - predicate: Arc, - input: Arc, - ) -> Result { - match predicate.data_type(input.schema().as_ref())? { - DataType::Boolean => { - let default_selectivity = 20; - let cache = - Self::compute_properties(&input, &predicate, default_selectivity, None)?; - Ok(Self { - predicate, - input: Arc::clone(&input), - metrics: ExecutionPlanMetricsSet::new(), - default_selectivity, - cache, - projection: None, - }) - } - other => { - plan_err!("Filter predicate must return BOOLEAN values, got {other:?}") - } - } - } - - pub fn with_default_selectivity( - mut self, - default_selectivity: u8, - ) -> Result { - if default_selectivity > 100 { - return plan_err!( - "Default filter selectivity value needs to be less than or equal to 100" - ); - } - self.default_selectivity = default_selectivity; - Ok(self) - } - - /// Return new instance of [FilterExec] with the given projection. - pub fn with_projection(&self, projection: Option>) -> Result { - // Check if the projection is valid - can_project(&self.schema(), projection.as_ref())?; - - let projection = match projection { - Some(projection) => match &self.projection { - Some(p) => Some(projection.iter().map(|i| p[*i]).collect()), - None => Some(projection), - }, - None => None, - }; - - let cache = Self::compute_properties( - &self.input, - &self.predicate, - self.default_selectivity, - projection.as_ref(), - )?; - Ok(Self { - predicate: Arc::clone(&self.predicate), - input: Arc::clone(&self.input), - metrics: self.metrics.clone(), - default_selectivity: self.default_selectivity, - cache, - projection, - }) - } - - /// The expression to filter on. This expression must evaluate to a boolean value. - pub fn predicate(&self) -> &Arc { - &self.predicate - } - - /// The input plan - pub fn input(&self) -> &Arc { - &self.input - } - - /// The default selectivity - pub fn default_selectivity(&self) -> u8 { - self.default_selectivity - } - - /// Projection - pub fn projection(&self) -> Option<&Vec> { - self.projection.as_ref() - } - - /// Calculates `Statistics` for `FilterExec`, by applying selectivity (either default, or estimated) to input statistics. - fn statistics_helper( - input: &Arc, - predicate: &Arc, - default_selectivity: u8, - ) -> Result { - let input_stats = input.partition_statistics(None)?; - let schema = input.schema(); - if !check_support(predicate, &schema) { - let selectivity = default_selectivity as f64 / 100.0; - let mut stats = input_stats.to_inexact(); - stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity); - stats.total_byte_size = stats - .total_byte_size - .with_estimated_selectivity(selectivity); - return Ok(stats); - } - - let num_rows = input_stats.num_rows; - let total_byte_size = input_stats.total_byte_size; - let input_analysis_ctx = - AnalysisContext::try_from_statistics(&input.schema(), &input_stats.column_statistics)?; - - let analysis_ctx = analyze(predicate, input_analysis_ctx, &schema)?; - - // Estimate (inexact) selectivity of predicate - let selectivity = analysis_ctx.selectivity.unwrap_or(1.0); - let num_rows = num_rows.with_estimated_selectivity(selectivity); - let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity); - - let column_statistics = - collect_new_statistics(&input_stats.column_statistics, analysis_ctx.boundaries); - Ok(Statistics { - num_rows, - total_byte_size, - column_statistics, - }) - } - - fn extend_constants( - input: &Arc, - predicate: &Arc, - ) -> Vec { - let mut res_constants = Vec::new(); - let input_eqs = input.equivalence_properties(); - - let conjunctions = split_conjunction(predicate); - for conjunction in conjunctions { - if let Some(binary) = conjunction.as_any().downcast_ref::() { - if binary.op() == &Operator::Eq { - // Filter evaluates to single value for all partitions - if input_eqs.is_expr_constant(binary.left()).is_some() { - let across = input_eqs - .is_expr_constant(binary.right()) - .unwrap_or_default(); - res_constants.push(ConstExpr::new(Arc::clone(binary.right()), across)); - } else if input_eqs.is_expr_constant(binary.right()).is_some() { - let across = input_eqs - .is_expr_constant(binary.left()) - .unwrap_or_default(); - res_constants.push(ConstExpr::new(Arc::clone(binary.left()), across)); - } - } - } - } - res_constants - } - /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. - fn compute_properties( - input: &Arc, - predicate: &Arc, - default_selectivity: u8, - projection: Option<&Vec>, - ) -> Result { - // Combine the equal predicates with the input equivalence properties - // to construct the equivalence properties: - let stats = Self::statistics_helper(input, predicate, default_selectivity)?; - let mut eq_properties = input.equivalence_properties().clone(); - let (equal_pairs, _) = collect_columns_from_predicate(predicate); - for (lhs, rhs) in equal_pairs { - eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))? - } - // Add the columns that have only one viable value (singleton) after - // filtering to constants. - let constants = collect_columns(predicate) - .into_iter() - .filter(|column| stats.column_statistics[column.index()].is_singleton()) - .map(|column| { - let value = stats.column_statistics[column.index()] - .min_value - .get_value(); - let expr = Arc::new(column) as _; - ConstExpr::new(expr, AcrossPartitions::Uniform(value.cloned())) - }); - // This is for statistics - eq_properties.add_constants(constants)?; - // This is for logical constant (for example: a = '1', then a could be marked as a constant) - // to do: how to deal with a multiple situation to represent = (for example, c1 between 0 and 0) - eq_properties.add_constants(Self::extend_constants(input, predicate))?; - - let mut output_partitioning = input.output_partitioning().clone(); - // If contains projection, update the PlanProperties. - if let Some(projection) = projection { - let schema = eq_properties.schema(); - let projection_mapping = ProjectionMapping::from_indices(projection, schema)?; - let out_schema = project_schema(schema, Some(projection))?; - output_partitioning = output_partitioning.project(&projection_mapping, &eq_properties); - eq_properties = eq_properties.project(&projection_mapping, out_schema); - } - - Ok(PlanProperties::new( - eq_properties, - output_partitioning, - input.pipeline_behavior(), - input.boundedness(), - )) - } -} - -impl DisplayAs for FilterExec { - fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - let display_projections = if let Some(projection) = self.projection.as_ref() { - format!( - ", projection=[{}]", - projection - .iter() - .map(|index| format!( - "{}@{}", - self.input.schema().fields().get(*index).unwrap().name(), - index - )) - .collect::>() - .join(", ") - ) - } else { - "".to_string() - }; - write!( - f, - "CometFilterExec: {}{}", - self.predicate, display_projections - ) - } - DisplayFormatType::TreeRender => unimplemented!(), - } - } -} - -impl ExecutionPlan for FilterExec { - fn name(&self) -> &'static str { - "CometFilterExec" - } - - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - fn properties(&self) -> &PlanProperties { - &self.cache - } - - fn children(&self) -> Vec<&Arc> { - vec![&self.input] - } - - fn maintains_input_order(&self) -> Vec { - // Tell optimizer this operator doesn't reorder its input - vec![true] - } - - fn with_new_children( - self: Arc, - mut children: Vec>, - ) -> Result> { - FilterExec::try_new(Arc::clone(&self.predicate), children.swap_remove(0)) - .and_then(|e| { - let selectivity = e.default_selectivity(); - e.with_default_selectivity(selectivity) - }) - .and_then(|e| e.with_projection(self.projection().cloned())) - .map(|e| Arc::new(e) as _) - } - - fn execute( - &self, - partition: usize, - context: Arc, - ) -> Result { - trace!( - "Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", - partition, - context.session_id(), - context.task_id() - ); - let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); - Ok(Box::pin(FilterExecStream { - schema: self.schema(), - predicate: Arc::clone(&self.predicate), - input: self.input.execute(partition, context)?, - baseline_metrics, - projection: self.projection.clone(), - })) - } - - fn metrics(&self) -> Option { - Some(self.metrics.clone_inner()) - } - - /// The output statistics of a filtering operation can be estimated if the - /// predicate's selectivity value can be determined for the incoming data. - fn statistics(&self) -> Result { - let stats = - Self::statistics_helper(&self.input, self.predicate(), self.default_selectivity)?; - Ok(stats.project(self.projection.as_ref())) - } - - fn cardinality_effect(&self) -> CardinalityEffect { - CardinalityEffect::LowerEqual - } -} - -/// This function ensures that all bounds in the `ExprBoundaries` vector are -/// converted to closed bounds. If a lower/upper bound is initially open, it -/// is adjusted by using the next/previous value for its data type to convert -/// it into a closed bound. -fn collect_new_statistics( - input_column_stats: &[ColumnStatistics], - analysis_boundaries: Vec, -) -> Vec { - analysis_boundaries - .into_iter() - .enumerate() - .map( - |( - idx, - ExprBoundaries { - interval, - distinct_count, - .. - }, - )| { - let Some(interval) = interval else { - // If the interval is `None`, we can say that there are no rows: - return ColumnStatistics { - null_count: Precision::Exact(0), - max_value: Precision::Exact(ScalarValue::Null), - min_value: Precision::Exact(ScalarValue::Null), - sum_value: Precision::Exact(ScalarValue::Null), - distinct_count: Precision::Exact(0), - }; - }; - let (lower, upper) = interval.into_bounds(); - let (min_value, max_value) = if lower.eq(&upper) { - (Precision::Exact(lower), Precision::Exact(upper)) - } else { - (Precision::Inexact(lower), Precision::Inexact(upper)) - }; - ColumnStatistics { - null_count: input_column_stats[idx].null_count.to_inexact(), - max_value, - min_value, - sum_value: Precision::Absent, - distinct_count: distinct_count.to_inexact(), - } - }, - ) - .collect() -} - -/// The FilterExec streams wraps the input iterator and applies the predicate expression to -/// determine which rows to include in its output batches -struct FilterExecStream { - /// Output schema after the projection - schema: SchemaRef, - /// The expression to filter on. This expression must evaluate to a boolean value. - predicate: Arc, - /// The input partition to filter. - input: SendableRecordBatchStream, - /// Runtime metrics recording - baseline_metrics: BaselineMetrics, - /// The projection indices of the columns in the input schema - projection: Option>, -} - -fn filter_and_project( - batch: &RecordBatch, - predicate: &Arc, - projection: Option<&Vec>, - output_schema: &SchemaRef, -) -> Result { - predicate - .evaluate(batch) - .and_then(|v| v.into_array(batch.num_rows())) - .and_then(|array| { - Ok(match (as_boolean_array(&array), projection) { - // Apply filter array to record batch - (Ok(filter_array), None) => comet_filter_record_batch(batch, filter_array)?, - (Ok(filter_array), Some(projection)) => { - let projected_columns = projection - .iter() - .map(|i| Arc::clone(batch.column(*i))) - .collect(); - let projected_batch = - RecordBatch::try_new(Arc::clone(output_schema), projected_columns)?; - comet_filter_record_batch(&projected_batch, filter_array)? - } - (Err(_), _) => { - return internal_err!("Cannot create filter_array from non-boolean predicates"); - } - }) - }) -} - -// BEGIN Comet changes -// `FilterExec` could modify input batch or return input batch without change. Instead of always -// adding `CopyExec` on top of it, we only copy input batch for the special case. -pub fn comet_filter_record_batch( - record_batch: &RecordBatch, - predicate: &BooleanArray, -) -> std::result::Result { - if predicate.true_count() == record_batch.num_rows() { - // special case where we just make an exact copy - let arrays: Vec = record_batch - .columns() - .iter() - .map(|array| { - let capacity = array.len(); - let data = array.to_data(); - let mut mutable = MutableArrayData::new(vec![&data], false, capacity); - mutable.extend(0, 0, capacity); - make_array(mutable.freeze()) - }) - .collect(); - let options = RecordBatchOptions::new().with_row_count(Some(record_batch.num_rows())); - RecordBatch::try_new_with_options(Arc::clone(&record_batch.schema()), arrays, &options) - } else { - filter_record_batch(record_batch, predicate) - } -} -// END Comet changes - -impl Stream for FilterExecStream { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let poll; - loop { - match ready!(self.input.poll_next_unpin(cx)) { - Some(Ok(batch)) => { - let timer = self.baseline_metrics.elapsed_compute().timer(); - let filtered_batch = filter_and_project( - &batch, - &self.predicate, - self.projection.as_ref(), - &self.schema, - )?; - timer.done(); - // Skip entirely filtered batches - if filtered_batch.num_rows() == 0 { - continue; - } - poll = Poll::Ready(Some(Ok(filtered_batch))); - break; - } - value => { - poll = Poll::Ready(value); - break; - } - } - } - self.baseline_metrics.record_poll(poll) - } - - fn size_hint(&self) -> (usize, Option) { - // Same number of record batches - self.input.size_hint() - } -} - -impl RecordBatchStream for FilterExecStream { - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } -} - -/// Return the equals Column-Pairs and Non-equals Column-Pairs -fn collect_columns_from_predicate(predicate: &Arc) -> EqualAndNonEqual<'_> { - let mut eq_predicate_columns = Vec::::new(); - let mut ne_predicate_columns = Vec::::new(); - - let predicates = split_conjunction(predicate); - predicates.into_iter().for_each(|p| { - if let Some(binary) = p.as_any().downcast_ref::() { - match binary.op() { - Operator::Eq => eq_predicate_columns.push((binary.left(), binary.right())), - Operator::NotEq => ne_predicate_columns.push((binary.left(), binary.right())), - _ => {} - } - } - }); - - (eq_predicate_columns, ne_predicate_columns) -} - -/// Pair of `Arc`s -pub type PhysicalExprPairRef<'a> = (&'a Arc, &'a Arc); - -/// The equals Column-Pairs and Non-equals Column-Pairs in the Predicates -pub type EqualAndNonEqual<'a> = (Vec>, Vec>); diff --git a/native/core/src/execution/operators/mod.rs b/native/core/src/execution/operators/mod.rs index 4e15e4341a..c8cfebd45e 100644 --- a/native/core/src/execution/operators/mod.rs +++ b/native/core/src/execution/operators/mod.rs @@ -22,14 +22,11 @@ use std::fmt::Debug; use jni::objects::GlobalRef; pub use copy::*; -pub use filter::comet_filter_record_batch; -pub use filter::FilterExec; pub use scan::*; mod copy; mod expand; pub use expand::ExpandExec; -mod filter; mod scan; /// Error returned during executing operators. diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index a842efaa30..c4676c0222 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::execution::operators::copy_array; use crate::{ errors::CometError, execution::{ @@ -63,6 +64,8 @@ pub struct ScanExec { pub input_source: Option>, /// A description of the input source for informational purposes pub input_source_description: String, + /// Whether this is a native_comet scan that reuses buffers + pub has_buffer_reuse: bool, /// The data types of columns of the input batch. Converted from Spark schema. pub data_types: Vec, /// Schema of first batch @@ -88,6 +91,7 @@ impl ScanExec { input_source: Option>, input_source_description: &str, data_types: Vec, + has_buffer_reuse: bool, ) -> Result { let metrics_set = ExecutionPlanMetricsSet::default(); let baseline_metrics = BaselineMetrics::new(&metrics_set, 0); @@ -108,6 +112,7 @@ impl ScanExec { data_types.len(), &jvm_fetch_time, &arrow_ffi_time, + has_buffer_reuse, )?; timer.stop(); batch @@ -130,6 +135,7 @@ impl ScanExec { exec_context_id, input_source, input_source_description: input_source_description.to_string(), + has_buffer_reuse, data_types, batch: Arc::new(Mutex::new(Some(first_batch))), cache, @@ -186,6 +192,7 @@ impl ScanExec { self.data_types.len(), &self.jvm_fetch_time, &self.arrow_ffi_time, + self.has_buffer_reuse, )?; *current_batch = Some(next_batch); } @@ -202,6 +209,7 @@ impl ScanExec { num_cols: usize, jvm_fetch_time: &Time, arrow_ffi_time: &Time, + copy_arrays: bool, ) -> Result { if exec_context_id == TEST_EXEC_CONTEXT_ID { // This is a unit test. We don't need to call JNI. @@ -276,8 +284,17 @@ impl ScanExec { let array_data = ArrayData::from_spark((array_ptr, schema_ptr))?; // TODO: validate array input data + // array_data.validate_full()?; - inputs.push(make_array(array_data)); + let array = make_array(array_data); + + // for native_comet scans, copy the array to avoid memory + // corruption issues due to mutable buffer reuse + if copy_arrays { + inputs.push(copy_array(&array)); + } else { + inputs.push(array); + } // Drop the Arcs to avoid memory leak unsafe { @@ -375,6 +392,7 @@ impl DisplayAs for ScanExec { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { write!(f, "ScanExec: source=[{}], ", self.input_source_description)?; + write!(f, "hasBufferReuse={}, ", self.has_buffer_reuse)?; let fields: Vec = self .data_types .iter() diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 7fa7bfe905..4101f3f16e 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -18,7 +18,6 @@ //! Converts Spark physical plan to DataFusion physical plan use crate::execution::operators::CopyMode; -use crate::execution::operators::FilterExec as CometFilterExec; use crate::{ errors::ExpressionError, execution::{ @@ -1146,25 +1145,17 @@ impl PhysicalPlanner { let predicate = self.create_expr(filter.predicate.as_ref().unwrap(), child.schema())?; - let filter: Arc = - match (filter.wrap_child_in_copy_exec, filter.use_datafusion_filter) { - (true, true) => Arc::new(DataFusionFilterExec::try_new( - predicate, - Self::wrap_in_copy_exec(Arc::clone(&child.native_plan)), - )?), - (true, false) => Arc::new(CometFilterExec::try_new( - predicate, - Self::wrap_in_copy_exec(Arc::clone(&child.native_plan)), - )?), - (false, true) => Arc::new(DataFusionFilterExec::try_new( - predicate, - Arc::clone(&child.native_plan), - )?), - (false, false) => Arc::new(CometFilterExec::try_new( - predicate, - Arc::clone(&child.native_plan), - )?), - }; + let filter: Arc = if filter.wrap_child_in_copy_exec { + Arc::new(DataFusionFilterExec::try_new( + predicate, + Self::wrap_in_copy_exec(Arc::clone(&child.native_plan)), + )?) + } else { + Arc::new(DataFusionFilterExec::try_new( + predicate, + Arc::clone(&child.native_plan), + )?) + }; Ok(( scans, @@ -1424,8 +1415,14 @@ impl PhysicalPlanner { }; // The `ScanExec` operator will take actual arrays from Spark during execution - let scan = - ScanExec::new(self.exec_context_id, input_source, &scan.source, data_types)?; + let scan = ScanExec::new( + self.exec_context_id, + input_source, + &scan.source, + data_types, + // TODO scan.has_buffer_reuse, + true + )?; Ok(( vec![scan.clone()], @@ -1504,22 +1501,11 @@ impl PhysicalPlanner { .map(|(idx, dt)| Field::new(format!("col_{idx}"), dt.clone(), true)) .collect(); let schema = Arc::new(Schema::new(fields)); - - // `Expand` operator keeps the input batch and expands it to multiple output - // batches. However, `ScanExec` will reuse input arrays for the next - // input batch. Therefore, we need to copy the input batch to avoid - // the data corruption. Note that we only need to copy the input batch - // if the child operator is `ScanExec`, because other operators after `ScanExec` - // will create new arrays for the output batch. - let input = if can_reuse_input_batch(&child.native_plan) { - Arc::new(CopyExec::new( - Arc::clone(&child.native_plan), - CopyMode::UnpackOrDeepCopy, - )) - } else { - Arc::clone(&child.native_plan) - }; - let expand = Arc::new(ExpandExec::new(projections, input, schema)); + let expand = Arc::new(ExpandExec::new( + projections, + Arc::clone(&child.native_plan), + schema, + )); Ok(( scans, Arc::new(SparkPlan::new(spark_plan.plan_id, expand, vec![child])), @@ -1845,14 +1831,9 @@ impl PhysicalPlanner { )) } - /// Wrap an ExecutionPlan in a CopyExec, which will unpack any dictionary-encoded arrays - /// and make a deep copy of other arrays if the plan re-uses batches. + /// Wrap an ExecutionPlan in a CopyExec, which will unpack any dictionary-encoded arrays. fn wrap_in_copy_exec(plan: Arc) -> Arc { - if can_reuse_input_batch(&plan) { - Arc::new(CopyExec::new(plan, CopyMode::UnpackOrDeepCopy)) - } else { - Arc::new(CopyExec::new(plan, CopyMode::UnpackOrClone)) - } + Arc::new(CopyExec::new(plan, CopyMode::UnpackOrClone)) } /// Create a DataFusion physical aggregate expression from Spark physical aggregate expression @@ -2553,17 +2534,6 @@ impl From for DataFusionError { } } -/// Returns true if given operator can return input array as output array without -/// modification. This is used to determine if we need to copy the input batch to avoid -/// data corruption from reusing the input batch. -fn can_reuse_input_batch(op: &Arc) -> bool { - if op.as_any().is::() || op.as_any().is::() { - can_reuse_input_batch(op.children()[0]) - } else { - op.as_any().is::() - } -} - /// Collects the indices of the columns in the input schema that are used in the expression /// and returns them as a pair of vectors, one for the left side and one for the right side. fn expr_to_columns( @@ -2783,6 +2753,7 @@ mod tests { }; use datafusion::error::DataFusionError; use datafusion::logical_expr::ScalarUDF; + use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::ExecutionPlan; use datafusion::{assert_batches_eq, physical_plan::common::collect, prelude::SessionContext}; use tempfile::TempDir; @@ -2791,6 +2762,7 @@ mod tests { use crate::execution::{operators::InputBatch, planner::PhysicalPlanner}; use crate::execution::operators::ExecutionError; + use crate::execution::spark_plan::SparkPlan; use crate::parquet::parquet_support::SparkParquetOptions; use crate::parquet::schema_adapter::SparkSchemaAdapterFactory; use datafusion_comet_proto::spark_expression::expr::ExprStruct; @@ -2814,6 +2786,7 @@ mod tests { type_info: None, }], source: "".to_string(), + has_buffer_reuse: true, })), }; @@ -2887,6 +2860,7 @@ mod tests { type_info: None, }], source: "".to_string(), + has_buffer_reuse: true, })), }; @@ -2965,7 +2939,7 @@ mod tests { #[tokio::test()] #[allow(clippy::field_reassign_with_default)] async fn to_datafusion_filter() { - let op_scan = create_scan(); + let op_scan = create_native_comet_scan(); let op = create_filter(op_scan, 0); let planner = PhysicalPlanner::default(); @@ -2993,6 +2967,73 @@ mod tests { assert_eq!(comet_err.to_string(), "Error from DataFusion: exec error."); } + #[test] + fn add_copy_to_sort_on_native_comet_scan() { + let scan_exec = create_native_comet_scan(); + let sort_exec = create_sort_exec(scan_exec); + let planner = PhysicalPlanner::default(); + let (_scans, datafusion_plan) = planner.create_plan(&sort_exec, &mut vec![], 1).unwrap(); + let plan_str = explain_plan(datafusion_plan); + let expected_str = r"SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false] + CopyExec [UnpackOrClone] + ScanExec: source=[native_comet], hasBufferReuse=true, schema=[col_0: Int32] +"; + assert_eq!(plan_str, expected_str); + } + + #[test] + fn add_copy_to_sort_on_sink_scan() { + let scan_exec = create_comet_sink_scan(); + let sort_exec = create_sort_exec(scan_exec); + let planner = PhysicalPlanner::default(); + let (_scans, datafusion_plan) = planner.create_plan(&sort_exec, &mut vec![], 1).unwrap(); + let plan_str = explain_plan(datafusion_plan); + let expected_str = r"SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false] + CopyExec [UnpackOrClone] + ScanExec: source=[sink], hasBufferReuse=false, schema=[col_0: Int32] +"; + assert_eq!(plan_str, expected_str); + } + + #[test] + fn add_copy_to_sort_on_filtered_native_comet_scan() { + let scan_exec = create_native_comet_scan(); + let filter_exec = create_filter(scan_exec, 1); + let sort_exec = create_sort_exec(filter_exec); + let planner = PhysicalPlanner::default(); + let (_scans, datafusion_plan) = planner.create_plan(&sort_exec, &mut vec![], 1).unwrap(); + let plan_str = explain_plan(datafusion_plan); + let expected_str = r"SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false] + CopyExec [UnpackOrClone] + FilterExec: col_0@0 = 1 + ScanExec: source=[native_comet], hasBufferReuse=true, schema=[col_0: Int32] +"; + assert_eq!(plan_str, expected_str); + } + + #[test] + fn add_copy_to_sort_on_filtered_scan() { + let scan_exec = create_comet_sink_scan(); + let filter_exec = create_filter(scan_exec, 1); + let sort_exec = create_sort_exec(filter_exec); + let planner = PhysicalPlanner::default(); + let (_scans, datafusion_plan) = planner.create_plan(&sort_exec, &mut vec![], 1).unwrap(); + let plan_str = explain_plan(datafusion_plan); + let expected_str = r"SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false] + CopyExec [UnpackOrClone] + FilterExec: col_0@0 = 1 + ScanExec: source=[sink], hasBufferReuse=false, schema=[col_0: Int32] +"; + assert_eq!(plan_str, expected_str); + } + + fn explain_plan(datafusion_plan: Arc) -> String { + format!( + "{}", + DisplayableExecutionPlan::new(datafusion_plan.native_plan.as_ref()).indent(true) + ) + } + // Creates a filter operator which takes an `Int32Array` and selects rows that are equal to // `value`. fn create_filter(child_op: spark_operator::Operator, value: i32) -> spark_operator::Operator { @@ -3038,28 +3079,58 @@ mod tests { children: vec![child_op], op_struct: Some(OpStruct::Filter(spark_operator::Filter { predicate: Some(expr), - use_datafusion_filter: false, wrap_child_in_copy_exec: false, })), } } + fn create_sort_exec(scan_exec: Operator) -> Operator { + // Create a SortOrder expression that sorts by the first column (index 0) + // in ascending order with nulls first + let sort_order_expr = spark_expression::Expr { + expr_struct: Some(ExprStruct::SortOrder(Box::new( + spark_expression::SortOrder { + child: Some(Box::new(spark_expression::Expr { + expr_struct: Some(ExprStruct::Bound(spark_expression::BoundReference { + index: 0, + datatype: Some(spark_expression::DataType { + type_id: 3, // Int32 + type_info: None, + }), + })), + })), + direction: spark_expression::SortDirection::Ascending as i32, + null_ordering: spark_expression::NullOrdering::NullsFirst as i32, + }, + ))), + }; + + Operator { + plan_id: 0, + children: vec![scan_exec], + op_struct: Some(OpStruct::Sort(spark_operator::Sort { + sort_orders: vec![sort_order_expr], + fetch: None, + })), + } + } + #[test] fn spark_plan_metrics_filter() { - let op_scan = create_scan(); + let op_scan = create_native_comet_scan(); let op = create_filter(op_scan, 0); let planner = PhysicalPlanner::default(); let (_scans, filter_exec) = planner.create_plan(&op, &mut vec![], 1).unwrap(); - assert_eq!("CometFilterExec", filter_exec.native_plan.name()); + assert_eq!("FilterExec", filter_exec.native_plan.name()); assert_eq!(1, filter_exec.children.len()); assert_eq!(0, filter_exec.additional_native_plans.len()); } #[test] fn spark_plan_metrics_hash_join() { - let op_scan = create_scan(); + let op_scan = create_native_comet_scan(); let op_join = Operator { plan_id: 0, children: vec![op_scan.clone(), op_scan.clone()], @@ -3091,13 +3162,28 @@ mod tests { } } - fn create_scan() -> Operator { + /// Create a scan for reading from native_comet Parquet reader + fn create_native_comet_scan() -> Operator { Operator { plan_id: 0, children: vec![], op_struct: Some(OpStruct::Scan(spark_operator::Scan { fields: vec![create_proto_datatype()], - source: "".to_string(), + source: "native_comet".to_string(), + has_buffer_reuse: true, // buffer reuse for native_comet + })), + } + } + + /// Create a scan for reading from a shuffle + fn create_comet_sink_scan() -> Operator { + Operator { + plan_id: 0, + children: vec![], + op_struct: Some(OpStruct::Scan(spark_operator::Scan { + fields: vec![create_proto_datatype()], + source: "sink".to_string(), + has_buffer_reuse: false, // no buffer reuse when reading a sink })), } } @@ -3140,6 +3226,7 @@ mod tests { }, ], source: "".to_string(), + has_buffer_reuse: true, })), }; @@ -3254,6 +3341,7 @@ mod tests { }, ], source: "".to_string(), + has_buffer_reuse: true, })), }; diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 3470ba8100..485a1be534 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -76,6 +76,10 @@ message Scan { // is purely for informational purposes when viewing native query plans in // debug mode. string source = 2; + // we need to specialize if the source is native_comet scan (either directly, + // or via Iceberg) because the native code needs to insert CopyExec nodes to + // avoid memory corruption issues due to the reuse of mutable buffers + bool has_buffer_reuse = 3; } message NativeScan { @@ -109,7 +113,6 @@ message Projection { message Filter { spark.spark_expression.Expr predicate = 1; - bool use_datafusion_filter = 2; // Some expressions don't support dictionary arrays, so may need to wrap the child in a CopyExec bool wrap_child_in_copy_exec = 3; } diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 35ebabdacb..360d18827e 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -52,6 +52,7 @@ import org.apache.comet.CometSparkSessionExtensions.{isCometScan, withInfo} import org.apache.comet.DataTypeSupport.isComplexType import org.apache.comet.expressions._ import org.apache.comet.objectstore.NativeConfig +import org.apache.comet.parquet.SupportsComet import org.apache.comet.serde.ExprOuterClass.{AggExpr, DataType => ProtoDataType, Expr, ScalarFunc} import org.apache.comet.serde.ExprOuterClass.DataType._ import org.apache.comet.serde.OperatorOuterClass.{AggregateMode => CometAggregateMode, BuildSide, JoinType, Operator} @@ -1788,22 +1789,6 @@ object QueryPlanSerde extends Logging with CometExprShim { val cond = exprToProto(condition, child.output) if (cond.isDefined && childOp.nonEmpty) { - // We need to determine whether to use DataFusion's FilterExec or Comet's - // FilterExec. The difference is that DataFusion's implementation will sometimes pass - // batches through whereas the Comet implementation guarantees that a copy is always - // made, which is critical when using `native_comet` scans due to buffer re-use - - // TODO this could be optimized more to stop walking the tree on hitting - // certain operators such as join or aggregate which will copy batches - def containsNativeCometScan(plan: SparkPlan): Boolean = { - plan match { - case w: CometScanWrapper => containsNativeCometScan(w.originalPlan) - case scan: CometScanExec => scan.scanImpl == CometConf.SCAN_NATIVE_COMET - case _: CometNativeScanExec => false - case _ => plan.children.exists(containsNativeCometScan) - } - } - // Some native expressions do not support operating on dictionary-encoded arrays, so // wrap the child in a CopyExec to unpack dictionaries first. def wrapChildInCopyExec(condition: Expression): Boolean = { @@ -1816,7 +1801,6 @@ object QueryPlanSerde extends Logging with CometExprShim { val filterBuilder = OperatorOuterClass.Filter .newBuilder() .setPredicate(cond.get) - .setUseDatafusionFilter(!containsNativeCometScan(op)) .setWrapChildInCopyExec(wrapChildInCopyExec(condition)) Some(builder.setFilter(filterBuilder).build()) } else { @@ -2197,17 +2181,39 @@ object QueryPlanSerde extends Logging with CometExprShim { // These operators are source of Comet native execution chain val scanBuilder = OperatorOuterClass.Scan.newBuilder() - val source = op.simpleStringWithNodeId() - if (source.isEmpty) { - scanBuilder.setSource(op.getClass.getSimpleName) - } else { - scanBuilder.setSource(source) + op match { + case scan: CometScanExec => + scanBuilder.setSource( + s"CometScanExec[${scan.scanImpl}]: ${op.simpleStringWithNodeId()}") + case _ => + scanBuilder.setSource( + s"[${op.getClass.getSimpleName}]: ${op.simpleStringWithNodeId()}") } val scanTypes = op.output.flatten { attr => serializeDataType(attr.dataType) } + def hasBufferReuse(op: SparkPlan): Boolean = { + op match { + case batchScan: CometBatchScanExec if batchScan.scan.isInstanceOf[SupportsComet] => + // Iceberg integration + true + case scan: CometScanExec => + // native_comet scan + scan.scanImpl == CometConf.SCAN_NATIVE_COMET + case CometScanWrapper(_, scan) => + hasBufferReuse(scan) + case _ => + // wrapper around scan such as CoalesceExec + op.children.exists(hasBufferReuse) + } + } + + if (hasBufferReuse(op)) { + scanBuilder.setHasBufferReuse(true) + } + if (scanTypes.length == op.output.length) { scanBuilder.addAllFields(scanTypes.asJava) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala index 2fc73bb7c5..fce45cb18d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala @@ -88,7 +88,10 @@ object CometExecUtils { * child partition */ def getLimitNativePlan(outputAttributes: Seq[Attribute], limit: Int): Option[Operator] = { - val scanBuilder = OperatorOuterClass.Scan.newBuilder().setSource("LimitInput") + val scanBuilder = OperatorOuterClass.Scan + .newBuilder() + .setSource("LimitInput") + .setHasBufferReuse(true) // TODO needed? val scanOpBuilder = OperatorOuterClass.Operator.newBuilder() val scanTypes = outputAttributes.flatten { attr => @@ -118,7 +121,10 @@ object CometExecUtils { sortOrder: Seq[SortOrder], child: SparkPlan, limit: Int): Option[Operator] = { - val scanBuilder = OperatorOuterClass.Scan.newBuilder().setSource("TopKInput") + val scanBuilder = OperatorOuterClass.Scan + .newBuilder() + .setSource("TopKInput") + .setHasBufferReuse(true) // TODO needed? val scanOpBuilder = OperatorOuterClass.Operator.newBuilder() val scanTypes = outputAttributes.flatten { attr => diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala index 5679a87b30..bba2fe99c3 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala @@ -284,7 +284,7 @@ case class CometScanExec( } override val nodeName: String = - s"CometScan $relation ${tableIdentifier.map(_.unquotedString).getOrElse("")}" + s"CometScan [$scanImpl] $relation ${tableIdentifier.map(_.unquotedString).getOrElse("")}" /** * Create an RDD for bucketed reads. The non-bucketed variant of this function is diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala index 5d772be403..275d976f5f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala @@ -141,7 +141,10 @@ class CometNativeShuffleWriter[K, V]( } private def getNativePlan(dataFile: String, indexFile: String): Operator = { - val scanBuilder = OperatorOuterClass.Scan.newBuilder().setSource("ShuffleWriterInput") + val scanBuilder = OperatorOuterClass.Scan + .newBuilder() + .setSource("ShuffleWriterInput") + .setHasBufferReuse(true) // TODO can be optimized? val opBuilder = OperatorOuterClass.Operator.newBuilder() val scanTypes = outputAttributes.flatten { attr =>