From 5c785ecbd13ad271f10f92484f70f86c2637abe6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 23 Dec 2025 13:02:54 -0700 Subject: [PATCH 01/12] Add microbenchmark for casting string to numeric --- .../CometCastStringToNumericBenchmark.scala | 125 ++++++++++++++++++ 1 file changed, 125 insertions(+) create mode 100644 spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToNumericBenchmark.scala diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToNumericBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToNumericBenchmark.scala new file mode 100644 index 0000000000..1dc258d54c --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToNumericBenchmark.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +import org.apache.spark.sql.internal.SQLConf + +case class CastStringToNumericConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +/** + * Benchmark to measure performance of Comet cast from String to numeric types. To run this + * benchmark: `SPARK_GENERATE_BENCHMARK_FILES=1 make + * benchmark-org.apache.spark.sql.benchmark.CometCastStringToNumericBenchmark` Results will be + * written to "spark/benchmarks/CometCastStringToNumericBenchmark-**results.txt". + */ +object CometCastStringToNumericBenchmark extends CometBenchmarkBase { + + /** + * Generic method to run a cast benchmark with the given configuration. + */ + def runCastBenchmark(config: CastStringToNumericConfig, values: Int): Unit = { + withTempPath { dir => + withTempTable("parquetV1Table") { + // Generate numeric strings with decimal points: "123.45", "-456.78", etc. + prepareTable( + dir, + spark.sql(s"SELECT CAST((value - 500000) / 100.0 AS STRING) AS c1 FROM $tbl")) + + runExpressionBenchmark(config.name, values, config.query, config.extraCometConfigs) + } + } + } + + // Configuration for all String to numeric cast benchmarks + private val castConfigs = List( + // Boolean + CastStringToNumericConfig( + "Cast String to Boolean (LEGACY)", + "SELECT CAST(c1 AS BOOLEAN) FROM parquetV1Table", + Map(SQLConf.ANSI_ENABLED.key -> "false")), + CastStringToNumericConfig( + "Cast String to Boolean (ANSI)", + "SELECT CAST(c1 AS BOOLEAN) FROM parquetV1Table", + Map(SQLConf.ANSI_ENABLED.key -> "true")), + // Byte + CastStringToNumericConfig( + "Cast String to Byte (LEGACY)", + "SELECT CAST(c1 AS BYTE) FROM parquetV1Table", + Map(SQLConf.ANSI_ENABLED.key -> "false")), + CastStringToNumericConfig( + "Cast String to Byte (ANSI)", + "SELECT CAST(c1 AS BYTE) FROM parquetV1Table", + Map(SQLConf.ANSI_ENABLED.key -> "true")), + // Short + CastStringToNumericConfig( + "Cast String to Short (LEGACY)", + "SELECT CAST(c1 AS SHORT) FROM parquetV1Table", + Map(SQLConf.ANSI_ENABLED.key -> "false")), + CastStringToNumericConfig( + "Cast String to Short (ANSI)", + "SELECT CAST(c1 AS SHORT) FROM parquetV1Table", + Map(SQLConf.ANSI_ENABLED.key -> "true")), + // Integer + CastStringToNumericConfig( + "Cast String to Integer (LEGACY)", + "SELECT CAST(c1 AS INT) FROM parquetV1Table", + Map(SQLConf.ANSI_ENABLED.key -> "false")), + CastStringToNumericConfig( + "Cast String to Integer (ANSI)", + "SELECT CAST(c1 AS INT) FROM parquetV1Table", + Map(SQLConf.ANSI_ENABLED.key -> "true")), + // Long + CastStringToNumericConfig( + "Cast String to Long (LEGACY)", + "SELECT CAST(c1 AS LONG) FROM parquetV1Table", + Map(SQLConf.ANSI_ENABLED.key -> "false")), + CastStringToNumericConfig( + "Cast String to Long (ANSI)", + "SELECT CAST(c1 AS LONG) FROM parquetV1Table", + Map(SQLConf.ANSI_ENABLED.key -> "true")), + // Float (Incompatible - but still useful to benchmark) + CastStringToNumericConfig( + "Cast String to Float (LEGACY)", + "SELECT CAST(c1 AS FLOAT) FROM parquetV1Table", + Map(SQLConf.ANSI_ENABLED.key -> "false")), + // Double (Incompatible - but still useful to benchmark) + CastStringToNumericConfig( + "Cast String to Double (LEGACY)", + "SELECT CAST(c1 AS DOUBLE) FROM parquetV1Table", + Map(SQLConf.ANSI_ENABLED.key -> "false")), + // Decimal (Incompatible - but still useful to benchmark) + CastStringToNumericConfig( + "Cast String to Decimal(10,2) (LEGACY)", + "SELECT CAST(c1 AS DECIMAL(10,2)) FROM parquetV1Table", + Map(SQLConf.ANSI_ENABLED.key -> "false"))) + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = 1024 * 1024 * 10 // 10M rows + + castConfigs.foreach { config => + runBenchmarkWithTable(config.name, values) { v => + runCastBenchmark(config, v) + } + } + } +} From df8084312bad64c5a7e3cef2a2f1b0a1b66acf0f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 23 Dec 2025 13:04:16 -0700 Subject: [PATCH 02/12] Add microbenchmark for casting string to temporal types --- .../CometCastStringToTemporalBenchmark.scala | 112 ++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToTemporalBenchmark.scala diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToTemporalBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToTemporalBenchmark.scala new file mode 100644 index 0000000000..595f661a21 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToTemporalBenchmark.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +import org.apache.spark.sql.internal.SQLConf + +case class CastStringToTemporalConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +/** + * Benchmark to measure performance of Comet cast from String to temporal types. To run this + * benchmark: `SPARK_GENERATE_BENCHMARK_FILES=1 make + * benchmark-org.apache.spark.sql.benchmark.CometCastStringToTemporalBenchmark` Results will be + * written to "spark/benchmarks/CometCastStringToTemporalBenchmark-**results.txt". + */ +object CometCastStringToTemporalBenchmark extends CometBenchmarkBase { + + /** + * Generic method to run a cast benchmark with the given configuration. + */ + def runCastBenchmark(config: CastStringToTemporalConfig, values: Int): Unit = { + withTempPath { dir => + withTempTable("parquetV1Table") { + // Generate date strings like "2020-01-01", "2020-01-02", etc. + // This covers the full range for date parsing + prepareTable( + dir, + spark.sql( + s"SELECT CAST(DATE_ADD('2020-01-01', CAST(value % 3650 AS INT)) AS STRING) AS c1 FROM $tbl")) + + runExpressionBenchmark(config.name, values, config.query, config.extraCometConfigs) + } + } + } + + /** + * Benchmark for String to Timestamp with timestamp-formatted strings. + */ + def runTimestampCastBenchmark(config: CastStringToTemporalConfig, values: Int): Unit = { + withTempPath { dir => + withTempTable("parquetV1Table") { + // Generate timestamp strings like "2020-01-01 12:34:56", etc. + prepareTable( + dir, + spark.sql( + s"SELECT CAST(TIMESTAMP_MICROS(value % 9999999999) AS STRING) AS c1 FROM $tbl")) + + runExpressionBenchmark(config.name, values, config.query, config.extraCometConfigs) + } + } + } + + // Configuration for String to temporal cast benchmarks + private val castConfigs = List( + // Date + CastStringToTemporalConfig( + "Cast String to Date (LEGACY)", + "SELECT CAST(c1 AS DATE) FROM parquetV1Table", + Map(SQLConf.ANSI_ENABLED.key -> "false")), + CastStringToTemporalConfig( + "Cast String to Date (ANSI)", + "SELECT CAST(c1 AS DATE) FROM parquetV1Table", + Map(SQLConf.ANSI_ENABLED.key -> "true"))) + + private val timestampCastConfigs = List( + // Timestamp (only UTC timezone is compatible) + CastStringToTemporalConfig( + "Cast String to Timestamp (LEGACY, UTC)", + "SELECT CAST(c1 AS TIMESTAMP) FROM parquetV1Table", + Map(SQLConf.ANSI_ENABLED.key -> "false")), + CastStringToTemporalConfig( + "Cast String to Timestamp (ANSI, UTC)", + "SELECT CAST(c1 AS TIMESTAMP) FROM parquetV1Table", + Map(SQLConf.ANSI_ENABLED.key -> "true"))) + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = 1024 * 1024 * 10 // 10M rows + + // Run date casts + castConfigs.foreach { config => + runBenchmarkWithTable(config.name, values) { v => + runCastBenchmark(config, v) + } + } + + // Run timestamp casts + timestampCastConfigs.foreach { config => + runBenchmarkWithTable(config.name, values) { v => + runTimestampCastBenchmark(config, v) + } + } + } +} From 41ad69ab4f5e661dbb87a9f2056274e74a2344c3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 23 Dec 2025 13:11:10 -0700 Subject: [PATCH 03/12] improve --- .../CometCastStringToNumericBenchmark.scala | 41 ++++++++++++++----- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToNumericBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToNumericBenchmark.scala index 1dc258d54c..afcef9e506 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToNumericBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToNumericBenchmark.scala @@ -19,19 +19,24 @@ package org.apache.spark.sql.benchmark +import org.apache.spark.sql.catalyst.expressions.Cast import org.apache.spark.sql.internal.SQLConf +import org.apache.comet.CometConf + case class CastStringToNumericConfig( name: String, query: String, extraCometConfigs: Map[String, String] = Map.empty) +// spotless:off /** * Benchmark to measure performance of Comet cast from String to numeric types. To run this - * benchmark: `SPARK_GENERATE_BENCHMARK_FILES=1 make - * benchmark-org.apache.spark.sql.benchmark.CometCastStringToNumericBenchmark` Results will be - * written to "spark/benchmarks/CometCastStringToNumericBenchmark-**results.txt". + * benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometCastStringToNumericBenchmark` + * Results will be written to "spark/benchmarks/CometCastStringToNumericBenchmark-**results.txt". */ +// spotless:on object CometCastStringToNumericBenchmark extends CometBenchmarkBase { /** @@ -41,9 +46,19 @@ object CometCastStringToNumericBenchmark extends CometBenchmarkBase { withTempPath { dir => withTempTable("parquetV1Table") { // Generate numeric strings with decimal points: "123.45", "-456.78", etc. + // Also include some special values: nulls (~2%), NaN (~2%), Infinity (~2%) prepareTable( dir, - spark.sql(s"SELECT CAST((value - 500000) / 100.0 AS STRING) AS c1 FROM $tbl")) + spark.sql(s""" + SELECT CASE + WHEN value % 50 = 0 THEN NULL + WHEN value % 50 = 1 THEN 'NaN' + WHEN value % 50 = 2 THEN 'Infinity' + WHEN value % 50 = 3 THEN '-Infinity' + ELSE CAST((value - 500000) / 100.0 AS STRING) + END AS c1 + FROM $tbl + """)) runExpressionBenchmark(config.name, values, config.query, config.extraCometConfigs) } @@ -97,21 +112,27 @@ object CometCastStringToNumericBenchmark extends CometBenchmarkBase { "Cast String to Long (ANSI)", "SELECT CAST(c1 AS LONG) FROM parquetV1Table", Map(SQLConf.ANSI_ENABLED.key -> "true")), - // Float (Incompatible - but still useful to benchmark) + // Float (Incompatible - requires allowIncompat config) CastStringToNumericConfig( "Cast String to Float (LEGACY)", "SELECT CAST(c1 AS FLOAT) FROM parquetV1Table", - Map(SQLConf.ANSI_ENABLED.key -> "false")), - // Double (Incompatible - but still useful to benchmark) + Map( + SQLConf.ANSI_ENABLED.key -> "false", + CometConf.getExprAllowIncompatConfigKey(classOf[Cast]) -> "true")), + // Double (Incompatible - requires allowIncompat config) CastStringToNumericConfig( "Cast String to Double (LEGACY)", "SELECT CAST(c1 AS DOUBLE) FROM parquetV1Table", - Map(SQLConf.ANSI_ENABLED.key -> "false")), - // Decimal (Incompatible - but still useful to benchmark) + Map( + SQLConf.ANSI_ENABLED.key -> "false", + CometConf.getExprAllowIncompatConfigKey(classOf[Cast]) -> "true")), + // Decimal (Incompatible - requires allowIncompat config) CastStringToNumericConfig( "Cast String to Decimal(10,2) (LEGACY)", "SELECT CAST(c1 AS DECIMAL(10,2)) FROM parquetV1Table", - Map(SQLConf.ANSI_ENABLED.key -> "false"))) + Map( + SQLConf.ANSI_ENABLED.key -> "false", + CometConf.getExprAllowIncompatConfigKey(classOf[Cast]) -> "true"))) override def runCometBenchmark(mainArgs: Array[String]): Unit = { val values = 1024 * 1024 * 10 // 10M rows From 399a04761238b64acd80c3c8d14548e489cbe29f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 23 Dec 2025 13:18:27 -0700 Subject: [PATCH 04/12] simplify --- .../CometCastStringToNumericBenchmark.scala | 80 +++---------------- 1 file changed, 13 insertions(+), 67 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToNumericBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToNumericBenchmark.scala index afcef9e506..815c303796 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToNumericBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToNumericBenchmark.scala @@ -66,73 +66,19 @@ object CometCastStringToNumericBenchmark extends CometBenchmarkBase { } // Configuration for all String to numeric cast benchmarks - private val castConfigs = List( - // Boolean - CastStringToNumericConfig( - "Cast String to Boolean (LEGACY)", - "SELECT CAST(c1 AS BOOLEAN) FROM parquetV1Table", - Map(SQLConf.ANSI_ENABLED.key -> "false")), - CastStringToNumericConfig( - "Cast String to Boolean (ANSI)", - "SELECT CAST(c1 AS BOOLEAN) FROM parquetV1Table", - Map(SQLConf.ANSI_ENABLED.key -> "true")), - // Byte - CastStringToNumericConfig( - "Cast String to Byte (LEGACY)", - "SELECT CAST(c1 AS BYTE) FROM parquetV1Table", - Map(SQLConf.ANSI_ENABLED.key -> "false")), - CastStringToNumericConfig( - "Cast String to Byte (ANSI)", - "SELECT CAST(c1 AS BYTE) FROM parquetV1Table", - Map(SQLConf.ANSI_ENABLED.key -> "true")), - // Short - CastStringToNumericConfig( - "Cast String to Short (LEGACY)", - "SELECT CAST(c1 AS SHORT) FROM parquetV1Table", - Map(SQLConf.ANSI_ENABLED.key -> "false")), - CastStringToNumericConfig( - "Cast String to Short (ANSI)", - "SELECT CAST(c1 AS SHORT) FROM parquetV1Table", - Map(SQLConf.ANSI_ENABLED.key -> "true")), - // Integer - CastStringToNumericConfig( - "Cast String to Integer (LEGACY)", - "SELECT CAST(c1 AS INT) FROM parquetV1Table", - Map(SQLConf.ANSI_ENABLED.key -> "false")), - CastStringToNumericConfig( - "Cast String to Integer (ANSI)", - "SELECT CAST(c1 AS INT) FROM parquetV1Table", - Map(SQLConf.ANSI_ENABLED.key -> "true")), - // Long - CastStringToNumericConfig( - "Cast String to Long (LEGACY)", - "SELECT CAST(c1 AS LONG) FROM parquetV1Table", - Map(SQLConf.ANSI_ENABLED.key -> "false")), - CastStringToNumericConfig( - "Cast String to Long (ANSI)", - "SELECT CAST(c1 AS LONG) FROM parquetV1Table", - Map(SQLConf.ANSI_ENABLED.key -> "true")), - // Float (Incompatible - requires allowIncompat config) - CastStringToNumericConfig( - "Cast String to Float (LEGACY)", - "SELECT CAST(c1 AS FLOAT) FROM parquetV1Table", - Map( - SQLConf.ANSI_ENABLED.key -> "false", - CometConf.getExprAllowIncompatConfigKey(classOf[Cast]) -> "true")), - // Double (Incompatible - requires allowIncompat config) - CastStringToNumericConfig( - "Cast String to Double (LEGACY)", - "SELECT CAST(c1 AS DOUBLE) FROM parquetV1Table", - Map( - SQLConf.ANSI_ENABLED.key -> "false", - CometConf.getExprAllowIncompatConfigKey(classOf[Cast]) -> "true")), - // Decimal (Incompatible - requires allowIncompat config) - CastStringToNumericConfig( - "Cast String to Decimal(10,2) (LEGACY)", - "SELECT CAST(c1 AS DECIMAL(10,2)) FROM parquetV1Table", - Map( - SQLConf.ANSI_ENABLED.key -> "false", - CometConf.getExprAllowIncompatConfigKey(classOf[Cast]) -> "true"))) + private val castFunctions = Seq("CAST", "TRY_CAST") + private val targetTypes = + Seq("BOOLEAN", "BYTE", "SHORT", "INT", "LONG", "FLOAT", "DOUBLE", "DECIMAL(10,2)") + + private val castConfigs = for { + castFunc <- castFunctions + targetType <- targetTypes + } yield CastStringToNumericConfig( + s"$castFunc String to $targetType", + s"SELECT $castFunc(c1 AS $targetType) FROM parquetV1Table", + Map( + SQLConf.ANSI_ENABLED.key -> "false", + CometConf.getExprAllowIncompatConfigKey(classOf[Cast]) -> "true")) override def runCometBenchmark(mainArgs: Array[String]): Unit = { val values = 1024 * 1024 * 10 // 10M rows From 480abcfecd5155c86cbc2387010966fc2dc728b4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 23 Dec 2025 13:28:59 -0700 Subject: [PATCH 05/12] Save --- .../spark/sql/benchmark/CometCastStringToNumericBenchmark.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToNumericBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToNumericBenchmark.scala index 815c303796..e2b8bdf327 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToNumericBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToNumericBenchmark.scala @@ -81,7 +81,7 @@ object CometCastStringToNumericBenchmark extends CometBenchmarkBase { CometConf.getExprAllowIncompatConfigKey(classOf[Cast]) -> "true")) override def runCometBenchmark(mainArgs: Array[String]): Unit = { - val values = 1024 * 1024 * 10 // 10M rows + val values = 1024 * 1024 // 1M rows castConfigs.foreach { config => runBenchmarkWithTable(config.name, values) { v => From ef5baaad11eb14d5f31ed685730f89e4de62d711 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 23 Dec 2025 13:32:51 -0700 Subject: [PATCH 06/12] generate data once --- .../CometCastStringToNumericBenchmark.scala | 53 +++++++++---------- 1 file changed, 24 insertions(+), 29 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToNumericBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToNumericBenchmark.scala index e2b8bdf327..01d7ea9f2e 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToNumericBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToNumericBenchmark.scala @@ -39,32 +39,6 @@ case class CastStringToNumericConfig( // spotless:on object CometCastStringToNumericBenchmark extends CometBenchmarkBase { - /** - * Generic method to run a cast benchmark with the given configuration. - */ - def runCastBenchmark(config: CastStringToNumericConfig, values: Int): Unit = { - withTempPath { dir => - withTempTable("parquetV1Table") { - // Generate numeric strings with decimal points: "123.45", "-456.78", etc. - // Also include some special values: nulls (~2%), NaN (~2%), Infinity (~2%) - prepareTable( - dir, - spark.sql(s""" - SELECT CASE - WHEN value % 50 = 0 THEN NULL - WHEN value % 50 = 1 THEN 'NaN' - WHEN value % 50 = 2 THEN 'Infinity' - WHEN value % 50 = 3 THEN '-Infinity' - ELSE CAST((value - 500000) / 100.0 AS STRING) - END AS c1 - FROM $tbl - """)) - - runExpressionBenchmark(config.name, values, config.query, config.extraCometConfigs) - } - } - } - // Configuration for all String to numeric cast benchmarks private val castFunctions = Seq("CAST", "TRY_CAST") private val targetTypes = @@ -83,9 +57,30 @@ object CometCastStringToNumericBenchmark extends CometBenchmarkBase { override def runCometBenchmark(mainArgs: Array[String]): Unit = { val values = 1024 * 1024 // 1M rows - castConfigs.foreach { config => - runBenchmarkWithTable(config.name, values) { v => - runCastBenchmark(config, v) + // Generate input data once for all benchmarks + runBenchmarkWithTable("String to numeric casts", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + // Generate numeric strings with decimal points: "123.45", "-456.78", etc. + // Also include some special values: nulls (~2%), NaN (~2%), Infinity (~2%) + prepareTable( + dir, + spark.sql(s""" + SELECT CASE + WHEN value % 50 = 0 THEN NULL + WHEN value % 50 = 1 THEN 'NaN' + WHEN value % 50 = 2 THEN 'Infinity' + WHEN value % 50 = 3 THEN '-Infinity' + ELSE CAST((value - 500000) / 100.0 AS STRING) + END AS c1 + FROM $tbl + """)) + + // Run all benchmarks on the same input data + castConfigs.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } } } } From 01b077282ab95b52351eb556806f66c335b4023c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 23 Dec 2025 13:53:42 -0700 Subject: [PATCH 07/12] improve --- .../sql/benchmark/CometCastStringToNumericBenchmark.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToNumericBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToNumericBenchmark.scala index 01d7ea9f2e..403e72f154 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToNumericBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToNumericBenchmark.scala @@ -39,7 +39,6 @@ case class CastStringToNumericConfig( // spotless:on object CometCastStringToNumericBenchmark extends CometBenchmarkBase { - // Configuration for all String to numeric cast benchmarks private val castFunctions = Seq("CAST", "TRY_CAST") private val targetTypes = Seq("BOOLEAN", "BYTE", "SHORT", "INT", "LONG", "FLOAT", "DOUBLE", "DECIMAL(10,2)") @@ -61,7 +60,7 @@ object CometCastStringToNumericBenchmark extends CometBenchmarkBase { runBenchmarkWithTable("String to numeric casts", values) { v => withTempPath { dir => withTempTable("parquetV1Table") { - // Generate numeric strings with decimal points: "123.45", "-456.78", etc. + // Generate numeric strings with both integer and decimal values // Also include some special values: nulls (~2%), NaN (~2%), Infinity (~2%) prepareTable( dir, @@ -71,12 +70,13 @@ object CometCastStringToNumericBenchmark extends CometBenchmarkBase { WHEN value % 50 = 1 THEN 'NaN' WHEN value % 50 = 2 THEN 'Infinity' WHEN value % 50 = 3 THEN '-Infinity' + WHEN value % 50 < 10 THEN CAST(value % 99 AS STRING) + WHEN value % 50 < 30 THEN CAST(value % 999999 AS STRING) ELSE CAST((value - 500000) / 100.0 AS STRING) END AS c1 FROM $tbl """)) - // Run all benchmarks on the same input data castConfigs.foreach { config => runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) } From 360edb8e36a400b61ee84a1a081a6d92be12fff5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 23 Dec 2025 14:38:44 -0700 Subject: [PATCH 08/12] fix --- .../CometCastStringToTemporalBenchmark.scala | 113 ++++++++---------- 1 file changed, 52 insertions(+), 61 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToTemporalBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToTemporalBenchmark.scala index 595f661a21..3924805884 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToTemporalBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToTemporalBenchmark.scala @@ -26,86 +26,77 @@ case class CastStringToTemporalConfig( query: String, extraCometConfigs: Map[String, String] = Map.empty) +// spotless:off /** * Benchmark to measure performance of Comet cast from String to temporal types. To run this - * benchmark: `SPARK_GENERATE_BENCHMARK_FILES=1 make - * benchmark-org.apache.spark.sql.benchmark.CometCastStringToTemporalBenchmark` Results will be - * written to "spark/benchmarks/CometCastStringToTemporalBenchmark-**results.txt". + * benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometCastStringToTemporalBenchmark` + * Results will be written to "spark/benchmarks/CometCastStringToTemporalBenchmark-**results.txt". */ +// spotless:on object CometCastStringToTemporalBenchmark extends CometBenchmarkBase { - /** - * Generic method to run a cast benchmark with the given configuration. - */ - def runCastBenchmark(config: CastStringToTemporalConfig, values: Int): Unit = { - withTempPath { dir => - withTempTable("parquetV1Table") { - // Generate date strings like "2020-01-01", "2020-01-02", etc. - // This covers the full range for date parsing - prepareTable( - dir, - spark.sql( - s"SELECT CAST(DATE_ADD('2020-01-01', CAST(value % 3650 AS INT)) AS STRING) AS c1 FROM $tbl")) - - runExpressionBenchmark(config.name, values, config.query, config.extraCometConfigs) - } - } - } - - /** - * Benchmark for String to Timestamp with timestamp-formatted strings. - */ - def runTimestampCastBenchmark(config: CastStringToTemporalConfig, values: Int): Unit = { - withTempPath { dir => - withTempTable("parquetV1Table") { - // Generate timestamp strings like "2020-01-01 12:34:56", etc. - prepareTable( - dir, - spark.sql( - s"SELECT CAST(TIMESTAMP_MICROS(value % 9999999999) AS STRING) AS c1 FROM $tbl")) - - runExpressionBenchmark(config.name, values, config.query, config.extraCometConfigs) - } - } - } - // Configuration for String to temporal cast benchmarks - private val castConfigs = List( - // Date + private val dateCastConfigs = List( CastStringToTemporalConfig( - "Cast String to Date (LEGACY)", - "SELECT CAST(c1 AS DATE) FROM parquetV1Table", - Map(SQLConf.ANSI_ENABLED.key -> "false")), + "Cast String to Date", + "SELECT CAST(c1 AS DATE) FROM parquetV1Table"), CastStringToTemporalConfig( - "Cast String to Date (ANSI)", - "SELECT CAST(c1 AS DATE) FROM parquetV1Table", - Map(SQLConf.ANSI_ENABLED.key -> "true"))) + "Try_Cast String to Date", + "SELECT TRY_CAST(c1 AS DATE) FROM parquetV1Table")) private val timestampCastConfigs = List( - // Timestamp (only UTC timezone is compatible) CastStringToTemporalConfig( - "Cast String to Timestamp (LEGACY, UTC)", - "SELECT CAST(c1 AS TIMESTAMP) FROM parquetV1Table", - Map(SQLConf.ANSI_ENABLED.key -> "false")), + "Cast String to Timestamp", + "SELECT CAST(c1 AS TIMESTAMP) FROM parquetV1Table"), CastStringToTemporalConfig( - "Cast String to Timestamp (ANSI, UTC)", - "SELECT CAST(c1 AS TIMESTAMP) FROM parquetV1Table", - Map(SQLConf.ANSI_ENABLED.key -> "true"))) + "Try_Cast String to Timestamp", + "SELECT TRY_CAST(c1 AS TIMESTAMP) FROM parquetV1Table")) override def runCometBenchmark(mainArgs: Array[String]): Unit = { val values = 1024 * 1024 * 10 // 10M rows - // Run date casts - castConfigs.foreach { config => - runBenchmarkWithTable(config.name, values) { v => - runCastBenchmark(config, v) + // Generate date data once with ~10% invalid values + runBenchmarkWithTable("date data generation", values) { v => + withTempPath { dateDir => + withTempTable("parquetV1Table") { + prepareTable( + dateDir, + spark.sql(s""" + SELECT CASE + WHEN value % 10 = 0 THEN 'invalid-date' + ELSE CAST(DATE_ADD('2020-01-01', CAST(value % 3650 AS INT)) AS STRING) + END AS c1 + FROM $tbl + """)) + + // Run date cast benchmarks with the same data + dateCastConfigs.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } } } - // Run timestamp casts - timestampCastConfigs.foreach { config => - runBenchmarkWithTable(config.name, values) { v => - runTimestampCastBenchmark(config, v) + // Generate timestamp data once with ~10% invalid values + runBenchmarkWithTable("timestamp data generation", values) { v => + withTempPath { timestampDir => + withTempTable("parquetV1Table") { + prepareTable( + timestampDir, + spark.sql(s""" + SELECT CASE + WHEN value % 10 = 0 THEN 'not-a-timestamp' + ELSE CAST(TIMESTAMP_MICROS(value % 9999999999) AS STRING) + END AS c1 + FROM $tbl + """)) + + // Run timestamp cast benchmarks with the same data + timestampCastConfigs.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } } } } From 6bc01dabdcd58be717f3977f6033f10d34d2bef6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 23 Dec 2025 15:27:06 -0700 Subject: [PATCH 09/12] remove unused import --- .../sql/benchmark/CometCastStringToTemporalBenchmark.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToTemporalBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToTemporalBenchmark.scala index 3924805884..39337be5c8 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToTemporalBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToTemporalBenchmark.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.benchmark -import org.apache.spark.sql.internal.SQLConf - case class CastStringToTemporalConfig( name: String, query: String, From bb9b56b606d9df6ec77a0e94bd26ce08c92b9440 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 24 Dec 2025 09:31:45 -0700 Subject: [PATCH 10/12] more benchmarks --- .../CometAggregateFunctionBenchmark.scala | 93 ++++++++++++ .../CometArrayExpressionBenchmark.scala | 109 +++++++++++++ .../CometBitwiseExpressionBenchmark.scala | 71 +++++++++ .../sql/benchmark/CometCastBenchmark.scala | 96 ------------ .../benchmark/CometCastBooleanBenchmark.scala | 121 +++++++++++++++ .../CometCastNumericToNumericBenchmark.scala | 143 ++++++++++++++++++ .../CometCastNumericToStringBenchmark.scala | 91 +++++++++++ .../CometCastNumericToTemporalBenchmark.scala | 102 +++++++++++++ .../CometCastTemporalToNumericBenchmark.scala | 102 +++++++++++++ .../CometCastTemporalToStringBenchmark.scala | 96 ++++++++++++ .../CometComparisonExpressionBenchmark.scala | 86 +++++++++++ .../CometDatetimeExpressionBenchmark.scala | 88 +++++++++++ .../CometHashExpressionBenchmark.scala | 70 +++++++++ .../CometMapExpressionBenchmark.scala | 80 ++++++++++ .../CometMathExpressionBenchmark.scala | 102 +++++++++++++ .../CometStringExpressionBenchmark.scala | 21 ++- 16 files changed, 1370 insertions(+), 101 deletions(-) create mode 100644 spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateFunctionBenchmark.scala create mode 100644 spark/src/test/scala/org/apache/spark/sql/benchmark/CometArrayExpressionBenchmark.scala create mode 100644 spark/src/test/scala/org/apache/spark/sql/benchmark/CometBitwiseExpressionBenchmark.scala delete mode 100644 spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBenchmark.scala create mode 100644 spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBooleanBenchmark.scala create mode 100644 spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToNumericBenchmark.scala create mode 100644 spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToStringBenchmark.scala create mode 100644 spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToTemporalBenchmark.scala create mode 100644 spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastTemporalToNumericBenchmark.scala create mode 100644 spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastTemporalToStringBenchmark.scala create mode 100644 spark/src/test/scala/org/apache/spark/sql/benchmark/CometComparisonExpressionBenchmark.scala create mode 100644 spark/src/test/scala/org/apache/spark/sql/benchmark/CometHashExpressionBenchmark.scala create mode 100644 spark/src/test/scala/org/apache/spark/sql/benchmark/CometMapExpressionBenchmark.scala create mode 100644 spark/src/test/scala/org/apache/spark/sql/benchmark/CometMathExpressionBenchmark.scala diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateFunctionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateFunctionBenchmark.scala new file mode 100644 index 0000000000..158a9bd7d6 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateFunctionBenchmark.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +case class AggExprConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Comprehensive benchmark for Comet aggregate functions. To run this benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometAggregateFunctionBenchmark` + * Results will be written to "spark/benchmarks/CometAggregateFunctionBenchmark-**results.txt". + */ +// spotless:on +object CometAggregateFunctionBenchmark extends CometBenchmarkBase { + + private val basicAggregates = List( + AggExprConfig("count", "SELECT COUNT(*) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("count_col", "SELECT COUNT(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("count_distinct", "SELECT COUNT(DISTINCT c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("min_int", "SELECT MIN(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("max_int", "SELECT MAX(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("min_double", "SELECT MIN(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("max_double", "SELECT MAX(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("sum_int", "SELECT SUM(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("sum_long", "SELECT SUM(c_long) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("sum_double", "SELECT SUM(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("avg_int", "SELECT AVG(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("avg_double", "SELECT AVG(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("first", "SELECT FIRST(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("first_ignore_nulls", "SELECT FIRST(c_int, true) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("last", "SELECT LAST(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("last_ignore_nulls", "SELECT LAST(c_int, true) FROM parquetV1Table GROUP BY grp")) + + private val statisticalAggregates = List( + AggExprConfig("var_samp", "SELECT VAR_SAMP(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("var_pop", "SELECT VAR_POP(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("stddev_samp", "SELECT STDDEV_SAMP(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("stddev_pop", "SELECT STDDEV_POP(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("covar_samp", "SELECT COVAR_SAMP(c_double, c_double2) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("covar_pop", "SELECT COVAR_POP(c_double, c_double2) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("corr", "SELECT CORR(c_double, c_double2) FROM parquetV1Table GROUP BY grp")) + + private val bitwiseAggregates = List( + AggExprConfig("bit_and", "SELECT BIT_AND(c_long) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("bit_or", "SELECT BIT_OR(c_long) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("bit_xor", "SELECT BIT_XOR(c_long) FROM parquetV1Table GROUP BY grp")) + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = 1024 * 1024 * 10 // 10M rows for aggregates + + runBenchmarkWithTable("Aggregate function benchmarks", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT + CAST(value % 1000 AS INT) AS grp, + CASE WHEN value % 100 = 0 THEN NULL ELSE CAST((value % 10000) - 5000 AS INT) END AS c_int, + CASE WHEN value % 100 = 1 THEN NULL ELSE CAST(value * 1000 AS LONG) END AS c_long, + CASE WHEN value % 100 = 2 THEN NULL ELSE CAST((value % 10000) / 100.0 AS DOUBLE) END AS c_double, + CASE WHEN value % 100 = 3 THEN NULL ELSE CAST((value % 5000) / 50.0 AS DOUBLE) END AS c_double2 + FROM $tbl + """)) + + (basicAggregates ++ statisticalAggregates ++ bitwiseAggregates).foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArrayExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArrayExpressionBenchmark.scala new file mode 100644 index 0000000000..cfa29043a6 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArrayExpressionBenchmark.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +case class ArrayExprConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Comprehensive benchmark for Comet array expressions. To run this benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometArrayExpressionBenchmark` + * Results will be written to "spark/benchmarks/CometArrayExpressionBenchmark-**results.txt". + */ +// spotless:on +object CometArrayExpressionBenchmark extends CometBenchmarkBase { + + private val arrayExpressions = List( + ArrayExprConfig("array_contains", "SELECT array_contains(arr_int, 5) FROM parquetV1Table"), + ArrayExprConfig("array_max", "SELECT array_max(arr_int) FROM parquetV1Table"), + ArrayExprConfig("array_min", "SELECT array_min(arr_int) FROM parquetV1Table"), + ArrayExprConfig("array_distinct", "SELECT array_distinct(arr_int) FROM parquetV1Table"), + ArrayExprConfig("array_remove", "SELECT array_remove(arr_int, 5) FROM parquetV1Table"), + ArrayExprConfig("array_append", "SELECT array_append(arr_int, 100) FROM parquetV1Table"), + ArrayExprConfig("array_compact", "SELECT array_compact(arr_nullable) FROM parquetV1Table"), + ArrayExprConfig("array_intersect", "SELECT array_intersect(arr_int, arr_int2) FROM parquetV1Table"), + ArrayExprConfig("array_except", "SELECT array_except(arr_int, arr_int2) FROM parquetV1Table"), + ArrayExprConfig("array_union", "SELECT array_union(arr_int, arr_int2) FROM parquetV1Table"), + ArrayExprConfig("arrays_overlap", "SELECT arrays_overlap(arr_int, arr_int2) FROM parquetV1Table"), + ArrayExprConfig("array_insert", "SELECT array_insert(arr_int, 2, 999) FROM parquetV1Table"), + ArrayExprConfig("array_join", "SELECT array_join(arr_str, ',') FROM parquetV1Table"), + ArrayExprConfig("array_repeat", "SELECT array_repeat(elem, 5) FROM parquetV1Table"), + ArrayExprConfig("get_array_item", "SELECT arr_int[0] FROM parquetV1Table"), + ArrayExprConfig("element_at", "SELECT element_at(arr_int, 1) FROM parquetV1Table"), + ArrayExprConfig("reverse", "SELECT reverse(arr_int) FROM parquetV1Table"), + ArrayExprConfig("flatten", "SELECT flatten(nested_arr) FROM parquetV1Table"), + ArrayExprConfig("size", "SELECT size(arr_int) FROM parquetV1Table")) + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = 1024 * 1024 * 2 // 2M rows (arrays are larger) + + runBenchmarkWithTable("Array expression benchmarks", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT + CASE WHEN value % 50 = 0 THEN NULL + ELSE array( + cast((value % 10) as int), + cast((value % 20) as int), + cast((value % 30) as int), + cast((value % 10) as int), + 5, 10, 15) + END AS arr_int, + CASE WHEN value % 50 = 1 THEN NULL + ELSE array( + cast((value % 15) as int), + cast((value % 25) as int), + cast((value % 35) as int)) + END AS arr_int2, + CASE WHEN value % 50 = 2 THEN NULL + ELSE array( + CASE WHEN value % 7 = 0 THEN NULL ELSE cast((value % 10) as int) END, + CASE WHEN value % 7 = 1 THEN NULL ELSE cast((value % 20) as int) END, + CASE WHEN value % 7 = 2 THEN NULL ELSE cast((value % 30) as int) END) + END AS arr_nullable, + CASE WHEN value % 50 = 3 THEN NULL + ELSE array( + concat('str_', cast(value % 10 as string)), + concat('val_', cast(value % 20 as string)), + concat('item_', cast(value % 5 as string))) + END AS arr_str, + CASE WHEN value % 50 = 4 THEN NULL + ELSE array( + array(cast((value % 5) as int), cast((value % 10) as int)), + array(cast((value % 15) as int), cast((value % 20) as int))) + END AS nested_arr, + CASE WHEN value % 50 = 5 THEN NULL ELSE cast((value % 100) as int) END AS elem + FROM $tbl + """)) + + arrayExpressions.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBitwiseExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBitwiseExpressionBenchmark.scala new file mode 100644 index 0000000000..3ca693f24d --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBitwiseExpressionBenchmark.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +case class BitwiseExprConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Comprehensive benchmark for Comet bitwise expressions. To run this benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometBitwiseExpressionBenchmark` + * Results will be written to "spark/benchmarks/CometBitwiseExpressionBenchmark-**results.txt". + */ +// spotless:on +object CometBitwiseExpressionBenchmark extends CometBenchmarkBase { + + private val bitwiseExpressions = List( + BitwiseExprConfig("bitwise_and", "SELECT c_int & c_int2 FROM parquetV1Table"), + BitwiseExprConfig("bitwise_or", "SELECT c_int | c_int2 FROM parquetV1Table"), + BitwiseExprConfig("bitwise_xor", "SELECT c_int ^ c_int2 FROM parquetV1Table"), + BitwiseExprConfig("bitwise_not", "SELECT ~c_int FROM parquetV1Table"), + BitwiseExprConfig("shift_left", "SELECT SHIFTLEFT(c_int, 3) FROM parquetV1Table"), + BitwiseExprConfig("shift_right", "SELECT SHIFTRIGHT(c_int, 3) FROM parquetV1Table"), + BitwiseExprConfig("shift_right_unsigned", "SELECT SHIFTRIGHTUNSIGNED(c_int, 3) FROM parquetV1Table"), + BitwiseExprConfig("bit_count", "SELECT BIT_COUNT(c_long) FROM parquetV1Table"), + BitwiseExprConfig("bit_get", "SELECT BIT_GET(c_long, c_pos) FROM parquetV1Table")) + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = 1024 * 1024 * 5 // 5M rows + + runBenchmarkWithTable("Bitwise expression benchmarks", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT + CASE WHEN value % 100 = 0 THEN NULL ELSE CAST(value % 1000000 AS INT) END AS c_int, + CASE WHEN value % 100 = 1 THEN NULL ELSE CAST((value * 7) % 1000000 AS INT) END AS c_int2, + CASE WHEN value % 100 = 2 THEN NULL ELSE CAST(value * 1000 AS LONG) END AS c_long, + CASE WHEN value % 100 = 3 THEN NULL ELSE CAST(value % 64 AS INT) END AS c_pos + FROM $tbl + """)) + + bitwiseExpressions.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBenchmark.scala deleted file mode 100644 index 975abd632f..0000000000 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBenchmark.scala +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.spark.sql.benchmark - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{DataType, LongType} - -import org.apache.comet.expressions.{CometCast, CometEvalMode} -import org.apache.comet.serde.{Compatible, Incompatible, Unsupported} - -/** - * Benchmark to measure Comet execution performance. To run this benchmark: - * {{{ - * SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometCastBenchmark - * }}} - * - * Results will be written to "spark/benchmarks/CometCastBenchmark-**results.txt". - */ - -object CometCastBenchmark extends CometBenchmarkBase { - - override def getSparkSession: SparkSession = { - val session = super.getSparkSession - session.conf.set("parquet.enable.dictionary", "false") - session.conf.set("spark.sql.shuffle.partitions", "2") - session - } - - def castExprSQL(toDataType: DataType, input: String): String = { - s"CAST ($input AS ${toDataType.sql})" - } - - override def runCometBenchmark(args: Array[String]): Unit = { - - // TODO : Create all possible input datatypes. We only have Long inputs for now - CometCast.supportedTypes.foreach { toDataType => - Seq(false, true).foreach { ansiMode => - CometCast.isSupported( - LongType, - toDataType, - None, - if (ansiMode) CometEvalMode.ANSI else CometEvalMode.LEGACY) match { - case Compatible(notes) => - runBenchmarkWithTable( - s"Running benchmark cast operation from : $LongType to : $toDataType", - 1024 * 1024 * 10) { v => - castBenchmark(v, LongType, toDataType, isAnsiMode = ansiMode) - } - case Incompatible(notes) => None - case Unsupported(notes) => None - } - } - } - } - - def castBenchmark( - values: Int, - fromDataType: DataType, - toDataType: DataType, - isAnsiMode: Boolean): Unit = { - - withTempPath { dir => - withTempTable("parquetV1Table") { - prepareTable(dir, spark.sql(s"SELECT value FROM $tbl")) - - val functionSQL = castExprSQL(toDataType, "value") - val query = s"SELECT $functionSQL FROM parquetV1Table" - val name = - s"Cast function to : ${toDataType} , ansi mode enabled : ${isAnsiMode}" - - val extraConfigs = Map(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) - - runExpressionBenchmark(name, values, query, extraConfigs) - } - } - } - -} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBooleanBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBooleanBenchmark.scala new file mode 100644 index 0000000000..d104f40ffb --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBooleanBenchmark.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +case class CastBooleanConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Benchmark to measure performance of Comet cast operations involving Boolean type. To run this + * benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometCastBooleanBenchmark` + * Results will be written to "spark/benchmarks/CometCastBooleanBenchmark-**results.txt". + */ +// spotless:on +object CometCastBooleanBenchmark extends CometBenchmarkBase { + + private val castFunctions = Seq("CAST", "TRY_CAST") + + // Boolean to String + private val boolToStringConfigs = for { + castFunc <- castFunctions + } yield CastBooleanConfig( + s"$castFunc Boolean to String", + s"SELECT $castFunc(c_bool AS STRING) FROM parquetV1Table") + + // Boolean to numeric types + private val boolToNumericTypes = Seq("BYTE", "SHORT", "INT", "LONG", "FLOAT", "DOUBLE", "DECIMAL(10,2)") + private val boolToNumericConfigs = for { + castFunc <- castFunctions + targetType <- boolToNumericTypes + } yield CastBooleanConfig( + s"$castFunc Boolean to $targetType", + s"SELECT $castFunc(c_bool AS $targetType) FROM parquetV1Table") + + // Numeric to Boolean + private val numericTypes = Seq( + ("BYTE", "c_byte"), + ("SHORT", "c_short"), + ("INT", "c_int"), + ("LONG", "c_long"), + ("FLOAT", "c_float"), + ("DOUBLE", "c_double"), + ("DECIMAL(10,2)", "c_decimal")) + + private val numericToBoolConfigs = for { + castFunc <- castFunctions + (sourceType, colName) <- numericTypes + } yield CastBooleanConfig( + s"$castFunc $sourceType to Boolean", + s"SELECT $castFunc($colName AS BOOLEAN) FROM parquetV1Table") + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = 1024 * 1024 * 5 // 5M rows + + // Generate boolean data for boolean-to-other casts + runBenchmarkWithTable("Boolean to other types casts", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT CASE + WHEN value % 100 = 0 THEN NULL + ELSE (value % 2 = 0) + END AS c_bool + FROM $tbl + """)) + + (boolToStringConfigs ++ boolToNumericConfigs).foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + + // Generate numeric data for numeric-to-boolean casts + runBenchmarkWithTable("Numeric to Boolean casts", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT + CASE WHEN value % 100 = 0 THEN NULL ELSE CAST((value % 3) - 1 AS BYTE) END AS c_byte, + CASE WHEN value % 100 = 1 THEN NULL ELSE CAST((value % 3) - 1 AS SHORT) END AS c_short, + CASE WHEN value % 100 = 2 THEN NULL ELSE CAST((value % 3) - 1 AS INT) END AS c_int, + CASE WHEN value % 100 = 3 THEN NULL ELSE CAST((value % 3) - 1 AS LONG) END AS c_long, + CASE WHEN value % 100 = 4 THEN NULL ELSE CAST((value % 3) - 1 AS FLOAT) END AS c_float, + CASE WHEN value % 100 = 5 THEN NULL ELSE CAST((value % 3) - 1 AS DOUBLE) END AS c_double, + CASE WHEN value % 100 = 6 THEN NULL ELSE CAST((value % 3) - 1 AS DECIMAL(10,2)) END AS c_decimal + FROM $tbl + """)) + + numericToBoolConfigs.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToNumericBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToNumericBenchmark.scala new file mode 100644 index 0000000000..a923136999 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToNumericBenchmark.scala @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +case class CastNumericToNumericConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Benchmark to measure performance of Comet cast between numeric types. To run this + * benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometCastNumericToNumericBenchmark` + * Results will be written to "spark/benchmarks/CometCastNumericToNumericBenchmark-**results.txt". + */ +// spotless:on +object CometCastNumericToNumericBenchmark extends CometBenchmarkBase { + + private val castFunctions = Seq("CAST", "TRY_CAST") + + // Integer widening conversions + private val integerWideningPairs = Seq( + ("BYTE", "c_byte", "SHORT"), + ("BYTE", "c_byte", "INT"), + ("BYTE", "c_byte", "LONG"), + ("SHORT", "c_short", "INT"), + ("SHORT", "c_short", "LONG"), + ("INT", "c_int", "LONG")) + + // Integer narrowing conversions + private val integerNarrowingPairs = Seq( + ("LONG", "c_long", "INT"), + ("LONG", "c_long", "SHORT"), + ("LONG", "c_long", "BYTE"), + ("INT", "c_int", "SHORT"), + ("INT", "c_int", "BYTE"), + ("SHORT", "c_short", "BYTE")) + + // Floating point conversions + private val floatPairs = Seq( + ("FLOAT", "c_float", "DOUBLE"), + ("DOUBLE", "c_double", "FLOAT")) + + // Integer to floating point conversions + private val intToFloatPairs = Seq( + ("BYTE", "c_byte", "FLOAT"), + ("SHORT", "c_short", "FLOAT"), + ("INT", "c_int", "FLOAT"), + ("LONG", "c_long", "FLOAT"), + ("INT", "c_int", "DOUBLE"), + ("LONG", "c_long", "DOUBLE")) + + // Floating point to integer conversions + private val floatToIntPairs = Seq( + ("FLOAT", "c_float", "INT"), + ("FLOAT", "c_float", "LONG"), + ("DOUBLE", "c_double", "INT"), + ("DOUBLE", "c_double", "LONG")) + + // Decimal conversions + private val decimalPairs = Seq( + ("INT", "c_int", "DECIMAL(10,2)"), + ("LONG", "c_long", "DECIMAL(20,4)"), + ("DOUBLE", "c_double", "DECIMAL(15,5)"), + ("DECIMAL(10,2)", "c_decimal", "INT"), + ("DECIMAL(10,2)", "c_decimal", "LONG"), + ("DECIMAL(10,2)", "c_decimal", "DOUBLE")) + + private def generateConfigs(pairs: Seq[(String, String, String)]): Seq[CastNumericToNumericConfig] = { + for { + castFunc <- castFunctions + (sourceType, colName, targetType) <- pairs + } yield CastNumericToNumericConfig( + s"$castFunc $sourceType to $targetType", + s"SELECT $castFunc($colName AS $targetType) FROM parquetV1Table") + } + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = 1024 * 1024 * 5 // 5M rows + + // Generate input data once with all numeric types + runBenchmarkWithTable("Numeric to Numeric casts", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + // Generate varied numeric data including edge cases + prepareTable( + dir, + spark.sql(s""" + SELECT + CASE WHEN value % 100 = 0 THEN NULL ELSE CAST((value % 128) - 64 AS BYTE) END AS c_byte, + CASE WHEN value % 100 = 1 THEN NULL ELSE CAST((value % 32768) - 16384 AS SHORT) END AS c_short, + CASE WHEN value % 100 = 2 THEN NULL ELSE CAST(value - 2500000 AS INT) END AS c_int, + CASE WHEN value % 100 = 3 THEN NULL ELSE CAST(value * 1000 AS LONG) END AS c_long, + CASE + WHEN value % 100 = 4 THEN NULL + WHEN value % 100 = 5 THEN CAST('NaN' AS FLOAT) + WHEN value % 100 = 6 THEN CAST('Infinity' AS FLOAT) + WHEN value % 100 = 7 THEN CAST('-Infinity' AS FLOAT) + ELSE CAST((value - 2500000) / 100.0 AS FLOAT) + END AS c_float, + CASE + WHEN value % 100 = 8 THEN NULL + WHEN value % 100 = 9 THEN CAST('NaN' AS DOUBLE) + WHEN value % 100 = 10 THEN CAST('Infinity' AS DOUBLE) + WHEN value % 100 = 11 THEN CAST('-Infinity' AS DOUBLE) + ELSE CAST((value - 2500000) / 100.0 AS DOUBLE) + END AS c_double, + CASE WHEN value % 100 = 12 THEN NULL ELSE CAST((value - 2500000) / 100.0 AS DECIMAL(10,2)) END AS c_decimal + FROM $tbl + """)) + + // Run all benchmark categories + (generateConfigs(integerWideningPairs) ++ + generateConfigs(integerNarrowingPairs) ++ + generateConfigs(floatPairs) ++ + generateConfigs(intToFloatPairs) ++ + generateConfigs(floatToIntPairs) ++ + generateConfigs(decimalPairs)).foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToStringBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToStringBenchmark.scala new file mode 100644 index 0000000000..41331b9a47 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToStringBenchmark.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +case class CastNumericToStringConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Benchmark to measure performance of Comet cast from numeric types to String. To run this + * benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometCastNumericToStringBenchmark` + * Results will be written to "spark/benchmarks/CometCastNumericToStringBenchmark-**results.txt". + */ +// spotless:on +object CometCastNumericToStringBenchmark extends CometBenchmarkBase { + + private val castFunctions = Seq("CAST", "TRY_CAST") + private val sourceTypes = + Seq(("BOOLEAN", "c_bool"), ("BYTE", "c_byte"), ("SHORT", "c_short"), ("INT", "c_int"), + ("LONG", "c_long"), ("FLOAT", "c_float"), ("DOUBLE", "c_double"), ("DECIMAL(10,2)", "c_decimal")) + + private val castConfigs = for { + castFunc <- castFunctions + (sourceType, colName) <- sourceTypes + } yield CastNumericToStringConfig( + s"$castFunc $sourceType to String", + s"SELECT $castFunc($colName AS STRING) FROM parquetV1Table") + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = 1024 * 1024 * 5 // 5M rows + + // Generate input data once with all numeric types + runBenchmarkWithTable("Numeric to String casts", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + // Generate varied numeric data including edge cases + prepareTable( + dir, + spark.sql(s""" + SELECT + CASE WHEN value % 100 = 0 THEN NULL ELSE (value % 2 = 0) END AS c_bool, + CASE WHEN value % 100 = 1 THEN NULL ELSE CAST((value % 128) - 64 AS BYTE) END AS c_byte, + CASE WHEN value % 100 = 2 THEN NULL ELSE CAST((value % 32768) - 16384 AS SHORT) END AS c_short, + CASE WHEN value % 100 = 3 THEN NULL ELSE CAST(value - 2500000 AS INT) END AS c_int, + CASE WHEN value % 100 = 4 THEN NULL ELSE CAST(value * 1000000 AS LONG) END AS c_long, + CASE + WHEN value % 100 = 5 THEN NULL + WHEN value % 100 = 6 THEN CAST('NaN' AS FLOAT) + WHEN value % 100 = 7 THEN CAST('Infinity' AS FLOAT) + WHEN value % 100 = 8 THEN CAST('-Infinity' AS FLOAT) + ELSE CAST((value - 2500000) / 1000.0 AS FLOAT) + END AS c_float, + CASE + WHEN value % 100 = 9 THEN NULL + WHEN value % 100 = 10 THEN CAST('NaN' AS DOUBLE) + WHEN value % 100 = 11 THEN CAST('Infinity' AS DOUBLE) + WHEN value % 100 = 12 THEN CAST('-Infinity' AS DOUBLE) + ELSE CAST((value - 2500000) / 100.0 AS DOUBLE) + END AS c_double, + CASE WHEN value % 100 = 13 THEN NULL ELSE CAST((value - 2500000) / 100.0 AS DECIMAL(10,2)) END AS c_decimal + FROM $tbl + """)) + + castConfigs.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToTemporalBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToTemporalBenchmark.scala new file mode 100644 index 0000000000..e0079b0b6b --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToTemporalBenchmark.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +case class CastNumericToTemporalConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Benchmark to measure performance of Comet cast from numeric types to temporal types. To run + * this benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometCastNumericToTemporalBenchmark` + * Results will be written to "spark/benchmarks/CometCastNumericToTemporalBenchmark-**results.txt". + */ +// spotless:on +object CometCastNumericToTemporalBenchmark extends CometBenchmarkBase { + + private val castFunctions = Seq("CAST", "TRY_CAST") + + // INT to DATE (days since epoch) + private val intToDateConfigs = for { + castFunc <- castFunctions + } yield CastNumericToTemporalConfig( + s"$castFunc Int to Date", + s"SELECT $castFunc(c_int AS DATE) FROM parquetV1Table") + + // LONG to TIMESTAMP (microseconds since epoch) + private val longToTimestampConfigs = for { + castFunc <- castFunctions + } yield CastNumericToTemporalConfig( + s"$castFunc Long to Timestamp", + s"SELECT $castFunc(c_long AS TIMESTAMP) FROM parquetV1Table") + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = 1024 * 1024 * 5 // 5M rows + + // Generate data once for INT to DATE conversions + runBenchmarkWithTable("Int to Date casts", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + // Generate INT values representing days since epoch (1970-01-01) + // Range: ~-18000 to +18000 days (roughly 1920 to 2020) + prepareTable( + dir, + spark.sql(s""" + SELECT CASE + WHEN value % 100 = 0 THEN NULL + ELSE CAST((value % 36500) - 18000 AS INT) + END AS c_int + FROM $tbl + """)) + + intToDateConfigs.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + + // Generate data once for LONG to TIMESTAMP conversions + runBenchmarkWithTable("Long to Timestamp casts", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + // Generate LONG values representing microseconds since epoch + // Range: 2020-2021 timestamps + prepareTable( + dir, + spark.sql(s""" + SELECT CASE + WHEN value % 100 = 0 THEN NULL + ELSE 1577836800000000 + (value % 31536000000000) + END AS c_long + FROM $tbl + """)) + + longToTimestampConfigs.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastTemporalToNumericBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastTemporalToNumericBenchmark.scala new file mode 100644 index 0000000000..68ea0bbbcf --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastTemporalToNumericBenchmark.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +case class CastTemporalToNumericConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Benchmark to measure performance of Comet cast from temporal types to numeric types. To run + * this benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometCastTemporalToNumericBenchmark` + * Results will be written to "spark/benchmarks/CometCastTemporalToNumericBenchmark-**results.txt". + */ +// spotless:on +object CometCastTemporalToNumericBenchmark extends CometBenchmarkBase { + + private val castFunctions = Seq("CAST", "TRY_CAST") + + // DATE to numeric types + private val dateToNumericTypes = Seq("BYTE", "SHORT", "INT", "LONG") + private val dateToNumericConfigs = for { + castFunc <- castFunctions + targetType <- dateToNumericTypes + } yield CastTemporalToNumericConfig( + s"$castFunc Date to $targetType", + s"SELECT $castFunc(c_date AS $targetType) FROM parquetV1Table") + + // TIMESTAMP to numeric types + private val timestampToNumericTypes = Seq("BYTE", "SHORT", "INT", "LONG") + private val timestampToNumericConfigs = for { + castFunc <- castFunctions + targetType <- timestampToNumericTypes + } yield CastTemporalToNumericConfig( + s"$castFunc Timestamp to $targetType", + s"SELECT $castFunc(c_timestamp AS $targetType) FROM parquetV1Table") + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = 1024 * 1024 * 5 // 5M rows + + // Generate DATE data once for all date-to-numeric benchmarks + runBenchmarkWithTable("Date to Numeric casts", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT CASE + WHEN value % 100 = 0 THEN NULL + ELSE DATE_ADD('2020-01-01', CAST(value % 3650 AS INT)) + END AS c_date + FROM $tbl + """)) + + dateToNumericConfigs.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + + // Generate TIMESTAMP data once for all timestamp-to-numeric benchmarks + runBenchmarkWithTable("Timestamp to Numeric casts", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT CASE + WHEN value % 100 = 0 THEN NULL + ELSE TIMESTAMP_MICROS(1577836800000000 + value % 31536000000000) + END AS c_timestamp + FROM $tbl + """)) + + timestampToNumericConfigs.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastTemporalToStringBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastTemporalToStringBenchmark.scala new file mode 100644 index 0000000000..8d091e86fd --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastTemporalToStringBenchmark.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +case class CastTemporalToStringConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Benchmark to measure performance of Comet cast from temporal types to String. To run this + * benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometCastTemporalToStringBenchmark` + * Results will be written to "spark/benchmarks/CometCastTemporalToStringBenchmark-**results.txt". + */ +// spotless:on +object CometCastTemporalToStringBenchmark extends CometBenchmarkBase { + + private val castFunctions = Seq("CAST", "TRY_CAST") + + private val dateCastConfigs = for { + castFunc <- castFunctions + } yield CastTemporalToStringConfig( + s"$castFunc Date to String", + s"SELECT $castFunc(c_date AS STRING) FROM parquetV1Table") + + private val timestampCastConfigs = for { + castFunc <- castFunctions + } yield CastTemporalToStringConfig( + s"$castFunc Timestamp to String", + s"SELECT $castFunc(c_timestamp AS STRING) FROM parquetV1Table") + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = 1024 * 1024 * 5 // 5M rows + + // Generate temporal data once for date benchmarks + runBenchmarkWithTable("Date to String casts", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT CASE + WHEN value % 100 = 0 THEN NULL + ELSE DATE_ADD('2020-01-01', CAST(value % 3650 AS INT)) + END AS c_date + FROM $tbl + """)) + + dateCastConfigs.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + + // Generate temporal data once for timestamp benchmarks + runBenchmarkWithTable("Timestamp to String casts", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT CASE + WHEN value % 100 = 0 THEN NULL + ELSE TIMESTAMP_MICROS(1577836800000000 + value % 31536000000000) + END AS c_timestamp + FROM $tbl + """)) + + timestampCastConfigs.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometComparisonExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometComparisonExpressionBenchmark.scala new file mode 100644 index 0000000000..7d127044e3 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometComparisonExpressionBenchmark.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +case class ComparisonExprConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Comprehensive benchmark for Comet comparison and predicate expressions. To run this benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometComparisonExpressionBenchmark` + * Results will be written to "spark/benchmarks/CometComparisonExpressionBenchmark-**results.txt". + */ +// spotless:on +object CometComparisonExpressionBenchmark extends CometBenchmarkBase { + + private val comparisonExpressions = List( + ComparisonExprConfig("equal_to", "SELECT c_int = c_int2 FROM parquetV1Table"), + ComparisonExprConfig("not_equal_to", "SELECT c_int != c_int2 FROM parquetV1Table"), + ComparisonExprConfig("less_than", "SELECT c_int < c_int2 FROM parquetV1Table"), + ComparisonExprConfig("less_than_or_equal", "SELECT c_int <= c_int2 FROM parquetV1Table"), + ComparisonExprConfig("greater_than", "SELECT c_int > c_int2 FROM parquetV1Table"), + ComparisonExprConfig("greater_than_or_equal", "SELECT c_int >= c_int2 FROM parquetV1Table"), + ComparisonExprConfig("equal_null_safe", "SELECT c_int <=> c_int2 FROM parquetV1Table"), + ComparisonExprConfig("is_null", "SELECT c_int IS NULL FROM parquetV1Table"), + ComparisonExprConfig("is_not_null", "SELECT c_int IS NOT NULL FROM parquetV1Table"), + ComparisonExprConfig("is_nan_float", "SELECT isnan(c_float) FROM parquetV1Table"), + ComparisonExprConfig("is_nan_double", "SELECT isnan(c_double) FROM parquetV1Table"), + ComparisonExprConfig("and", "SELECT (c_int > 0) AND (c_int2 < 100) FROM parquetV1Table"), + ComparisonExprConfig("or", "SELECT (c_int > 0) OR (c_int2 < 100) FROM parquetV1Table"), + ComparisonExprConfig("not", "SELECT NOT (c_int > 0) FROM parquetV1Table"), + ComparisonExprConfig("in_list", "SELECT c_int IN (1, 10, 100, 1000, 10000) FROM parquetV1Table"), + ComparisonExprConfig("not_in_list", "SELECT c_int NOT IN (1, 10, 100, 1000, 10000) FROM parquetV1Table")) + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = 1024 * 1024 * 5 // 5M rows + + runBenchmarkWithTable("Comparison expression benchmarks", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT + CASE WHEN value % 10 = 0 THEN NULL ELSE CAST((value % 100000) - 50000 AS INT) END AS c_int, + CASE WHEN value % 10 = 1 THEN NULL ELSE CAST((value % 1000) AS INT) END AS c_int2, + CASE + WHEN value % 50 = 2 THEN NULL + WHEN value % 50 = 3 THEN CAST('NaN' AS FLOAT) + ELSE CAST((value % 10000) / 100.0 AS FLOAT) + END AS c_float, + CASE + WHEN value % 50 = 4 THEN NULL + WHEN value % 50 = 5 THEN CAST('NaN' AS DOUBLE) + ELSE CAST((value % 10000) / 100.0 AS DOUBLE) + END AS c_double + FROM $tbl + """)) + + comparisonExpressions.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala index 47eff41bbd..e4ceee0093 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala @@ -76,6 +76,76 @@ object CometDatetimeExpressionBenchmark extends CometBenchmarkBase { } } + def datePartExprBenchmark(values: Int, useDictionary: Boolean): Unit = { + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql( + s"select cast(timestamp_micros(cast(value/100000 as integer)) as date) as dt FROM $tbl")) + Seq( + ("year", "YEAR(dt)"), + ("month", "MONTH(dt)"), + ("day_of_month", "DAYOFMONTH(dt)"), + ("day_of_week", "DAYOFWEEK(dt)"), + ("weekday", "WEEKDAY(dt)"), + ("day_of_year", "DAYOFYEAR(dt)"), + ("week_of_year", "WEEKOFYEAR(dt)"), + ("quarter", "QUARTER(dt)")).foreach { + case (name, expr) => + val isDictionary = if (useDictionary) "(Dictionary)" else "" + val benchName = s"Date Extract $isDictionary - $name" + val query = s"select $expr from parquetV1Table" + runExpressionBenchmark(benchName, values, query) + } + } + } + } + + def timestampPartExprBenchmark(values: Int, useDictionary: Boolean): Unit = { + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s"select timestamp_micros(cast(value/100000 as integer)) as ts FROM $tbl")) + Seq( + ("year", "YEAR(ts)"), + ("month", "MONTH(ts)"), + ("day_of_month", "DAYOFMONTH(ts)"), + ("hour", "HOUR(ts)"), + ("minute", "MINUTE(ts)"), + ("second", "SECOND(ts)"), + ("quarter", "QUARTER(ts)")).foreach { + case (name, expr) => + val isDictionary = if (useDictionary) "(Dictionary)" else "" + val benchName = s"Timestamp Extract $isDictionary - $name" + val query = s"select $expr from parquetV1Table" + runExpressionBenchmark(benchName, values, query) + } + } + } + } + + def dateArithmeticExprBenchmark(values: Int, useDictionary: Boolean): Unit = { + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql( + s"select cast(timestamp_micros(cast(value/100000 as integer)) as date) as dt, cast(value % 365 as int) as days FROM $tbl")) + Seq( + ("date_add", "DATE_ADD(dt, days)"), + ("date_sub", "DATE_SUB(dt, days)")).foreach { + case (name, expr) => + val isDictionary = if (useDictionary) "(Dictionary)" else "" + val benchName = s"Date Arithmetic $isDictionary - $name" + val query = s"select $expr from parquetV1Table" + runExpressionBenchmark(benchName, values, query) + } + } + } + } + override def runCometBenchmark(mainArgs: Array[String]): Unit = { val values = 1024 * 1024; @@ -96,6 +166,24 @@ object CometDatetimeExpressionBenchmark extends CometBenchmarkBase { runBenchmarkWithTable("TimestampTrunc (Dictionary)", values, useDictionary = true) { v => timestampTruncExprBenchmark(v, useDictionary = true) } + runBenchmarkWithTable("DatePart", values) { v => + datePartExprBenchmark(v, useDictionary = false) + } + runBenchmarkWithTable("DatePart (Dictionary)", values, useDictionary = true) { v => + datePartExprBenchmark(v, useDictionary = true) + } + runBenchmarkWithTable("TimestampPart", values) { v => + timestampPartExprBenchmark(v, useDictionary = false) + } + runBenchmarkWithTable("TimestampPart (Dictionary)", values, useDictionary = true) { v => + timestampPartExprBenchmark(v, useDictionary = true) + } + runBenchmarkWithTable("DateArithmetic", values) { v => + dateArithmeticExprBenchmark(v, useDictionary = false) + } + runBenchmarkWithTable("DateArithmetic (Dictionary)", values, useDictionary = true) { v => + dateArithmeticExprBenchmark(v, useDictionary = true) + } } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometHashExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometHashExpressionBenchmark.scala new file mode 100644 index 0000000000..a91f3c2b96 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometHashExpressionBenchmark.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +case class HashExprConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Comprehensive benchmark for Comet hash expressions. To run this benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometHashExpressionBenchmark` + * Results will be written to "spark/benchmarks/CometHashExpressionBenchmark-**results.txt". + */ +// spotless:on +object CometHashExpressionBenchmark extends CometBenchmarkBase { + + private val hashExpressions = List( + HashExprConfig("xxhash64_single", "SELECT xxhash64(c_str) FROM parquetV1Table"), + HashExprConfig("xxhash64_multi", "SELECT xxhash64(c_str, c_int, c_long) FROM parquetV1Table"), + HashExprConfig("murmur3_hash_single", "SELECT hash(c_str) FROM parquetV1Table"), + HashExprConfig("murmur3_hash_multi", "SELECT hash(c_str, c_int, c_long) FROM parquetV1Table"), + HashExprConfig("sha1", "SELECT sha1(c_str) FROM parquetV1Table"), + HashExprConfig("sha2_224", "SELECT sha2(c_str, 224) FROM parquetV1Table"), + HashExprConfig("sha2_256", "SELECT sha2(c_str, 256) FROM parquetV1Table"), + HashExprConfig("sha2_384", "SELECT sha2(c_str, 384) FROM parquetV1Table"), + HashExprConfig("sha2_512", "SELECT sha2(c_str, 512) FROM parquetV1Table")) + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = 1024 * 1024 * 5 // 5M rows + + runBenchmarkWithTable("Hash expression benchmarks", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT + CASE WHEN value % 100 = 0 THEN NULL ELSE CONCAT('string_', CAST(value AS STRING)) END AS c_str, + CASE WHEN value % 100 = 1 THEN NULL ELSE CAST(value % 1000000 AS INT) END AS c_int, + CASE WHEN value % 100 = 2 THEN NULL ELSE CAST(value * 1000 AS LONG) END AS c_long + FROM $tbl + """)) + + hashExpressions.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMapExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMapExpressionBenchmark.scala new file mode 100644 index 0000000000..f41eac4e47 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMapExpressionBenchmark.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +case class MapExprConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Comprehensive benchmark for Comet map expressions. To run this benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometMapExpressionBenchmark` + * Results will be written to "spark/benchmarks/CometMapExpressionBenchmark-**results.txt". + */ +// spotless:on +object CometMapExpressionBenchmark extends CometBenchmarkBase { + + private val mapExpressions = List( + MapExprConfig("map_keys", "SELECT map_keys(m) FROM parquetV1Table"), + MapExprConfig("map_values", "SELECT map_values(m) FROM parquetV1Table"), + MapExprConfig("map_entries", "SELECT map_entries(m) FROM parquetV1Table"), + MapExprConfig("map_from_arrays", "SELECT map_from_arrays(keys, values) FROM parquetV1Table"), + MapExprConfig("element_at_map", "SELECT element_at(m, 'key1') FROM parquetV1Table"), + MapExprConfig("map_subscript", "SELECT m['key2'] FROM parquetV1Table")) + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = 1024 * 1024 * 2 // 2M rows (maps are larger) + + runBenchmarkWithTable("Map expression benchmarks", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT + CASE WHEN value % 50 = 0 THEN NULL + ELSE map( + 'key1', cast((value % 100) as int), + 'key2', cast((value % 200) as int), + 'key3', cast((value % 300) as int)) + END AS m, + CASE WHEN value % 50 = 1 THEN NULL + ELSE array('key1', 'key2', 'key3', 'key4') + END AS keys, + CASE WHEN value % 50 = 2 THEN NULL + ELSE array( + cast((value % 100) as int), + cast((value % 200) as int), + cast((value % 300) as int), + cast((value % 400) as int)) + END AS values + FROM $tbl + """)) + + mapExpressions.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMathExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMathExpressionBenchmark.scala new file mode 100644 index 0000000000..f83b9570b1 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMathExpressionBenchmark.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +case class MathExprConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Comprehensive benchmark for Comet math expressions. To run this benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometMathExpressionBenchmark` + * Results will be written to "spark/benchmarks/CometMathExpressionBenchmark-**results.txt". + */ +// spotless:on +object CometMathExpressionBenchmark extends CometBenchmarkBase { + + // Unary math functions + private val unaryMathFunctions = List( + "abs" -> "ABS(c_double)", + "ceil" -> "CEIL(c_double)", + "floor" -> "FLOOR(c_double)", + "round" -> "ROUND(c_double)", + "round_scale" -> "ROUND(c_double, 2)", + "hex_int" -> "HEX(c_int)", + "hex_long" -> "HEX(c_long)", + "unhex" -> "UNHEX(c_hex_str)", + "unary_minus" -> "-(c_double)") + + // Binary math functions + private val binaryMathFunctions = List( + "atan2" -> "ATAN2(c_double, c_double2)", + "pmod" -> "PMOD(c_int, c_int2)") + + // Logarithm functions + private val logFunctions = List( + "log" -> "LN(c_positive_double)", + "log10" -> "LOG10(c_positive_double)", + "log2" -> "LOG2(c_positive_double)") + + private def generateConfigs(funcs: List[(String, String)]): List[MathExprConfig] = { + funcs.map { + case (name, expr) => + MathExprConfig(name, s"SELECT $expr FROM parquetV1Table") + } + } + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = 1024 * 1024 * 5 // 5M rows + + // Benchmark unary and binary math functions + runBenchmarkWithTable("Math expression benchmarks", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT + CASE WHEN value % 100 = 0 THEN NULL ELSE CAST((value - 2500000) AS INT) END AS c_int, + CASE WHEN value % 100 = 1 THEN NULL ELSE CAST((value % 1000) - 500 AS INT) END AS c_int2, + CASE WHEN value % 100 = 2 THEN NULL ELSE CAST(value * 1000 AS LONG) END AS c_long, + CASE + WHEN value % 100 = 3 THEN NULL + WHEN value % 100 = 4 THEN CAST('NaN' AS DOUBLE) + WHEN value % 100 = 5 THEN CAST('Infinity' AS DOUBLE) + WHEN value % 100 = 6 THEN CAST('-Infinity' AS DOUBLE) + ELSE CAST((value - 2500000) / 100.0 AS DOUBLE) + END AS c_double, + CASE WHEN value % 100 = 7 THEN NULL ELSE CAST((value % 1000) - 500 AS DOUBLE) END AS c_double2, + CASE WHEN value % 100 = 8 THEN NULL ELSE CAST(ABS((value - 2500000) / 100.0) + 1.0 AS DOUBLE) END AS c_positive_double, + CASE WHEN value % 100 = 9 THEN NULL ELSE HEX(CAST(value % 1000000 AS LONG)) END AS c_hex_str + FROM $tbl + """)) + + (generateConfigs(unaryMathFunctions) ++ + generateConfigs(binaryMathFunctions) ++ + generateConfigs(logFunctions)).foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala index 41eabb8513..90a8f28969 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala @@ -62,23 +62,34 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { // Configuration for all string expression benchmarks private val stringExpressions = List( - StringExprConfig("Substring", "select substring(c1, 1, 100) from parquetV1Table"), + StringExprConfig("substring", "select substring(c1, 1, 100) from parquetV1Table"), StringExprConfig("ascii", "select ascii(c1) from parquetV1Table"), - StringExprConfig("bitLength", "select bit_length(c1) from parquetV1Table"), + StringExprConfig("bit_length", "select bit_length(c1) from parquetV1Table"), StringExprConfig("octet_length", "select octet_length(c1) from parquetV1Table"), StringExprConfig("upper", "select upper(c1) from parquetV1Table"), StringExprConfig("lower", "select lower(c1) from parquetV1Table"), - StringExprConfig("chr", "select chr(c1) from parquetV1Table"), + StringExprConfig("chr", "select chr(65) from parquetV1Table"), StringExprConfig("initCap", "select initCap(c1) from parquetV1Table"), StringExprConfig("trim", "select trim(c1) from parquetV1Table"), - StringExprConfig("concatws", "select concat_ws(' ', c1, c1) from parquetV1Table"), + StringExprConfig("ltrim", "select ltrim(c1) from parquetV1Table"), + StringExprConfig("rtrim", "select rtrim(c1) from parquetV1Table"), + StringExprConfig("concat", "select concat(c1, c1) from parquetV1Table"), + StringExprConfig("concat_ws", "select concat_ws(' ', c1, c1) from parquetV1Table"), StringExprConfig("length", "select length(c1) from parquetV1Table"), StringExprConfig("repeat", "select repeat(c1, 3) from parquetV1Table"), StringExprConfig("reverse", "select reverse(c1) from parquetV1Table"), StringExprConfig("instr", "select instr(c1, '123') from parquetV1Table"), StringExprConfig("replace", "select replace(c1, '123', 'ab') from parquetV1Table"), StringExprConfig("space", "select space(2) from parquetV1Table"), - StringExprConfig("translate", "select translate(c1, '123456', 'aBcDeF') from parquetV1Table")) + StringExprConfig("translate", "select translate(c1, '123456', 'aBcDeF') from parquetV1Table"), + StringExprConfig("lpad", "select lpad(c1, 150, 'x') from parquetV1Table"), + StringExprConfig("rpad", "select rpad(c1, 150, 'x') from parquetV1Table"), + StringExprConfig("contains", "select contains(c1, '123') from parquetV1Table"), + StringExprConfig("startswith", "select startswith(c1, '1') from parquetV1Table"), + StringExprConfig("endswith", "select endswith(c1, '9') from parquetV1Table"), + StringExprConfig("like", "select c1 like '%123%' from parquetV1Table"), + StringExprConfig("rlike", "select c1 rlike '[0-9]+' from parquetV1Table"), + StringExprConfig("regexp_replace", "select regexp_replace(c1, '[0-9]', 'X') from parquetV1Table")) override def runCometBenchmark(mainArgs: Array[String]): Unit = { val values = 1024 * 1024; From 3ff6ad3d48948217e198c0689d5ece8fb96f14ea Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 24 Dec 2025 09:33:05 -0700 Subject: [PATCH 11/12] format --- .../CometAggregateFunctionBenchmark.scala | 20 ++++++++++---- .../CometArrayExpressionBenchmark.scala | 8 ++++-- .../CometBitwiseExpressionBenchmark.scala | 4 ++- .../benchmark/CometCastBooleanBenchmark.scala | 3 ++- .../CometCastNumericToNumericBenchmark.scala | 7 +++-- .../CometCastNumericToStringBenchmark.scala | 11 ++++++-- .../CometComparisonExpressionBenchmark.scala | 8 ++++-- .../CometDatetimeExpressionBenchmark.scala | 26 ++++++++----------- .../CometMathExpressionBenchmark.scala | 10 +++---- .../CometStringExpressionBenchmark.scala | 4 ++- 10 files changed, 62 insertions(+), 39 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateFunctionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateFunctionBenchmark.scala index 158a9bd7d6..250956a247 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateFunctionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateFunctionBenchmark.scala @@ -36,7 +36,9 @@ object CometAggregateFunctionBenchmark extends CometBenchmarkBase { private val basicAggregates = List( AggExprConfig("count", "SELECT COUNT(*) FROM parquetV1Table GROUP BY grp"), AggExprConfig("count_col", "SELECT COUNT(c_int) FROM parquetV1Table GROUP BY grp"), - AggExprConfig("count_distinct", "SELECT COUNT(DISTINCT c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig( + "count_distinct", + "SELECT COUNT(DISTINCT c_int) FROM parquetV1Table GROUP BY grp"), AggExprConfig("min_int", "SELECT MIN(c_int) FROM parquetV1Table GROUP BY grp"), AggExprConfig("max_int", "SELECT MAX(c_int) FROM parquetV1Table GROUP BY grp"), AggExprConfig("min_double", "SELECT MIN(c_double) FROM parquetV1Table GROUP BY grp"), @@ -47,17 +49,25 @@ object CometAggregateFunctionBenchmark extends CometBenchmarkBase { AggExprConfig("avg_int", "SELECT AVG(c_int) FROM parquetV1Table GROUP BY grp"), AggExprConfig("avg_double", "SELECT AVG(c_double) FROM parquetV1Table GROUP BY grp"), AggExprConfig("first", "SELECT FIRST(c_int) FROM parquetV1Table GROUP BY grp"), - AggExprConfig("first_ignore_nulls", "SELECT FIRST(c_int, true) FROM parquetV1Table GROUP BY grp"), + AggExprConfig( + "first_ignore_nulls", + "SELECT FIRST(c_int, true) FROM parquetV1Table GROUP BY grp"), AggExprConfig("last", "SELECT LAST(c_int) FROM parquetV1Table GROUP BY grp"), - AggExprConfig("last_ignore_nulls", "SELECT LAST(c_int, true) FROM parquetV1Table GROUP BY grp")) + AggExprConfig( + "last_ignore_nulls", + "SELECT LAST(c_int, true) FROM parquetV1Table GROUP BY grp")) private val statisticalAggregates = List( AggExprConfig("var_samp", "SELECT VAR_SAMP(c_double) FROM parquetV1Table GROUP BY grp"), AggExprConfig("var_pop", "SELECT VAR_POP(c_double) FROM parquetV1Table GROUP BY grp"), AggExprConfig("stddev_samp", "SELECT STDDEV_SAMP(c_double) FROM parquetV1Table GROUP BY grp"), AggExprConfig("stddev_pop", "SELECT STDDEV_POP(c_double) FROM parquetV1Table GROUP BY grp"), - AggExprConfig("covar_samp", "SELECT COVAR_SAMP(c_double, c_double2) FROM parquetV1Table GROUP BY grp"), - AggExprConfig("covar_pop", "SELECT COVAR_POP(c_double, c_double2) FROM parquetV1Table GROUP BY grp"), + AggExprConfig( + "covar_samp", + "SELECT COVAR_SAMP(c_double, c_double2) FROM parquetV1Table GROUP BY grp"), + AggExprConfig( + "covar_pop", + "SELECT COVAR_POP(c_double, c_double2) FROM parquetV1Table GROUP BY grp"), AggExprConfig("corr", "SELECT CORR(c_double, c_double2) FROM parquetV1Table GROUP BY grp")) private val bitwiseAggregates = List( diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArrayExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArrayExpressionBenchmark.scala index cfa29043a6..c6dccc52bc 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArrayExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArrayExpressionBenchmark.scala @@ -41,10 +41,14 @@ object CometArrayExpressionBenchmark extends CometBenchmarkBase { ArrayExprConfig("array_remove", "SELECT array_remove(arr_int, 5) FROM parquetV1Table"), ArrayExprConfig("array_append", "SELECT array_append(arr_int, 100) FROM parquetV1Table"), ArrayExprConfig("array_compact", "SELECT array_compact(arr_nullable) FROM parquetV1Table"), - ArrayExprConfig("array_intersect", "SELECT array_intersect(arr_int, arr_int2) FROM parquetV1Table"), + ArrayExprConfig( + "array_intersect", + "SELECT array_intersect(arr_int, arr_int2) FROM parquetV1Table"), ArrayExprConfig("array_except", "SELECT array_except(arr_int, arr_int2) FROM parquetV1Table"), ArrayExprConfig("array_union", "SELECT array_union(arr_int, arr_int2) FROM parquetV1Table"), - ArrayExprConfig("arrays_overlap", "SELECT arrays_overlap(arr_int, arr_int2) FROM parquetV1Table"), + ArrayExprConfig( + "arrays_overlap", + "SELECT arrays_overlap(arr_int, arr_int2) FROM parquetV1Table"), ArrayExprConfig("array_insert", "SELECT array_insert(arr_int, 2, 999) FROM parquetV1Table"), ArrayExprConfig("array_join", "SELECT array_join(arr_str, ',') FROM parquetV1Table"), ArrayExprConfig("array_repeat", "SELECT array_repeat(elem, 5) FROM parquetV1Table"), diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBitwiseExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBitwiseExpressionBenchmark.scala index 3ca693f24d..47b320e2cb 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBitwiseExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBitwiseExpressionBenchmark.scala @@ -40,7 +40,9 @@ object CometBitwiseExpressionBenchmark extends CometBenchmarkBase { BitwiseExprConfig("bitwise_not", "SELECT ~c_int FROM parquetV1Table"), BitwiseExprConfig("shift_left", "SELECT SHIFTLEFT(c_int, 3) FROM parquetV1Table"), BitwiseExprConfig("shift_right", "SELECT SHIFTRIGHT(c_int, 3) FROM parquetV1Table"), - BitwiseExprConfig("shift_right_unsigned", "SELECT SHIFTRIGHTUNSIGNED(c_int, 3) FROM parquetV1Table"), + BitwiseExprConfig( + "shift_right_unsigned", + "SELECT SHIFTRIGHTUNSIGNED(c_int, 3) FROM parquetV1Table"), BitwiseExprConfig("bit_count", "SELECT BIT_COUNT(c_long) FROM parquetV1Table"), BitwiseExprConfig("bit_get", "SELECT BIT_GET(c_long, c_pos) FROM parquetV1Table")) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBooleanBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBooleanBenchmark.scala index d104f40ffb..678817f962 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBooleanBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBooleanBenchmark.scala @@ -44,7 +44,8 @@ object CometCastBooleanBenchmark extends CometBenchmarkBase { s"SELECT $castFunc(c_bool AS STRING) FROM parquetV1Table") // Boolean to numeric types - private val boolToNumericTypes = Seq("BYTE", "SHORT", "INT", "LONG", "FLOAT", "DOUBLE", "DECIMAL(10,2)") + private val boolToNumericTypes = + Seq("BYTE", "SHORT", "INT", "LONG", "FLOAT", "DOUBLE", "DECIMAL(10,2)") private val boolToNumericConfigs = for { castFunc <- castFunctions targetType <- boolToNumericTypes diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToNumericBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToNumericBenchmark.scala index a923136999..138f2fe06b 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToNumericBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToNumericBenchmark.scala @@ -55,9 +55,7 @@ object CometCastNumericToNumericBenchmark extends CometBenchmarkBase { ("SHORT", "c_short", "BYTE")) // Floating point conversions - private val floatPairs = Seq( - ("FLOAT", "c_float", "DOUBLE"), - ("DOUBLE", "c_double", "FLOAT")) + private val floatPairs = Seq(("FLOAT", "c_float", "DOUBLE"), ("DOUBLE", "c_double", "FLOAT")) // Integer to floating point conversions private val intToFloatPairs = Seq( @@ -84,7 +82,8 @@ object CometCastNumericToNumericBenchmark extends CometBenchmarkBase { ("DECIMAL(10,2)", "c_decimal", "LONG"), ("DECIMAL(10,2)", "c_decimal", "DOUBLE")) - private def generateConfigs(pairs: Seq[(String, String, String)]): Seq[CastNumericToNumericConfig] = { + private def generateConfigs( + pairs: Seq[(String, String, String)]): Seq[CastNumericToNumericConfig] = { for { castFunc <- castFunctions (sourceType, colName, targetType) <- pairs diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToStringBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToStringBenchmark.scala index 41331b9a47..0e9bed9b04 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToStringBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToStringBenchmark.scala @@ -36,8 +36,15 @@ object CometCastNumericToStringBenchmark extends CometBenchmarkBase { private val castFunctions = Seq("CAST", "TRY_CAST") private val sourceTypes = - Seq(("BOOLEAN", "c_bool"), ("BYTE", "c_byte"), ("SHORT", "c_short"), ("INT", "c_int"), - ("LONG", "c_long"), ("FLOAT", "c_float"), ("DOUBLE", "c_double"), ("DECIMAL(10,2)", "c_decimal")) + Seq( + ("BOOLEAN", "c_bool"), + ("BYTE", "c_byte"), + ("SHORT", "c_short"), + ("INT", "c_int"), + ("LONG", "c_long"), + ("FLOAT", "c_float"), + ("DOUBLE", "c_double"), + ("DECIMAL(10,2)", "c_decimal")) private val castConfigs = for { castFunc <- castFunctions diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometComparisonExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometComparisonExpressionBenchmark.scala index 7d127044e3..cef98aef9e 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometComparisonExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometComparisonExpressionBenchmark.scala @@ -48,8 +48,12 @@ object CometComparisonExpressionBenchmark extends CometBenchmarkBase { ComparisonExprConfig("and", "SELECT (c_int > 0) AND (c_int2 < 100) FROM parquetV1Table"), ComparisonExprConfig("or", "SELECT (c_int > 0) OR (c_int2 < 100) FROM parquetV1Table"), ComparisonExprConfig("not", "SELECT NOT (c_int > 0) FROM parquetV1Table"), - ComparisonExprConfig("in_list", "SELECT c_int IN (1, 10, 100, 1000, 10000) FROM parquetV1Table"), - ComparisonExprConfig("not_in_list", "SELECT c_int NOT IN (1, 10, 100, 1000, 10000) FROM parquetV1Table")) + ComparisonExprConfig( + "in_list", + "SELECT c_int IN (1, 10, 100, 1000, 10000) FROM parquetV1Table"), + ComparisonExprConfig( + "not_in_list", + "SELECT c_int NOT IN (1, 10, 100, 1000, 10000) FROM parquetV1Table")) override def runCometBenchmark(mainArgs: Array[String]): Unit = { val values = 1024 * 1024 * 5 // 5M rows diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala index e4ceee0093..ccc4e45c7d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala @@ -91,12 +91,11 @@ object CometDatetimeExpressionBenchmark extends CometBenchmarkBase { ("weekday", "WEEKDAY(dt)"), ("day_of_year", "DAYOFYEAR(dt)"), ("week_of_year", "WEEKOFYEAR(dt)"), - ("quarter", "QUARTER(dt)")).foreach { - case (name, expr) => - val isDictionary = if (useDictionary) "(Dictionary)" else "" - val benchName = s"Date Extract $isDictionary - $name" - val query = s"select $expr from parquetV1Table" - runExpressionBenchmark(benchName, values, query) + ("quarter", "QUARTER(dt)")).foreach { case (name, expr) => + val isDictionary = if (useDictionary) "(Dictionary)" else "" + val benchName = s"Date Extract $isDictionary - $name" + val query = s"select $expr from parquetV1Table" + runExpressionBenchmark(benchName, values, query) } } } @@ -115,12 +114,11 @@ object CometDatetimeExpressionBenchmark extends CometBenchmarkBase { ("hour", "HOUR(ts)"), ("minute", "MINUTE(ts)"), ("second", "SECOND(ts)"), - ("quarter", "QUARTER(ts)")).foreach { - case (name, expr) => - val isDictionary = if (useDictionary) "(Dictionary)" else "" - val benchName = s"Timestamp Extract $isDictionary - $name" - val query = s"select $expr from parquetV1Table" - runExpressionBenchmark(benchName, values, query) + ("quarter", "QUARTER(ts)")).foreach { case (name, expr) => + val isDictionary = if (useDictionary) "(Dictionary)" else "" + val benchName = s"Timestamp Extract $isDictionary - $name" + val query = s"select $expr from parquetV1Table" + runExpressionBenchmark(benchName, values, query) } } } @@ -133,9 +131,7 @@ object CometDatetimeExpressionBenchmark extends CometBenchmarkBase { dir, spark.sql( s"select cast(timestamp_micros(cast(value/100000 as integer)) as date) as dt, cast(value % 365 as int) as days FROM $tbl")) - Seq( - ("date_add", "DATE_ADD(dt, days)"), - ("date_sub", "DATE_SUB(dt, days)")).foreach { + Seq(("date_add", "DATE_ADD(dt, days)"), ("date_sub", "DATE_SUB(dt, days)")).foreach { case (name, expr) => val isDictionary = if (useDictionary) "(Dictionary)" else "" val benchName = s"Date Arithmetic $isDictionary - $name" diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMathExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMathExpressionBenchmark.scala index f83b9570b1..509d9d8408 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMathExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMathExpressionBenchmark.scala @@ -46,9 +46,8 @@ object CometMathExpressionBenchmark extends CometBenchmarkBase { "unary_minus" -> "-(c_double)") // Binary math functions - private val binaryMathFunctions = List( - "atan2" -> "ATAN2(c_double, c_double2)", - "pmod" -> "PMOD(c_int, c_int2)") + private val binaryMathFunctions = + List("atan2" -> "ATAN2(c_double, c_double2)", "pmod" -> "PMOD(c_int, c_int2)") // Logarithm functions private val logFunctions = List( @@ -57,9 +56,8 @@ object CometMathExpressionBenchmark extends CometBenchmarkBase { "log2" -> "LOG2(c_positive_double)") private def generateConfigs(funcs: List[(String, String)]): List[MathExprConfig] = { - funcs.map { - case (name, expr) => - MathExprConfig(name, s"SELECT $expr FROM parquetV1Table") + funcs.map { case (name, expr) => + MathExprConfig(name, s"SELECT $expr FROM parquetV1Table") } } diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala index 90a8f28969..a9aff529fb 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala @@ -89,7 +89,9 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { StringExprConfig("endswith", "select endswith(c1, '9') from parquetV1Table"), StringExprConfig("like", "select c1 like '%123%' from parquetV1Table"), StringExprConfig("rlike", "select c1 rlike '[0-9]+' from parquetV1Table"), - StringExprConfig("regexp_replace", "select regexp_replace(c1, '[0-9]', 'X') from parquetV1Table")) + StringExprConfig( + "regexp_replace", + "select regexp_replace(c1, '[0-9]', 'X') from parquetV1Table")) override def runCometBenchmark(mainArgs: Array[String]): Unit = { val values = 1024 * 1024; From ed22bbbfbc722edc388fa1c81f1025032fe8f722 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 24 Dec 2025 13:23:00 -0700 Subject: [PATCH 12/12] more --- .../CometAggregateFunctionBenchmark.scala | 2 +- .../CometArrayExpressionBenchmark.scala | 2 +- .../sql/benchmark/CometBenchmarkBase.scala | 21 +++++++++++++++++++ .../CometBitwiseExpressionBenchmark.scala | 2 +- .../benchmark/CometCastBooleanBenchmark.scala | 2 +- .../CometCastNumericToNumericBenchmark.scala | 2 +- .../CometCastNumericToStringBenchmark.scala | 2 +- .../CometCastNumericToTemporalBenchmark.scala | 2 +- .../CometCastTemporalToNumericBenchmark.scala | 2 +- .../CometCastTemporalToStringBenchmark.scala | 2 +- .../CometComparisonExpressionBenchmark.scala | 2 +- .../CometHashExpressionBenchmark.scala | 2 +- .../CometMapExpressionBenchmark.scala | 2 +- .../CometMathExpressionBenchmark.scala | 2 +- 14 files changed, 34 insertions(+), 13 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateFunctionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateFunctionBenchmark.scala index 250956a247..e54f98eb35 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateFunctionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateFunctionBenchmark.scala @@ -76,7 +76,7 @@ object CometAggregateFunctionBenchmark extends CometBenchmarkBase { AggExprConfig("bit_xor", "SELECT BIT_XOR(c_long) FROM parquetV1Table GROUP BY grp")) override def runCometBenchmark(mainArgs: Array[String]): Unit = { - val values = 1024 * 1024 * 10 // 10M rows for aggregates + val values = getBenchmarkRows(1024 * 1024 * 10) // 10M rows default for aggregates runBenchmarkWithTable("Aggregate function benchmarks", values) { v => withTempPath { dir => diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArrayExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArrayExpressionBenchmark.scala index c6dccc52bc..77069b0844 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArrayExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArrayExpressionBenchmark.scala @@ -59,7 +59,7 @@ object CometArrayExpressionBenchmark extends CometBenchmarkBase { ArrayExprConfig("size", "SELECT size(arr_int) FROM parquetV1Table")) override def runCometBenchmark(mainArgs: Array[String]): Unit = { - val values = 1024 * 1024 * 2 // 2M rows (arrays are larger) + val values = getBenchmarkRows(1024 * 1024 * 2) // 2M rows default (arrays are larger) runBenchmarkWithTable("Array expression benchmarks", values) { v => withTempPath { dir => diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala index 8d56cefa05..dd5b6ed3a5 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala @@ -69,6 +69,27 @@ trait CometBenchmarkBase extends SqlBasedBenchmark { protected val tbl = "comet_table" + /** + * Get the number of rows to use for benchmarks, with support for quick mode. Set + * COMET_BENCHMARK_QUICK_MODE=1 to run with reduced dataset sizes for faster benchmarking. + * + * @param defaultRows + * the default number of rows for full benchmark + * @return + * the number of rows to use (1/100th of default in quick mode) + */ + protected def getBenchmarkRows(defaultRows: Int): Int = { + val quickMode = sys.env + .get("COMET_BENCHMARK_QUICK_MODE") + .exists(v => v == "1" || v.equalsIgnoreCase("true")) + if (quickMode) { + // Use 1% of default size in quick mode (minimum 1024) + Math.max(1024, defaultRows / 100) + } else { + defaultRows + } + } + protected def withTempTable(tableNames: String*)(f: => Unit): Unit = { try f finally tableNames.foreach(spark.catalog.dropTempView) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBitwiseExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBitwiseExpressionBenchmark.scala index 47b320e2cb..a391c32573 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBitwiseExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBitwiseExpressionBenchmark.scala @@ -47,7 +47,7 @@ object CometBitwiseExpressionBenchmark extends CometBenchmarkBase { BitwiseExprConfig("bit_get", "SELECT BIT_GET(c_long, c_pos) FROM parquetV1Table")) override def runCometBenchmark(mainArgs: Array[String]): Unit = { - val values = 1024 * 1024 * 5 // 5M rows + val values = getBenchmarkRows(1024 * 1024 * 5) // 5M rows default runBenchmarkWithTable("Bitwise expression benchmarks", values) { v => withTempPath { dir => diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBooleanBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBooleanBenchmark.scala index 678817f962..085e7388c7 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBooleanBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBooleanBenchmark.scala @@ -71,7 +71,7 @@ object CometCastBooleanBenchmark extends CometBenchmarkBase { s"SELECT $castFunc($colName AS BOOLEAN) FROM parquetV1Table") override def runCometBenchmark(mainArgs: Array[String]): Unit = { - val values = 1024 * 1024 * 5 // 5M rows + val values = getBenchmarkRows(1024 * 1024 * 5) // 5M rows default // Generate boolean data for boolean-to-other casts runBenchmarkWithTable("Boolean to other types casts", values) { v => diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToNumericBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToNumericBenchmark.scala index 138f2fe06b..5137e50e18 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToNumericBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToNumericBenchmark.scala @@ -93,7 +93,7 @@ object CometCastNumericToNumericBenchmark extends CometBenchmarkBase { } override def runCometBenchmark(mainArgs: Array[String]): Unit = { - val values = 1024 * 1024 * 5 // 5M rows + val values = getBenchmarkRows(1024 * 1024 * 5) // 5M rows default // Generate input data once with all numeric types runBenchmarkWithTable("Numeric to Numeric casts", values) { v => diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToStringBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToStringBenchmark.scala index 0e9bed9b04..1459ab941f 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToStringBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToStringBenchmark.scala @@ -54,7 +54,7 @@ object CometCastNumericToStringBenchmark extends CometBenchmarkBase { s"SELECT $castFunc($colName AS STRING) FROM parquetV1Table") override def runCometBenchmark(mainArgs: Array[String]): Unit = { - val values = 1024 * 1024 * 5 // 5M rows + val values = getBenchmarkRows(1024 * 1024 * 5) // 5M rows default // Generate input data once with all numeric types runBenchmarkWithTable("Numeric to String casts", values) { v => diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToTemporalBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToTemporalBenchmark.scala index e0079b0b6b..a8e81a3dff 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToTemporalBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToTemporalBenchmark.scala @@ -51,7 +51,7 @@ object CometCastNumericToTemporalBenchmark extends CometBenchmarkBase { s"SELECT $castFunc(c_long AS TIMESTAMP) FROM parquetV1Table") override def runCometBenchmark(mainArgs: Array[String]): Unit = { - val values = 1024 * 1024 * 5 // 5M rows + val values = getBenchmarkRows(1024 * 1024 * 5) // 5M rows default // Generate data once for INT to DATE conversions runBenchmarkWithTable("Int to Date casts", values) { v => diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastTemporalToNumericBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastTemporalToNumericBenchmark.scala index 68ea0bbbcf..08850b6a12 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastTemporalToNumericBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastTemporalToNumericBenchmark.scala @@ -55,7 +55,7 @@ object CometCastTemporalToNumericBenchmark extends CometBenchmarkBase { s"SELECT $castFunc(c_timestamp AS $targetType) FROM parquetV1Table") override def runCometBenchmark(mainArgs: Array[String]): Unit = { - val values = 1024 * 1024 * 5 // 5M rows + val values = getBenchmarkRows(1024 * 1024 * 5) // 5M rows default // Generate DATE data once for all date-to-numeric benchmarks runBenchmarkWithTable("Date to Numeric casts", values) { v => diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastTemporalToStringBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastTemporalToStringBenchmark.scala index 8d091e86fd..5e2316a142 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastTemporalToStringBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastTemporalToStringBenchmark.scala @@ -49,7 +49,7 @@ object CometCastTemporalToStringBenchmark extends CometBenchmarkBase { s"SELECT $castFunc(c_timestamp AS STRING) FROM parquetV1Table") override def runCometBenchmark(mainArgs: Array[String]): Unit = { - val values = 1024 * 1024 * 5 // 5M rows + val values = getBenchmarkRows(1024 * 1024 * 5) // 5M rows default // Generate temporal data once for date benchmarks runBenchmarkWithTable("Date to String casts", values) { v => diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometComparisonExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometComparisonExpressionBenchmark.scala index cef98aef9e..8360e006fd 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometComparisonExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometComparisonExpressionBenchmark.scala @@ -56,7 +56,7 @@ object CometComparisonExpressionBenchmark extends CometBenchmarkBase { "SELECT c_int NOT IN (1, 10, 100, 1000, 10000) FROM parquetV1Table")) override def runCometBenchmark(mainArgs: Array[String]): Unit = { - val values = 1024 * 1024 * 5 // 5M rows + val values = getBenchmarkRows(1024 * 1024 * 5) // 5M rows default runBenchmarkWithTable("Comparison expression benchmarks", values) { v => withTempPath { dir => diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometHashExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometHashExpressionBenchmark.scala index a91f3c2b96..9584322c44 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometHashExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometHashExpressionBenchmark.scala @@ -45,7 +45,7 @@ object CometHashExpressionBenchmark extends CometBenchmarkBase { HashExprConfig("sha2_512", "SELECT sha2(c_str, 512) FROM parquetV1Table")) override def runCometBenchmark(mainArgs: Array[String]): Unit = { - val values = 1024 * 1024 * 5 // 5M rows + val values = getBenchmarkRows(1024 * 1024 * 5) // 5M rows default runBenchmarkWithTable("Hash expression benchmarks", values) { v => withTempPath { dir => diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMapExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMapExpressionBenchmark.scala index f41eac4e47..496af42c45 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMapExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMapExpressionBenchmark.scala @@ -42,7 +42,7 @@ object CometMapExpressionBenchmark extends CometBenchmarkBase { MapExprConfig("map_subscript", "SELECT m['key2'] FROM parquetV1Table")) override def runCometBenchmark(mainArgs: Array[String]): Unit = { - val values = 1024 * 1024 * 2 // 2M rows (maps are larger) + val values = getBenchmarkRows(1024 * 1024 * 2) // 2M rows default (maps are larger) runBenchmarkWithTable("Map expression benchmarks", values) { v => withTempPath { dir => diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMathExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMathExpressionBenchmark.scala index 509d9d8408..d63008d3b9 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMathExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMathExpressionBenchmark.scala @@ -62,7 +62,7 @@ object CometMathExpressionBenchmark extends CometBenchmarkBase { } override def runCometBenchmark(mainArgs: Array[String]): Unit = { - val values = 1024 * 1024 * 5 // 5M rows + val values = getBenchmarkRows(1024 * 1024 * 5) // 5M rows default // Benchmark unary and binary math functions runBenchmarkWithTable("Math expression benchmarks", values) { v =>