Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 7 additions & 39 deletions datafusion-examples/examples/relation_planner/table_sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,12 @@ use std::{
any::Any,
fmt::{self, Debug, Formatter},
hash::{Hash, Hasher},
ops::{Add, Div, Mul, Sub},
pin::Pin,
str::FromStr,
sync::Arc,
task::{Context, Poll},
};

use arrow::datatypes::{Float64Type, Int64Type};
use arrow::{
array::{ArrayRef, Int32Array, RecordBatch, StringArray, UInt32Array},
compute,
Expand All @@ -102,6 +101,7 @@ use futures::{
use rand::{Rng, SeedableRng, rngs::StdRng};
use tonic::async_trait;

use datafusion::optimizer::simplify_expressions::simplify_sql_literal::parse_sql_literal;
use datafusion::{
execution::{
RecordBatchStream, SendableRecordBatchStream, SessionState, SessionStateBuilder,
Expand Down Expand Up @@ -410,11 +410,12 @@ impl RelationPlanner for TableSamplePlanner {
"TABLESAMPLE requires a quantity (percentage, fraction, or row count)"
);
};
let quantity_value_expr = context.sql_to_expr(quantity.value, input.schema())?;

match quantity.unit {
// TABLESAMPLE (N ROWS) - exact row limit
Some(TableSampleUnit::Rows) => {
let rows = parse_quantity::<i64>(&quantity.value)?;
let rows: i64 = parse_sql_literal::<Int64Type>(&quantity_value_expr)?;
if rows < 0 {
return plan_err!("row count must be non-negative, got {}", rows);
}
Expand All @@ -426,15 +427,16 @@ impl RelationPlanner for TableSamplePlanner {

// TABLESAMPLE (N PERCENT) - percentage sampling
Some(TableSampleUnit::Percent) => {
let percent = parse_quantity::<f64>(&quantity.value)?;
let percent: f64 =
parse_sql_literal::<Float64Type>(&quantity_value_expr)?;
let fraction = percent / 100.0;
let plan = TableSamplePlanNode::new(input, fraction, seed).into_plan();
Ok(RelationPlanning::Planned(PlannedRelation::new(plan, alias)))
}

// TABLESAMPLE (N) - fraction if <1.0, row limit if >=1.0
None => {
let value = parse_quantity::<f64>(&quantity.value)?;
let value = parse_sql_literal::<Float64Type>(&quantity_value_expr)?;
if value < 0.0 {
return plan_err!("sample value must be non-negative, got {}", value);
}
Expand All @@ -453,40 +455,6 @@ impl RelationPlanner for TableSamplePlanner {
}
}

/// Parse a SQL expression as a numeric value (supports basic arithmetic).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@geoffreyclaude I wonder what your thoughts on this PR and the approach of parse_sql_literal?

fn parse_quantity<T>(expr: &ast::Expr) -> Result<T>
where
T: FromStr + Add<Output = T> + Sub<Output = T> + Mul<Output = T> + Div<Output = T>,
{
eval_numeric_expr(expr)
.ok_or_else(|| plan_datafusion_err!("invalid numeric expression: {:?}", expr))
}

/// Recursively evaluate numeric SQL expressions.
fn eval_numeric_expr<T>(expr: &ast::Expr) -> Option<T>
where
T: FromStr + Add<Output = T> + Sub<Output = T> + Mul<Output = T> + Div<Output = T>,
{
match expr {
ast::Expr::Value(v) => match &v.value {
ast::Value::Number(n, _) => n.to_string().parse().ok(),
_ => None,
},
ast::Expr::BinaryOp { left, op, right } => {
let l = eval_numeric_expr::<T>(left)?;
let r = eval_numeric_expr::<T>(right)?;
match op {
ast::BinaryOperator::Plus => Some(l + r),
ast::BinaryOperator::Minus => Some(l - r),
ast::BinaryOperator::Multiply => Some(l * r),
ast::BinaryOperator::Divide => Some(l / r),
_ => None,
}
}
_ => None,
}
}

/// Custom logical plan node representing a TABLESAMPLE operation.
///
/// Stores sampling parameters (bounds, seed) and wraps the input plan.
Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/src/simplify_expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod inlist_simplifier;
mod regex;
pub mod simplify_exprs;
mod simplify_predicates;
pub mod simplify_sql_literal;
mod unwrap_cast;
mod utils;

Expand Down
232 changes: 232 additions & 0 deletions datafusion/optimizer/src/simplify_expressions/simplify_sql_literal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Parses and simplifies a SQL expression to a literal of a given type.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here too

//!
//! This module provides functionality to parse and simplify static SQL expressions
//! used in SQL constructs like `FROM TABLE SAMPLE (10 + 50 * 2)`. If they are required
//! in a planning (not an execution) phase, they need to be reduced to literals of a given type.
use crate::simplify_expressions::ExprSimplifier;
use arrow::datatypes::ArrowPrimitiveType;
use datafusion_common::{
DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, plan_datafusion_err,
plan_err,
};
use datafusion_expr::Expr;
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::simplify::SimplifyContext;
use std::sync::Arc;

/// Parse and simplifies a SQL expression to a numeric literal,
/// corresponding to an arrow primitive type `T` (for example, Float64Type).
///
/// This function simplifies and coerces the expression, then extracts the underlying
/// native type using `TryFrom<ScalarValue>`.
///
/// # Example
/// ```ignore
/// let value: f64 = parse_sql_literal::<Float64Type>(expr)?;
/// ```
pub fn parse_sql_literal<T>(expr: &Expr) -> Result<T::Native>
Comment on lines +35 to +45
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We just need to fix the documentation and rename this to omit mention of SQL now, since it only operates at Expr level

where
T: ArrowPrimitiveType,
T::Native: TryFrom<ScalarValue, Error = DataFusionError>,
{
// Empty schema is sufficient because it parses only literal expressions
let schema = DFSchemaRef::new(DFSchema::empty());

log::debug!("Parsing expr {:?} to type {}", expr, T::DATA_TYPE);

let execution_props = ExecutionProps::new();
let simplifier = ExprSimplifier::new(
SimplifyContext::new(&execution_props).with_schema(Arc::clone(&schema)),
);

// Simplify and coerce expression in case of constant arithmetic operations (e.g., 10 + 5)
let simplified_expr: Expr = simplifier
.simplify(expr.clone())
.map_err(|err| plan_datafusion_err!("Cannot simplify {expr:?}: {err}"))?;
let coerced_expr: Expr = simplifier.coerce(simplified_expr, schema.as_ref())?;
log::debug!("Coerced expression: {:?}", &coerced_expr);

match coerced_expr {
Expr::Literal(scalar_value, _) => {
// It is a literal - proceed to the underlying value
// Cast to the target type if needed
let casted_scalar = scalar_value.cast_to(&T::DATA_TYPE)?;

// Extract the native type
T::Native::try_from(casted_scalar).map_err(|err| {
plan_datafusion_err!(
"Cannot extract {} from scalar value: {err}",
std::any::type_name::<T>()
)
})
}
actual => {
plan_err!(
"Cannot extract literal from coerced {actual:?} expression given {expr:?} expression"
)
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::datatypes::{DataType, Float64Type, Int64Type};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{TableReference, not_impl_err};
use datafusion_expr::planner::{ContextProvider, RelationPlannerContext};
use datafusion_expr::sqlparser::parser::Parser;
use datafusion_expr::{AggregateUDF, ScalarUDF, TableSource, WindowUDF};
use datafusion_sql::planner::{PlannerContext, SqlToRel};
use datafusion_sql::relation::SqlToRelRelationContext;
use datafusion_sql::sqlparser::dialect::GenericDialect;
use std::sync::Arc;

// Simple mock context provider for testing
struct MockContextProvider {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests can probably be simplified to remove the SQL parsing related code and only supply Expr to test cases

options: ConfigOptions,
}

impl ContextProvider for MockContextProvider {
fn get_table_source(&self, _: TableReference) -> Result<Arc<dyn TableSource>> {
not_impl_err!("mock")
}

fn get_function_meta(&self, _name: &str) -> Option<Arc<ScalarUDF>> {
None
}

fn get_aggregate_meta(&self, _name: &str) -> Option<Arc<AggregateUDF>> {
None
}

fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
None
}

fn get_window_meta(&self, _name: &str) -> Option<Arc<WindowUDF>> {
None
}

fn options(&self) -> &ConfigOptions {
&self.options
}

fn udf_names(&self) -> Vec<String> {
vec![]
}

fn udaf_names(&self) -> Vec<String> {
vec![]
}

fn udwf_names(&self) -> Vec<String> {
vec![]
}
}

#[test]
fn test_parse_sql_float_literal() {
let test_cases = vec![
("0.0", 0.0),
("1.0", 1.0),
("0", 0.0),
("1", 1.0),
("0.5", 0.5),
("100.0", 100.0),
("0.001", 0.001),
("999.999", 999.999),
("1.0 + 2.0", 3.0),
("10.0 * 0.5", 5.0),
("100.0 / 4.0", 25.0),
("(80.0 + 2.0*10.0) / 4.0", 25.0),
("50.0 - 10.0", 40.0),
("1e2", 100.0),
("1.5e1", 15.0),
("2.5e-1", 0.25),
];

let context = MockContextProvider {
options: ConfigOptions::default(),
};
let sql_to_rel = SqlToRel::new(&context);
let mut planner_context = PlannerContext::new();
let mut sql_context =
SqlToRelRelationContext::new(&sql_to_rel, &mut planner_context);
let dialect = GenericDialect {};
let schema = DFSchemaRef::new(DFSchema::empty());

for (sql_expr, expected) in test_cases {
let ast_expr = Parser::new(&dialect)
.try_with_sql(sql_expr)
.unwrap()
.parse_expr()
.unwrap();
let expr = sql_context
.sql_to_expr(ast_expr, &schema)
.expect("sql_to_expr");

let result: Result<f64> = parse_sql_literal::<Float64Type>(&expr);

match result {
Ok(value) => {
assert!(
(value - expected).abs() < 1e-10,
"For expression '{sql_expr}': expected {expected}, got {value}",
);
}
Err(e) => panic!("Failed to parse expression '{sql_expr}': {e}"),
}
}
}

#[test]
fn test_parse_sql_integer_literal() {
let context = MockContextProvider {
options: ConfigOptions::default(),
};
let sql_to_rel = SqlToRel::new(&context);
let mut planner_context = PlannerContext::new();
let mut sql_context =
SqlToRelRelationContext::new(&sql_to_rel, &mut planner_context);
let dialect = GenericDialect {};
let schema = DFSchemaRef::new(DFSchema::empty());

// Integer
let ast_expr = Parser::new(&dialect)
.try_with_sql("2 + 4")
.unwrap()
.parse_expr()
.unwrap();
let expr = sql_context
.sql_to_expr(ast_expr, &schema)
.expect("sql_to_expr");

let result: Result<i64> = parse_sql_literal::<Int64Type>(&expr);

match result {
Ok(value) => {
assert_eq!(6, value);
}
Err(e) => panic!("Failed to parse expression: {e}"),
}
}
}
2 changes: 1 addition & 1 deletion datafusion/sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ mod expr;
pub mod parser;
pub mod planner;
mod query;
mod relation;
pub mod relation;
pub mod resolve;
mod select;
mod set_expr;
Expand Down
20 changes: 14 additions & 6 deletions datafusion/sql/src/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,23 @@ use sqlparser::ast::{FunctionArg, FunctionArgExpr, Spanned, TableFactor};

mod join;

struct SqlToRelRelationContext<'a, 'b, S: ContextProvider> {
pub struct SqlToRelRelationContext<'a, 'b, S: ContextProvider> {
planner: &'a SqlToRel<'b, S>,
planner_context: &'a mut PlannerContext,
}

impl<'a, 'b, S: ContextProvider> SqlToRelRelationContext<'a, 'b, S> {
pub fn new(
planner: &'a SqlToRel<'b, S>,
planner_context: &'a mut PlannerContext,
) -> Self {
Self {
planner,
planner_context,
}
}
}

// Implement RelationPlannerContext
impl<'a, 'b, S: ContextProvider> RelationPlannerContext
for SqlToRelRelationContext<'a, 'b, S>
Expand Down Expand Up @@ -117,11 +129,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {

let mut current_relation = relation;
for planner in planners.iter() {
let mut context = SqlToRelRelationContext {
planner: self,
planner_context,
};

let mut context = SqlToRelRelationContext::new(self, planner_context);
match planner.plan_relation(current_relation, &mut context)? {
RelationPlanning::Planned(planned) => {
return Ok(RelationPlanning::Planned(planned));
Expand Down