From 6f436d9070ef12429398a578ab4f499b41ca009c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 10 Dec 2025 15:51:17 -0700 Subject: [PATCH] update contrib guide for native registry framework --- .../adding_a_new_expression.md | 148 ++++++++++++- .../adding_a_new_operator.md | 202 +++++++++++++++--- 2 files changed, 316 insertions(+), 34 deletions(-) diff --git a/docs/source/contributor-guide/adding_a_new_expression.md b/docs/source/contributor-guide/adding_a_new_expression.md index 74825f4301..33b69c8d34 100644 --- a/docs/source/contributor-guide/adding_a_new_expression.md +++ b/docs/source/contributor-guide/adding_a_new_expression.md @@ -262,19 +262,151 @@ message Add2 { ### Adding the Expression in Rust -With the serialization complete, the next step is to implement the expression in Rust and ensure that the incoming plan can make use of it. +With the serialization complete, the next step is to implement the expression in Rust. Comet now uses a modular expression registry pattern that provides better organization and type safety compared to monolithic match statements. -How this works is somewhat dependent on the type of expression you're adding. Expression implementations live in the `native/spark-expr/src/` directory, organized by category (e.g., `math_funcs/`, `string_funcs/`, `array_funcs/`). +#### File Organization -#### Generally Adding a New Expression +The expression-related code is organized as follows: -If you're adding a new expression that requires custom protobuf serialization, you may need to: +- `native/core/src/execution/planner/expression_registry.rs` - Contains `ExpressionBuilder` trait, `ExpressionType` enum, and `ExpressionRegistry` +- `native/core/src/execution/planner/macros.rs` - Contains shared macros (`extract_expr!`, `binary_expr_builder!`, `unary_expr_builder!`) +- `native/core/src/execution/expressions/` - Individual expression builder implementations organized by category +- `native/spark-expr/src/` - Scalar function implementations organized by category (e.g., `math_funcs/`, `string_funcs/`) -1. Add a new message to the protobuf definition in `native/proto/src/proto/expr.proto` -2. Add a native expression handler in `expression_registry.rs` to deserialize the new protobuf message type and - create a native expression +#### Option A: Using the Expression Registry (Recommended for Complex Expressions) -For most expressions, you can skip this step if you're using the existing scalar function infrastructure. +For expressions that need custom protobuf handling or complex logic, use the modular registry pattern. + +##### Create an ExpressionBuilder + +Create or update a file in `native/core/src/execution/expressions/` (e.g., `comparison.rs`, `arithmetic.rs`): + +```rust +use std::sync::Arc; + +use arrow::datatypes::SchemaRef; +use datafusion::physical_expr::PhysicalExpr; +use datafusion_comet_proto::spark_expression::Expr; + +use crate::execution::{ + operators::ExecutionError, + planner::{expression_registry::ExpressionBuilder, PhysicalPlanner}, +}; +use crate::extract_expr; + +/// Builder for YourNewExpression expressions +pub struct YourExpressionBuilder; + +impl ExpressionBuilder for YourExpressionBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &PhysicalPlanner, + ) -> Result, ExecutionError> { + // Use extract_expr! macro for type-safe extraction + let expr = extract_expr!(spark_expr, YourNewExpression); + + // Convert child expressions + let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?; + let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?; + + // Create and return the DataFusion physical expression + Ok(Arc::new(YourDataFusionExpr::new(left, right))) + } +} +``` + +For simple binary expressions, you can use the `binary_expr_builder!` macro: + +```rust +use datafusion::logical_expr::Operator as DataFusionOperator; +use crate::binary_expr_builder; + +// This generates a complete ExpressionBuilder implementation +binary_expr_builder!(YourBinaryExprBuilder, YourBinaryExpr, DataFusionOperator::Plus); +``` + +##### Register the ExpressionBuilder + +Add the ExpressionType to the enum in `native/core/src/execution/planner/expression_registry.rs`: + +```rust +/// Enum to identify different expression types for registry dispatch +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum ExpressionType { + // Arithmetic expressions + Add, + Subtract, + // ... existing expressions ... + YourNewExpression, // Add your expression type here +} +``` + +Register your builder in the `ExpressionRegistry` implementation: + +```rust +/// Register all expression builders +fn register_all_expressions(&mut self) { + self.register_arithmetic_expressions(); + self.register_comparison_expressions(); + self.register_your_expressions(); // Add this line +} + +/// Register your new expressions +fn register_your_expressions(&mut self) { + use crate::execution::expressions::your_category::YourExpressionBuilder; + + self.builders + .insert(ExpressionType::YourNewExpression, Box::new(YourExpressionBuilder)); +} +``` + +Update the `get_expression_type` function to map your protobuf expression: + +```rust +fn get_expression_type(spark_expr: &Expr) -> Option { + use datafusion_comet_proto::spark_expression::expr::ExprStruct; + + match spark_expr.expr_struct.as_ref()? { + ExprStruct::Add(_) => Some(ExpressionType::Add), + ExprStruct::Subtract(_) => Some(ExpressionType::Subtract), + // ... existing expressions ... + ExprStruct::YourNewExpression(_) => Some(ExpressionType::YourNewExpression), + } +} +``` + +**Note**: See existing implementations in `native/core/src/execution/expressions/` for working examples, such as `arithmetic.rs`, `comparison.rs`, etc. + +#### Option B: Using Scalar Functions (Recommended for Simple Functions) + +For expressions that map directly to scalar functions, use the existing scalar function infrastructure. This approach is simpler for basic functions but less flexible than the registry pattern. + +**When to use the Registry Pattern (Option A):** + +- Complex expressions that need custom deserialization logic +- Expressions with multiple variants or complex parameter handling +- Binary/unary expressions that benefit from type-safe extraction +- Expressions that need custom DataFusion physical expression implementations + +**When to use Scalar Functions (Option B):** + +- Simple functions that map directly to DataFusion scalar functions +- Functions that don't need complex parameter handling +- Functions where the existing `CometScalarFunction` pattern is sufficient + +**Benefits of the Registry Pattern:** + +- **Better Organization**: Each expression's logic is isolated +- **Type Safety**: The `extract_expr!` macro ensures compile-time correctness +- **Extensibility**: New expressions can be added without modifying core planner logic +- **Code Reuse**: Macros like `binary_expr_builder!` reduce boilerplate +- **Graceful Fallback**: Unregistered expressions automatically fall back to the monolithic match + +#### Option C: Fallback to Monolithic Match (Legacy) + +If you need to add an expression but prefer not to use the registry pattern, expressions that aren't registered will automatically fall back to the legacy monolithic match statement in `create_expr()`. However, the registry pattern (Option A) is strongly recommended for new expressions. #### Adding a New Scalar Function Expression diff --git a/docs/source/contributor-guide/adding_a_new_operator.md b/docs/source/contributor-guide/adding_a_new_operator.md index 4317943aa8..ee5987bece 100644 --- a/docs/source/contributor-guide/adding_a_new_operator.md +++ b/docs/source/contributor-guide/adding_a_new_operator.md @@ -326,51 +326,201 @@ Run `make` to update the user guide. The new configuration option will be added ### Step 5: Implement the Native Operator in Rust -#### Update the Planner +Comet now uses a modular operator registry pattern that provides better organization and type safety compared to monolithic match statements. -In `native/core/src/execution/planner.rs`, add a match case in the operator deserialization logic to handle your new protobuf message: +#### File Organization + +The operator-related code is organized as follows: + +- `native/core/src/execution/planner/operator_registry.rs` - Contains `OperatorBuilder` trait, `OperatorType` enum, and `OperatorRegistry` +- `native/core/src/execution/planner/expression_registry.rs` - Contains `ExpressionBuilder` trait, `ExpressionType` enum, and `ExpressionRegistry` +- `native/core/src/execution/planner/macros.rs` - Contains shared macros (`extract_op!`, `extract_expr!`, etc.) +- `native/core/src/execution/operators/` - Individual operator builder implementations + +#### Option A: Using the Operator Registry (Recommended) + +The recommended approach is to create an `OperatorBuilder` and register it with the `OperatorRegistry`. + +**Note**: See `native/core/src/execution/operators/projection.rs` for a complete working example of `ProjectionBuilder` that follows this pattern. + +##### Create an OperatorBuilder + +Create a new file in `native/core/src/execution/operators/` (e.g., `your_operator.rs`): + +```rust +use std::sync::Arc; + +use datafusion::physical_plan::your_operator::YourDataFusionExec; +use datafusion_comet_proto::spark_operator::Operator; +use jni::objects::GlobalRef; + +use crate::{ + execution::{ + operators::{ExecutionError, ScanExec}, + planner::{operator_registry::OperatorBuilder, PhysicalPlanner}, + spark_plan::SparkPlan, + }, + extract_op, +}; + +/// Builder for YourNewOperator operators +pub struct YourOperatorBuilder; + +impl OperatorBuilder for YourOperatorBuilder { + fn build( + &self, + spark_plan: &Operator, + inputs: &mut Vec>, + partition_count: usize, + planner: &PhysicalPlanner, + ) -> Result<(Vec, Arc), ExecutionError> { + // Use extract_op! macro for type-safe extraction + let your_op = extract_op!(spark_plan, YourNewOperator); + let children = &spark_plan.children; + + assert_eq!(children.len(), 1); // Adjust based on your operator's arity + let (scans, child) = planner.create_plan(&children[0], inputs, partition_count)?; + + // Convert protobuf fields to DataFusion components + let expressions: Result, _> = your_op + .expressions + .iter() + .map(|expr| planner.create_expr(expr, child.schema())) + .collect(); + + // Create DataFusion operator + let datafusion_exec = Arc::new(YourDataFusionExec::try_new( + expressions?, + Arc::clone(&child.native_plan), + )?); + + Ok(( + scans, + Arc::new(SparkPlan::new(spark_plan.plan_id, datafusion_exec, vec![child])), + )) + } +} +``` + +##### Register the OperatorBuilder + +Add the OperatorType to the enum in `native/core/src/execution/planner/operator_registry.rs`: ```rust -use datafusion_comet_proto::spark_operator::operator::OpStruct; +/// Enum to identify different operator types for registry dispatch +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum OperatorType { + Scan, + NativeScan, + IcebergScan, + Projection, + Filter, + HashAgg, + Limit, + Sort, + ShuffleWriter, + ParquetWriter, + Expand, + SortMergeJoin, + HashJoin, + Window, + YourNewOperator, // Add your operator type here +} +``` + +Update the `get_operator_type` function in the same file: + +```rust +fn get_operator_type(spark_operator: &Operator) -> Option { + use datafusion_comet_proto::spark_operator::operator::OpStruct; + + match spark_operator.op_struct.as_ref()? { + OpStruct::Projection(_) => Some(OperatorType::Projection), + OpStruct::Filter(_) => Some(OperatorType::Filter), + // ... existing operators ... + OpStruct::YourNewOperator(_) => Some(OperatorType::YourNewOperator), + OpStruct::Explode(_) => None, // Not yet in OperatorType enum + } +} +``` -// In the create_plan or similar method: -match op.op_struct.as_ref() { - Some(OpStruct::Scan(scan)) => { +Register your builder in the `OperatorRegistry` implementation: + +```rust +/// Register all operator builders +fn register_all_operators(&mut self) { + self.register_projection_operators(); + self.register_your_operator(); // Add this line +} + +/// Register your new operator +fn register_your_operator(&mut self) { + use crate::execution::operators::your_operator::YourOperatorBuilder; + + self.builders + .insert(OperatorType::YourNewOperator, Box::new(YourOperatorBuilder)); +} +``` + +##### Add the module declaration + +Add your new module to `native/core/src/execution/operators/mod.rs`: + +```rust +pub mod your_operator; +``` + +#### Option B: Using the Fallback Match (Legacy) + +If you prefer not to use the registry pattern (or for complex operators that don't fit the pattern), you can still add a match case in the fallback logic in `native/core/src/execution/planner.rs`: + +```rust +// In the create_plan method's fallback match statement: +match spark_plan.op_struct.as_ref().unwrap() { + OpStruct::Filter(filter) => { // ... existing cases ... } - Some(OpStruct::YourNewOperator(your_op)) => { - create_your_operator_exec(your_op, children, session_ctx) + OpStruct::YourNewOperator(your_op) => { + create_your_operator_exec(your_op, children, partition_count) } - // ... other cases ... + _ => Err(GeneralError(format!( + "Unsupported or unregistered operator type: {:?}", + spark_plan.op_struct + ))), } ``` -#### Implement the Operator +However, the registry approach (Option A) is strongly recommended because it provides: -Create the operator implementation, either in an existing file or a new file in `native/core/src/execution/operators/`: +- **Better organization**: Each operator's logic is isolated +- **Type safety**: The `extract_op!` macro ensures compile-time correctness +- **Extensibility**: New operators can be added without modifying core planner logic +- **Consistency**: Follows the established pattern for expressions + +#### Custom Operator Implementation + +If your operator doesn't have a direct DataFusion equivalent, you'll need to implement a custom operator. See `native/core/src/execution/operators/expand.rs` for a complete example: ```rust use datafusion::physical_plan::{ExecutionPlan, ...}; -use datafusion_comet_proto::spark_operator::YourNewOperator; - -pub fn create_your_operator_exec( - op: &YourNewOperator, - children: Vec>, - session_ctx: &SessionContext, -) -> Result, ExecutionError> { - // Deserialize expressions and configuration - // Create and return the execution plan - // Option 1: Use existing DataFusion operator - // Ok(Arc::new(SomeDataFusionExec::try_new(...)?)) +#[derive(Debug)] +pub struct YourCustomExec { + // Your operator's fields +} - // Option 2: Implement custom operator (see ExpandExec for example) - // Ok(Arc::new(YourCustomExec::new(...))) +impl ExecutionPlan for YourCustomExec { + // Implement required methods + fn as_any(&self) -> &dyn Any { self } + fn schema(&self) -> SchemaRef { /* ... */ } + fn output_partitioning(&self) -> Partitioning { /* ... */ } + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { /* ... */ } + fn children(&self) -> Vec> { /* ... */ } + fn with_new_children(&self, children: Vec>) -> Result> { /* ... */ } + fn execute(&self, partition: usize, context: Arc) -> Result { /* ... */ } } ``` -For custom operators, you'll need to implement the `ExecutionPlan` trait. See `native/core/src/execution/operators/expand.rs` or `scan.rs` for examples. - ### Step 6: Add Tests #### Scala Integration Tests