From c5440f236e37dbaae5008645be93760e9403a90e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 18 Dec 2025 11:45:07 -0700 Subject: [PATCH 01/10] intitial impl --- docs/source/user-guide/latest/configs.md | 1 + docs/spark_expressions_support.md | 2 +- native/core/src/execution/planner.rs | 19 ++ .../execution/planner/expression_registry.rs | 2 + native/proto/src/proto/expr.proto | 6 + native/spark-expr/src/datetime_funcs/mod.rs | 2 + .../src/datetime_funcs/unix_timestamp.rs | 209 ++++++++++++++++++ native/spark-expr/src/lib.rs | 4 +- .../apache/comet/serde/QueryPlanSerde.scala | 1 + .../org/apache/comet/serde/datetime.scala | 28 ++- .../comet/CometTemporalExpressionSuite.scala | 21 ++ .../CometDatetimeExpressionBenchmark.scala | 44 ++++ 12 files changed, 336 insertions(+), 3 deletions(-) create mode 100644 native/spark-expr/src/datetime_funcs/unix_timestamp.rs diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index db7d2ce32b..67d4de3682 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -331,6 +331,7 @@ These settings can be used to determine which parts of the plan are accelerated | `spark.comet.expression.TruncTimestamp.enabled` | Enable Comet acceleration for `TruncTimestamp` | true | | `spark.comet.expression.UnaryMinus.enabled` | Enable Comet acceleration for `UnaryMinus` | true | | `spark.comet.expression.Unhex.enabled` | Enable Comet acceleration for `Unhex` | true | +| `spark.comet.expression.UnixTimestamp.enabled` | Enable Comet acceleration for `UnixTimestamp` | true | | `spark.comet.expression.UnscaledValue.enabled` | Enable Comet acceleration for `UnscaledValue` | true | | `spark.comet.expression.Upper.enabled` | Enable Comet acceleration for `Upper` | true | | `spark.comet.expression.WeekDay.enabled` | Enable Comet acceleration for `WeekDay` | true | diff --git a/docs/spark_expressions_support.md b/docs/spark_expressions_support.md index 67eb519ea5..fa6b3a43fb 100644 --- a/docs/spark_expressions_support.md +++ b/docs/spark_expressions_support.md @@ -217,7 +217,7 @@ - [ ] unix_micros - [ ] unix_millis - [ ] unix_seconds -- [ ] unix_timestamp +- [x] unix_timestamp - [ ] weekday - [ ] weekofyear - [ ] year diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 56de19d670..3e86c12283 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -71,6 +71,7 @@ use datafusion::{ use datafusion_comet_spark_expr::{ create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, BinaryOutputStyle, BloomFilterAgg, BloomFilterMightContain, EvalMode, SparkHour, SparkMinute, SparkSecond, + SparkUnixTimestamp, }; use iceberg::expr::Bind; @@ -425,6 +426,24 @@ impl PhysicalPlanner { Ok(Arc::new(expr)) } + ExprStruct::UnixTimestamp(expr) => { + let child = + self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; + let timezone = expr.timezone.clone(); + let args = vec![child]; + let comet_unix_timestamp = + Arc::new(ScalarUDF::new_from_impl(SparkUnixTimestamp::new(timezone))); + let field_ref = Arc::new(Field::new("unix_timestamp", DataType::Int64, true)); + let expr: ScalarFunctionExpr = ScalarFunctionExpr::new( + "unix_timestamp", + comet_unix_timestamp, + args, + field_ref, + Arc::new(ConfigOptions::default()), + ); + + Ok(Arc::new(expr)) + } ExprStruct::TruncTimestamp(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; diff --git a/native/core/src/execution/planner/expression_registry.rs b/native/core/src/execution/planner/expression_registry.rs index 3321f61182..49b70809bc 100644 --- a/native/core/src/execution/planner/expression_registry.rs +++ b/native/core/src/execution/planner/expression_registry.rs @@ -108,6 +108,7 @@ pub enum ExpressionType { Minute, Second, TruncTimestamp, + UnixTimestamp, } /// Registry for expression builders @@ -351,6 +352,7 @@ impl ExpressionRegistry { Some(ExprStruct::Minute(_)) => Ok(ExpressionType::Minute), Some(ExprStruct::Second(_)) => Ok(ExpressionType::Second), Some(ExprStruct::TruncTimestamp(_)) => Ok(ExpressionType::TruncTimestamp), + Some(ExprStruct::UnixTimestamp(_)) => Ok(ExpressionType::UnixTimestamp), Some(other) => Err(ExecutionError::GeneralError(format!( "Unsupported expression type: {:?}", diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index a7736f561a..7db6f69e83 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -85,6 +85,7 @@ message Expr { Rand randn = 62; EmptyExpr spark_partition_id = 63; EmptyExpr monotonically_increasing_id = 64; + UnixTimestamp unix_timestamp = 65; } } @@ -297,6 +298,11 @@ message Second { string timezone = 2; } +message UnixTimestamp { + Expr child = 1; + string timezone = 2; +} + message CheckOverflow { Expr child = 1; DataType datatype = 2; diff --git a/native/spark-expr/src/datetime_funcs/mod.rs b/native/spark-expr/src/datetime_funcs/mod.rs index ef8041e5fe..c1e7e617b0 100644 --- a/native/spark-expr/src/datetime_funcs/mod.rs +++ b/native/spark-expr/src/datetime_funcs/mod.rs @@ -18,9 +18,11 @@ mod date_trunc; mod extract_date_part; mod timestamp_trunc; +mod unix_timestamp; pub use date_trunc::SparkDateTrunc; pub use extract_date_part::SparkHour; pub use extract_date_part::SparkMinute; pub use extract_date_part::SparkSecond; pub use timestamp_trunc::TimestampTruncExpr; +pub use unix_timestamp::SparkUnixTimestamp; diff --git a/native/spark-expr/src/datetime_funcs/unix_timestamp.rs b/native/spark-expr/src/datetime_funcs/unix_timestamp.rs new file mode 100644 index 0000000000..20901a318c --- /dev/null +++ b/native/spark-expr/src/datetime_funcs/unix_timestamp.rs @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::utils::array_with_timezone; +use arrow::array::AsArray; +use arrow::datatypes::{DataType, Field, TimeUnit::Microsecond}; +use datafusion::common::{internal_datafusion_err, DataFusionError}; +use datafusion::config::ConfigOptions; +use datafusion::logical_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use num::integer::div_floor; +use std::{any::Any, fmt::Debug, sync::Arc}; + +const MICROS_PER_SECOND: i64 = 1_000_000; +const SECONDS_PER_DAY: i64 = 86400; + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkUnixTimestamp { + signature: Signature, + aliases: Vec, + timezone: String, +} + +impl SparkUnixTimestamp { + pub fn new(timezone: String) -> Self { + Self { + signature: Signature::user_defined(Volatility::Immutable), + aliases: vec![], + timezone, + } + } +} + +impl ScalarUDFImpl for SparkUnixTimestamp { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "unix_timestamp" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> datafusion::common::Result { + Ok(match &arg_types[0] { + DataType::Dictionary(_, _) => { + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int64)) + } + _ => DataType::Int64, + }) + } + + fn invoke_with_args( + &self, + args: ScalarFunctionArgs, + ) -> datafusion::common::Result { + let args: [ColumnarValue; 1] = args.args.try_into().map_err(|_| { + internal_datafusion_err!("unix_timestamp expects exactly one argument") + })?; + + match args { + [ColumnarValue::Array(array)] => { + match array.data_type() { + DataType::Timestamp(_, _) => { + // Convert timestamp to have timezone information + let array = array_with_timezone( + array, + self.timezone.clone(), + Some(&DataType::Timestamp( + Microsecond, + Some(self.timezone.clone().into()), + )), + )?; + + // Extract timestamp values and convert microseconds to seconds + let timestamp_array = + array.as_primitive::(); + let result: arrow::array::Int64Array = timestamp_array + .iter() + .map(|v| v.map(|micros| div_floor(micros, MICROS_PER_SECOND))) + .collect(); + Ok(ColumnarValue::Array(Arc::new(result))) + } + DataType::Date32 => { + // Date32 is stored as days since Unix epoch + // Convert to seconds by multiplying by seconds per day + let date_array = array.as_primitive::(); + let result: arrow::array::Int64Array = date_array + .iter() + .map(|v| v.map(|days| (days as i64) * SECONDS_PER_DAY)) + .collect(); + Ok(ColumnarValue::Array(Arc::new(result))) + } + _ => Err(DataFusionError::Execution(format!( + "unix_timestamp does not support input type: {:?}", + array.data_type() + ))), + } + } + _ => Err(DataFusionError::Execution( + "unix_timestamp(scalar) should be fold in Spark JVM side.".to_string(), + )), + } + } + + fn aliases(&self) -> &[String] { + &self.aliases + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{Array, Date32Array, TimestampMicrosecondArray}; + use std::sync::Arc; + + #[test] + fn test_unix_timestamp_from_timestamp() { + // Test with known timestamp value + // 2020-01-01 00:00:00 UTC = 1577836800 seconds = 1577836800000000 microseconds + let input = TimestampMicrosecondArray::from(vec![Some(1577836800000000)]); + let udf = SparkUnixTimestamp::new("UTC".to_string()); + + let return_field = Arc::new(Field::new("unix_timestamp", DataType::Int64, true)); + let args = ScalarFunctionArgs { + args: vec![ColumnarValue::Array(Arc::new(input))], + number_rows: 1, + return_field, + config_options: Arc::new(ConfigOptions::default()), + arg_fields: vec![], + }; + + let result = udf.invoke_with_args(args).unwrap(); + if let ColumnarValue::Array(result_array) = result { + let int64_array = result_array.as_primitive::(); + assert_eq!(int64_array.value(0), 1577836800); + } else { + panic!("Expected array result"); + } + } + + #[test] + fn test_unix_timestamp_from_date() { + // Test with Date32 + // Date32(18262) = 2020-01-01 = 1577836800 seconds + let input = Date32Array::from(vec![Some(18262)]); + let udf = SparkUnixTimestamp::new("UTC".to_string()); + + let return_field = Arc::new(Field::new("unix_timestamp", DataType::Int64, true)); + let args = ScalarFunctionArgs { + args: vec![ColumnarValue::Array(Arc::new(input))], + number_rows: 1, + return_field, + config_options: Arc::new(ConfigOptions::default()), + arg_fields: vec![], + }; + + let result = udf.invoke_with_args(args).unwrap(); + if let ColumnarValue::Array(result_array) = result { + let int64_array = result_array.as_primitive::(); + assert_eq!(int64_array.value(0), 1577836800); + } else { + panic!("Expected array result"); + } + } + + #[test] + fn test_unix_timestamp_with_nulls() { + // Test null handling + let input = TimestampMicrosecondArray::from(vec![Some(1577836800000000), None]); + let udf = SparkUnixTimestamp::new("UTC".to_string()); + + let return_field = Arc::new(Field::new("unix_timestamp", DataType::Int64, true)); + let args = ScalarFunctionArgs { + args: vec![ColumnarValue::Array(Arc::new(input))], + number_rows: 2, + return_field, + config_options: Arc::new(ConfigOptions::default()), + arg_fields: vec![], + }; + + let result = udf.invoke_with_args(args).unwrap(); + if let ColumnarValue::Array(result_array) = result { + let int64_array = result_array.as_primitive::(); + assert_eq!(int64_array.value(0), 1577836800); + assert!(int64_array.is_null(1)); + } else { + panic!("Expected array result"); + } + } +} diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index 2903061d60..6204be6780 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -68,7 +68,9 @@ pub use comet_scalar_funcs::{ create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, register_all_comet_functions, }; -pub use datetime_funcs::{SparkDateTrunc, SparkHour, SparkMinute, SparkSecond, TimestampTruncExpr}; +pub use datetime_funcs::{ + SparkDateTrunc, SparkHour, SparkMinute, SparkSecond, SparkUnixTimestamp, TimestampTruncExpr, +}; pub use error::{SparkError, SparkResult}; pub use hash_funcs::*; pub use json_funcs::ToJson; diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 54df2f1688..e3e8ed7962 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -190,6 +190,7 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[Second] -> CometSecond, classOf[TruncDate] -> CometTruncDate, classOf[TruncTimestamp] -> CometTruncTimestamp, + classOf[UnixTimestamp] -> CometUnixTimestamp, classOf[Year] -> CometYear, classOf[Month] -> CometMonth, classOf[DayOfMonth] -> CometDayOfMonth, diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index ef2b0f793c..69b0f2f0a3 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -21,7 +21,7 @@ package org.apache.comet.serde import java.util.Locale -import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, Literal, Minute, Month, Quarter, Second, TruncDate, TruncTimestamp, WeekDay, WeekOfYear, Year} +import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, Literal, Minute, Month, Quarter, Second, TruncDate, TruncTimestamp, UnixTimestamp, WeekDay, WeekOfYear, Year} import org.apache.spark.sql.types.{DateType, IntegerType} import org.apache.spark.unsafe.types.UTF8String @@ -254,6 +254,32 @@ object CometSecond extends CometExpressionSerde[Second] { } } +object CometUnixTimestamp extends CometExpressionSerde[UnixTimestamp] { + override def convert( + expr: UnixTimestamp, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { + val childExpr = exprToProtoInternal(expr.children.head, inputs, binding) + + if (childExpr.isDefined) { + val builder = ExprOuterClass.UnixTimestamp.newBuilder() + builder.setChild(childExpr.get) + + val timeZone = expr.timeZoneId.getOrElse("UTC") + builder.setTimezone(timeZone) + + Some( + ExprOuterClass.Expr + .newBuilder() + .setUnixTimestamp(builder) + .build()) + } else { + withInfo(expr, expr.children.head) + None + } + } +} + object CometDateAdd extends CometScalarFunction[DateAdd]("date_add") object CometDateSub extends CometScalarFunction[DateSub]("date_sub") diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index 9a23c76d82..fccfa3b43d 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -114,6 +114,27 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH } } + test("unix_timestamp") { + createTimestampTestData.createOrReplaceTempView("tbl") + + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") { + checkSparkAnswerAndOperator("SELECT c0, unix_timestamp(c0) from tbl order by c0") + } + + // Test with Date type + val r = new Random(42) + val dateSchema = StructType(Seq(StructField("d", DataTypes.DateType, true))) + val dateDF = FuzzDataGenerator.generateDataFrame(r, spark, dateSchema, 100, DataGenOptions()) + dateDF.createOrReplaceTempView("date_tbl") + checkSparkAnswerAndOperator("SELECT d, unix_timestamp(d) from date_tbl order by d") + + // Test with different timezones + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Los_Angeles") { + createTimestampTestData.createOrReplaceTempView("tbl2") + checkSparkAnswerAndOperator("SELECT c0, unix_timestamp(c0) from tbl2 order by c0") + } + } + private def createTimestampTestData = { val r = new Random(42) val schema = StructType( diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala index 0af1ecade5..5dbbfeb0c6 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala @@ -76,6 +76,35 @@ object CometDatetimeExpressionBenchmark extends CometBenchmarkBase { } } + def unixTimestampBenchmark(values: Int, useDictionary: Boolean): Unit = { + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s"select timestamp_micros(cast(value/100000 as integer)) as ts FROM $tbl")) + val isDictionary = if (useDictionary) "(Dictionary)" else "" + runWithComet(s"Unix Timestamp from Timestamp $isDictionary", values) { + spark.sql(s"select unix_timestamp(ts) from parquetV1Table").noop() + } + } + } + } + + def unixTimestampFromDateBenchmark(values: Int, useDictionary: Boolean): Unit = { + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql( + s"select cast(timestamp_micros(cast(value/100000 as integer)) as date) as dt FROM $tbl")) + val isDictionary = if (useDictionary) "(Dictionary)" else "" + runWithComet(s"Unix Timestamp from Date $isDictionary", values) { + spark.sql(s"select unix_timestamp(dt) from parquetV1Table").noop() + } + } + } + } + override def runCometBenchmark(mainArgs: Array[String]): Unit = { val values = 1024 * 1024; @@ -96,6 +125,21 @@ object CometDatetimeExpressionBenchmark extends CometBenchmarkBase { runBenchmarkWithTable("TimestampTrunc (Dictionary)", values, useDictionary = true) { v => timestampTruncExprBenchmark(v, useDictionary = true) } + runBenchmarkWithTable("UnixTimestamp", values) { v => + unixTimestampBenchmark(v, useDictionary = false) + } + runBenchmarkWithTable("UnixTimestamp (Dictionary)", values, useDictionary = true) { v => + unixTimestampBenchmark(v, useDictionary = true) + } + runBenchmarkWithTable("UnixTimestamp from Date", values) { v => + unixTimestampFromDateBenchmark(v, useDictionary = false) + } + runBenchmarkWithTable( + "UnixTimestamp from Date (Dictionary)", + values, + useDictionary = true) { v => + unixTimestampFromDateBenchmark(v, useDictionary = true) + } } } } From 4838df1026b948bce03ef359c2b64ac966d1aa9a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 18 Dec 2025 11:46:55 -0700 Subject: [PATCH 02/10] Clean up comments --- native/spark-expr/src/datetime_funcs/unix_timestamp.rs | 5 ----- .../org/apache/comet/CometTemporalExpressionSuite.scala | 2 -- 2 files changed, 7 deletions(-) diff --git a/native/spark-expr/src/datetime_funcs/unix_timestamp.rs b/native/spark-expr/src/datetime_funcs/unix_timestamp.rs index 20901a318c..f65e04f3b3 100644 --- a/native/spark-expr/src/datetime_funcs/unix_timestamp.rs +++ b/native/spark-expr/src/datetime_funcs/unix_timestamp.rs @@ -80,7 +80,6 @@ impl ScalarUDFImpl for SparkUnixTimestamp { [ColumnarValue::Array(array)] => { match array.data_type() { DataType::Timestamp(_, _) => { - // Convert timestamp to have timezone information let array = array_with_timezone( array, self.timezone.clone(), @@ -90,7 +89,6 @@ impl ScalarUDFImpl for SparkUnixTimestamp { )), )?; - // Extract timestamp values and convert microseconds to seconds let timestamp_array = array.as_primitive::(); let result: arrow::array::Int64Array = timestamp_array @@ -100,8 +98,6 @@ impl ScalarUDFImpl for SparkUnixTimestamp { Ok(ColumnarValue::Array(Arc::new(result))) } DataType::Date32 => { - // Date32 is stored as days since Unix epoch - // Convert to seconds by multiplying by seconds per day let date_array = array.as_primitive::(); let result: arrow::array::Int64Array = date_array .iter() @@ -184,7 +180,6 @@ mod tests { #[test] fn test_unix_timestamp_with_nulls() { - // Test null handling let input = TimestampMicrosecondArray::from(vec![Some(1577836800000000), None]); let udf = SparkUnixTimestamp::new("UTC".to_string()); diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index fccfa3b43d..64cf7e81b2 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -121,14 +121,12 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH checkSparkAnswerAndOperator("SELECT c0, unix_timestamp(c0) from tbl order by c0") } - // Test with Date type val r = new Random(42) val dateSchema = StructType(Seq(StructField("d", DataTypes.DateType, true))) val dateDF = FuzzDataGenerator.generateDataFrame(r, spark, dateSchema, 100, DataGenOptions()) dateDF.createOrReplaceTempView("date_tbl") checkSparkAnswerAndOperator("SELECT d, unix_timestamp(d) from date_tbl order by d") - // Test with different timezones withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Los_Angeles") { createTimestampTestData.createOrReplaceTempView("tbl2") checkSparkAnswerAndOperator("SELECT c0, unix_timestamp(c0) from tbl2 order by c0") From 88e960579c47a8d56d9fade1e83032127df2fe5b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 18 Dec 2025 11:53:14 -0700 Subject: [PATCH 03/10] split tests --- .../org/apache/comet/CometTemporalExpressionSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index 64cf7e81b2..f4d1e880da 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -114,13 +114,14 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH } } - test("unix_timestamp") { + test("unix_timestamp - UTC") { createTimestampTestData.createOrReplaceTempView("tbl") - withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") { checkSparkAnswerAndOperator("SELECT c0, unix_timestamp(c0) from tbl order by c0") } + } + test("unix_timestamp - non-UTC") { val r = new Random(42) val dateSchema = StructType(Seq(StructField("d", DataTypes.DateType, true))) val dateDF = FuzzDataGenerator.generateDataFrame(r, spark, dateSchema, 100, DataGenOptions()) From a65496426271b06759fb95f77596672702d1bca2 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 18 Dec 2025 12:05:18 -0700 Subject: [PATCH 04/10] fix --- .../src/datetime_funcs/unix_timestamp.rs | 2 +- .../comet/CometTemporalExpressionSuite.scala | 20 ++++++++++--------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/native/spark-expr/src/datetime_funcs/unix_timestamp.rs b/native/spark-expr/src/datetime_funcs/unix_timestamp.rs index f65e04f3b3..4a532fa0cb 100644 --- a/native/spark-expr/src/datetime_funcs/unix_timestamp.rs +++ b/native/spark-expr/src/datetime_funcs/unix_timestamp.rs @@ -85,7 +85,7 @@ impl ScalarUDFImpl for SparkUnixTimestamp { self.timezone.clone(), Some(&DataType::Timestamp( Microsecond, - Some(self.timezone.clone().into()), + Some("UTC".into()), )), )?; diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index f4d1e880da..7e179b29e9 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -114,26 +114,28 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH } } - test("unix_timestamp - UTC") { + test("unix_timestamp - timestamp input") { createTimestampTestData.createOrReplaceTempView("tbl") - withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") { - checkSparkAnswerAndOperator("SELECT c0, unix_timestamp(c0) from tbl order by c0") + for (timezone <- Seq("UTC", "America/Los_Angeles")) { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timezone) { + checkSparkAnswerAndOperator("SELECT c0, unix_timestamp(c0) from tbl order by c0") + } } } - test("unix_timestamp - non-UTC") { + test("unix_timestamp - date input") { val r = new Random(42) val dateSchema = StructType(Seq(StructField("d", DataTypes.DateType, true))) val dateDF = FuzzDataGenerator.generateDataFrame(r, spark, dateSchema, 100, DataGenOptions()) dateDF.createOrReplaceTempView("date_tbl") - checkSparkAnswerAndOperator("SELECT d, unix_timestamp(d) from date_tbl order by d") - - withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Los_Angeles") { - createTimestampTestData.createOrReplaceTempView("tbl2") - checkSparkAnswerAndOperator("SELECT c0, unix_timestamp(c0) from tbl2 order by c0") + for (timezone <- Seq("UTC", "America/Los_Angeles")) { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timezone) { + checkSparkAnswerAndOperator("SELECT d, unix_timestamp(d) from date_tbl order by d") + } } } + private def createTimestampTestData = { val r = new Random(42) val schema = StructType( From 79807d56d023c156ea91f7926f446fa991a55311 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 18 Dec 2025 12:14:18 -0700 Subject: [PATCH 05/10] fix --- .../src/datetime_funcs/unix_timestamp.rs | 79 ++++++++++--------- .../comet/CometTemporalExpressionSuite.scala | 1 - .../CometDatetimeExpressionBenchmark.scala | 4 +- 3 files changed, 44 insertions(+), 40 deletions(-) diff --git a/native/spark-expr/src/datetime_funcs/unix_timestamp.rs b/native/spark-expr/src/datetime_funcs/unix_timestamp.rs index 4a532fa0cb..742a6f3fd8 100644 --- a/native/spark-expr/src/datetime_funcs/unix_timestamp.rs +++ b/native/spark-expr/src/datetime_funcs/unix_timestamp.rs @@ -17,6 +17,7 @@ use crate::utils::array_with_timezone; use arrow::array::AsArray; +use arrow::compute::cast; use arrow::datatypes::{DataType, Field, TimeUnit::Microsecond}; use datafusion::common::{internal_datafusion_err, DataFusionError}; use datafusion::config::ConfigOptions; @@ -27,7 +28,6 @@ use num::integer::div_floor; use std::{any::Any, fmt::Debug, sync::Arc}; const MICROS_PER_SECOND: i64 = 1_000_000; -const SECONDS_PER_DAY: i64 = 86400; #[derive(Debug, PartialEq, Eq, Hash)] pub struct SparkUnixTimestamp { @@ -72,45 +72,50 @@ impl ScalarUDFImpl for SparkUnixTimestamp { &self, args: ScalarFunctionArgs, ) -> datafusion::common::Result { - let args: [ColumnarValue; 1] = args.args.try_into().map_err(|_| { - internal_datafusion_err!("unix_timestamp expects exactly one argument") - })?; + let args: [ColumnarValue; 1] = args + .args + .try_into() + .map_err(|_| internal_datafusion_err!("unix_timestamp expects exactly one argument"))?; match args { - [ColumnarValue::Array(array)] => { - match array.data_type() { - DataType::Timestamp(_, _) => { - let array = array_with_timezone( - array, - self.timezone.clone(), - Some(&DataType::Timestamp( - Microsecond, - Some("UTC".into()), - )), - )?; - - let timestamp_array = - array.as_primitive::(); - let result: arrow::array::Int64Array = timestamp_array - .iter() - .map(|v| v.map(|micros| div_floor(micros, MICROS_PER_SECOND))) - .collect(); - Ok(ColumnarValue::Array(Arc::new(result))) - } - DataType::Date32 => { - let date_array = array.as_primitive::(); - let result: arrow::array::Int64Array = date_array - .iter() - .map(|v| v.map(|days| (days as i64) * SECONDS_PER_DAY)) - .collect(); - Ok(ColumnarValue::Array(Arc::new(result))) - } - _ => Err(DataFusionError::Execution(format!( - "unix_timestamp does not support input type: {:?}", - array.data_type() - ))), + [ColumnarValue::Array(array)] => match array.data_type() { + DataType::Timestamp(_, _) => { + let array = array_with_timezone( + array, + self.timezone.clone(), + Some(&DataType::Timestamp(Microsecond, Some("UTC".into()))), + )?; + + let timestamp_array = + array.as_primitive::(); + let result: arrow::array::Int64Array = timestamp_array + .iter() + .map(|v| v.map(|micros| div_floor(micros, MICROS_PER_SECOND))) + .collect(); + Ok(ColumnarValue::Array(Arc::new(result))) } - } + DataType::Date32 => { + let timestamp_array = cast(&array, &DataType::Timestamp(Microsecond, None))?; + + let array = array_with_timezone( + timestamp_array, + self.timezone.clone(), + Some(&DataType::Timestamp(Microsecond, Some("UTC".into()))), + )?; + + let timestamp_array = + array.as_primitive::(); + let result: arrow::array::Int64Array = timestamp_array + .iter() + .map(|v| v.map(|micros| div_floor(micros, MICROS_PER_SECOND))) + .collect(); + Ok(ColumnarValue::Array(Arc::new(result))) + } + _ => Err(DataFusionError::Execution(format!( + "unix_timestamp does not support input type: {:?}", + array.data_type() + ))), + }, _ => Err(DataFusionError::Execution( "unix_timestamp(scalar) should be fold in Spark JVM side.".to_string(), )), diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index 7e179b29e9..0dbbab5dc0 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -135,7 +135,6 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH } } - private def createTimestampTestData = { val r = new Random(42) val schema = StructType( diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala index 5dbbfeb0c6..0dfabe947c 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala @@ -84,7 +84,7 @@ object CometDatetimeExpressionBenchmark extends CometBenchmarkBase { spark.sql(s"select timestamp_micros(cast(value/100000 as integer)) as ts FROM $tbl")) val isDictionary = if (useDictionary) "(Dictionary)" else "" runWithComet(s"Unix Timestamp from Timestamp $isDictionary", values) { - spark.sql(s"select unix_timestamp(ts) from parquetV1Table").noop() + spark.sql("select unix_timestamp(ts) from parquetV1Table").noop() } } } @@ -99,7 +99,7 @@ object CometDatetimeExpressionBenchmark extends CometBenchmarkBase { s"select cast(timestamp_micros(cast(value/100000 as integer)) as date) as dt FROM $tbl")) val isDictionary = if (useDictionary) "(Dictionary)" else "" runWithComet(s"Unix Timestamp from Date $isDictionary", values) { - spark.sql(s"select unix_timestamp(dt) from parquetV1Table").noop() + spark.sql("select unix_timestamp(dt) from parquetV1Table").noop() } } } From d0ef8bfac4f3cbf658774a246ec8f5dd72ee94a8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 18 Dec 2025 12:19:57 -0700 Subject: [PATCH 06/10] save --- .../src/datetime_funcs/unix_timestamp.rs | 3 +- .../CometDatetimeExpressionBenchmark.scala | 34 +++++++------------ 2 files changed, 13 insertions(+), 24 deletions(-) diff --git a/native/spark-expr/src/datetime_funcs/unix_timestamp.rs b/native/spark-expr/src/datetime_funcs/unix_timestamp.rs index 742a6f3fd8..e345ef4622 100644 --- a/native/spark-expr/src/datetime_funcs/unix_timestamp.rs +++ b/native/spark-expr/src/datetime_funcs/unix_timestamp.rs @@ -18,9 +18,8 @@ use crate::utils::array_with_timezone; use arrow::array::AsArray; use arrow::compute::cast; -use arrow::datatypes::{DataType, Field, TimeUnit::Microsecond}; +use arrow::datatypes::{DataType, TimeUnit::Microsecond}; use datafusion::common::{internal_datafusion_err, DataFusionError}; -use datafusion::config::ConfigOptions; use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala index 0dfabe947c..07bdb90dfc 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala @@ -22,12 +22,13 @@ package org.apache.spark.sql.benchmark import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.{withDefaultTimeZone, LA} import org.apache.spark.sql.internal.SQLConf +// spotless:off /** * Benchmark to measure Comet execution performance. To run this benchmark: - * `SPARK_GENERATE_BENCHMARK_FILES=1 make - * benchmark-org.apache.spark.sql.benchmark.CometDatetimeExpressionBenchmark` Results will be - * written to "spark/benchmarks/CometDatetimeExpressionBenchmark-**results.txt". + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometDatetimeExpressionBenchmark` + * Results will be written to "spark/benchmarks/CometDatetimeExpressionBenchmark-**results.txt". */ +// spotless:on object CometDatetimeExpressionBenchmark extends CometBenchmarkBase { def dateTruncExprBenchmark(values: Int, useDictionary: Boolean): Unit = { @@ -76,29 +77,27 @@ object CometDatetimeExpressionBenchmark extends CometBenchmarkBase { } } - def unixTimestampBenchmark(values: Int, useDictionary: Boolean): Unit = { + def unixTimestampBenchmark(values: Int): Unit = { withTempPath { dir => withTempTable("parquetV1Table") { prepareTable( dir, spark.sql(s"select timestamp_micros(cast(value/100000 as integer)) as ts FROM $tbl")) - val isDictionary = if (useDictionary) "(Dictionary)" else "" - runWithComet(s"Unix Timestamp from Timestamp $isDictionary", values) { + runWithComet(s"Unix Timestamp from Timestamp", values) { spark.sql("select unix_timestamp(ts) from parquetV1Table").noop() } } } } - def unixTimestampFromDateBenchmark(values: Int, useDictionary: Boolean): Unit = { + def unixTimestampFromDateBenchmark(values: Int): Unit = { withTempPath { dir => withTempTable("parquetV1Table") { prepareTable( dir, spark.sql( s"select cast(timestamp_micros(cast(value/100000 as integer)) as date) as dt FROM $tbl")) - val isDictionary = if (useDictionary) "(Dictionary)" else "" - runWithComet(s"Unix Timestamp from Date $isDictionary", values) { + runWithComet(s"Unix Timestamp from Date", values) { spark.sql("select unix_timestamp(dt) from parquetV1Table").noop() } } @@ -125,20 +124,11 @@ object CometDatetimeExpressionBenchmark extends CometBenchmarkBase { runBenchmarkWithTable("TimestampTrunc (Dictionary)", values, useDictionary = true) { v => timestampTruncExprBenchmark(v, useDictionary = true) } - runBenchmarkWithTable("UnixTimestamp", values) { v => - unixTimestampBenchmark(v, useDictionary = false) + runBenchmarkWithTable("UnixTimestamp(timestamp)", values) { v => + unixTimestampBenchmark(v) } - runBenchmarkWithTable("UnixTimestamp (Dictionary)", values, useDictionary = true) { v => - unixTimestampBenchmark(v, useDictionary = true) - } - runBenchmarkWithTable("UnixTimestamp from Date", values) { v => - unixTimestampFromDateBenchmark(v, useDictionary = false) - } - runBenchmarkWithTable( - "UnixTimestamp from Date (Dictionary)", - values, - useDictionary = true) { v => - unixTimestampFromDateBenchmark(v, useDictionary = true) + runBenchmarkWithTable("UnixTimestamp(date))", values) { v => + unixTimestampFromDateBenchmark(v) } } } From c183eadf8fb3c84da374d8a76841b47fa7f65a46 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 18 Dec 2025 13:03:19 -0700 Subject: [PATCH 07/10] improve benchmarks --- .../src/datetime_funcs/unix_timestamp.rs | 74 ++++++++++++++----- .../CometDatetimeExpressionBenchmark.scala | 33 ++++++--- 2 files changed, 75 insertions(+), 32 deletions(-) diff --git a/native/spark-expr/src/datetime_funcs/unix_timestamp.rs b/native/spark-expr/src/datetime_funcs/unix_timestamp.rs index e345ef4622..c4f1576293 100644 --- a/native/spark-expr/src/datetime_funcs/unix_timestamp.rs +++ b/native/spark-expr/src/datetime_funcs/unix_timestamp.rs @@ -16,9 +16,9 @@ // under the License. use crate::utils::array_with_timezone; -use arrow::array::AsArray; +use arrow::array::{Array, AsArray, PrimitiveArray}; use arrow::compute::cast; -use arrow::datatypes::{DataType, TimeUnit::Microsecond}; +use arrow::datatypes::{DataType, Int64Type, TimeUnit::Microsecond}; use datafusion::common::{internal_datafusion_err, DataFusionError}; use datafusion::logical_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, @@ -79,35 +79,67 @@ impl ScalarUDFImpl for SparkUnixTimestamp { match args { [ColumnarValue::Array(array)] => match array.data_type() { DataType::Timestamp(_, _) => { - let array = array_with_timezone( - array, - self.timezone.clone(), - Some(&DataType::Timestamp(Microsecond, Some("UTC".into()))), - )?; + let is_utc = self.timezone == "UTC"; + let array = if is_utc + && matches!(array.data_type(), DataType::Timestamp(Microsecond, Some(tz)) if tz.as_ref() == "UTC") + { + array + } else { + array_with_timezone( + array, + self.timezone.clone(), + Some(&DataType::Timestamp(Microsecond, Some("UTC".into()))), + )? + }; let timestamp_array = array.as_primitive::(); - let result: arrow::array::Int64Array = timestamp_array - .iter() - .map(|v| v.map(|micros| div_floor(micros, MICROS_PER_SECOND))) - .collect(); + + let result: PrimitiveArray = if timestamp_array.null_count() == 0 { + timestamp_array + .values() + .iter() + .map(|µs| micros / MICROS_PER_SECOND) + .collect() + } else { + timestamp_array + .iter() + .map(|v| v.map(|micros| div_floor(micros, MICROS_PER_SECOND))) + .collect() + }; + Ok(ColumnarValue::Array(Arc::new(result))) } DataType::Date32 => { let timestamp_array = cast(&array, &DataType::Timestamp(Microsecond, None))?; - let array = array_with_timezone( - timestamp_array, - self.timezone.clone(), - Some(&DataType::Timestamp(Microsecond, Some("UTC".into()))), - )?; + let is_utc = self.timezone == "UTC"; + let array = if is_utc { + timestamp_array + } else { + array_with_timezone( + timestamp_array, + self.timezone.clone(), + Some(&DataType::Timestamp(Microsecond, Some("UTC".into()))), + )? + }; let timestamp_array = array.as_primitive::(); - let result: arrow::array::Int64Array = timestamp_array - .iter() - .map(|v| v.map(|micros| div_floor(micros, MICROS_PER_SECOND))) - .collect(); + + let result: PrimitiveArray = if timestamp_array.null_count() == 0 { + timestamp_array + .values() + .iter() + .map(|µs| micros / MICROS_PER_SECOND) + .collect() + } else { + timestamp_array + .iter() + .map(|v| v.map(|micros| div_floor(micros, MICROS_PER_SECOND))) + .collect() + }; + Ok(ColumnarValue::Array(Arc::new(result))) } _ => Err(DataFusionError::Execution(format!( @@ -130,6 +162,8 @@ impl ScalarUDFImpl for SparkUnixTimestamp { mod tests { use super::*; use arrow::array::{Array, Date32Array, TimestampMicrosecondArray}; + use arrow::datatypes::Field; + use datafusion::config::ConfigOptions; use std::sync::Arc; #[test] diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala index 07bdb90dfc..f9559e5ea0 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala @@ -77,28 +77,32 @@ object CometDatetimeExpressionBenchmark extends CometBenchmarkBase { } } - def unixTimestampBenchmark(values: Int): Unit = { + def unixTimestampBenchmark(values: Int, timeZone: String): Unit = { withTempPath { dir => withTempTable("parquetV1Table") { prepareTable( dir, spark.sql(s"select timestamp_micros(cast(value/100000 as integer)) as ts FROM $tbl")) - runWithComet(s"Unix Timestamp from Timestamp", values) { - spark.sql("select unix_timestamp(ts) from parquetV1Table").noop() + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) { + runWithComet(s"Unix Timestamp from Timestamp ($timeZone)", values) { + spark.sql("select unix_timestamp(ts) from parquetV1Table").noop() + } } } } } - def unixTimestampFromDateBenchmark(values: Int): Unit = { + def unixTimestampFromDateBenchmark(values: Int, timeZone: String): Unit = { withTempPath { dir => withTempTable("parquetV1Table") { prepareTable( dir, spark.sql( s"select cast(timestamp_micros(cast(value/100000 as integer)) as date) as dt FROM $tbl")) - runWithComet(s"Unix Timestamp from Date", values) { - spark.sql("select unix_timestamp(dt) from parquetV1Table").noop() + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) { + runWithComet(s"Unix Timestamp from Date ($timeZone)", values) { + spark.sql("select unix_timestamp(dt) from parquetV1Table").noop() + } } } } @@ -107,6 +111,17 @@ object CometDatetimeExpressionBenchmark extends CometBenchmarkBase { override def runCometBenchmark(mainArgs: Array[String]): Unit = { val values = 1024 * 1024; + for (timeZone <- Seq("UTC", "America/Los_Angeles")) { + withSQLConf("spark.sql.parquet.datetimeRebaseModeInWrite" -> "CORRECTED") { + runBenchmarkWithTable(s"UnixTimestamp(timestamp) - $timeZone", values) { v => + unixTimestampBenchmark(v, timeZone) + } + runBenchmarkWithTable(s"UnixTimestamp(date) - $timeZone", values) { v => + unixTimestampFromDateBenchmark(v, timeZone) + } + } + } + withDefaultTimeZone(LA) { withSQLConf( SQLConf.SESSION_LOCAL_TIMEZONE.key -> LA.getId, @@ -124,12 +139,6 @@ object CometDatetimeExpressionBenchmark extends CometBenchmarkBase { runBenchmarkWithTable("TimestampTrunc (Dictionary)", values, useDictionary = true) { v => timestampTruncExprBenchmark(v, useDictionary = true) } - runBenchmarkWithTable("UnixTimestamp(timestamp)", values) { v => - unixTimestampBenchmark(v) - } - runBenchmarkWithTable("UnixTimestamp(date))", values) { v => - unixTimestampFromDateBenchmark(v) - } } } } From 6af833bc247e25b1f6ef66cccb5c4c53f3bbab78 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 20 Dec 2025 09:13:55 -0700 Subject: [PATCH 08/10] fix --- native/core/src/execution/expressions/mod.rs | 1 + native/core/src/execution/planner.rs | 83 +------------------ .../execution/planner/expression_registry.rs | 25 +++++- 3 files changed, 25 insertions(+), 84 deletions(-) diff --git a/native/core/src/execution/expressions/mod.rs b/native/core/src/execution/expressions/mod.rs index a06b41b2c0..563d62e91b 100644 --- a/native/core/src/execution/expressions/mod.rs +++ b/native/core/src/execution/expressions/mod.rs @@ -24,6 +24,7 @@ pub mod logical; pub mod nullcheck; pub mod strings; pub mod subquery; +pub mod temporal; pub use datafusion_comet_spark_expr::EvalMode; diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 3e86c12283..2dadcbd911 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -70,8 +70,7 @@ use datafusion::{ }; use datafusion_comet_spark_expr::{ create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, BinaryOutputStyle, - BloomFilterAgg, BloomFilterMightContain, EvalMode, SparkHour, SparkMinute, SparkSecond, - SparkUnixTimestamp, + BloomFilterAgg, BloomFilterMightContain, EvalMode, }; use iceberg::expr::Bind; @@ -126,8 +125,7 @@ use datafusion_comet_spark_expr::monotonically_increasing_id::MonotonicallyIncre use datafusion_comet_spark_expr::{ ArrayInsert, Avg, AvgDecimal, Cast, CheckOverflow, Correlation, Covariance, CreateNamedStruct, GetArrayStructFields, GetStructField, IfExpr, ListExtract, NormalizeNaNAndZero, RandExpr, - RandnExpr, SparkCastOptions, Stddev, SumDecimal, TimestampTruncExpr, ToJson, UnboundColumn, - Variance, + RandnExpr, SparkCastOptions, Stddev, SumDecimal, ToJson, UnboundColumn, Variance, }; use itertools::Itertools; use jni::objects::GlobalRef; @@ -375,83 +373,6 @@ impl PhysicalPlanner { SparkCastOptions::new(eval_mode, &expr.timezone, expr.allow_incompat), ))) } - ExprStruct::Hour(expr) => { - let child = - self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; - let timezone = expr.timezone.clone(); - let args = vec![child]; - let comet_hour = Arc::new(ScalarUDF::new_from_impl(SparkHour::new(timezone))); - let field_ref = Arc::new(Field::new("hour", DataType::Int32, true)); - let expr: ScalarFunctionExpr = ScalarFunctionExpr::new( - "hour", - comet_hour, - args, - field_ref, - Arc::new(ConfigOptions::default()), - ); - - Ok(Arc::new(expr)) - } - ExprStruct::Minute(expr) => { - let child = - self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; - let timezone = expr.timezone.clone(); - let args = vec![child]; - let comet_minute = Arc::new(ScalarUDF::new_from_impl(SparkMinute::new(timezone))); - let field_ref = Arc::new(Field::new("minute", DataType::Int32, true)); - let expr: ScalarFunctionExpr = ScalarFunctionExpr::new( - "minute", - comet_minute, - args, - field_ref, - Arc::new(ConfigOptions::default()), - ); - - Ok(Arc::new(expr)) - } - ExprStruct::Second(expr) => { - let child = - self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; - let timezone = expr.timezone.clone(); - let args = vec![child]; - let comet_second = Arc::new(ScalarUDF::new_from_impl(SparkSecond::new(timezone))); - let field_ref = Arc::new(Field::new("second", DataType::Int32, true)); - let expr: ScalarFunctionExpr = ScalarFunctionExpr::new( - "second", - comet_second, - args, - field_ref, - Arc::new(ConfigOptions::default()), - ); - - Ok(Arc::new(expr)) - } - ExprStruct::UnixTimestamp(expr) => { - let child = - self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; - let timezone = expr.timezone.clone(); - let args = vec![child]; - let comet_unix_timestamp = - Arc::new(ScalarUDF::new_from_impl(SparkUnixTimestamp::new(timezone))); - let field_ref = Arc::new(Field::new("unix_timestamp", DataType::Int64, true)); - let expr: ScalarFunctionExpr = ScalarFunctionExpr::new( - "unix_timestamp", - comet_unix_timestamp, - args, - field_ref, - Arc::new(ConfigOptions::default()), - ); - - Ok(Arc::new(expr)) - } - ExprStruct::TruncTimestamp(expr) => { - let child = - self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; - let format = self.create_expr(expr.format.as_ref().unwrap(), input_schema)?; - let timezone = expr.timezone.clone(); - - Ok(Arc::new(TimestampTruncExpr::new(child, format, timezone))) - } ExprStruct::CheckOverflow(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?; let data_type = to_arrow_datatype(expr.datatype.as_ref().unwrap()); diff --git a/native/core/src/execution/planner/expression_registry.rs b/native/core/src/execution/planner/expression_registry.rs index 36677e6961..34aa3de179 100644 --- a/native/core/src/execution/planner/expression_registry.rs +++ b/native/core/src/execution/planner/expression_registry.rs @@ -182,9 +182,8 @@ impl ExpressionRegistry { // Register string expressions self.register_string_expressions(); - // TODO: Register other expression categories in future phases - // self.register_temporal_expressions(); - // etc. + // Register temporal expressions + self.register_temporal_expressions(); } /// Register arithmetic expression builders @@ -287,6 +286,26 @@ impl ExpressionRegistry { .insert(ExpressionType::FromJson, Box::new(FromJsonBuilder)); } + /// Register temporal expression builders + fn register_temporal_expressions(&mut self) { + use crate::execution::expressions::temporal::*; + + self.builders + .insert(ExpressionType::Hour, Box::new(HourBuilder)); + self.builders + .insert(ExpressionType::Minute, Box::new(MinuteBuilder)); + self.builders + .insert(ExpressionType::Second, Box::new(SecondBuilder)); + self.builders.insert( + ExpressionType::UnixTimestamp, + Box::new(UnixTimestampBuilder), + ); + self.builders.insert( + ExpressionType::TruncTimestamp, + Box::new(TruncTimestampBuilder), + ); + } + /// Extract expression type from Spark protobuf expression fn get_expression_type(spark_expr: &Expr) -> Result { match spark_expr.expr_struct.as_ref() { From bc2ac6043a85dc328e2d8fdcf54fa743b05724f0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 20 Dec 2025 09:16:00 -0700 Subject: [PATCH 09/10] Add --- .../src/execution/expressions/temporal.rs | 162 ++++++++++++++++++ 1 file changed, 162 insertions(+) create mode 100644 native/core/src/execution/expressions/temporal.rs diff --git a/native/core/src/execution/expressions/temporal.rs b/native/core/src/execution/expressions/temporal.rs new file mode 100644 index 0000000000..ae57cd3b2b --- /dev/null +++ b/native/core/src/execution/expressions/temporal.rs @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Temporal expression builders + +use std::sync::Arc; + +use arrow::datatypes::{DataType, Field, SchemaRef}; +use datafusion::config::ConfigOptions; +use datafusion::logical_expr::ScalarUDF; +use datafusion::physical_expr::{PhysicalExpr, ScalarFunctionExpr}; +use datafusion_comet_proto::spark_expression::Expr; +use datafusion_comet_spark_expr::{ + SparkHour, SparkMinute, SparkSecond, SparkUnixTimestamp, TimestampTruncExpr, +}; + +use crate::execution::{ + expressions::extract_expr, + operators::ExecutionError, + planner::{expression_registry::ExpressionBuilder, PhysicalPlanner}, +}; + +pub struct HourBuilder; + +impl ExpressionBuilder for HourBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &PhysicalPlanner, + ) -> Result, ExecutionError> { + let expr = extract_expr!(spark_expr, Hour); + let child = planner.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; + let timezone = expr.timezone.clone(); + let args = vec![child]; + let comet_hour = Arc::new(ScalarUDF::new_from_impl(SparkHour::new(timezone))); + let field_ref = Arc::new(Field::new("hour", DataType::Int32, true)); + let expr: ScalarFunctionExpr = ScalarFunctionExpr::new( + "hour", + comet_hour, + args, + field_ref, + Arc::new(ConfigOptions::default()), + ); + + Ok(Arc::new(expr)) + } +} + +pub struct MinuteBuilder; + +impl ExpressionBuilder for MinuteBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &PhysicalPlanner, + ) -> Result, ExecutionError> { + let expr = extract_expr!(spark_expr, Minute); + let child = planner.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; + let timezone = expr.timezone.clone(); + let args = vec![child]; + let comet_minute = Arc::new(ScalarUDF::new_from_impl(SparkMinute::new(timezone))); + let field_ref = Arc::new(Field::new("minute", DataType::Int32, true)); + let expr: ScalarFunctionExpr = ScalarFunctionExpr::new( + "minute", + comet_minute, + args, + field_ref, + Arc::new(ConfigOptions::default()), + ); + + Ok(Arc::new(expr)) + } +} + +pub struct SecondBuilder; + +impl ExpressionBuilder for SecondBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &PhysicalPlanner, + ) -> Result, ExecutionError> { + let expr = extract_expr!(spark_expr, Second); + let child = planner.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; + let timezone = expr.timezone.clone(); + let args = vec![child]; + let comet_second = Arc::new(ScalarUDF::new_from_impl(SparkSecond::new(timezone))); + let field_ref = Arc::new(Field::new("second", DataType::Int32, true)); + let expr: ScalarFunctionExpr = ScalarFunctionExpr::new( + "second", + comet_second, + args, + field_ref, + Arc::new(ConfigOptions::default()), + ); + + Ok(Arc::new(expr)) + } +} + +pub struct UnixTimestampBuilder; + +impl ExpressionBuilder for UnixTimestampBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &PhysicalPlanner, + ) -> Result, ExecutionError> { + let expr = extract_expr!(spark_expr, UnixTimestamp); + let child = planner.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; + let timezone = expr.timezone.clone(); + let args = vec![child]; + let comet_unix_timestamp = + Arc::new(ScalarUDF::new_from_impl(SparkUnixTimestamp::new(timezone))); + let field_ref = Arc::new(Field::new("unix_timestamp", DataType::Int64, true)); + let expr: ScalarFunctionExpr = ScalarFunctionExpr::new( + "unix_timestamp", + comet_unix_timestamp, + args, + field_ref, + Arc::new(ConfigOptions::default()), + ); + + Ok(Arc::new(expr)) + } +} + +pub struct TruncTimestampBuilder; + +impl ExpressionBuilder for TruncTimestampBuilder { + fn build( + &self, + spark_expr: &Expr, + input_schema: SchemaRef, + planner: &PhysicalPlanner, + ) -> Result, ExecutionError> { + let expr = extract_expr!(spark_expr, TruncTimestamp); + let child = planner.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?; + let format = planner.create_expr(expr.format.as_ref().unwrap(), input_schema)?; + let timezone = expr.timezone.clone(); + + Ok(Arc::new(TimestampTruncExpr::new(child, format, timezone))) + } +} From a886f03be8d8dd10ad4c9d02c9c69cc58cb48437 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 20 Dec 2025 14:53:27 -0700 Subject: [PATCH 10/10] skip hive-1 test for spark 4 --- .github/workflows/spark_sql_test.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/spark_sql_test.yml b/.github/workflows/spark_sql_test.yml index d143ef83a0..3d7aa2e2f9 100644 --- a/.github/workflows/spark_sql_test.yml +++ b/.github/workflows/spark_sql_test.yml @@ -59,6 +59,10 @@ jobs: - {name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"} - {name: "sql_hive-2", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.ExtendedHiveTest"} - {name: "sql_hive-3", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.SlowHiveTest"} + # Skip sql_hive-1 for Spark 4.0 due to https://github.com/apache/datafusion-comet/issues/2946 + exclude: + - spark-version: {short: '4.0', full: '4.0.1', java: 17} + module: {name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"} fail-fast: false name: spark-sql-${{ matrix.module.name }}/${{ matrix.os }}/spark-${{ matrix.spark-version.full }}/java-${{ matrix.spark-version.java }} runs-on: ${{ matrix.os }}