Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.benchmark

case class AggExprConfig(
name: String,
query: String,
extraCometConfigs: Map[String, String] = Map.empty)

// spotless:off
/**
* Comprehensive benchmark for Comet aggregate functions. To run this benchmark:
* `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometAggregateFunctionBenchmark`
* Results will be written to "spark/benchmarks/CometAggregateFunctionBenchmark-**results.txt".
*/
// spotless:on
object CometAggregateFunctionBenchmark extends CometBenchmarkBase {

private val basicAggregates = List(
AggExprConfig("count", "SELECT COUNT(*) FROM parquetV1Table GROUP BY grp"),
AggExprConfig("count_col", "SELECT COUNT(c_int) FROM parquetV1Table GROUP BY grp"),
AggExprConfig(
"count_distinct",
"SELECT COUNT(DISTINCT c_int) FROM parquetV1Table GROUP BY grp"),
AggExprConfig("min_int", "SELECT MIN(c_int) FROM parquetV1Table GROUP BY grp"),
AggExprConfig("max_int", "SELECT MAX(c_int) FROM parquetV1Table GROUP BY grp"),
AggExprConfig("min_double", "SELECT MIN(c_double) FROM parquetV1Table GROUP BY grp"),
AggExprConfig("max_double", "SELECT MAX(c_double) FROM parquetV1Table GROUP BY grp"),
AggExprConfig("sum_int", "SELECT SUM(c_int) FROM parquetV1Table GROUP BY grp"),
AggExprConfig("sum_long", "SELECT SUM(c_long) FROM parquetV1Table GROUP BY grp"),
AggExprConfig("sum_double", "SELECT SUM(c_double) FROM parquetV1Table GROUP BY grp"),
AggExprConfig("avg_int", "SELECT AVG(c_int) FROM parquetV1Table GROUP BY grp"),
AggExprConfig("avg_double", "SELECT AVG(c_double) FROM parquetV1Table GROUP BY grp"),
AggExprConfig("first", "SELECT FIRST(c_int) FROM parquetV1Table GROUP BY grp"),
AggExprConfig(
"first_ignore_nulls",
"SELECT FIRST(c_int, true) FROM parquetV1Table GROUP BY grp"),
AggExprConfig("last", "SELECT LAST(c_int) FROM parquetV1Table GROUP BY grp"),
AggExprConfig(
"last_ignore_nulls",
"SELECT LAST(c_int, true) FROM parquetV1Table GROUP BY grp"))

private val statisticalAggregates = List(
AggExprConfig("var_samp", "SELECT VAR_SAMP(c_double) FROM parquetV1Table GROUP BY grp"),
AggExprConfig("var_pop", "SELECT VAR_POP(c_double) FROM parquetV1Table GROUP BY grp"),
AggExprConfig("stddev_samp", "SELECT STDDEV_SAMP(c_double) FROM parquetV1Table GROUP BY grp"),
AggExprConfig("stddev_pop", "SELECT STDDEV_POP(c_double) FROM parquetV1Table GROUP BY grp"),
AggExprConfig(
"covar_samp",
"SELECT COVAR_SAMP(c_double, c_double2) FROM parquetV1Table GROUP BY grp"),
AggExprConfig(
"covar_pop",
"SELECT COVAR_POP(c_double, c_double2) FROM parquetV1Table GROUP BY grp"),
AggExprConfig("corr", "SELECT CORR(c_double, c_double2) FROM parquetV1Table GROUP BY grp"))

private val bitwiseAggregates = List(
AggExprConfig("bit_and", "SELECT BIT_AND(c_long) FROM parquetV1Table GROUP BY grp"),
AggExprConfig("bit_or", "SELECT BIT_OR(c_long) FROM parquetV1Table GROUP BY grp"),
AggExprConfig("bit_xor", "SELECT BIT_XOR(c_long) FROM parquetV1Table GROUP BY grp"))

override def runCometBenchmark(mainArgs: Array[String]): Unit = {
val values = getBenchmarkRows(1024 * 1024 * 10) // 10M rows default for aggregates

runBenchmarkWithTable("Aggregate function benchmarks", values) { v =>
withTempPath { dir =>
withTempTable("parquetV1Table") {
prepareTable(
dir,
spark.sql(s"""
SELECT
CAST(value % 1000 AS INT) AS grp,
CASE WHEN value % 100 = 0 THEN NULL ELSE CAST((value % 10000) - 5000 AS INT) END AS c_int,
CASE WHEN value % 100 = 1 THEN NULL ELSE CAST(value * 1000 AS LONG) END AS c_long,
CASE WHEN value % 100 = 2 THEN NULL ELSE CAST((value % 10000) / 100.0 AS DOUBLE) END AS c_double,
CASE WHEN value % 100 = 3 THEN NULL ELSE CAST((value % 5000) / 50.0 AS DOUBLE) END AS c_double2
FROM $tbl
"""))

(basicAggregates ++ statisticalAggregates ++ bitwiseAggregates).foreach { config =>
runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs)
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.benchmark

case class ArrayExprConfig(
name: String,
query: String,
extraCometConfigs: Map[String, String] = Map.empty)

// spotless:off
/**
* Comprehensive benchmark for Comet array expressions. To run this benchmark:
* `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometArrayExpressionBenchmark`
* Results will be written to "spark/benchmarks/CometArrayExpressionBenchmark-**results.txt".
*/
// spotless:on
object CometArrayExpressionBenchmark extends CometBenchmarkBase {

private val arrayExpressions = List(
ArrayExprConfig("array_contains", "SELECT array_contains(arr_int, 5) FROM parquetV1Table"),
ArrayExprConfig("array_max", "SELECT array_max(arr_int) FROM parquetV1Table"),
ArrayExprConfig("array_min", "SELECT array_min(arr_int) FROM parquetV1Table"),
ArrayExprConfig("array_distinct", "SELECT array_distinct(arr_int) FROM parquetV1Table"),
ArrayExprConfig("array_remove", "SELECT array_remove(arr_int, 5) FROM parquetV1Table"),
ArrayExprConfig("array_append", "SELECT array_append(arr_int, 100) FROM parquetV1Table"),
ArrayExprConfig("array_compact", "SELECT array_compact(arr_nullable) FROM parquetV1Table"),
ArrayExprConfig(
"array_intersect",
"SELECT array_intersect(arr_int, arr_int2) FROM parquetV1Table"),
ArrayExprConfig("array_except", "SELECT array_except(arr_int, arr_int2) FROM parquetV1Table"),
ArrayExprConfig("array_union", "SELECT array_union(arr_int, arr_int2) FROM parquetV1Table"),
ArrayExprConfig(
"arrays_overlap",
"SELECT arrays_overlap(arr_int, arr_int2) FROM parquetV1Table"),
ArrayExprConfig("array_insert", "SELECT array_insert(arr_int, 2, 999) FROM parquetV1Table"),
ArrayExprConfig("array_join", "SELECT array_join(arr_str, ',') FROM parquetV1Table"),
ArrayExprConfig("array_repeat", "SELECT array_repeat(elem, 5) FROM parquetV1Table"),
ArrayExprConfig("get_array_item", "SELECT arr_int[0] FROM parquetV1Table"),
ArrayExprConfig("element_at", "SELECT element_at(arr_int, 1) FROM parquetV1Table"),
ArrayExprConfig("reverse", "SELECT reverse(arr_int) FROM parquetV1Table"),
ArrayExprConfig("flatten", "SELECT flatten(nested_arr) FROM parquetV1Table"),
ArrayExprConfig("size", "SELECT size(arr_int) FROM parquetV1Table"))

override def runCometBenchmark(mainArgs: Array[String]): Unit = {
val values = getBenchmarkRows(1024 * 1024 * 2) // 2M rows default (arrays are larger)

runBenchmarkWithTable("Array expression benchmarks", values) { v =>
withTempPath { dir =>
withTempTable("parquetV1Table") {
prepareTable(
dir,
spark.sql(s"""
SELECT
CASE WHEN value % 50 = 0 THEN NULL
ELSE array(
cast((value % 10) as int),
cast((value % 20) as int),
cast((value % 30) as int),
cast((value % 10) as int),
5, 10, 15)
END AS arr_int,
CASE WHEN value % 50 = 1 THEN NULL
ELSE array(
cast((value % 15) as int),
cast((value % 25) as int),
cast((value % 35) as int))
END AS arr_int2,
CASE WHEN value % 50 = 2 THEN NULL
ELSE array(
CASE WHEN value % 7 = 0 THEN NULL ELSE cast((value % 10) as int) END,
CASE WHEN value % 7 = 1 THEN NULL ELSE cast((value % 20) as int) END,
CASE WHEN value % 7 = 2 THEN NULL ELSE cast((value % 30) as int) END)
END AS arr_nullable,
CASE WHEN value % 50 = 3 THEN NULL
ELSE array(
concat('str_', cast(value % 10 as string)),
concat('val_', cast(value % 20 as string)),
concat('item_', cast(value % 5 as string)))
END AS arr_str,
CASE WHEN value % 50 = 4 THEN NULL
ELSE array(
array(cast((value % 5) as int), cast((value % 10) as int)),
array(cast((value % 15) as int), cast((value % 20) as int)))
END AS nested_arr,
CASE WHEN value % 50 = 5 THEN NULL ELSE cast((value % 100) as int) END AS elem
FROM $tbl
"""))

arrayExpressions.foreach { config =>
runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs)
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,27 @@ trait CometBenchmarkBase extends SqlBasedBenchmark {

protected val tbl = "comet_table"

/**
* Get the number of rows to use for benchmarks, with support for quick mode. Set
* COMET_BENCHMARK_QUICK_MODE=1 to run with reduced dataset sizes for faster benchmarking.
*
* @param defaultRows
* the default number of rows for full benchmark
* @return
* the number of rows to use (1/100th of default in quick mode)
*/
protected def getBenchmarkRows(defaultRows: Int): Int = {
val quickMode = sys.env
.get("COMET_BENCHMARK_QUICK_MODE")
.exists(v => v == "1" || v.equalsIgnoreCase("true"))
if (quickMode) {
// Use 1% of default size in quick mode (minimum 1024)
Math.max(1024, defaultRows / 100)
} else {
defaultRows
}
}

protected def withTempTable(tableNames: String*)(f: => Unit): Unit = {
try f
finally tableNames.foreach(spark.catalog.dropTempView)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.benchmark

case class BitwiseExprConfig(
name: String,
query: String,
extraCometConfigs: Map[String, String] = Map.empty)

// spotless:off
/**
* Comprehensive benchmark for Comet bitwise expressions. To run this benchmark:
* `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometBitwiseExpressionBenchmark`
* Results will be written to "spark/benchmarks/CometBitwiseExpressionBenchmark-**results.txt".
*/
// spotless:on
object CometBitwiseExpressionBenchmark extends CometBenchmarkBase {

private val bitwiseExpressions = List(
BitwiseExprConfig("bitwise_and", "SELECT c_int & c_int2 FROM parquetV1Table"),
BitwiseExprConfig("bitwise_or", "SELECT c_int | c_int2 FROM parquetV1Table"),
BitwiseExprConfig("bitwise_xor", "SELECT c_int ^ c_int2 FROM parquetV1Table"),
BitwiseExprConfig("bitwise_not", "SELECT ~c_int FROM parquetV1Table"),
BitwiseExprConfig("shift_left", "SELECT SHIFTLEFT(c_int, 3) FROM parquetV1Table"),
BitwiseExprConfig("shift_right", "SELECT SHIFTRIGHT(c_int, 3) FROM parquetV1Table"),
BitwiseExprConfig(
"shift_right_unsigned",
"SELECT SHIFTRIGHTUNSIGNED(c_int, 3) FROM parquetV1Table"),
BitwiseExprConfig("bit_count", "SELECT BIT_COUNT(c_long) FROM parquetV1Table"),
BitwiseExprConfig("bit_get", "SELECT BIT_GET(c_long, c_pos) FROM parquetV1Table"))

override def runCometBenchmark(mainArgs: Array[String]): Unit = {
val values = getBenchmarkRows(1024 * 1024 * 5) // 5M rows default

runBenchmarkWithTable("Bitwise expression benchmarks", values) { v =>
withTempPath { dir =>
withTempTable("parquetV1Table") {
prepareTable(
dir,
spark.sql(s"""
SELECT
CASE WHEN value % 100 = 0 THEN NULL ELSE CAST(value % 1000000 AS INT) END AS c_int,
CASE WHEN value % 100 = 1 THEN NULL ELSE CAST((value * 7) % 1000000 AS INT) END AS c_int2,
CASE WHEN value % 100 = 2 THEN NULL ELSE CAST(value * 1000 AS LONG) END AS c_long,
CASE WHEN value % 100 = 3 THEN NULL ELSE CAST(value % 64 AS INT) END AS c_pos
FROM $tbl
"""))

bitwiseExpressions.foreach { config =>
runExpressionBenchmark(config.name, v, config.query, config.extraCometConfigs)
}
}
}
}
}
}
Loading
Loading