diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateFunctionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateFunctionBenchmark.scala new file mode 100644 index 0000000000..e54f98eb35 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateFunctionBenchmark.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +case class AggExprConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Comprehensive benchmark for Comet aggregate functions. To run this benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometAggregateFunctionBenchmark` + * Results will be written to "spark/benchmarks/CometAggregateFunctionBenchmark-**results.txt". + */ +// spotless:on +object CometAggregateFunctionBenchmark extends CometBenchmarkBase { + + private val basicAggregates = List( + AggExprConfig("count", "SELECT COUNT(*) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("count_col", "SELECT COUNT(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig( + "count_distinct", + "SELECT COUNT(DISTINCT c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("min_int", "SELECT MIN(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("max_int", "SELECT MAX(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("min_double", "SELECT MIN(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("max_double", "SELECT MAX(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("sum_int", "SELECT SUM(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("sum_long", "SELECT SUM(c_long) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("sum_double", "SELECT SUM(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("avg_int", "SELECT AVG(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("avg_double", "SELECT AVG(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("first", "SELECT FIRST(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig( + "first_ignore_nulls", + "SELECT FIRST(c_int, true) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("last", "SELECT LAST(c_int) FROM parquetV1Table GROUP BY grp"), + AggExprConfig( + "last_ignore_nulls", + "SELECT LAST(c_int, true) FROM parquetV1Table GROUP BY grp")) + + private val statisticalAggregates = List( + AggExprConfig("var_samp", "SELECT VAR_SAMP(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("var_pop", "SELECT VAR_POP(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("stddev_samp", "SELECT STDDEV_SAMP(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("stddev_pop", "SELECT STDDEV_POP(c_double) FROM parquetV1Table GROUP BY grp"), + AggExprConfig( + "covar_samp", + "SELECT COVAR_SAMP(c_double, c_double2) FROM parquetV1Table GROUP BY grp"), + AggExprConfig( + "covar_pop", + "SELECT COVAR_POP(c_double, c_double2) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("corr", "SELECT CORR(c_double, c_double2) FROM parquetV1Table GROUP BY grp")) + + private val bitwiseAggregates = List( + AggExprConfig("bit_and", "SELECT BIT_AND(c_long) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("bit_or", "SELECT BIT_OR(c_long) FROM parquetV1Table GROUP BY grp"), + AggExprConfig("bit_xor", "SELECT BIT_XOR(c_long) FROM parquetV1Table GROUP BY grp")) + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = getBenchmarkRows(1024 * 1024 * 10) // 10M rows default for aggregates + + runBenchmarkWithTable("Aggregate function benchmarks", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT + CAST(value % 1000 AS INT) AS grp, + CASE WHEN value % 100 = 0 THEN NULL ELSE CAST((value % 10000) - 5000 AS INT) END AS c_int, + CASE WHEN value % 100 = 1 THEN NULL ELSE CAST(value * 1000 AS LONG) END AS c_long, + CASE WHEN value % 100 = 2 THEN NULL ELSE CAST((value % 10000) / 100.0 AS DOUBLE) END AS c_double, + CASE WHEN value % 100 = 3 THEN NULL ELSE CAST((value % 5000) / 50.0 AS DOUBLE) END AS c_double2 + FROM $tbl + """)) + + (basicAggregates ++ statisticalAggregates ++ bitwiseAggregates).foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArrayExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArrayExpressionBenchmark.scala new file mode 100644 index 0000000000..77069b0844 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometArrayExpressionBenchmark.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +case class ArrayExprConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Comprehensive benchmark for Comet array expressions. To run this benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometArrayExpressionBenchmark` + * Results will be written to "spark/benchmarks/CometArrayExpressionBenchmark-**results.txt". + */ +// spotless:on +object CometArrayExpressionBenchmark extends CometBenchmarkBase { + + private val arrayExpressions = List( + ArrayExprConfig("array_contains", "SELECT array_contains(arr_int, 5) FROM parquetV1Table"), + ArrayExprConfig("array_max", "SELECT array_max(arr_int) FROM parquetV1Table"), + ArrayExprConfig("array_min", "SELECT array_min(arr_int) FROM parquetV1Table"), + ArrayExprConfig("array_distinct", "SELECT array_distinct(arr_int) FROM parquetV1Table"), + ArrayExprConfig("array_remove", "SELECT array_remove(arr_int, 5) FROM parquetV1Table"), + ArrayExprConfig("array_append", "SELECT array_append(arr_int, 100) FROM parquetV1Table"), + ArrayExprConfig("array_compact", "SELECT array_compact(arr_nullable) FROM parquetV1Table"), + ArrayExprConfig( + "array_intersect", + "SELECT array_intersect(arr_int, arr_int2) FROM parquetV1Table"), + ArrayExprConfig("array_except", "SELECT array_except(arr_int, arr_int2) FROM parquetV1Table"), + ArrayExprConfig("array_union", "SELECT array_union(arr_int, arr_int2) FROM parquetV1Table"), + ArrayExprConfig( + "arrays_overlap", + "SELECT arrays_overlap(arr_int, arr_int2) FROM parquetV1Table"), + ArrayExprConfig("array_insert", "SELECT array_insert(arr_int, 2, 999) FROM parquetV1Table"), + ArrayExprConfig("array_join", "SELECT array_join(arr_str, ',') FROM parquetV1Table"), + ArrayExprConfig("array_repeat", "SELECT array_repeat(elem, 5) FROM parquetV1Table"), + ArrayExprConfig("get_array_item", "SELECT arr_int[0] FROM parquetV1Table"), + ArrayExprConfig("element_at", "SELECT element_at(arr_int, 1) FROM parquetV1Table"), + ArrayExprConfig("reverse", "SELECT reverse(arr_int) FROM parquetV1Table"), + ArrayExprConfig("flatten", "SELECT flatten(nested_arr) FROM parquetV1Table"), + ArrayExprConfig("size", "SELECT size(arr_int) FROM parquetV1Table")) + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = getBenchmarkRows(1024 * 1024 * 2) // 2M rows default (arrays are larger) + + runBenchmarkWithTable("Array expression benchmarks", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT + CASE WHEN value % 50 = 0 THEN NULL + ELSE array( + cast((value % 10) as int), + cast((value % 20) as int), + cast((value % 30) as int), + cast((value % 10) as int), + 5, 10, 15) + END AS arr_int, + CASE WHEN value % 50 = 1 THEN NULL + ELSE array( + cast((value % 15) as int), + cast((value % 25) as int), + cast((value % 35) as int)) + END AS arr_int2, + CASE WHEN value % 50 = 2 THEN NULL + ELSE array( + CASE WHEN value % 7 = 0 THEN NULL ELSE cast((value % 10) as int) END, + CASE WHEN value % 7 = 1 THEN NULL ELSE cast((value % 20) as int) END, + CASE WHEN value % 7 = 2 THEN NULL ELSE cast((value % 30) as int) END) + END AS arr_nullable, + CASE WHEN value % 50 = 3 THEN NULL + ELSE array( + concat('str_', cast(value % 10 as string)), + concat('val_', cast(value % 20 as string)), + concat('item_', cast(value % 5 as string))) + END AS arr_str, + CASE WHEN value % 50 = 4 THEN NULL + ELSE array( + array(cast((value % 5) as int), cast((value % 10) as int)), + array(cast((value % 15) as int), cast((value % 20) as int))) + END AS nested_arr, + CASE WHEN value % 50 = 5 THEN NULL ELSE cast((value % 100) as int) END AS elem + FROM $tbl + """)) + + arrayExpressions.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala index 8d56cefa05..dd5b6ed3a5 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala @@ -69,6 +69,27 @@ trait CometBenchmarkBase extends SqlBasedBenchmark { protected val tbl = "comet_table" + /** + * Get the number of rows to use for benchmarks, with support for quick mode. Set + * COMET_BENCHMARK_QUICK_MODE=1 to run with reduced dataset sizes for faster benchmarking. + * + * @param defaultRows + * the default number of rows for full benchmark + * @return + * the number of rows to use (1/100th of default in quick mode) + */ + protected def getBenchmarkRows(defaultRows: Int): Int = { + val quickMode = sys.env + .get("COMET_BENCHMARK_QUICK_MODE") + .exists(v => v == "1" || v.equalsIgnoreCase("true")) + if (quickMode) { + // Use 1% of default size in quick mode (minimum 1024) + Math.max(1024, defaultRows / 100) + } else { + defaultRows + } + } + protected def withTempTable(tableNames: String*)(f: => Unit): Unit = { try f finally tableNames.foreach(spark.catalog.dropTempView) diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBitwiseExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBitwiseExpressionBenchmark.scala new file mode 100644 index 0000000000..a391c32573 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometBitwiseExpressionBenchmark.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +case class BitwiseExprConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Comprehensive benchmark for Comet bitwise expressions. To run this benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometBitwiseExpressionBenchmark` + * Results will be written to "spark/benchmarks/CometBitwiseExpressionBenchmark-**results.txt". + */ +// spotless:on +object CometBitwiseExpressionBenchmark extends CometBenchmarkBase { + + private val bitwiseExpressions = List( + BitwiseExprConfig("bitwise_and", "SELECT c_int & c_int2 FROM parquetV1Table"), + BitwiseExprConfig("bitwise_or", "SELECT c_int | c_int2 FROM parquetV1Table"), + BitwiseExprConfig("bitwise_xor", "SELECT c_int ^ c_int2 FROM parquetV1Table"), + BitwiseExprConfig("bitwise_not", "SELECT ~c_int FROM parquetV1Table"), + BitwiseExprConfig("shift_left", "SELECT SHIFTLEFT(c_int, 3) FROM parquetV1Table"), + BitwiseExprConfig("shift_right", "SELECT SHIFTRIGHT(c_int, 3) FROM parquetV1Table"), + BitwiseExprConfig( + "shift_right_unsigned", + "SELECT SHIFTRIGHTUNSIGNED(c_int, 3) FROM parquetV1Table"), + BitwiseExprConfig("bit_count", "SELECT BIT_COUNT(c_long) FROM parquetV1Table"), + BitwiseExprConfig("bit_get", "SELECT BIT_GET(c_long, c_pos) FROM parquetV1Table")) + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = getBenchmarkRows(1024 * 1024 * 5) // 5M rows default + + runBenchmarkWithTable("Bitwise expression benchmarks", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT + CASE WHEN value % 100 = 0 THEN NULL ELSE CAST(value % 1000000 AS INT) END AS c_int, + CASE WHEN value % 100 = 1 THEN NULL ELSE CAST((value * 7) % 1000000 AS INT) END AS c_int2, + CASE WHEN value % 100 = 2 THEN NULL ELSE CAST(value * 1000 AS LONG) END AS c_long, + CASE WHEN value % 100 = 3 THEN NULL ELSE CAST(value % 64 AS INT) END AS c_pos + FROM $tbl + """)) + + bitwiseExpressions.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBenchmark.scala deleted file mode 100644 index 975abd632f..0000000000 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBenchmark.scala +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.spark.sql.benchmark - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{DataType, LongType} - -import org.apache.comet.expressions.{CometCast, CometEvalMode} -import org.apache.comet.serde.{Compatible, Incompatible, Unsupported} - -/** - * Benchmark to measure Comet execution performance. To run this benchmark: - * {{{ - * SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometCastBenchmark - * }}} - * - * Results will be written to "spark/benchmarks/CometCastBenchmark-**results.txt". - */ - -object CometCastBenchmark extends CometBenchmarkBase { - - override def getSparkSession: SparkSession = { - val session = super.getSparkSession - session.conf.set("parquet.enable.dictionary", "false") - session.conf.set("spark.sql.shuffle.partitions", "2") - session - } - - def castExprSQL(toDataType: DataType, input: String): String = { - s"CAST ($input AS ${toDataType.sql})" - } - - override def runCometBenchmark(args: Array[String]): Unit = { - - // TODO : Create all possible input datatypes. We only have Long inputs for now - CometCast.supportedTypes.foreach { toDataType => - Seq(false, true).foreach { ansiMode => - CometCast.isSupported( - LongType, - toDataType, - None, - if (ansiMode) CometEvalMode.ANSI else CometEvalMode.LEGACY) match { - case Compatible(notes) => - runBenchmarkWithTable( - s"Running benchmark cast operation from : $LongType to : $toDataType", - 1024 * 1024 * 10) { v => - castBenchmark(v, LongType, toDataType, isAnsiMode = ansiMode) - } - case Incompatible(notes) => None - case Unsupported(notes) => None - } - } - } - } - - def castBenchmark( - values: Int, - fromDataType: DataType, - toDataType: DataType, - isAnsiMode: Boolean): Unit = { - - withTempPath { dir => - withTempTable("parquetV1Table") { - prepareTable(dir, spark.sql(s"SELECT value FROM $tbl")) - - val functionSQL = castExprSQL(toDataType, "value") - val query = s"SELECT $functionSQL FROM parquetV1Table" - val name = - s"Cast function to : ${toDataType} , ansi mode enabled : ${isAnsiMode}" - - val extraConfigs = Map(SQLConf.ANSI_ENABLED.key -> isAnsiMode.toString) - - runExpressionBenchmark(name, values, query, extraConfigs) - } - } - } - -} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBooleanBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBooleanBenchmark.scala new file mode 100644 index 0000000000..085e7388c7 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastBooleanBenchmark.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +case class CastBooleanConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Benchmark to measure performance of Comet cast operations involving Boolean type. To run this + * benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometCastBooleanBenchmark` + * Results will be written to "spark/benchmarks/CometCastBooleanBenchmark-**results.txt". + */ +// spotless:on +object CometCastBooleanBenchmark extends CometBenchmarkBase { + + private val castFunctions = Seq("CAST", "TRY_CAST") + + // Boolean to String + private val boolToStringConfigs = for { + castFunc <- castFunctions + } yield CastBooleanConfig( + s"$castFunc Boolean to String", + s"SELECT $castFunc(c_bool AS STRING) FROM parquetV1Table") + + // Boolean to numeric types + private val boolToNumericTypes = + Seq("BYTE", "SHORT", "INT", "LONG", "FLOAT", "DOUBLE", "DECIMAL(10,2)") + private val boolToNumericConfigs = for { + castFunc <- castFunctions + targetType <- boolToNumericTypes + } yield CastBooleanConfig( + s"$castFunc Boolean to $targetType", + s"SELECT $castFunc(c_bool AS $targetType) FROM parquetV1Table") + + // Numeric to Boolean + private val numericTypes = Seq( + ("BYTE", "c_byte"), + ("SHORT", "c_short"), + ("INT", "c_int"), + ("LONG", "c_long"), + ("FLOAT", "c_float"), + ("DOUBLE", "c_double"), + ("DECIMAL(10,2)", "c_decimal")) + + private val numericToBoolConfigs = for { + castFunc <- castFunctions + (sourceType, colName) <- numericTypes + } yield CastBooleanConfig( + s"$castFunc $sourceType to Boolean", + s"SELECT $castFunc($colName AS BOOLEAN) FROM parquetV1Table") + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = getBenchmarkRows(1024 * 1024 * 5) // 5M rows default + + // Generate boolean data for boolean-to-other casts + runBenchmarkWithTable("Boolean to other types casts", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT CASE + WHEN value % 100 = 0 THEN NULL + ELSE (value % 2 = 0) + END AS c_bool + FROM $tbl + """)) + + (boolToStringConfigs ++ boolToNumericConfigs).foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + + // Generate numeric data for numeric-to-boolean casts + runBenchmarkWithTable("Numeric to Boolean casts", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT + CASE WHEN value % 100 = 0 THEN NULL ELSE CAST((value % 3) - 1 AS BYTE) END AS c_byte, + CASE WHEN value % 100 = 1 THEN NULL ELSE CAST((value % 3) - 1 AS SHORT) END AS c_short, + CASE WHEN value % 100 = 2 THEN NULL ELSE CAST((value % 3) - 1 AS INT) END AS c_int, + CASE WHEN value % 100 = 3 THEN NULL ELSE CAST((value % 3) - 1 AS LONG) END AS c_long, + CASE WHEN value % 100 = 4 THEN NULL ELSE CAST((value % 3) - 1 AS FLOAT) END AS c_float, + CASE WHEN value % 100 = 5 THEN NULL ELSE CAST((value % 3) - 1 AS DOUBLE) END AS c_double, + CASE WHEN value % 100 = 6 THEN NULL ELSE CAST((value % 3) - 1 AS DECIMAL(10,2)) END AS c_decimal + FROM $tbl + """)) + + numericToBoolConfigs.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToNumericBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToNumericBenchmark.scala new file mode 100644 index 0000000000..5137e50e18 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToNumericBenchmark.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +case class CastNumericToNumericConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Benchmark to measure performance of Comet cast between numeric types. To run this + * benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometCastNumericToNumericBenchmark` + * Results will be written to "spark/benchmarks/CometCastNumericToNumericBenchmark-**results.txt". + */ +// spotless:on +object CometCastNumericToNumericBenchmark extends CometBenchmarkBase { + + private val castFunctions = Seq("CAST", "TRY_CAST") + + // Integer widening conversions + private val integerWideningPairs = Seq( + ("BYTE", "c_byte", "SHORT"), + ("BYTE", "c_byte", "INT"), + ("BYTE", "c_byte", "LONG"), + ("SHORT", "c_short", "INT"), + ("SHORT", "c_short", "LONG"), + ("INT", "c_int", "LONG")) + + // Integer narrowing conversions + private val integerNarrowingPairs = Seq( + ("LONG", "c_long", "INT"), + ("LONG", "c_long", "SHORT"), + ("LONG", "c_long", "BYTE"), + ("INT", "c_int", "SHORT"), + ("INT", "c_int", "BYTE"), + ("SHORT", "c_short", "BYTE")) + + // Floating point conversions + private val floatPairs = Seq(("FLOAT", "c_float", "DOUBLE"), ("DOUBLE", "c_double", "FLOAT")) + + // Integer to floating point conversions + private val intToFloatPairs = Seq( + ("BYTE", "c_byte", "FLOAT"), + ("SHORT", "c_short", "FLOAT"), + ("INT", "c_int", "FLOAT"), + ("LONG", "c_long", "FLOAT"), + ("INT", "c_int", "DOUBLE"), + ("LONG", "c_long", "DOUBLE")) + + // Floating point to integer conversions + private val floatToIntPairs = Seq( + ("FLOAT", "c_float", "INT"), + ("FLOAT", "c_float", "LONG"), + ("DOUBLE", "c_double", "INT"), + ("DOUBLE", "c_double", "LONG")) + + // Decimal conversions + private val decimalPairs = Seq( + ("INT", "c_int", "DECIMAL(10,2)"), + ("LONG", "c_long", "DECIMAL(20,4)"), + ("DOUBLE", "c_double", "DECIMAL(15,5)"), + ("DECIMAL(10,2)", "c_decimal", "INT"), + ("DECIMAL(10,2)", "c_decimal", "LONG"), + ("DECIMAL(10,2)", "c_decimal", "DOUBLE")) + + private def generateConfigs( + pairs: Seq[(String, String, String)]): Seq[CastNumericToNumericConfig] = { + for { + castFunc <- castFunctions + (sourceType, colName, targetType) <- pairs + } yield CastNumericToNumericConfig( + s"$castFunc $sourceType to $targetType", + s"SELECT $castFunc($colName AS $targetType) FROM parquetV1Table") + } + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = getBenchmarkRows(1024 * 1024 * 5) // 5M rows default + + // Generate input data once with all numeric types + runBenchmarkWithTable("Numeric to Numeric casts", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + // Generate varied numeric data including edge cases + prepareTable( + dir, + spark.sql(s""" + SELECT + CASE WHEN value % 100 = 0 THEN NULL ELSE CAST((value % 128) - 64 AS BYTE) END AS c_byte, + CASE WHEN value % 100 = 1 THEN NULL ELSE CAST((value % 32768) - 16384 AS SHORT) END AS c_short, + CASE WHEN value % 100 = 2 THEN NULL ELSE CAST(value - 2500000 AS INT) END AS c_int, + CASE WHEN value % 100 = 3 THEN NULL ELSE CAST(value * 1000 AS LONG) END AS c_long, + CASE + WHEN value % 100 = 4 THEN NULL + WHEN value % 100 = 5 THEN CAST('NaN' AS FLOAT) + WHEN value % 100 = 6 THEN CAST('Infinity' AS FLOAT) + WHEN value % 100 = 7 THEN CAST('-Infinity' AS FLOAT) + ELSE CAST((value - 2500000) / 100.0 AS FLOAT) + END AS c_float, + CASE + WHEN value % 100 = 8 THEN NULL + WHEN value % 100 = 9 THEN CAST('NaN' AS DOUBLE) + WHEN value % 100 = 10 THEN CAST('Infinity' AS DOUBLE) + WHEN value % 100 = 11 THEN CAST('-Infinity' AS DOUBLE) + ELSE CAST((value - 2500000) / 100.0 AS DOUBLE) + END AS c_double, + CASE WHEN value % 100 = 12 THEN NULL ELSE CAST((value - 2500000) / 100.0 AS DECIMAL(10,2)) END AS c_decimal + FROM $tbl + """)) + + // Run all benchmark categories + (generateConfigs(integerWideningPairs) ++ + generateConfigs(integerNarrowingPairs) ++ + generateConfigs(floatPairs) ++ + generateConfigs(intToFloatPairs) ++ + generateConfigs(floatToIntPairs) ++ + generateConfigs(decimalPairs)).foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToStringBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToStringBenchmark.scala new file mode 100644 index 0000000000..1459ab941f --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToStringBenchmark.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +case class CastNumericToStringConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Benchmark to measure performance of Comet cast from numeric types to String. To run this + * benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometCastNumericToStringBenchmark` + * Results will be written to "spark/benchmarks/CometCastNumericToStringBenchmark-**results.txt". + */ +// spotless:on +object CometCastNumericToStringBenchmark extends CometBenchmarkBase { + + private val castFunctions = Seq("CAST", "TRY_CAST") + private val sourceTypes = + Seq( + ("BOOLEAN", "c_bool"), + ("BYTE", "c_byte"), + ("SHORT", "c_short"), + ("INT", "c_int"), + ("LONG", "c_long"), + ("FLOAT", "c_float"), + ("DOUBLE", "c_double"), + ("DECIMAL(10,2)", "c_decimal")) + + private val castConfigs = for { + castFunc <- castFunctions + (sourceType, colName) <- sourceTypes + } yield CastNumericToStringConfig( + s"$castFunc $sourceType to String", + s"SELECT $castFunc($colName AS STRING) FROM parquetV1Table") + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = getBenchmarkRows(1024 * 1024 * 5) // 5M rows default + + // Generate input data once with all numeric types + runBenchmarkWithTable("Numeric to String casts", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + // Generate varied numeric data including edge cases + prepareTable( + dir, + spark.sql(s""" + SELECT + CASE WHEN value % 100 = 0 THEN NULL ELSE (value % 2 = 0) END AS c_bool, + CASE WHEN value % 100 = 1 THEN NULL ELSE CAST((value % 128) - 64 AS BYTE) END AS c_byte, + CASE WHEN value % 100 = 2 THEN NULL ELSE CAST((value % 32768) - 16384 AS SHORT) END AS c_short, + CASE WHEN value % 100 = 3 THEN NULL ELSE CAST(value - 2500000 AS INT) END AS c_int, + CASE WHEN value % 100 = 4 THEN NULL ELSE CAST(value * 1000000 AS LONG) END AS c_long, + CASE + WHEN value % 100 = 5 THEN NULL + WHEN value % 100 = 6 THEN CAST('NaN' AS FLOAT) + WHEN value % 100 = 7 THEN CAST('Infinity' AS FLOAT) + WHEN value % 100 = 8 THEN CAST('-Infinity' AS FLOAT) + ELSE CAST((value - 2500000) / 1000.0 AS FLOAT) + END AS c_float, + CASE + WHEN value % 100 = 9 THEN NULL + WHEN value % 100 = 10 THEN CAST('NaN' AS DOUBLE) + WHEN value % 100 = 11 THEN CAST('Infinity' AS DOUBLE) + WHEN value % 100 = 12 THEN CAST('-Infinity' AS DOUBLE) + ELSE CAST((value - 2500000) / 100.0 AS DOUBLE) + END AS c_double, + CASE WHEN value % 100 = 13 THEN NULL ELSE CAST((value - 2500000) / 100.0 AS DECIMAL(10,2)) END AS c_decimal + FROM $tbl + """)) + + castConfigs.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToTemporalBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToTemporalBenchmark.scala new file mode 100644 index 0000000000..a8e81a3dff --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastNumericToTemporalBenchmark.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +case class CastNumericToTemporalConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Benchmark to measure performance of Comet cast from numeric types to temporal types. To run + * this benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometCastNumericToTemporalBenchmark` + * Results will be written to "spark/benchmarks/CometCastNumericToTemporalBenchmark-**results.txt". + */ +// spotless:on +object CometCastNumericToTemporalBenchmark extends CometBenchmarkBase { + + private val castFunctions = Seq("CAST", "TRY_CAST") + + // INT to DATE (days since epoch) + private val intToDateConfigs = for { + castFunc <- castFunctions + } yield CastNumericToTemporalConfig( + s"$castFunc Int to Date", + s"SELECT $castFunc(c_int AS DATE) FROM parquetV1Table") + + // LONG to TIMESTAMP (microseconds since epoch) + private val longToTimestampConfigs = for { + castFunc <- castFunctions + } yield CastNumericToTemporalConfig( + s"$castFunc Long to Timestamp", + s"SELECT $castFunc(c_long AS TIMESTAMP) FROM parquetV1Table") + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = getBenchmarkRows(1024 * 1024 * 5) // 5M rows default + + // Generate data once for INT to DATE conversions + runBenchmarkWithTable("Int to Date casts", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + // Generate INT values representing days since epoch (1970-01-01) + // Range: ~-18000 to +18000 days (roughly 1920 to 2020) + prepareTable( + dir, + spark.sql(s""" + SELECT CASE + WHEN value % 100 = 0 THEN NULL + ELSE CAST((value % 36500) - 18000 AS INT) + END AS c_int + FROM $tbl + """)) + + intToDateConfigs.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + + // Generate data once for LONG to TIMESTAMP conversions + runBenchmarkWithTable("Long to Timestamp casts", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + // Generate LONG values representing microseconds since epoch + // Range: 2020-2021 timestamps + prepareTable( + dir, + spark.sql(s""" + SELECT CASE + WHEN value % 100 = 0 THEN NULL + ELSE 1577836800000000 + (value % 31536000000000) + END AS c_long + FROM $tbl + """)) + + longToTimestampConfigs.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToNumericBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToNumericBenchmark.scala new file mode 100644 index 0000000000..403e72f154 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToNumericBenchmark.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +import org.apache.spark.sql.catalyst.expressions.Cast +import org.apache.spark.sql.internal.SQLConf + +import org.apache.comet.CometConf + +case class CastStringToNumericConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Benchmark to measure performance of Comet cast from String to numeric types. To run this + * benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometCastStringToNumericBenchmark` + * Results will be written to "spark/benchmarks/CometCastStringToNumericBenchmark-**results.txt". + */ +// spotless:on +object CometCastStringToNumericBenchmark extends CometBenchmarkBase { + + private val castFunctions = Seq("CAST", "TRY_CAST") + private val targetTypes = + Seq("BOOLEAN", "BYTE", "SHORT", "INT", "LONG", "FLOAT", "DOUBLE", "DECIMAL(10,2)") + + private val castConfigs = for { + castFunc <- castFunctions + targetType <- targetTypes + } yield CastStringToNumericConfig( + s"$castFunc String to $targetType", + s"SELECT $castFunc(c1 AS $targetType) FROM parquetV1Table", + Map( + SQLConf.ANSI_ENABLED.key -> "false", + CometConf.getExprAllowIncompatConfigKey(classOf[Cast]) -> "true")) + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = 1024 * 1024 // 1M rows + + // Generate input data once for all benchmarks + runBenchmarkWithTable("String to numeric casts", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + // Generate numeric strings with both integer and decimal values + // Also include some special values: nulls (~2%), NaN (~2%), Infinity (~2%) + prepareTable( + dir, + spark.sql(s""" + SELECT CASE + WHEN value % 50 = 0 THEN NULL + WHEN value % 50 = 1 THEN 'NaN' + WHEN value % 50 = 2 THEN 'Infinity' + WHEN value % 50 = 3 THEN '-Infinity' + WHEN value % 50 < 10 THEN CAST(value % 99 AS STRING) + WHEN value % 50 < 30 THEN CAST(value % 999999 AS STRING) + ELSE CAST((value - 500000) / 100.0 AS STRING) + END AS c1 + FROM $tbl + """)) + + castConfigs.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToTemporalBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToTemporalBenchmark.scala new file mode 100644 index 0000000000..39337be5c8 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastStringToTemporalBenchmark.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +case class CastStringToTemporalConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Benchmark to measure performance of Comet cast from String to temporal types. To run this + * benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometCastStringToTemporalBenchmark` + * Results will be written to "spark/benchmarks/CometCastStringToTemporalBenchmark-**results.txt". + */ +// spotless:on +object CometCastStringToTemporalBenchmark extends CometBenchmarkBase { + + // Configuration for String to temporal cast benchmarks + private val dateCastConfigs = List( + CastStringToTemporalConfig( + "Cast String to Date", + "SELECT CAST(c1 AS DATE) FROM parquetV1Table"), + CastStringToTemporalConfig( + "Try_Cast String to Date", + "SELECT TRY_CAST(c1 AS DATE) FROM parquetV1Table")) + + private val timestampCastConfigs = List( + CastStringToTemporalConfig( + "Cast String to Timestamp", + "SELECT CAST(c1 AS TIMESTAMP) FROM parquetV1Table"), + CastStringToTemporalConfig( + "Try_Cast String to Timestamp", + "SELECT TRY_CAST(c1 AS TIMESTAMP) FROM parquetV1Table")) + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = 1024 * 1024 * 10 // 10M rows + + // Generate date data once with ~10% invalid values + runBenchmarkWithTable("date data generation", values) { v => + withTempPath { dateDir => + withTempTable("parquetV1Table") { + prepareTable( + dateDir, + spark.sql(s""" + SELECT CASE + WHEN value % 10 = 0 THEN 'invalid-date' + ELSE CAST(DATE_ADD('2020-01-01', CAST(value % 3650 AS INT)) AS STRING) + END AS c1 + FROM $tbl + """)) + + // Run date cast benchmarks with the same data + dateCastConfigs.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + + // Generate timestamp data once with ~10% invalid values + runBenchmarkWithTable("timestamp data generation", values) { v => + withTempPath { timestampDir => + withTempTable("parquetV1Table") { + prepareTable( + timestampDir, + spark.sql(s""" + SELECT CASE + WHEN value % 10 = 0 THEN 'not-a-timestamp' + ELSE CAST(TIMESTAMP_MICROS(value % 9999999999) AS STRING) + END AS c1 + FROM $tbl + """)) + + // Run timestamp cast benchmarks with the same data + timestampCastConfigs.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastTemporalToNumericBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastTemporalToNumericBenchmark.scala new file mode 100644 index 0000000000..08850b6a12 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastTemporalToNumericBenchmark.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +case class CastTemporalToNumericConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Benchmark to measure performance of Comet cast from temporal types to numeric types. To run + * this benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometCastTemporalToNumericBenchmark` + * Results will be written to "spark/benchmarks/CometCastTemporalToNumericBenchmark-**results.txt". + */ +// spotless:on +object CometCastTemporalToNumericBenchmark extends CometBenchmarkBase { + + private val castFunctions = Seq("CAST", "TRY_CAST") + + // DATE to numeric types + private val dateToNumericTypes = Seq("BYTE", "SHORT", "INT", "LONG") + private val dateToNumericConfigs = for { + castFunc <- castFunctions + targetType <- dateToNumericTypes + } yield CastTemporalToNumericConfig( + s"$castFunc Date to $targetType", + s"SELECT $castFunc(c_date AS $targetType) FROM parquetV1Table") + + // TIMESTAMP to numeric types + private val timestampToNumericTypes = Seq("BYTE", "SHORT", "INT", "LONG") + private val timestampToNumericConfigs = for { + castFunc <- castFunctions + targetType <- timestampToNumericTypes + } yield CastTemporalToNumericConfig( + s"$castFunc Timestamp to $targetType", + s"SELECT $castFunc(c_timestamp AS $targetType) FROM parquetV1Table") + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = getBenchmarkRows(1024 * 1024 * 5) // 5M rows default + + // Generate DATE data once for all date-to-numeric benchmarks + runBenchmarkWithTable("Date to Numeric casts", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT CASE + WHEN value % 100 = 0 THEN NULL + ELSE DATE_ADD('2020-01-01', CAST(value % 3650 AS INT)) + END AS c_date + FROM $tbl + """)) + + dateToNumericConfigs.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + + // Generate TIMESTAMP data once for all timestamp-to-numeric benchmarks + runBenchmarkWithTable("Timestamp to Numeric casts", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT CASE + WHEN value % 100 = 0 THEN NULL + ELSE TIMESTAMP_MICROS(1577836800000000 + value % 31536000000000) + END AS c_timestamp + FROM $tbl + """)) + + timestampToNumericConfigs.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastTemporalToStringBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastTemporalToStringBenchmark.scala new file mode 100644 index 0000000000..5e2316a142 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometCastTemporalToStringBenchmark.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +case class CastTemporalToStringConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Benchmark to measure performance of Comet cast from temporal types to String. To run this + * benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometCastTemporalToStringBenchmark` + * Results will be written to "spark/benchmarks/CometCastTemporalToStringBenchmark-**results.txt". + */ +// spotless:on +object CometCastTemporalToStringBenchmark extends CometBenchmarkBase { + + private val castFunctions = Seq("CAST", "TRY_CAST") + + private val dateCastConfigs = for { + castFunc <- castFunctions + } yield CastTemporalToStringConfig( + s"$castFunc Date to String", + s"SELECT $castFunc(c_date AS STRING) FROM parquetV1Table") + + private val timestampCastConfigs = for { + castFunc <- castFunctions + } yield CastTemporalToStringConfig( + s"$castFunc Timestamp to String", + s"SELECT $castFunc(c_timestamp AS STRING) FROM parquetV1Table") + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = getBenchmarkRows(1024 * 1024 * 5) // 5M rows default + + // Generate temporal data once for date benchmarks + runBenchmarkWithTable("Date to String casts", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT CASE + WHEN value % 100 = 0 THEN NULL + ELSE DATE_ADD('2020-01-01', CAST(value % 3650 AS INT)) + END AS c_date + FROM $tbl + """)) + + dateCastConfigs.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + + // Generate temporal data once for timestamp benchmarks + runBenchmarkWithTable("Timestamp to String casts", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT CASE + WHEN value % 100 = 0 THEN NULL + ELSE TIMESTAMP_MICROS(1577836800000000 + value % 31536000000000) + END AS c_timestamp + FROM $tbl + """)) + + timestampCastConfigs.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometComparisonExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometComparisonExpressionBenchmark.scala new file mode 100644 index 0000000000..8360e006fd --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometComparisonExpressionBenchmark.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +case class ComparisonExprConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Comprehensive benchmark for Comet comparison and predicate expressions. To run this benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometComparisonExpressionBenchmark` + * Results will be written to "spark/benchmarks/CometComparisonExpressionBenchmark-**results.txt". + */ +// spotless:on +object CometComparisonExpressionBenchmark extends CometBenchmarkBase { + + private val comparisonExpressions = List( + ComparisonExprConfig("equal_to", "SELECT c_int = c_int2 FROM parquetV1Table"), + ComparisonExprConfig("not_equal_to", "SELECT c_int != c_int2 FROM parquetV1Table"), + ComparisonExprConfig("less_than", "SELECT c_int < c_int2 FROM parquetV1Table"), + ComparisonExprConfig("less_than_or_equal", "SELECT c_int <= c_int2 FROM parquetV1Table"), + ComparisonExprConfig("greater_than", "SELECT c_int > c_int2 FROM parquetV1Table"), + ComparisonExprConfig("greater_than_or_equal", "SELECT c_int >= c_int2 FROM parquetV1Table"), + ComparisonExprConfig("equal_null_safe", "SELECT c_int <=> c_int2 FROM parquetV1Table"), + ComparisonExprConfig("is_null", "SELECT c_int IS NULL FROM parquetV1Table"), + ComparisonExprConfig("is_not_null", "SELECT c_int IS NOT NULL FROM parquetV1Table"), + ComparisonExprConfig("is_nan_float", "SELECT isnan(c_float) FROM parquetV1Table"), + ComparisonExprConfig("is_nan_double", "SELECT isnan(c_double) FROM parquetV1Table"), + ComparisonExprConfig("and", "SELECT (c_int > 0) AND (c_int2 < 100) FROM parquetV1Table"), + ComparisonExprConfig("or", "SELECT (c_int > 0) OR (c_int2 < 100) FROM parquetV1Table"), + ComparisonExprConfig("not", "SELECT NOT (c_int > 0) FROM parquetV1Table"), + ComparisonExprConfig( + "in_list", + "SELECT c_int IN (1, 10, 100, 1000, 10000) FROM parquetV1Table"), + ComparisonExprConfig( + "not_in_list", + "SELECT c_int NOT IN (1, 10, 100, 1000, 10000) FROM parquetV1Table")) + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = getBenchmarkRows(1024 * 1024 * 5) // 5M rows default + + runBenchmarkWithTable("Comparison expression benchmarks", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT + CASE WHEN value % 10 = 0 THEN NULL ELSE CAST((value % 100000) - 50000 AS INT) END AS c_int, + CASE WHEN value % 10 = 1 THEN NULL ELSE CAST((value % 1000) AS INT) END AS c_int2, + CASE + WHEN value % 50 = 2 THEN NULL + WHEN value % 50 = 3 THEN CAST('NaN' AS FLOAT) + ELSE CAST((value % 10000) / 100.0 AS FLOAT) + END AS c_float, + CASE + WHEN value % 50 = 4 THEN NULL + WHEN value % 50 = 5 THEN CAST('NaN' AS DOUBLE) + ELSE CAST((value % 10000) / 100.0 AS DOUBLE) + END AS c_double + FROM $tbl + """)) + + comparisonExpressions.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala index 47eff41bbd..ccc4e45c7d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometDatetimeExpressionBenchmark.scala @@ -76,6 +76,72 @@ object CometDatetimeExpressionBenchmark extends CometBenchmarkBase { } } + def datePartExprBenchmark(values: Int, useDictionary: Boolean): Unit = { + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql( + s"select cast(timestamp_micros(cast(value/100000 as integer)) as date) as dt FROM $tbl")) + Seq( + ("year", "YEAR(dt)"), + ("month", "MONTH(dt)"), + ("day_of_month", "DAYOFMONTH(dt)"), + ("day_of_week", "DAYOFWEEK(dt)"), + ("weekday", "WEEKDAY(dt)"), + ("day_of_year", "DAYOFYEAR(dt)"), + ("week_of_year", "WEEKOFYEAR(dt)"), + ("quarter", "QUARTER(dt)")).foreach { case (name, expr) => + val isDictionary = if (useDictionary) "(Dictionary)" else "" + val benchName = s"Date Extract $isDictionary - $name" + val query = s"select $expr from parquetV1Table" + runExpressionBenchmark(benchName, values, query) + } + } + } + } + + def timestampPartExprBenchmark(values: Int, useDictionary: Boolean): Unit = { + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s"select timestamp_micros(cast(value/100000 as integer)) as ts FROM $tbl")) + Seq( + ("year", "YEAR(ts)"), + ("month", "MONTH(ts)"), + ("day_of_month", "DAYOFMONTH(ts)"), + ("hour", "HOUR(ts)"), + ("minute", "MINUTE(ts)"), + ("second", "SECOND(ts)"), + ("quarter", "QUARTER(ts)")).foreach { case (name, expr) => + val isDictionary = if (useDictionary) "(Dictionary)" else "" + val benchName = s"Timestamp Extract $isDictionary - $name" + val query = s"select $expr from parquetV1Table" + runExpressionBenchmark(benchName, values, query) + } + } + } + } + + def dateArithmeticExprBenchmark(values: Int, useDictionary: Boolean): Unit = { + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql( + s"select cast(timestamp_micros(cast(value/100000 as integer)) as date) as dt, cast(value % 365 as int) as days FROM $tbl")) + Seq(("date_add", "DATE_ADD(dt, days)"), ("date_sub", "DATE_SUB(dt, days)")).foreach { + case (name, expr) => + val isDictionary = if (useDictionary) "(Dictionary)" else "" + val benchName = s"Date Arithmetic $isDictionary - $name" + val query = s"select $expr from parquetV1Table" + runExpressionBenchmark(benchName, values, query) + } + } + } + } + override def runCometBenchmark(mainArgs: Array[String]): Unit = { val values = 1024 * 1024; @@ -96,6 +162,24 @@ object CometDatetimeExpressionBenchmark extends CometBenchmarkBase { runBenchmarkWithTable("TimestampTrunc (Dictionary)", values, useDictionary = true) { v => timestampTruncExprBenchmark(v, useDictionary = true) } + runBenchmarkWithTable("DatePart", values) { v => + datePartExprBenchmark(v, useDictionary = false) + } + runBenchmarkWithTable("DatePart (Dictionary)", values, useDictionary = true) { v => + datePartExprBenchmark(v, useDictionary = true) + } + runBenchmarkWithTable("TimestampPart", values) { v => + timestampPartExprBenchmark(v, useDictionary = false) + } + runBenchmarkWithTable("TimestampPart (Dictionary)", values, useDictionary = true) { v => + timestampPartExprBenchmark(v, useDictionary = true) + } + runBenchmarkWithTable("DateArithmetic", values) { v => + dateArithmeticExprBenchmark(v, useDictionary = false) + } + runBenchmarkWithTable("DateArithmetic (Dictionary)", values, useDictionary = true) { v => + dateArithmeticExprBenchmark(v, useDictionary = true) + } } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometHashExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometHashExpressionBenchmark.scala new file mode 100644 index 0000000000..9584322c44 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometHashExpressionBenchmark.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +case class HashExprConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Comprehensive benchmark for Comet hash expressions. To run this benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometHashExpressionBenchmark` + * Results will be written to "spark/benchmarks/CometHashExpressionBenchmark-**results.txt". + */ +// spotless:on +object CometHashExpressionBenchmark extends CometBenchmarkBase { + + private val hashExpressions = List( + HashExprConfig("xxhash64_single", "SELECT xxhash64(c_str) FROM parquetV1Table"), + HashExprConfig("xxhash64_multi", "SELECT xxhash64(c_str, c_int, c_long) FROM parquetV1Table"), + HashExprConfig("murmur3_hash_single", "SELECT hash(c_str) FROM parquetV1Table"), + HashExprConfig("murmur3_hash_multi", "SELECT hash(c_str, c_int, c_long) FROM parquetV1Table"), + HashExprConfig("sha1", "SELECT sha1(c_str) FROM parquetV1Table"), + HashExprConfig("sha2_224", "SELECT sha2(c_str, 224) FROM parquetV1Table"), + HashExprConfig("sha2_256", "SELECT sha2(c_str, 256) FROM parquetV1Table"), + HashExprConfig("sha2_384", "SELECT sha2(c_str, 384) FROM parquetV1Table"), + HashExprConfig("sha2_512", "SELECT sha2(c_str, 512) FROM parquetV1Table")) + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = getBenchmarkRows(1024 * 1024 * 5) // 5M rows default + + runBenchmarkWithTable("Hash expression benchmarks", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT + CASE WHEN value % 100 = 0 THEN NULL ELSE CONCAT('string_', CAST(value AS STRING)) END AS c_str, + CASE WHEN value % 100 = 1 THEN NULL ELSE CAST(value % 1000000 AS INT) END AS c_int, + CASE WHEN value % 100 = 2 THEN NULL ELSE CAST(value * 1000 AS LONG) END AS c_long + FROM $tbl + """)) + + hashExpressions.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMapExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMapExpressionBenchmark.scala new file mode 100644 index 0000000000..496af42c45 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMapExpressionBenchmark.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +case class MapExprConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Comprehensive benchmark for Comet map expressions. To run this benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometMapExpressionBenchmark` + * Results will be written to "spark/benchmarks/CometMapExpressionBenchmark-**results.txt". + */ +// spotless:on +object CometMapExpressionBenchmark extends CometBenchmarkBase { + + private val mapExpressions = List( + MapExprConfig("map_keys", "SELECT map_keys(m) FROM parquetV1Table"), + MapExprConfig("map_values", "SELECT map_values(m) FROM parquetV1Table"), + MapExprConfig("map_entries", "SELECT map_entries(m) FROM parquetV1Table"), + MapExprConfig("map_from_arrays", "SELECT map_from_arrays(keys, values) FROM parquetV1Table"), + MapExprConfig("element_at_map", "SELECT element_at(m, 'key1') FROM parquetV1Table"), + MapExprConfig("map_subscript", "SELECT m['key2'] FROM parquetV1Table")) + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = getBenchmarkRows(1024 * 1024 * 2) // 2M rows default (maps are larger) + + runBenchmarkWithTable("Map expression benchmarks", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT + CASE WHEN value % 50 = 0 THEN NULL + ELSE map( + 'key1', cast((value % 100) as int), + 'key2', cast((value % 200) as int), + 'key3', cast((value % 300) as int)) + END AS m, + CASE WHEN value % 50 = 1 THEN NULL + ELSE array('key1', 'key2', 'key3', 'key4') + END AS keys, + CASE WHEN value % 50 = 2 THEN NULL + ELSE array( + cast((value % 100) as int), + cast((value % 200) as int), + cast((value % 300) as int), + cast((value % 400) as int)) + END AS values + FROM $tbl + """)) + + mapExpressions.foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMathExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMathExpressionBenchmark.scala new file mode 100644 index 0000000000..d63008d3b9 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometMathExpressionBenchmark.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +case class MathExprConfig( + name: String, + query: String, + extraCometConfigs: Map[String, String] = Map.empty) + +// spotless:off +/** + * Comprehensive benchmark for Comet math expressions. To run this benchmark: + * `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometMathExpressionBenchmark` + * Results will be written to "spark/benchmarks/CometMathExpressionBenchmark-**results.txt". + */ +// spotless:on +object CometMathExpressionBenchmark extends CometBenchmarkBase { + + // Unary math functions + private val unaryMathFunctions = List( + "abs" -> "ABS(c_double)", + "ceil" -> "CEIL(c_double)", + "floor" -> "FLOOR(c_double)", + "round" -> "ROUND(c_double)", + "round_scale" -> "ROUND(c_double, 2)", + "hex_int" -> "HEX(c_int)", + "hex_long" -> "HEX(c_long)", + "unhex" -> "UNHEX(c_hex_str)", + "unary_minus" -> "-(c_double)") + + // Binary math functions + private val binaryMathFunctions = + List("atan2" -> "ATAN2(c_double, c_double2)", "pmod" -> "PMOD(c_int, c_int2)") + + // Logarithm functions + private val logFunctions = List( + "log" -> "LN(c_positive_double)", + "log10" -> "LOG10(c_positive_double)", + "log2" -> "LOG2(c_positive_double)") + + private def generateConfigs(funcs: List[(String, String)]): List[MathExprConfig] = { + funcs.map { case (name, expr) => + MathExprConfig(name, s"SELECT $expr FROM parquetV1Table") + } + } + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val values = getBenchmarkRows(1024 * 1024 * 5) // 5M rows default + + // Benchmark unary and binary math functions + runBenchmarkWithTable("Math expression benchmarks", values) { v => + withTempPath { dir => + withTempTable("parquetV1Table") { + prepareTable( + dir, + spark.sql(s""" + SELECT + CASE WHEN value % 100 = 0 THEN NULL ELSE CAST((value - 2500000) AS INT) END AS c_int, + CASE WHEN value % 100 = 1 THEN NULL ELSE CAST((value % 1000) - 500 AS INT) END AS c_int2, + CASE WHEN value % 100 = 2 THEN NULL ELSE CAST(value * 1000 AS LONG) END AS c_long, + CASE + WHEN value % 100 = 3 THEN NULL + WHEN value % 100 = 4 THEN CAST('NaN' AS DOUBLE) + WHEN value % 100 = 5 THEN CAST('Infinity' AS DOUBLE) + WHEN value % 100 = 6 THEN CAST('-Infinity' AS DOUBLE) + ELSE CAST((value - 2500000) / 100.0 AS DOUBLE) + END AS c_double, + CASE WHEN value % 100 = 7 THEN NULL ELSE CAST((value % 1000) - 500 AS DOUBLE) END AS c_double2, + CASE WHEN value % 100 = 8 THEN NULL ELSE CAST(ABS((value - 2500000) / 100.0) + 1.0 AS DOUBLE) END AS c_positive_double, + CASE WHEN value % 100 = 9 THEN NULL ELSE HEX(CAST(value % 1000000 AS LONG)) END AS c_hex_str + FROM $tbl + """)) + + (generateConfigs(unaryMathFunctions) ++ + generateConfigs(binaryMathFunctions) ++ + generateConfigs(logFunctions)).foreach { config => + runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs) + } + } + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala index 41eabb8513..a9aff529fb 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometStringExpressionBenchmark.scala @@ -62,23 +62,36 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase { // Configuration for all string expression benchmarks private val stringExpressions = List( - StringExprConfig("Substring", "select substring(c1, 1, 100) from parquetV1Table"), + StringExprConfig("substring", "select substring(c1, 1, 100) from parquetV1Table"), StringExprConfig("ascii", "select ascii(c1) from parquetV1Table"), - StringExprConfig("bitLength", "select bit_length(c1) from parquetV1Table"), + StringExprConfig("bit_length", "select bit_length(c1) from parquetV1Table"), StringExprConfig("octet_length", "select octet_length(c1) from parquetV1Table"), StringExprConfig("upper", "select upper(c1) from parquetV1Table"), StringExprConfig("lower", "select lower(c1) from parquetV1Table"), - StringExprConfig("chr", "select chr(c1) from parquetV1Table"), + StringExprConfig("chr", "select chr(65) from parquetV1Table"), StringExprConfig("initCap", "select initCap(c1) from parquetV1Table"), StringExprConfig("trim", "select trim(c1) from parquetV1Table"), - StringExprConfig("concatws", "select concat_ws(' ', c1, c1) from parquetV1Table"), + StringExprConfig("ltrim", "select ltrim(c1) from parquetV1Table"), + StringExprConfig("rtrim", "select rtrim(c1) from parquetV1Table"), + StringExprConfig("concat", "select concat(c1, c1) from parquetV1Table"), + StringExprConfig("concat_ws", "select concat_ws(' ', c1, c1) from parquetV1Table"), StringExprConfig("length", "select length(c1) from parquetV1Table"), StringExprConfig("repeat", "select repeat(c1, 3) from parquetV1Table"), StringExprConfig("reverse", "select reverse(c1) from parquetV1Table"), StringExprConfig("instr", "select instr(c1, '123') from parquetV1Table"), StringExprConfig("replace", "select replace(c1, '123', 'ab') from parquetV1Table"), StringExprConfig("space", "select space(2) from parquetV1Table"), - StringExprConfig("translate", "select translate(c1, '123456', 'aBcDeF') from parquetV1Table")) + StringExprConfig("translate", "select translate(c1, '123456', 'aBcDeF') from parquetV1Table"), + StringExprConfig("lpad", "select lpad(c1, 150, 'x') from parquetV1Table"), + StringExprConfig("rpad", "select rpad(c1, 150, 'x') from parquetV1Table"), + StringExprConfig("contains", "select contains(c1, '123') from parquetV1Table"), + StringExprConfig("startswith", "select startswith(c1, '1') from parquetV1Table"), + StringExprConfig("endswith", "select endswith(c1, '9') from parquetV1Table"), + StringExprConfig("like", "select c1 like '%123%' from parquetV1Table"), + StringExprConfig("rlike", "select c1 rlike '[0-9]+' from parquetV1Table"), + StringExprConfig( + "regexp_replace", + "select regexp_replace(c1, '[0-9]', 'X') from parquetV1Table")) override def runCometBenchmark(mainArgs: Array[String]): Unit = { val values = 1024 * 1024;