diff --git a/.github/workflows/spark_sql_test.yml b/.github/workflows/spark_sql_test.yml index d143ef83a0..3d7aa2e2f9 100644 --- a/.github/workflows/spark_sql_test.yml +++ b/.github/workflows/spark_sql_test.yml @@ -59,6 +59,10 @@ jobs: - {name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"} - {name: "sql_hive-2", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.ExtendedHiveTest"} - {name: "sql_hive-3", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.SlowHiveTest"} + # Skip sql_hive-1 for Spark 4.0 due to https://github.com/apache/datafusion-comet/issues/2946 + exclude: + - spark-version: {short: '4.0', full: '4.0.1', java: 17} + module: {name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"} fail-fast: false name: spark-sql-${{ matrix.module.name }}/${{ matrix.os }}/spark-${{ matrix.spark-version.full }}/java-${{ matrix.spark-version.java }} runs-on: ${{ matrix.os }} diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index 13a9c752e3..e5c7f696e0 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -332,6 +332,7 @@ These settings can be used to determine which parts of the plan are accelerated | `spark.comet.expression.TruncTimestamp.enabled` | Enable Comet acceleration for `TruncTimestamp` | true | | `spark.comet.expression.UnaryMinus.enabled` | Enable Comet acceleration for `UnaryMinus` | true | | `spark.comet.expression.Unhex.enabled` | Enable Comet acceleration for `Unhex` | true | +| `spark.comet.expression.UnixTimestamp.enabled` | Enable Comet acceleration for `UnixTimestamp` | true | | `spark.comet.expression.UnscaledValue.enabled` | Enable Comet acceleration for `UnscaledValue` | true | | `spark.comet.expression.Upper.enabled` | Enable Comet acceleration for `Upper` | true | | `spark.comet.expression.WeekDay.enabled` | Enable Comet acceleration for `WeekDay` | true | diff --git a/docs/spark_expressions_support.md b/docs/spark_expressions_support.md index 67eb519ea5..fa6b3a43fb 100644 --- a/docs/spark_expressions_support.md +++ b/docs/spark_expressions_support.md @@ -217,7 +217,7 @@ - [ ] unix_micros - [ ] unix_millis - [ ] unix_seconds -- [ ] unix_timestamp +- [x] unix_timestamp - [ ] weekday - [ ] weekofyear - [ ] year diff --git a/native/core/src/execution/expressions/mod.rs b/native/core/src/execution/expressions/mod.rs index a06b41b2c0..563d62e91b 100644 --- a/native/core/src/execution/expressions/mod.rs +++ b/native/core/src/execution/expressions/mod.rs @@ -24,6 +24,7 @@ pub mod logical; pub mod nullcheck; pub mod strings; pub mod subquery; +pub mod temporal; pub use datafusion_comet_spark_expr::EvalMode; diff --git a/native/core/src/execution/expressions/temporal.rs b/native/core/src/execution/expressions/temporal.rs new file mode 100644 index 0000000000..ae57cd3b2b --- /dev/null +++ b/native/core/src/execution/expressions/temporal.rs @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Temporal expression builders + +use std::sync::Arc; + +use arrow::datatypes::{DataType, Field, SchemaRef}; +use datafusion::config::ConfigOptions; +use datafusion::logical_expr::ScalarUDF; +use datafusion::physical_expr::{PhysicalExpr, ScalarFunctionExpr}; +use datafusion_comet_proto::spark_expression::Expr; +use datafusion_comet_spark_expr::{ + SparkHour, SparkMinute, SparkSecond, SparkUnixTimestamp, TimestampTruncExpr, +}; + +use crate::execution::{ + expressions::extract_expr, + operators::ExecutionError, + planner::{expression_registry::ExpressionBuilder, PhysicalPlanner}, +}; + +pub struct HourBuilder; + +impl ExpressionBuilder for HourBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &PhysicalPlanner, + ) -> Result, ExecutionError> { + let expr = extract_expr!(spark_expr, Hour); + let child = planner.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; + let timezone = expr.timezone.clone(); + let args = vec![child]; + let comet_hour = Arc::new(ScalarUDF::new_from_impl(SparkHour::new(timezone))); + let field_ref = Arc::new(Field::new("hour", DataType::Int32, true)); + let expr: ScalarFunctionExpr = ScalarFunctionExpr::new( + "hour", + comet_hour, + args, + field_ref, + Arc::new(ConfigOptions::default()), + ); + + Ok(Arc::new(expr)) + } +} + +pub struct MinuteBuilder; + +impl ExpressionBuilder for MinuteBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &PhysicalPlanner, + ) -> Result, ExecutionError> { + let expr = extract_expr!(spark_expr, Minute); + let child = planner.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; + let timezone = expr.timezone.clone(); + let args = vec![child]; + let comet_minute = Arc::new(ScalarUDF::new_from_impl(SparkMinute::new(timezone))); + let field_ref = Arc::new(Field::new("minute", DataType::Int32, true)); + let expr: ScalarFunctionExpr = ScalarFunctionExpr::new( + "minute", + comet_minute, + args, + field_ref, + Arc::new(ConfigOptions::default()), + ); + + Ok(Arc::new(expr)) + } +} + +pub struct SecondBuilder; + +impl ExpressionBuilder for SecondBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &PhysicalPlanner, + ) -> Result, ExecutionError> { + let expr = extract_expr!(spark_expr, Second); + let child = planner.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; + let timezone = expr.timezone.clone(); + let args = vec![child]; + let comet_second = Arc::new(ScalarUDF::new_from_impl(SparkSecond::new(timezone))); + let field_ref = Arc::new(Field::new("second", DataType::Int32, true)); + let expr: ScalarFunctionExpr = ScalarFunctionExpr::new( + "second", + comet_second, + args, + field_ref, + Arc::new(ConfigOptions::default()), + ); + + Ok(Arc::new(expr)) + } +} + +pub struct UnixTimestampBuilder; + +impl ExpressionBuilder for UnixTimestampBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &PhysicalPlanner, + ) -> Result, ExecutionError> { + let expr = extract_expr!(spark_expr, UnixTimestamp); + let child = planner.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; + let timezone = expr.timezone.clone(); + let args = vec![child]; + let comet_unix_timestamp = + Arc::new(ScalarUDF::new_from_impl(SparkUnixTimestamp::new(timezone))); + let field_ref = Arc::new(Field::new("unix_timestamp", DataType::Int64, true)); + let expr: ScalarFunctionExpr = ScalarFunctionExpr::new( + "unix_timestamp", + comet_unix_timestamp, + args, + field_ref, + Arc::new(ConfigOptions::default()), + ); + + Ok(Arc::new(expr)) + } +} + +pub struct TruncTimestampBuilder; + +impl ExpressionBuilder for TruncTimestampBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &PhysicalPlanner, + ) -> Result, ExecutionError> { + let expr = extract_expr!(spark_expr, TruncTimestamp); + let child = planner.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; + let format = planner.create_expr(expr.format.as_ref().unwrap(), input_schema)?; + let timezone = expr.timezone.clone(); + + Ok(Arc::new(TimestampTruncExpr::new(child, format, timezone))) + } +} diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 56de19d670..2dadcbd911 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -70,7 +70,7 @@ use datafusion::{ }; use datafusion_comet_spark_expr::{ create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, BinaryOutputStyle, - BloomFilterAgg, BloomFilterMightContain, EvalMode, SparkHour, SparkMinute, SparkSecond, + BloomFilterAgg, BloomFilterMightContain, EvalMode, }; use iceberg::expr::Bind; @@ -125,8 +125,7 @@ use datafusion_comet_spark_expr::monotonically_increasing_id::MonotonicallyIncre use datafusion_comet_spark_expr::{ ArrayInsert, Avg, AvgDecimal, Cast, CheckOverflow, Correlation, Covariance, CreateNamedStruct, GetArrayStructFields, GetStructField, IfExpr, ListExtract, NormalizeNaNAndZero, RandExpr, - RandnExpr, SparkCastOptions, Stddev, SumDecimal, TimestampTruncExpr, ToJson, UnboundColumn, - Variance, + RandnExpr, SparkCastOptions, Stddev, SumDecimal, ToJson, UnboundColumn, Variance, }; use itertools::Itertools; use jni::objects::GlobalRef; @@ -374,65 +373,6 @@ impl PhysicalPlanner { SparkCastOptions::new(eval_mode, &expr.timezone, expr.allow_incompat), ))) } - ExprStruct::Hour(expr) => { - let child = - self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; - let timezone = expr.timezone.clone(); - let args = vec![child]; - let comet_hour = Arc::new(ScalarUDF::new_from_impl(SparkHour::new(timezone))); - let field_ref = Arc::new(Field::new("hour", DataType::Int32, true)); - let expr: ScalarFunctionExpr = ScalarFunctionExpr::new( - "hour", - comet_hour, - args, - field_ref, - Arc::new(ConfigOptions::default()), - ); - - Ok(Arc::new(expr)) - } - ExprStruct::Minute(expr) => { - let child = - self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; - let timezone = expr.timezone.clone(); - let args = vec![child]; - let comet_minute = Arc::new(ScalarUDF::new_from_impl(SparkMinute::new(timezone))); - let field_ref = Arc::new(Field::new("minute", DataType::Int32, true)); - let expr: ScalarFunctionExpr = ScalarFunctionExpr::new( - "minute", - comet_minute, - args, - field_ref, - Arc::new(ConfigOptions::default()), - ); - - Ok(Arc::new(expr)) - } - ExprStruct::Second(expr) => { - let child = - self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; - let timezone = expr.timezone.clone(); - let args = vec![child]; - let comet_second = Arc::new(ScalarUDF::new_from_impl(SparkSecond::new(timezone))); - let field_ref = Arc::new(Field::new("second", DataType::Int32, true)); - let expr: ScalarFunctionExpr = ScalarFunctionExpr::new( - "second", - comet_second, - args, - field_ref, - Arc::new(ConfigOptions::default()), - ); - - Ok(Arc::new(expr)) - } - ExprStruct::TruncTimestamp(expr) => { - let child = - self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; - let format = self.create_expr(expr.format.as_ref().unwrap(), input_schema)?; - let timezone = expr.timezone.clone(); - - Ok(Arc::new(TimestampTruncExpr::new(child, format, timezone))) - } ExprStruct::CheckOverflow(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?; let data_type = to_arrow_datatype(expr.datatype.as_ref().unwrap()); diff --git a/native/core/src/execution/planner/expression_registry.rs b/native/core/src/execution/planner/expression_registry.rs index e85fbe5104..34aa3de179 100644 --- a/native/core/src/execution/planner/expression_registry.rs +++ b/native/core/src/execution/planner/expression_registry.rs @@ -109,6 +109,7 @@ pub enum ExpressionType { Minute, Second, TruncTimestamp, + UnixTimestamp, } /// Registry for expression builders @@ -181,9 +182,8 @@ impl ExpressionRegistry { // Register string expressions self.register_string_expressions(); - // TODO: Register other expression categories in future phases - // self.register_temporal_expressions(); - // etc. + // Register temporal expressions + self.register_temporal_expressions(); } /// Register arithmetic expression builders @@ -286,6 +286,26 @@ impl ExpressionRegistry { .insert(ExpressionType::FromJson, Box::new(FromJsonBuilder)); } + /// Register temporal expression builders + fn register_temporal_expressions(&mut self) { + use crate::execution::expressions::temporal::*; + + self.builders + .insert(ExpressionType::Hour, Box::new(HourBuilder)); + self.builders + .insert(ExpressionType::Minute, Box::new(MinuteBuilder)); + self.builders + .insert(ExpressionType::Second, Box::new(SecondBuilder)); + self.builders.insert( + ExpressionType::UnixTimestamp, + Box::new(UnixTimestampBuilder), + ); + self.builders.insert( + ExpressionType::TruncTimestamp, + Box::new(TruncTimestampBuilder), + ); + } + /// Extract expression type from Spark protobuf expression fn get_expression_type(spark_expr: &Expr) -> Result { match spark_expr.expr_struct.as_ref() { @@ -355,6 +375,7 @@ impl ExpressionRegistry { Some(ExprStruct::Minute(_)) => Ok(ExpressionType::Minute), Some(ExprStruct::Second(_)) => Ok(ExpressionType::Second), Some(ExprStruct::TruncTimestamp(_)) => Ok(ExpressionType::TruncTimestamp), + Some(ExprStruct::UnixTimestamp(_)) => Ok(ExpressionType::UnixTimestamp), Some(other) => Err(ExecutionError::GeneralError(format!( "Unsupported expression type: {:?}", diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index 1c453b6336..94f8558a04 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -85,7 +85,8 @@ message Expr { Rand randn = 62; EmptyExpr spark_partition_id = 63; EmptyExpr monotonically_increasing_id = 64; - FromJson from_json = 89; + UnixTimestamp unix_timestamp = 65; + FromJson from_json = 66; } } @@ -304,6 +305,11 @@ message Second { string timezone = 2; } +message UnixTimestamp { + Expr child = 1; + string timezone = 2; +} + message CheckOverflow { Expr child = 1; DataType datatype = 2; diff --git a/native/spark-expr/src/datetime_funcs/mod.rs b/native/spark-expr/src/datetime_funcs/mod.rs index ef8041e5fe..c1e7e617b0 100644 --- a/native/spark-expr/src/datetime_funcs/mod.rs +++ b/native/spark-expr/src/datetime_funcs/mod.rs @@ -18,9 +18,11 @@ mod date_trunc; mod extract_date_part; mod timestamp_trunc; +mod unix_timestamp; pub use date_trunc::SparkDateTrunc; pub use extract_date_part::SparkHour; pub use extract_date_part::SparkMinute; pub use extract_date_part::SparkSecond; pub use timestamp_trunc::TimestampTruncExpr; +pub use unix_timestamp::SparkUnixTimestamp; diff --git a/native/spark-expr/src/datetime_funcs/unix_timestamp.rs b/native/spark-expr/src/datetime_funcs/unix_timestamp.rs new file mode 100644 index 0000000000..c4f1576293 --- /dev/null +++ b/native/spark-expr/src/datetime_funcs/unix_timestamp.rs @@ -0,0 +1,242 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::utils::array_with_timezone; +use arrow::array::{Array, AsArray, PrimitiveArray}; +use arrow::compute::cast; +use arrow::datatypes::{DataType, Int64Type, TimeUnit::Microsecond}; +use datafusion::common::{internal_datafusion_err, DataFusionError}; +use datafusion::logical_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use num::integer::div_floor; +use std::{any::Any, fmt::Debug, sync::Arc}; + +const MICROS_PER_SECOND: i64 = 1_000_000; + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkUnixTimestamp { + signature: Signature, + aliases: Vec, + timezone: String, +} + +impl SparkUnixTimestamp { + pub fn new(timezone: String) -> Self { + Self { + signature: Signature::user_defined(Volatility::Immutable), + aliases: vec![], + timezone, + } + } +} + +impl ScalarUDFImpl for SparkUnixTimestamp { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "unix_timestamp" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> datafusion::common::Result { + Ok(match &arg_types[0] { + DataType::Dictionary(_, _) => { + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int64)) + } + _ => DataType::Int64, + }) + } + + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion::common::Result { + let args: [ColumnarValue; 1] = args + .args + .try_into() + .map_err(|_| internal_datafusion_err!("unix_timestamp expects exactly one argument"))?; + + match args { + [ColumnarValue::Array(array)] => match array.data_type() { + DataType::Timestamp(_, _) => { + let is_utc = self.timezone == "UTC"; + let array = if is_utc + && matches!(array.data_type(), DataType::Timestamp(Microsecond, Some(tz)) if tz.as_ref() == "UTC") + { + array + } else { + array_with_timezone( + array, + self.timezone.clone(), + Some(&DataType::Timestamp(Microsecond, Some("UTC".into()))), + )? + }; + + let timestamp_array = + array.as_primitive::(); + + let result: PrimitiveArray = if timestamp_array.null_count() == 0 { + timestamp_array + .values() + .iter() + .map(|µs| micros / MICROS_PER_SECOND) + .collect() + } else { + timestamp_array + .iter() + .map(|v| v.map(|micros| div_floor(micros, MICROS_PER_SECOND))) + .collect() + }; + + Ok(ColumnarValue::Array(Arc::new(result))) + } + DataType::Date32 => { + let timestamp_array = cast(&array, &DataType::Timestamp(Microsecond, None))?; + + let is_utc = self.timezone == "UTC"; + let array = if is_utc { + timestamp_array + } else { + array_with_timezone( + timestamp_array, + self.timezone.clone(), + Some(&DataType::Timestamp(Microsecond, Some("UTC".into()))), + )? + }; + + let timestamp_array = + array.as_primitive::(); + + let result: PrimitiveArray = if timestamp_array.null_count() == 0 { + timestamp_array + .values() + .iter() + .map(|µs| micros / MICROS_PER_SECOND) + .collect() + } else { + timestamp_array + .iter() + .map(|v| v.map(|micros| div_floor(micros, MICROS_PER_SECOND))) + .collect() + }; + + Ok(ColumnarValue::Array(Arc::new(result))) + } + _ => Err(DataFusionError::Execution(format!( + "unix_timestamp does not support input type: {:?}", + array.data_type() + ))), + }, + _ => Err(DataFusionError::Execution( + "unix_timestamp(scalar) should be fold in Spark JVM side.".to_string(), + )), + } + } + + fn aliases(&self) -> &[String] { + &self.aliases + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{Array, Date32Array, TimestampMicrosecondArray}; + use arrow::datatypes::Field; + use datafusion::config::ConfigOptions; + use std::sync::Arc; + + #[test] + fn test_unix_timestamp_from_timestamp() { + // Test with known timestamp value + // 2020-01-01 00:00:00 UTC = 1577836800 seconds = 1577836800000000 microseconds + let input = TimestampMicrosecondArray::from(vec![Some(1577836800000000)]); + let udf = SparkUnixTimestamp::new("UTC".to_string()); + + let return_field = Arc::new(Field::new("unix_timestamp", DataType::Int64, true)); + let args = ScalarFunctionArgs { + args: vec![ColumnarValue::Array(Arc::new(input))], + number_rows: 1, + return_field, + config_options: Arc::new(ConfigOptions::default()), + arg_fields: vec![], + }; + + let result = udf.invoke_with_args(args).unwrap(); + if let ColumnarValue::Array(result_array) = result { + let int64_array = result_array.as_primitive::(); + assert_eq!(int64_array.value(0), 1577836800); + } else { + panic!("Expected array result"); + } + } + + #[test] + fn test_unix_timestamp_from_date() { + // Test with Date32 + // Date32(18262) = 2020-01-01 = 1577836800 seconds + let input = Date32Array::from(vec![Some(18262)]); + let udf = SparkUnixTimestamp::new("UTC".to_string()); + + let return_field = Arc::new(Field::new("unix_timestamp", DataType::Int64, true)); + let args = ScalarFunctionArgs { + args: vec![ColumnarValue::Array(Arc::new(input))], + number_rows: 1, + return_field, + config_options: Arc::new(ConfigOptions::default()), + arg_fields: vec![], + }; + + let result = udf.invoke_with_args(args).unwrap(); + if let ColumnarValue::Array(result_array) = result { + let int64_array = result_array.as_primitive::(); + assert_eq!(int64_array.value(0), 1577836800); + } else { + panic!("Expected array result"); + } + } + + #[test] + fn test_unix_timestamp_with_nulls() { + let input = TimestampMicrosecondArray::from(vec![Some(1577836800000000), None]); + let udf = SparkUnixTimestamp::new("UTC".to_string()); + + let return_field = Arc::new(Field::new("unix_timestamp", DataType::Int64, true)); + let args = ScalarFunctionArgs { + args: vec![ColumnarValue::Array(Arc::new(input))], + number_rows: 2, + return_field, + config_options: Arc::new(ConfigOptions::default()), + arg_fields: vec![], + }; + + let result = udf.invoke_with_args(args).unwrap(); + if let ColumnarValue::Array(result_array) = result { + let int64_array = result_array.as_primitive::(); + assert_eq!(int64_array.value(0), 1577836800); + assert!(int64_array.is_null(1)); + } else { + panic!("Expected array result"); + } + } +} diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index 96e727ae55..4612e249ee 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -68,7 +68,9 @@ pub use comet_scalar_funcs::{ create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, register_all_comet_functions, }; -pub use datetime_funcs::{SparkDateTrunc, SparkHour, SparkMinute, SparkSecond, TimestampTruncExpr}; +pub use datetime_funcs::{ + SparkDateTrunc, SparkHour, SparkMinute, SparkSecond, SparkUnixTimestamp, TimestampTruncExpr, +}; pub use error::{SparkError, SparkResult}; pub use hash_funcs::*; pub use json_funcs::{FromJson, ToJson}; diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 83917d33fc..7c3e2adee8 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -191,6 +191,7 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[Second] -> CometSecond, classOf[TruncDate] -> CometTruncDate, classOf[TruncTimestamp] -> CometTruncTimestamp, + classOf[UnixTimestamp] -> CometUnixTimestamp, classOf[Year] -> CometYear, classOf[Month] -> CometMonth, classOf[DayOfMonth] -> CometDayOfMonth, diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index ef2b0f793c..69b0f2f0a3 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -21,7 +21,7 @@ package org.apache.comet.serde import java.util.Locale -import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, Literal, Minute, Month, Quarter, Second, TruncDate, TruncTimestamp, WeekDay, WeekOfYear, Year} +import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, Literal, Minute, Month, Quarter, Second, TruncDate, TruncTimestamp, UnixTimestamp, WeekDay, WeekOfYear, Year} import org.apache.spark.sql.types.{DateType, IntegerType} import org.apache.spark.unsafe.types.UTF8String @@ -254,6 +254,32 @@ object CometSecond extends CometExpressionSerde[Second] { } } +object CometUnixTimestamp extends CometExpressionSerde[UnixTimestamp] { + override def convert( + expr: UnixTimestamp, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val childExpr = exprToProtoInternal(expr.children.head, inputs, binding) + + if (childExpr.isDefined) { + val builder = ExprOuterClass.UnixTimestamp.newBuilder() + builder.setChild(childExpr.get) + + val timeZone = expr.timeZoneId.getOrElse("UTC") + builder.setTimezone(timeZone) + + Some( + ExprOuterClass.Expr + .newBuilder() + .setUnixTimestamp(builder) + .build()) + } else { + withInfo(expr, expr.children.head) + None + } + } +} + object CometDateAdd extends CometScalarFunction[DateAdd]("date_add") object CometDateSub extends CometScalarFunction[DateSub]("date_sub") diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index 9a23c76d82..0dbbab5dc0 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -114,6 +114,27 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH } } + test("unix_timestamp - timestamp input") { + createTimestampTestData.createOrReplaceTempView("tbl") + for (timezone <- Seq("UTC", "America/Los_Angeles")) { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timezone) { + checkSparkAnswerAndOperator("SELECT c0, unix_timestamp(c0) from tbl order by c0") + } + } + } + + test("unix_timestamp - date input") { + val r = new Random(42) + val dateSchema = StructType(Seq(StructField("d", DataTypes.DateType, true))) + val dateDF = FuzzDataGenerator.generateDataFrame(r, spark, dateSchema, 100, DataGenOptions()) + dateDF.createOrReplaceTempView("date_tbl") + for (timezone <- Seq("UTC", "America/Los_Angeles")) { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timezone) { + checkSparkAnswerAndOperator("SELECT d, unix_timestamp(d) from date_tbl order by d") + } + } + } + private def createTimestampTestData = { val r = new Random(42) val schema = StructType( diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala index 0af1ecade5..f9559e5ea0 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala @@ -22,12 +22,13 @@ package org.apache.spark.sql.benchmark import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.{withDefaultTimeZone, LA} import org.apache.spark.sql.internal.SQLConf +// spotless:off /** * Benchmark to measure Comet execution performance. To run this benchmark: - * `SPARK_GENERATE_BENCHMARK_FILES=1 make - * benchmark-org.apache.spark.sql.benchmark.CometDatetimeExpressionBenchmark` Results will be - * written to "spark/benchmarks/CometDatetimeExpressionBenchmark-**results.txt". + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometDatetimeExpressionBenchmark` + * Results will be written to "spark/benchmarks/CometDatetimeExpressionBenchmark-**results.txt". */ +// spotless:on object CometDatetimeExpressionBenchmark extends CometBenchmarkBase { def dateTruncExprBenchmark(values: Int, useDictionary: Boolean): Unit = { @@ -76,9 +77,51 @@ object CometDatetimeExpressionBenchmark extends CometBenchmarkBase { } } + def unixTimestampBenchmark(values: Int, timeZone: String): Unit = { + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s"select timestamp_micros(cast(value/100000 as integer)) as ts FROM $tbl")) + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) { + runWithComet(s"Unix Timestamp from Timestamp ($timeZone)", values) { + spark.sql("select unix_timestamp(ts) from parquetV1Table").noop() + } + } + } + } + } + + def unixTimestampFromDateBenchmark(values: Int, timeZone: String): Unit = { + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql( + s"select cast(timestamp_micros(cast(value/100000 as integer)) as date) as dt FROM $tbl")) + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) { + runWithComet(s"Unix Timestamp from Date ($timeZone)", values) { + spark.sql("select unix_timestamp(dt) from parquetV1Table").noop() + } + } + } + } + } + override def runCometBenchmark(mainArgs: Array[String]): Unit = { val values = 1024 * 1024; + for (timeZone <- Seq("UTC", "America/Los_Angeles")) { + withSQLConf("spark.sql.parquet.datetimeRebaseModeInWrite" -> "CORRECTED") { + runBenchmarkWithTable(s"UnixTimestamp(timestamp) - $timeZone", values) { v => + unixTimestampBenchmark(v, timeZone) + } + runBenchmarkWithTable(s"UnixTimestamp(date) - $timeZone", values) { v => + unixTimestampFromDateBenchmark(v, timeZone) + } + } + } + withDefaultTimeZone(LA) { withSQLConf( SQLConf.SESSION_LOCAL_TIMEZONE.key -> LA.getId,