Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 140 additions & 8 deletions docs/source/contributor-guide/adding_a_new_expression.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,19 +262,151 @@ message Add2 {

### Adding the Expression in Rust

With the serialization complete, the next step is to implement the expression in Rust and ensure that the incoming plan can make use of it.
With the serialization complete, the next step is to implement the expression in Rust. Comet now uses a modular expression registry pattern that provides better organization and type safety compared to monolithic match statements.

How this works is somewhat dependent on the type of expression you're adding. Expression implementations live in the `native/spark-expr/src/` directory, organized by category (e.g., `math_funcs/`, `string_funcs/`, `array_funcs/`).
#### File Organization

#### Generally Adding a New Expression
The expression-related code is organized as follows:

If you're adding a new expression that requires custom protobuf serialization, you may need to:
- `native/core/src/execution/planner/expression_registry.rs` - Contains `ExpressionBuilder` trait, `ExpressionType` enum, and `ExpressionRegistry`
- `native/core/src/execution/planner/macros.rs` - Contains shared macros (`extract_expr!`, `binary_expr_builder!`, `unary_expr_builder!`)
- `native/core/src/execution/expressions/` - Individual expression builder implementations organized by category
- `native/spark-expr/src/` - Scalar function implementations organized by category (e.g., `math_funcs/`, `string_funcs/`)

1. Add a new message to the protobuf definition in `native/proto/src/proto/expr.proto`
2. Add a native expression handler in `expression_registry.rs` to deserialize the new protobuf message type and
create a native expression
#### Option A: Using the Expression Registry (Recommended for Complex Expressions)

For most expressions, you can skip this step if you're using the existing scalar function infrastructure.
For expressions that need custom protobuf handling or complex logic, use the modular registry pattern.

##### Create an ExpressionBuilder

Create or update a file in `native/core/src/execution/expressions/` (e.g., `comparison.rs`, `arithmetic.rs`):

```rust
use std::sync::Arc;

use arrow::datatypes::SchemaRef;
use datafusion::physical_expr::PhysicalExpr;
use datafusion_comet_proto::spark_expression::Expr;

use crate::execution::{
operators::ExecutionError,
planner::{expression_registry::ExpressionBuilder, PhysicalPlanner},
};
use crate::extract_expr;

/// Builder for YourNewExpression expressions
pub struct YourExpressionBuilder;

impl ExpressionBuilder for YourExpressionBuilder {
fn build(
&self,
spark_expr: &Expr,
input_schema: SchemaRef,
planner: &PhysicalPlanner,
) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
// Use extract_expr! macro for type-safe extraction
let expr = extract_expr!(spark_expr, YourNewExpression);

// Convert child expressions
let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?;
let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?;

// Create and return the DataFusion physical expression
Ok(Arc::new(YourDataFusionExpr::new(left, right)))
}
}
```

For simple binary expressions, you can use the `binary_expr_builder!` macro:

```rust
use datafusion::logical_expr::Operator as DataFusionOperator;
use crate::binary_expr_builder;

// This generates a complete ExpressionBuilder implementation
binary_expr_builder!(YourBinaryExprBuilder, YourBinaryExpr, DataFusionOperator::Plus);
```

##### Register the ExpressionBuilder

Add the ExpressionType to the enum in `native/core/src/execution/planner/expression_registry.rs`:

```rust
/// Enum to identify different expression types for registry dispatch
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ExpressionType {
// Arithmetic expressions
Add,
Subtract,
// ... existing expressions ...
YourNewExpression, // Add your expression type here
}
```

Register your builder in the `ExpressionRegistry` implementation:

```rust
/// Register all expression builders
fn register_all_expressions(&mut self) {
self.register_arithmetic_expressions();
self.register_comparison_expressions();
self.register_your_expressions(); // Add this line
}

/// Register your new expressions
fn register_your_expressions(&mut self) {
use crate::execution::expressions::your_category::YourExpressionBuilder;

self.builders
.insert(ExpressionType::YourNewExpression, Box::new(YourExpressionBuilder));
}
```

Update the `get_expression_type` function to map your protobuf expression:

```rust
fn get_expression_type(spark_expr: &Expr) -> Option<ExpressionType> {
use datafusion_comet_proto::spark_expression::expr::ExprStruct;

match spark_expr.expr_struct.as_ref()? {
ExprStruct::Add(_) => Some(ExpressionType::Add),
ExprStruct::Subtract(_) => Some(ExpressionType::Subtract),
// ... existing expressions ...
ExprStruct::YourNewExpression(_) => Some(ExpressionType::YourNewExpression),
}
}
```

**Note**: See existing implementations in `native/core/src/execution/expressions/` for working examples, such as `arithmetic.rs`, `comparison.rs`, etc.

#### Option B: Using Scalar Functions (Recommended for Simple Functions)

For expressions that map directly to scalar functions, use the existing scalar function infrastructure. This approach is simpler for basic functions but less flexible than the registry pattern.

**When to use the Registry Pattern (Option A):**

- Complex expressions that need custom deserialization logic
- Expressions with multiple variants or complex parameter handling
- Binary/unary expressions that benefit from type-safe extraction
- Expressions that need custom DataFusion physical expression implementations

**When to use Scalar Functions (Option B):**

- Simple functions that map directly to DataFusion scalar functions
- Functions that don't need complex parameter handling
- Functions where the existing `CometScalarFunction` pattern is sufficient

**Benefits of the Registry Pattern:**

- **Better Organization**: Each expression's logic is isolated
- **Type Safety**: The `extract_expr!` macro ensures compile-time correctness
- **Extensibility**: New expressions can be added without modifying core planner logic
- **Code Reuse**: Macros like `binary_expr_builder!` reduce boilerplate
- **Graceful Fallback**: Unregistered expressions automatically fall back to the monolithic match

#### Option C: Fallback to Monolithic Match (Legacy)

If you need to add an expression but prefer not to use the registry pattern, expressions that aren't registered will automatically fall back to the legacy monolithic match statement in `create_expr()`. However, the registry pattern (Option A) is strongly recommended for new expressions.

#### Adding a New Scalar Function Expression

Expand Down
202 changes: 176 additions & 26 deletions docs/source/contributor-guide/adding_a_new_operator.md
Original file line number Diff line number Diff line change
Expand Up @@ -326,51 +326,201 @@ Run `make` to update the user guide. The new configuration option will be added

### Step 5: Implement the Native Operator in Rust

#### Update the Planner
Comet now uses a modular operator registry pattern that provides better organization and type safety compared to monolithic match statements.

In `native/core/src/execution/planner.rs`, add a match case in the operator deserialization logic to handle your new protobuf message:
#### File Organization

The operator-related code is organized as follows:

- `native/core/src/execution/planner/operator_registry.rs` - Contains `OperatorBuilder` trait, `OperatorType` enum, and `OperatorRegistry`
- `native/core/src/execution/planner/expression_registry.rs` - Contains `ExpressionBuilder` trait, `ExpressionType` enum, and `ExpressionRegistry`
- `native/core/src/execution/planner/macros.rs` - Contains shared macros (`extract_op!`, `extract_expr!`, etc.)
- `native/core/src/execution/operators/` - Individual operator builder implementations

#### Option A: Using the Operator Registry (Recommended)

The recommended approach is to create an `OperatorBuilder` and register it with the `OperatorRegistry`.

**Note**: See `native/core/src/execution/operators/projection.rs` for a complete working example of `ProjectionBuilder` that follows this pattern.

##### Create an OperatorBuilder

Create a new file in `native/core/src/execution/operators/` (e.g., `your_operator.rs`):

```rust
use std::sync::Arc;

use datafusion::physical_plan::your_operator::YourDataFusionExec;
use datafusion_comet_proto::spark_operator::Operator;
use jni::objects::GlobalRef;

use crate::{
execution::{
operators::{ExecutionError, ScanExec},
planner::{operator_registry::OperatorBuilder, PhysicalPlanner},
spark_plan::SparkPlan,
},
extract_op,
};

/// Builder for YourNewOperator operators
pub struct YourOperatorBuilder;

impl OperatorBuilder for YourOperatorBuilder {
fn build(
&self,
spark_plan: &Operator,
inputs: &mut Vec<Arc<GlobalRef>>,
partition_count: usize,
planner: &PhysicalPlanner,
) -> Result<(Vec<ScanExec>, Arc<SparkPlan>), ExecutionError> {
// Use extract_op! macro for type-safe extraction
let your_op = extract_op!(spark_plan, YourNewOperator);
let children = &spark_plan.children;

assert_eq!(children.len(), 1); // Adjust based on your operator's arity
let (scans, child) = planner.create_plan(&children[0], inputs, partition_count)?;

// Convert protobuf fields to DataFusion components
let expressions: Result<Vec<_>, _> = your_op
.expressions
.iter()
.map(|expr| planner.create_expr(expr, child.schema()))
.collect();

// Create DataFusion operator
let datafusion_exec = Arc::new(YourDataFusionExec::try_new(
expressions?,
Arc::clone(&child.native_plan),
)?);

Ok((
scans,
Arc::new(SparkPlan::new(spark_plan.plan_id, datafusion_exec, vec![child])),
))
}
}
```

##### Register the OperatorBuilder

Add the OperatorType to the enum in `native/core/src/execution/planner/operator_registry.rs`:

```rust
use datafusion_comet_proto::spark_operator::operator::OpStruct;
/// Enum to identify different operator types for registry dispatch
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum OperatorType {
Scan,
NativeScan,
IcebergScan,
Projection,
Filter,
HashAgg,
Limit,
Sort,
ShuffleWriter,
ParquetWriter,
Expand,
SortMergeJoin,
HashJoin,
Window,
YourNewOperator, // Add your operator type here
}
```

Update the `get_operator_type` function in the same file:

```rust
fn get_operator_type(spark_operator: &Operator) -> Option<OperatorType> {
use datafusion_comet_proto::spark_operator::operator::OpStruct;

match spark_operator.op_struct.as_ref()? {
OpStruct::Projection(_) => Some(OperatorType::Projection),
OpStruct::Filter(_) => Some(OperatorType::Filter),
// ... existing operators ...
OpStruct::YourNewOperator(_) => Some(OperatorType::YourNewOperator),
OpStruct::Explode(_) => None, // Not yet in OperatorType enum
}
}
```

// In the create_plan or similar method:
match op.op_struct.as_ref() {
Some(OpStruct::Scan(scan)) => {
Register your builder in the `OperatorRegistry` implementation:

```rust
/// Register all operator builders
fn register_all_operators(&mut self) {
self.register_projection_operators();
self.register_your_operator(); // Add this line
}

/// Register your new operator
fn register_your_operator(&mut self) {
use crate::execution::operators::your_operator::YourOperatorBuilder;

self.builders
.insert(OperatorType::YourNewOperator, Box::new(YourOperatorBuilder));
}
```

##### Add the module declaration

Add your new module to `native/core/src/execution/operators/mod.rs`:

```rust
pub mod your_operator;
```

#### Option B: Using the Fallback Match (Legacy)

If you prefer not to use the registry pattern (or for complex operators that don't fit the pattern), you can still add a match case in the fallback logic in `native/core/src/execution/planner.rs`:

```rust
// In the create_plan method's fallback match statement:
match spark_plan.op_struct.as_ref().unwrap() {
OpStruct::Filter(filter) => {
// ... existing cases ...
}
Some(OpStruct::YourNewOperator(your_op)) => {
create_your_operator_exec(your_op, children, session_ctx)
OpStruct::YourNewOperator(your_op) => {
create_your_operator_exec(your_op, children, partition_count)
}
// ... other cases ...
_ => Err(GeneralError(format!(
"Unsupported or unregistered operator type: {:?}",
spark_plan.op_struct
))),
}
```

#### Implement the Operator
However, the registry approach (Option A) is strongly recommended because it provides:

Create the operator implementation, either in an existing file or a new file in `native/core/src/execution/operators/`:
- **Better organization**: Each operator's logic is isolated
- **Type safety**: The `extract_op!` macro ensures compile-time correctness
- **Extensibility**: New operators can be added without modifying core planner logic
- **Consistency**: Follows the established pattern for expressions

#### Custom Operator Implementation

If your operator doesn't have a direct DataFusion equivalent, you'll need to implement a custom operator. See `native/core/src/execution/operators/expand.rs` for a complete example:

```rust
use datafusion::physical_plan::{ExecutionPlan, ...};
use datafusion_comet_proto::spark_operator::YourNewOperator;

pub fn create_your_operator_exec(
op: &YourNewOperator,
children: Vec<Arc<dyn ExecutionPlan>>,
session_ctx: &SessionContext,
) -> Result<Arc<dyn ExecutionPlan>, ExecutionError> {
// Deserialize expressions and configuration
// Create and return the execution plan

// Option 1: Use existing DataFusion operator
// Ok(Arc::new(SomeDataFusionExec::try_new(...)?))
#[derive(Debug)]
pub struct YourCustomExec {
// Your operator's fields
}

// Option 2: Implement custom operator (see ExpandExec for example)
// Ok(Arc::new(YourCustomExec::new(...)))
impl ExecutionPlan for YourCustomExec {
// Implement required methods
fn as_any(&self) -> &dyn Any { self }
fn schema(&self) -> SchemaRef { /* ... */ }
fn output_partitioning(&self) -> Partitioning { /* ... */ }
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { /* ... */ }
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> { /* ... */ }
fn with_new_children(&self, children: Vec<Arc<dyn ExecutionPlan>>) -> Result<Arc<dyn ExecutionPlan>> { /* ... */ }
fn execute(&self, partition: usize, context: Arc<TaskContext>) -> Result<SendableRecordBatchStream> { /* ... */ }
}
```

For custom operators, you'll need to implement the `ExecutionPlan` trait. See `native/core/src/execution/operators/expand.rs` or `scan.rs` for examples.

### Step 6: Add Tests

#### Scala Integration Tests
Expand Down