From 331631f5c9404abf7a7114b9f190db8761334db8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 16 Feb 2026 14:58:42 -0700 Subject: [PATCH 1/7] Consolidate TPC benchmark scripts into single Python runner Replace 9 per-engine shell scripts with a single `run.py` that loads per-engine TOML config files. This eliminates duplicated Spark conf boilerplate and makes it easier to add new engines or modify shared settings. Usage: `python3 run.py --engine comet --benchmark tpch [--dry-run]` Also moves benchmarks from `dev/benchmarks/` to `benchmarks/tpc/` and updates all documentation references. Co-Authored-By: Claude Opus 4.6 --- {dev/benchmarks => benchmarks/tpc}/.gitignore | 0 {dev/benchmarks => benchmarks/tpc}/README.md | 39 +- .../tpc}/create-iceberg-tpch.py | 0 .../tpc}/drop-caches.sh | 0 benchmarks/tpc/engines/blaze.toml | 36 ++ benchmarks/tpc/engines/comet-iceberg.toml | 49 +++ benchmarks/tpc/engines/comet.toml | 35 ++ benchmarks/tpc/engines/gluten.toml | 34 ++ benchmarks/tpc/engines/spark.toml | 19 + .../tpc}/generate-comparison.py | 0 benchmarks/tpc/run.py | 378 ++++++++++++++++++ .../benchmarks => benchmarks/tpc}/tpcbench.py | 0 dev/benchmarks/blaze-tpcds.sh | 53 --- dev/benchmarks/blaze-tpch.sh | 53 --- dev/benchmarks/comet-tpcds.sh | 53 --- dev/benchmarks/comet-tpch-iceberg.sh | 114 ------ dev/benchmarks/comet-tpch.sh | 55 --- dev/benchmarks/gluten-tpcds.sh | 53 --- dev/benchmarks/gluten-tpch.sh | 53 --- dev/benchmarks/spark-tpcds.sh | 45 --- dev/benchmarks/spark-tpch.sh | 46 --- docs/source/about/gluten_comparison.md | 2 +- .../benchmark-results/tpc-ds.md | 2 +- .../benchmark-results/tpc-h.md | 2 +- docs/source/contributor-guide/benchmarking.md | 2 +- .../contributor-guide/benchmarking_aws_ec2.md | 15 +- 26 files changed, 600 insertions(+), 538 deletions(-) rename {dev/benchmarks => benchmarks/tpc}/.gitignore (100%) rename {dev/benchmarks => benchmarks/tpc}/README.md (79%) rename {dev/benchmarks => benchmarks/tpc}/create-iceberg-tpch.py (100%) rename {dev/benchmarks => benchmarks/tpc}/drop-caches.sh (100%) create mode 100644 benchmarks/tpc/engines/blaze.toml create mode 100644 benchmarks/tpc/engines/comet-iceberg.toml create mode 100644 benchmarks/tpc/engines/comet.toml create mode 100644 benchmarks/tpc/engines/gluten.toml create mode 100644 benchmarks/tpc/engines/spark.toml rename {dev/benchmarks => benchmarks/tpc}/generate-comparison.py (100%) create mode 100755 benchmarks/tpc/run.py rename {dev/benchmarks => benchmarks/tpc}/tpcbench.py (100%) delete mode 100755 dev/benchmarks/blaze-tpcds.sh delete mode 100755 dev/benchmarks/blaze-tpch.sh delete mode 100755 dev/benchmarks/comet-tpcds.sh delete mode 100755 dev/benchmarks/comet-tpch-iceberg.sh delete mode 100755 dev/benchmarks/comet-tpch.sh delete mode 100755 dev/benchmarks/gluten-tpcds.sh delete mode 100755 dev/benchmarks/gluten-tpch.sh delete mode 100755 dev/benchmarks/spark-tpcds.sh delete mode 100755 dev/benchmarks/spark-tpch.sh diff --git a/dev/benchmarks/.gitignore b/benchmarks/tpc/.gitignore similarity index 100% rename from dev/benchmarks/.gitignore rename to benchmarks/tpc/.gitignore diff --git a/dev/benchmarks/README.md b/benchmarks/tpc/README.md similarity index 79% rename from dev/benchmarks/README.md rename to benchmarks/tpc/README.md index b3ea674199..48df9c63d1 100644 --- a/dev/benchmarks/README.md +++ b/benchmarks/tpc/README.md @@ -26,6 +26,26 @@ For full instructions on running these benchmarks on an EC2 instance, see the [C [Comet Benchmarking on EC2 Guide]: https://datafusion.apache.org/comet/contributor-guide/benchmarking_aws_ec2.html +## Usage + +All benchmarks are run via `run.py`: + +``` +python3 run.py --engine --benchmark [options] +``` + +| Option | Description | +| --------------- | ---------------------------------------- | +| `--engine` | Engine name (matches a TOML file in `engines/`) | +| `--benchmark` | `tpch` or `tpcds` | +| `--iterations` | Number of iterations (default: 1) | +| `--output` | Output directory (default: `.`) | +| `--query` | Run a single query number | +| `--no-restart` | Skip Spark master/worker restart | +| `--dry-run` | Print the spark-submit command without executing | + +Available engines: `spark`, `comet`, `comet-iceberg`, `gluten`, `blaze` + ## Example usage Set Spark environment variables: @@ -47,7 +67,7 @@ Run Spark benchmark: ```shell export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 sudo ./drop-caches.sh -./spark-tpch.sh +python3 run.py --engine spark --benchmark tpch ``` Run Comet benchmark: @@ -56,7 +76,7 @@ Run Comet benchmark: export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 export COMET_JAR=/opt/comet/comet-spark-spark3.5_2.12-0.10.0.jar sudo ./drop-caches.sh -./comet-tpch.sh +python3 run.py --engine comet --benchmark tpch ``` Run Gluten benchmark: @@ -65,7 +85,13 @@ Run Gluten benchmark: export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 export GLUTEN_JAR=/opt/gluten/gluten-velox-bundle-spark3.5_2.12-linux_amd64-1.4.0.jar sudo ./drop-caches.sh -./gluten-tpch.sh +python3 run.py --engine gluten --benchmark tpch +``` + +Preview a command without running it: + +```shell +python3 run.py --engine comet --benchmark tpch --dry-run ``` Generating charts: @@ -74,6 +100,11 @@ Generating charts: python3 generate-comparison.py --benchmark tpch --labels "Spark 3.5.3" "Comet 0.9.0" "Gluten 1.4.0" --title "TPC-H @ 100 GB (single executor, 8 cores, local Parquet files)" spark-tpch-1752338506381.json comet-tpch-1752337818039.json gluten-tpch-1752337474344.json ``` +## Engine Configuration + +Each engine is defined by a TOML file in `engines/`. The config specifies JARs, Spark conf overrides, +required environment variables, and optional defaults/exports. See existing files for examples. + ## Iceberg Benchmarking Comet includes native Iceberg support via iceberg-rust integration. This enables benchmarking TPC-H queries @@ -124,7 +155,7 @@ export ICEBERG_JAR=/path/to/iceberg-spark-runtime-3.5_2.12-1.8.1.jar export ICEBERG_WAREHOUSE=/mnt/bigdata/iceberg-warehouse export TPCH_QUERIES=/mnt/bigdata/tpch/queries/ sudo ./drop-caches.sh -./comet-tpch-iceberg.sh +python3 run.py --engine comet-iceberg --benchmark tpch ``` The benchmark uses `spark.comet.scan.icebergNative.enabled=true` to enable Comet's native iceberg-rust diff --git a/dev/benchmarks/create-iceberg-tpch.py b/benchmarks/tpc/create-iceberg-tpch.py similarity index 100% rename from dev/benchmarks/create-iceberg-tpch.py rename to benchmarks/tpc/create-iceberg-tpch.py diff --git a/dev/benchmarks/drop-caches.sh b/benchmarks/tpc/drop-caches.sh similarity index 100% rename from dev/benchmarks/drop-caches.sh rename to benchmarks/tpc/drop-caches.sh diff --git a/benchmarks/tpc/engines/blaze.toml b/benchmarks/tpc/engines/blaze.toml new file mode 100644 index 0000000000..7298a8f3ab --- /dev/null +++ b/benchmarks/tpc/engines/blaze.toml @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[engine] +name = "blaze" + +[env] +required = ["BLAZE_JAR"] + +[spark_submit] +jars = ["$BLAZE_JAR"] +driver_class_path = ["$BLAZE_JAR"] + +[spark_conf] +"spark.driver.extraClassPath" = "$BLAZE_JAR" +"spark.executor.extraClassPath" = "$BLAZE_JAR" +"spark.sql.extensions" = "org.apache.spark.sql.blaze.BlazeSparkSessionExtension" +"spark.shuffle.manager" = "org.apache.spark.sql.execution.blaze.shuffle.BlazeShuffleManager" +"spark.blaze.enable" = "true" +"spark.blaze.forceShuffledHashJoin" = "true" +"spark.executor.memoryOverhead" = "16g" +"spark.memory.offHeap.enabled" = "false" diff --git a/benchmarks/tpc/engines/comet-iceberg.toml b/benchmarks/tpc/engines/comet-iceberg.toml new file mode 100644 index 0000000000..ebabfce7b4 --- /dev/null +++ b/benchmarks/tpc/engines/comet-iceberg.toml @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[engine] +name = "comet-iceberg" + +[env] +required = ["COMET_JAR", "ICEBERG_JAR", "ICEBERG_WAREHOUSE", "TPCH_QUERIES"] + +[env.defaults] +ICEBERG_CATALOG = "local" +ICEBERG_DATABASE = "tpch" + +[spark_submit] +jars = ["$COMET_JAR", "$ICEBERG_JAR"] +driver_class_path = ["$COMET_JAR", "$ICEBERG_JAR"] + +[spark_conf] +"spark.driver.extraClassPath" = "$COMET_JAR:$ICEBERG_JAR" +"spark.executor.extraClassPath" = "$COMET_JAR:$ICEBERG_JAR" +"spark.plugins" = "org.apache.spark.CometPlugin" +"spark.shuffle.manager" = "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager" +"spark.comet.exec.replaceSortMergeJoin" = "true" +"spark.comet.expression.Cast.allowIncompatible" = "true" +"spark.comet.enabled" = "true" +"spark.comet.exec.enabled" = "true" +"spark.comet.scan.icebergNative.enabled" = "true" +"spark.comet.explainFallback.enabled" = "true" +"spark.sql.catalog.${ICEBERG_CATALOG}" = "org.apache.iceberg.spark.SparkCatalog" +"spark.sql.catalog.${ICEBERG_CATALOG}.type" = "hadoop" +"spark.sql.catalog.${ICEBERG_CATALOG}.warehouse" = "$ICEBERG_WAREHOUSE" +"spark.sql.defaultCatalog" = "${ICEBERG_CATALOG}" + +[tpcbench_args] +use_iceberg = true diff --git a/benchmarks/tpc/engines/comet.toml b/benchmarks/tpc/engines/comet.toml new file mode 100644 index 0000000000..8e19165ebf --- /dev/null +++ b/benchmarks/tpc/engines/comet.toml @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[engine] +name = "comet" + +[env] +required = ["COMET_JAR"] + +[spark_submit] +jars = ["$COMET_JAR"] +driver_class_path = ["$COMET_JAR"] + +[spark_conf] +"spark.driver.extraClassPath" = "$COMET_JAR" +"spark.executor.extraClassPath" = "$COMET_JAR" +"spark.plugins" = "org.apache.spark.CometPlugin" +"spark.shuffle.manager" = "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager" +"spark.comet.scan.impl" = "native_datafusion" +"spark.comet.exec.replaceSortMergeJoin" = "true" +"spark.comet.expression.Cast.allowIncompatible" = "true" diff --git a/benchmarks/tpc/engines/gluten.toml b/benchmarks/tpc/engines/gluten.toml new file mode 100644 index 0000000000..20165788c2 --- /dev/null +++ b/benchmarks/tpc/engines/gluten.toml @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[engine] +name = "gluten" + +[env] +required = ["GLUTEN_JAR"] +exports = { TZ = "UTC" } + +[spark_submit] +jars = ["$GLUTEN_JAR"] + +[spark_conf] +"spark.plugins" = "org.apache.gluten.GlutenPlugin" +"spark.driver.extraClassPath" = "${GLUTEN_JAR}" +"spark.executor.extraClassPath" = "${GLUTEN_JAR}" +"spark.gluten.sql.columnar.forceShuffledHashJoin" = "true" +"spark.shuffle.manager" = "org.apache.spark.shuffle.sort.ColumnarShuffleManager" +"spark.sql.session.timeZone" = "UTC" diff --git a/benchmarks/tpc/engines/spark.toml b/benchmarks/tpc/engines/spark.toml new file mode 100644 index 0000000000..c02e7a6ad2 --- /dev/null +++ b/benchmarks/tpc/engines/spark.toml @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[engine] +name = "spark" diff --git a/dev/benchmarks/generate-comparison.py b/benchmarks/tpc/generate-comparison.py similarity index 100% rename from dev/benchmarks/generate-comparison.py rename to benchmarks/tpc/generate-comparison.py diff --git a/benchmarks/tpc/run.py b/benchmarks/tpc/run.py new file mode 100755 index 0000000000..122f90f821 --- /dev/null +++ b/benchmarks/tpc/run.py @@ -0,0 +1,378 @@ +#!/usr/bin/env python3 +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Consolidated TPC benchmark runner for Spark-based engines. + +Usage: + python3 run.py --engine comet --benchmark tpch + python3 run.py --engine blaze --benchmark tpcds --iterations 3 + python3 run.py --engine comet-iceberg --benchmark tpch + python3 run.py --engine comet --benchmark tpch --dry-run + python3 run.py --engine spark --benchmark tpch --no-restart +""" + +import argparse +import os +import re +import subprocess +import sys + +# --------------------------------------------------------------------------- +# TOML loading – prefer stdlib tomllib (3.11+), else minimal fallback +# --------------------------------------------------------------------------- + +try: + import tomllib # Python 3.11+ + + def load_toml(path): + with open(path, "rb") as f: + return tomllib.load(f) + +except ModuleNotFoundError: + + def _parse_toml(text): + """Minimal TOML parser supporting tables, quoted-key strings, plain + strings, arrays of strings, booleans, and comments. Sufficient for + the engine config files used by this runner.""" + root = {} + current = root + for line in text.splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + # Table header: [env.defaults] or [spark_conf] + m = re.match(r"^\[([^\]]+)\]$", line) + if m: + keys = m.group(1).split(".") + current = root + for k in keys: + current = current.setdefault(k, {}) + continue + # Key = value + m = re.match(r'^("(?:[^"\\]|\\.)*"|[A-Za-z0-9_.]+)\s*=\s*(.+)$', line) + if not m: + continue + raw_key, raw_val = m.group(1), m.group(2).strip() + key = raw_key.strip('"') + val = _parse_value(raw_val) + current[key] = val + return root + + def _parse_value(raw): + if raw == "true": + return True + if raw == "false": + return False + if raw.startswith('"') and raw.endswith('"'): + return raw[1:-1] + if raw.startswith("["): + # Simple array of strings + items = [] + for m in re.finditer(r'"((?:[^"\\]|\\.)*)"', raw): + items.append(m.group(1)) + return items + if raw.startswith("{"): + # Inline table: { KEY = "VAL", ... } + tbl = {} + for m in re.finditer(r'([A-Za-z0-9_]+)\s*=\s*"((?:[^"\\]|\\.)*)"', raw): + tbl[m.group(1)] = m.group(2) + return tbl + return raw + + def load_toml(path): + with open(path, "r") as f: + return _parse_toml(f.read()) + + +# --------------------------------------------------------------------------- +# Common Spark configuration (shared across all engines) +# --------------------------------------------------------------------------- + +COMMON_SPARK_CONF = { + "spark.driver.memory": "8G", + "spark.executor.memory": "16g", + "spark.memory.offHeap.enabled": "true", + "spark.memory.offHeap.size": "16g", + "spark.eventLog.enabled": "true", + "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", + "spark.hadoop.fs.s3a.aws.credentials.provider": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain", +} + +# --------------------------------------------------------------------------- +# Benchmark profiles +# --------------------------------------------------------------------------- + +BENCHMARK_PROFILES = { + "tpch": { + "executor_instances": "1", + "executor_cores": "8", + "max_cores": "8", + "data_env": "TPCH_DATA", + "queries_env": "TPCH_QUERIES", + "format": "parquet", + }, + "tpcds": { + "executor_instances": "2", + "executor_cores": "8", + "max_cores": "16", + "data_env": "TPCDS_DATA", + "queries_env": "TPCDS_QUERIES", + "format": None, # omit --format for TPC-DS + }, +} + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def resolve_env(value): + """Expand $VAR and ${VAR} references using os.environ.""" + if not isinstance(value, str): + return value + return re.sub( + r"\$\{([^}]+)\}|\$([A-Za-z_][A-Za-z0-9_]*)", + lambda m: os.environ.get(m.group(1) or m.group(2), ""), + value, + ) + + +def resolve_env_in_list(lst): + return [resolve_env(v) for v in lst] + + +def load_engine_config(engine_name): + """Load and return the TOML config for the given engine.""" + script_dir = os.path.dirname(os.path.abspath(__file__)) + config_path = os.path.join(script_dir, "engines", f"{engine_name}.toml") + if not os.path.exists(config_path): + available = sorted( + f.removesuffix(".toml") + for f in os.listdir(os.path.join(script_dir, "engines")) + if f.endswith(".toml") + ) + print(f"Error: Unknown engine '{engine_name}'", file=sys.stderr) + print(f"Available engines: {', '.join(available)}", file=sys.stderr) + sys.exit(1) + return load_toml(config_path) + + +def apply_env_defaults(config): + """Set environment variable defaults from [env.defaults].""" + defaults = config.get("env", {}).get("defaults", {}) + for key, val in defaults.items(): + if key not in os.environ: + os.environ[key] = val + + +def apply_env_exports(config): + """Export environment variables from [env.exports].""" + exports = config.get("env", {}).get("exports", {}) + for key, val in exports.items(): + os.environ[key] = val + + +def check_required_env(config): + """Validate that required environment variables are set.""" + required = config.get("env", {}).get("required", []) + missing = [v for v in required if not os.environ.get(v)] + if missing: + print( + f"Error: Required environment variable(s) not set: {', '.join(missing)}", + file=sys.stderr, + ) + sys.exit(1) + + +def check_common_env(): + """Validate SPARK_HOME and SPARK_MASTER are set.""" + for var in ("SPARK_HOME", "SPARK_MASTER"): + if not os.environ.get(var): + print(f"Error: {var} is not set", file=sys.stderr) + sys.exit(1) + + +def build_spark_submit_cmd(config, benchmark, args): + """Build the spark-submit command list.""" + spark_home = os.environ["SPARK_HOME"] + spark_master = os.environ["SPARK_MASTER"] + profile = BENCHMARK_PROFILES[benchmark] + + cmd = [os.path.join(spark_home, "bin", "spark-submit")] + cmd += ["--master", spark_master] + + # --jars + jars = config.get("spark_submit", {}).get("jars", []) + if jars: + cmd += ["--jars", ",".join(resolve_env_in_list(jars))] + + # --driver-class-path + driver_cp = config.get("spark_submit", {}).get("driver_class_path", []) + if driver_cp: + cmd += ["--driver-class-path", ":".join(resolve_env_in_list(driver_cp))] + + # Merge spark confs: common + benchmark profile + engine overrides + conf = dict(COMMON_SPARK_CONF) + conf["spark.executor.instances"] = profile["executor_instances"] + conf["spark.executor.cores"] = profile["executor_cores"] + conf["spark.cores.max"] = profile["max_cores"] + + engine_conf = config.get("spark_conf", {}) + for key, val in engine_conf.items(): + if isinstance(val, bool): + val = "true" if val else "false" + conf[resolve_env(key)] = resolve_env(str(val)) + + for key, val in sorted(conf.items()): + cmd += ["--conf", f"{key}={val}"] + + # tpcbench.py path + cmd.append("tpcbench.py") + + # tpcbench args + engine_name = config.get("engine", {}).get("name", args.engine) + cmd += ["--name", engine_name] + cmd += ["--benchmark", benchmark] + + use_iceberg = config.get("tpcbench_args", {}).get("use_iceberg", False) + if use_iceberg: + cmd += ["--catalog", resolve_env("${ICEBERG_CATALOG}")] + cmd += ["--database", resolve_env("${ICEBERG_DATABASE}")] + else: + data_var = profile["data_env"] + data_val = os.environ.get(data_var, "") + cmd += ["--data", data_val] + + queries_var = profile["queries_env"] + queries_val = os.environ.get(queries_var, "") + cmd += ["--queries", queries_val] + + cmd += ["--output", args.output] + cmd += ["--iterations", str(args.iterations)] + + if args.query is not None: + cmd += ["--query", str(args.query)] + + if profile["format"] and not use_iceberg: + cmd += ["--format", profile["format"]] + + return cmd + + +def restart_spark(): + """Stop and start Spark master and worker.""" + spark_home = os.environ["SPARK_HOME"] + sbin = os.path.join(spark_home, "sbin") + spark_master = os.environ["SPARK_MASTER"] + + # Stop (ignore errors) + subprocess.run( + [os.path.join(sbin, "stop-master.sh")], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + subprocess.run( + [os.path.join(sbin, "stop-worker.sh")], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + # Start (check errors) + r = subprocess.run([os.path.join(sbin, "start-master.sh")]) + if r.returncode != 0: + print("Error: Failed to start Spark master", file=sys.stderr) + sys.exit(1) + + r = subprocess.run([os.path.join(sbin, "start-worker.sh"), spark_master]) + if r.returncode != 0: + print("Error: Failed to start Spark worker", file=sys.stderr) + sys.exit(1) + + +def main(): + parser = argparse.ArgumentParser( + description="Consolidated TPC benchmark runner for Spark-based engines." + ) + parser.add_argument( + "--engine", + required=True, + help="Engine name (matches a TOML file in engines/)", + ) + parser.add_argument( + "--benchmark", + required=True, + choices=["tpch", "tpcds"], + help="Benchmark to run", + ) + parser.add_argument( + "--iterations", type=int, default=1, help="Number of iterations (default: 1)" + ) + parser.add_argument( + "--output", default=".", help="Output directory (default: .)" + ) + parser.add_argument( + "--query", type=int, default=None, help="Run a single query number" + ) + parser.add_argument( + "--no-restart", + action="store_true", + help="Skip Spark master/worker restart", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Print the spark-submit command without executing", + ) + args = parser.parse_args() + + config = load_engine_config(args.engine) + + # Apply env defaults and exports before validation + apply_env_defaults(config) + apply_env_exports(config) + + check_common_env() + check_required_env(config) + + # Restart Spark unless --no-restart or --dry-run + if not args.no_restart and not args.dry_run: + restart_spark() + + cmd = build_spark_submit_cmd(config, args.benchmark, args) + + if args.dry_run: + # Group paired arguments (e.g. --conf key=value) on one line + parts = [] + i = 0 + while i < len(cmd): + token = cmd[i] + if token.startswith("--") and i + 1 < len(cmd) and not cmd[i + 1].startswith("--"): + parts.append(f"{token} {cmd[i + 1]}") + i += 2 + else: + parts.append(token) + i += 1 + print(" \\\n ".join(parts)) + else: + r = subprocess.run(cmd) + sys.exit(r.returncode) + + +if __name__ == "__main__": + main() diff --git a/dev/benchmarks/tpcbench.py b/benchmarks/tpc/tpcbench.py similarity index 100% rename from dev/benchmarks/tpcbench.py rename to benchmarks/tpc/tpcbench.py diff --git a/dev/benchmarks/blaze-tpcds.sh b/dev/benchmarks/blaze-tpcds.sh deleted file mode 100755 index 90a4a48468..0000000000 --- a/dev/benchmarks/blaze-tpcds.sh +++ /dev/null @@ -1,53 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -$SPARK_HOME/sbin/stop-master.sh -$SPARK_HOME/sbin/stop-worker.sh - -$SPARK_HOME/sbin/start-master.sh -$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER - -$SPARK_HOME/bin/spark-submit \ - --master $SPARK_MASTER \ - --jars $BLAZE_JAR \ - --driver-class-path $BLAZE_JAR \ - --conf spark.driver.memory=8G \ - --conf spark.executor.instances=2 \ - --conf spark.executor.cores=8 \ - --conf spark.cores.max=16 \ - --conf spark.executor.memory=16g \ - --conf spark.executor.memoryOverhead=16g \ - --conf spark.memory.offHeap.enabled=false \ - --conf spark.eventLog.enabled=true \ - --conf spark.driver.extraClassPath=$BLAZE_JAR \ - --conf spark.executor.extraClassPath=$BLAZE_JAR \ - --conf spark.sql.extensions=org.apache.spark.sql.blaze.BlazeSparkSessionExtension \ - --conf spark.shuffle.manager=org.apache.spark.sql.execution.blaze.shuffle.BlazeShuffleManager \ - --conf spark.blaze.enable=true \ - --conf spark.blaze.forceShuffledHashJoin=true \ - --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ - --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain \ - tpcbench.py \ - --name blaze \ - --benchmark tpcds \ - --data $TPCDS_DATA \ - --queries $TPCDS_QUERIES \ - --output . \ - --iterations 1 diff --git a/dev/benchmarks/blaze-tpch.sh b/dev/benchmarks/blaze-tpch.sh deleted file mode 100755 index 2c6878737d..0000000000 --- a/dev/benchmarks/blaze-tpch.sh +++ /dev/null @@ -1,53 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -$SPARK_HOME/sbin/stop-master.sh -$SPARK_HOME/sbin/stop-worker.sh - -$SPARK_HOME/sbin/start-master.sh -$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER - -$SPARK_HOME/bin/spark-submit \ - --master $SPARK_MASTER \ - --jars $BLAZE_JAR \ - --driver-class-path $BLAZE_JAR \ - --conf spark.driver.memory=8G \ - --conf spark.executor.instances=1 \ - --conf spark.executor.cores=8 \ - --conf spark.cores.max=8 \ - --conf spark.executor.memory=16g \ - --conf spark.executor.memoryOverhead=16g \ - --conf spark.memory.offHeap.enabled=false \ - --conf spark.eventLog.enabled=true \ - --conf spark.driver.extraClassPath=$BLAZE_JAR \ - --conf spark.executor.extraClassPath=$BLAZE_JAR \ - --conf spark.sql.extensions=org.apache.spark.sql.blaze.BlazeSparkSessionExtension \ - --conf spark.shuffle.manager=org.apache.spark.sql.execution.blaze.shuffle.BlazeShuffleManager \ - --conf spark.blaze.enable=true \ - --conf spark.blaze.forceShuffledHashJoin=true \ - --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ - --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain \ - tpcbench.py \ - --name blaze \ - --benchmark tpch \ - --data $TPCH_DATA \ - --queries $TPCH_QUERIES \ - --output . \ - --iterations 1 diff --git a/dev/benchmarks/comet-tpcds.sh b/dev/benchmarks/comet-tpcds.sh deleted file mode 100755 index b55b27188c..0000000000 --- a/dev/benchmarks/comet-tpcds.sh +++ /dev/null @@ -1,53 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -$SPARK_HOME/sbin/stop-master.sh -$SPARK_HOME/sbin/stop-worker.sh - -$SPARK_HOME/sbin/start-master.sh -$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER - -$SPARK_HOME/bin/spark-submit \ - --master $SPARK_MASTER \ - --jars $COMET_JAR \ - --driver-class-path $COMET_JAR \ - --conf spark.driver.memory=8G \ - --conf spark.executor.instances=2 \ - --conf spark.executor.cores=8 \ - --conf spark.cores.max=16 \ - --conf spark.executor.memory=16g \ - --conf spark.memory.offHeap.enabled=true \ - --conf spark.memory.offHeap.size=16g \ - --conf spark.eventLog.enabled=true \ - --conf spark.driver.extraClassPath=$COMET_JAR \ - --conf spark.executor.extraClassPath=$COMET_JAR \ - --conf spark.plugins=org.apache.spark.CometPlugin \ - --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ - --conf spark.comet.scan.impl=native_datafusion \ - --conf spark.comet.expression.Cast.allowIncompatible=true \ - --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ - --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain \ - tpcbench.py \ - --name comet \ - --benchmark tpcds \ - --data $TPCDS_DATA \ - --queries $TPCDS_QUERIES \ - --output . \ - --iterations 1 diff --git a/dev/benchmarks/comet-tpch-iceberg.sh b/dev/benchmarks/comet-tpch-iceberg.sh deleted file mode 100755 index 7907125c82..0000000000 --- a/dev/benchmarks/comet-tpch-iceberg.sh +++ /dev/null @@ -1,114 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# TPC-H benchmark using Iceberg tables with Comet's native iceberg-rust integration. -# -# Required environment variables: -# SPARK_HOME - Path to Spark installation -# SPARK_MASTER - Spark master URL (e.g., spark://localhost:7077) -# COMET_JAR - Path to Comet JAR -# ICEBERG_JAR - Path to Iceberg Spark runtime JAR -# ICEBERG_WAREHOUSE - Path to Iceberg warehouse directory -# TPCH_QUERIES - Path to TPC-H query files -# -# Optional: -# ICEBERG_CATALOG - Catalog name (default: local) -# ICEBERG_DATABASE - Database name (default: tpch) -# -# Setup (run once to create Iceberg tables from Parquet): -# $SPARK_HOME/bin/spark-submit \ -# --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \ -# --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \ -# --conf spark.sql.catalog.local.type=hadoop \ -# --conf spark.sql.catalog.local.warehouse=$ICEBERG_WAREHOUSE \ -# create-iceberg-tpch.py \ -# --parquet-path $TPCH_DATA \ -# --catalog local \ -# --database tpch - -set -e - -# Defaults -ICEBERG_CATALOG=${ICEBERG_CATALOG:-local} -ICEBERG_DATABASE=${ICEBERG_DATABASE:-tpch} - -# Validate required variables -if [ -z "$SPARK_HOME" ]; then - echo "Error: SPARK_HOME is not set" - exit 1 -fi -if [ -z "$COMET_JAR" ]; then - echo "Error: COMET_JAR is not set" - exit 1 -fi -if [ -z "$ICEBERG_JAR" ]; then - echo "Error: ICEBERG_JAR is not set" - echo "Download from: https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.8.1/" - exit 1 -fi -if [ -z "$ICEBERG_WAREHOUSE" ]; then - echo "Error: ICEBERG_WAREHOUSE is not set" - exit 1 -fi -if [ -z "$TPCH_QUERIES" ]; then - echo "Error: TPCH_QUERIES is not set" - exit 1 -fi - -$SPARK_HOME/sbin/stop-master.sh 2>/dev/null || true -$SPARK_HOME/sbin/stop-worker.sh 2>/dev/null || true - -$SPARK_HOME/sbin/start-master.sh -$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER - -$SPARK_HOME/bin/spark-submit \ - --master $SPARK_MASTER \ - --jars $COMET_JAR,$ICEBERG_JAR \ - --driver-class-path $COMET_JAR:$ICEBERG_JAR \ - --conf spark.driver.memory=8G \ - --conf spark.executor.instances=1 \ - --conf spark.executor.cores=8 \ - --conf spark.cores.max=8 \ - --conf spark.executor.memory=16g \ - --conf spark.memory.offHeap.enabled=true \ - --conf spark.memory.offHeap.size=16g \ - --conf spark.eventLog.enabled=true \ - --conf spark.driver.extraClassPath=$COMET_JAR:$ICEBERG_JAR \ - --conf spark.executor.extraClassPath=$COMET_JAR:$ICEBERG_JAR \ - --conf spark.plugins=org.apache.spark.CometPlugin \ - --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ - --conf spark.comet.exec.replaceSortMergeJoin=true \ - --conf spark.comet.expression.Cast.allowIncompatible=true \ - --conf spark.comet.enabled=true \ - --conf spark.comet.exec.enabled=true \ - --conf spark.comet.scan.icebergNative.enabled=true \ - --conf spark.comet.explainFallback.enabled=true \ - --conf spark.sql.catalog.${ICEBERG_CATALOG}=org.apache.iceberg.spark.SparkCatalog \ - --conf spark.sql.catalog.${ICEBERG_CATALOG}.type=hadoop \ - --conf spark.sql.catalog.${ICEBERG_CATALOG}.warehouse=$ICEBERG_WAREHOUSE \ - --conf spark.sql.defaultCatalog=${ICEBERG_CATALOG} \ - tpcbench.py \ - --name comet-iceberg \ - --benchmark tpch \ - --catalog $ICEBERG_CATALOG \ - --database $ICEBERG_DATABASE \ - --queries $TPCH_QUERIES \ - --output . \ - --iterations 1 diff --git a/dev/benchmarks/comet-tpch.sh b/dev/benchmarks/comet-tpch.sh deleted file mode 100755 index a748a02319..0000000000 --- a/dev/benchmarks/comet-tpch.sh +++ /dev/null @@ -1,55 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -$SPARK_HOME/sbin/stop-master.sh -$SPARK_HOME/sbin/stop-worker.sh - -$SPARK_HOME/sbin/start-master.sh -$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER - -$SPARK_HOME/bin/spark-submit \ - --master $SPARK_MASTER \ - --jars $COMET_JAR \ - --driver-class-path $COMET_JAR \ - --conf spark.driver.memory=8G \ - --conf spark.executor.instances=1 \ - --conf spark.executor.cores=8 \ - --conf spark.cores.max=8 \ - --conf spark.executor.memory=16g \ - --conf spark.memory.offHeap.enabled=true \ - --conf spark.memory.offHeap.size=16g \ - --conf spark.eventLog.enabled=true \ - --conf spark.driver.extraClassPath=$COMET_JAR \ - --conf spark.executor.extraClassPath=$COMET_JAR \ - --conf spark.plugins=org.apache.spark.CometPlugin \ - --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ - --conf spark.comet.scan.impl=native_datafusion \ - --conf spark.comet.exec.replaceSortMergeJoin=true \ - --conf spark.comet.expression.Cast.allowIncompatible=true \ - --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ - --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain \ - tpcbench.py \ - --name comet \ - --benchmark tpch \ - --data $TPCH_DATA \ - --queries $TPCH_QUERIES \ - --output . \ - --iterations 1 \ - --format parquet diff --git a/dev/benchmarks/gluten-tpcds.sh b/dev/benchmarks/gluten-tpcds.sh deleted file mode 100755 index 7c475c79c0..0000000000 --- a/dev/benchmarks/gluten-tpcds.sh +++ /dev/null @@ -1,53 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -export TZ=UTC - -$SPARK_HOME/sbin/stop-master.sh -$SPARK_HOME/sbin/stop-worker.sh - -$SPARK_HOME/sbin/start-master.sh -$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER - -$SPARK_HOME/bin/spark-submit \ - --master $SPARK_MASTER \ - --conf spark.driver.memory=8G \ - --conf spark.executor.instances=2 \ - --conf spark.executor.memory=16G \ - --conf spark.executor.cores=8 \ - --conf spark.cores.max=16 \ - --conf spark.eventLog.enabled=true \ - --jars $GLUTEN_JAR \ - --conf spark.plugins=org.apache.gluten.GlutenPlugin \ - --conf spark.driver.extraClassPath=${GLUTEN_JAR} \ - --conf spark.executor.extraClassPath=${GLUTEN_JAR} \ - --conf spark.memory.offHeap.enabled=true \ - --conf spark.memory.offHeap.size=16g \ - --conf spark.gluten.sql.columnar.forceShuffledHashJoin=true \ - --conf spark.shuffle.manager=org.apache.spark.shuffle.sort.ColumnarShuffleManager \ - --conf spark.sql.session.timeZone=UTC \ - --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ - --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain \ - tpcbench.py \ - --name gluten \ - --benchmark tpcds \ - --data $TPCDS_DATA \ - --queries $TPCDS_QUERIES \ - --output . \ - --iterations 1 diff --git a/dev/benchmarks/gluten-tpch.sh b/dev/benchmarks/gluten-tpch.sh deleted file mode 100755 index 46c3ed7527..0000000000 --- a/dev/benchmarks/gluten-tpch.sh +++ /dev/null @@ -1,53 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -export TZ=UTC - -$SPARK_HOME/sbin/stop-master.sh -$SPARK_HOME/sbin/stop-worker.sh - -$SPARK_HOME/sbin/start-master.sh -$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER - -$SPARK_HOME/bin/spark-submit \ - --master $SPARK_MASTER \ - --conf spark.driver.memory=8G \ - --conf spark.executor.instances=1 \ - --conf spark.executor.memory=16G \ - --conf spark.executor.cores=8 \ - --conf spark.cores.max=8 \ - --conf spark.eventLog.enabled=true \ - --jars $GLUTEN_JAR \ - --conf spark.plugins=org.apache.gluten.GlutenPlugin \ - --conf spark.driver.extraClassPath=${GLUTEN_JAR} \ - --conf spark.executor.extraClassPath=${GLUTEN_JAR} \ - --conf spark.memory.offHeap.enabled=true \ - --conf spark.memory.offHeap.size=16g \ - --conf spark.gluten.sql.columnar.forceShuffledHashJoin=true \ - --conf spark.shuffle.manager=org.apache.spark.shuffle.sort.ColumnarShuffleManager \ - --conf spark.sql.session.timeZone=UTC \ - --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ - --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain \ - tpcbench.py \ - --name gluten \ - --benchmark tpch \ - --data $TPCH_DATA \ - --queries $TPCH_QUERIES \ - --output . \ - --iterations 1 diff --git a/dev/benchmarks/spark-tpcds.sh b/dev/benchmarks/spark-tpcds.sh deleted file mode 100755 index dad079ba23..0000000000 --- a/dev/benchmarks/spark-tpcds.sh +++ /dev/null @@ -1,45 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -$SPARK_HOME/sbin/stop-master.sh -$SPARK_HOME/sbin/stop-worker.sh - -$SPARK_HOME/sbin/start-master.sh -$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER - -$SPARK_HOME/bin/spark-submit \ - --master $SPARK_MASTER \ - --conf spark.driver.memory=8G \ - --conf spark.executor.instances=2 \ - --conf spark.executor.cores=8 \ - --conf spark.cores.max=16 \ - --conf spark.executor.memory=16g \ - --conf spark.memory.offHeap.enabled=true \ - --conf spark.memory.offHeap.size=16g \ - --conf spark.eventLog.enabled=true \ - --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ - --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain \ - tpcbench.py \ - --name spark \ - --benchmark tpcds \ - --data $TPCDS_DATA \ - --queries $TPCDS_QUERIES \ - --output . \ - --iterations 1 diff --git a/dev/benchmarks/spark-tpch.sh b/dev/benchmarks/spark-tpch.sh deleted file mode 100755 index ae359f049f..0000000000 --- a/dev/benchmarks/spark-tpch.sh +++ /dev/null @@ -1,46 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -$SPARK_HOME/sbin/stop-master.sh -$SPARK_HOME/sbin/stop-worker.sh - -$SPARK_HOME/sbin/start-master.sh -$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER - -$SPARK_HOME/bin/spark-submit \ - --master $SPARK_MASTER \ - --conf spark.driver.memory=8G \ - --conf spark.executor.instances=1 \ - --conf spark.executor.cores=8 \ - --conf spark.cores.max=8 \ - --conf spark.executor.memory=16g \ - --conf spark.memory.offHeap.enabled=true \ - --conf spark.memory.offHeap.size=16g \ - --conf spark.eventLog.enabled=true \ - --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ - --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain \ - tpcbench.py \ - --name spark \ - --benchmark tpch \ - --data $TPCH_DATA \ - --queries $TPCH_QUERIES \ - --output . \ - --iterations 1 \ - --format parquet diff --git a/docs/source/about/gluten_comparison.md b/docs/source/about/gluten_comparison.md index 492479bb9d..40c6c2741a 100644 --- a/docs/source/about/gluten_comparison.md +++ b/docs/source/about/gluten_comparison.md @@ -86,7 +86,7 @@ on your existing Spark jobs. ![tpch_allqueries_comet_gluten.png](/_static/images/tpch_allqueries_comet_gluten.png) -The scripts that were used to generate these results can be found [here](https://github.com/apache/datafusion-comet/tree/main/dev/benchmarks). +The scripts that were used to generate these results can be found [here](https://github.com/apache/datafusion-comet/tree/main/benchmarks/tpc). ## Ease of Development & Contributing diff --git a/docs/source/contributor-guide/benchmark-results/tpc-ds.md b/docs/source/contributor-guide/benchmark-results/tpc-ds.md index 66ff2e51a7..bea254cc08 100644 --- a/docs/source/contributor-guide/benchmark-results/tpc-ds.md +++ b/docs/source/contributor-guide/benchmark-results/tpc-ds.md @@ -46,4 +46,4 @@ The raw results of these benchmarks in JSON format is available here: - [Spark](spark-3.5.3-tpcds.json) - [Comet](comet-0.11.0-tpcds.json) -The scripts that were used to generate these results can be found [here](https://github.com/apache/datafusion-comet/tree/main/dev/benchmarks). +The scripts that were used to generate these results can be found [here](https://github.com/apache/datafusion-comet/tree/main/benchmarks/tpc). diff --git a/docs/source/contributor-guide/benchmark-results/tpc-h.md b/docs/source/contributor-guide/benchmark-results/tpc-h.md index 4424d9eac7..2170426c05 100644 --- a/docs/source/contributor-guide/benchmark-results/tpc-h.md +++ b/docs/source/contributor-guide/benchmark-results/tpc-h.md @@ -46,4 +46,4 @@ The raw results of these benchmarks in JSON format is available here: - [Spark](spark-3.5.3-tpch.json) - [Comet](comet-0.11.0-tpch.json) -The scripts that were used to generate these results can be found [here](https://github.com/apache/datafusion-comet/tree/main/dev/benchmarks). +The scripts that were used to generate these results can be found [here](https://github.com/apache/datafusion-comet/tree/main/benchmarks/tpc). diff --git a/docs/source/contributor-guide/benchmarking.md b/docs/source/contributor-guide/benchmarking.md index 768089c955..ce98fc4be6 100644 --- a/docs/source/contributor-guide/benchmarking.md +++ b/docs/source/contributor-guide/benchmarking.md @@ -21,7 +21,7 @@ under the License. To track progress on performance, we regularly run benchmarks derived from TPC-H and TPC-DS. -The benchmarking scripts are contained at [https://github.com/apache/datafusion-comet/tree/main/dev/benchmarks](https://github.com/apache/datafusion-comet/tree/main/dev/benchmarks). +The benchmarking scripts are contained at [https://github.com/apache/datafusion-comet/tree/main/benchmarks/tpc](https://github.com/apache/datafusion-comet/tree/main/benchmarks/tpc). Data generation scripts are available in the [DataFusion Benchmarks](https://github.com/apache/datafusion-benchmarks) GitHub repository. diff --git a/docs/source/contributor-guide/benchmarking_aws_ec2.md b/docs/source/contributor-guide/benchmarking_aws_ec2.md index 922b0379fe..81f15d64ea 100644 --- a/docs/source/contributor-guide/benchmarking_aws_ec2.md +++ b/docs/source/contributor-guide/benchmarking_aws_ec2.md @@ -109,23 +109,23 @@ export COMET_JAR=/home/ec2-user/datafusion-comet/spark/target/comet-spark-spark3 ## Run Benchmarks -Use the scripts in `dev/benchmarks` in the Comet repository. +Use the scripts in `benchmarks/tpc` in the Comet repository. ```shell -cd dev/benchmarks +cd benchmarks/tpc export TPCH_QUERIES=/home/ec2-user/datafusion-benchmarks/tpch/queries/ ``` Run Spark benchmark: ```shell -./spark-tpch.sh +python3 run.py --engine spark --benchmark tpch ``` Run Comet benchmark: ```shell -./comet-tpch.sh +python3 run.py --engine comet --benchmark tpch ``` ## Running Benchmarks with S3 @@ -164,4 +164,9 @@ Modify the scripts to add the following configurations. --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain \ ``` -Now run the `spark-tpch.sh` and `comet-tpch.sh` scripts. +Now run the benchmarks: + +```shell +python3 run.py --engine spark --benchmark tpch +python3 run.py --engine comet --benchmark tpch +``` From 93b9069d0d3c3274f5b8ee2c32e9d0465b9caa1b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 16 Feb 2026 15:10:10 -0700 Subject: [PATCH 2/7] Add TPC-DS support for Iceberg benchmarking - Rename create-iceberg-tpch.py to create-iceberg-tables.py with --benchmark flag supporting both tpch and tpcds table sets - Remove hardcoded TPCH_QUERIES from comet-iceberg.toml required env vars - Remove hardcoded ICEBERG_DATABASE default of "tpch" from comet-iceberg.toml - Add check_benchmark_env() in run.py to validate benchmark-specific env vars and default ICEBERG_DATABASE to the benchmark name - Update README with TPC-DS Iceberg table creation examples Co-Authored-By: Claude Opus 4.6 --- benchmarks/tpc/README.md | 30 ++++- benchmarks/tpc/create-iceberg-tables.py | 150 ++++++++++++++++++++++ benchmarks/tpc/create-iceberg-tpch.py | 88 ------------- benchmarks/tpc/engines/comet-iceberg.toml | 3 +- benchmarks/tpc/run.py | 24 ++++ 5 files changed, 199 insertions(+), 96 deletions(-) create mode 100644 benchmarks/tpc/create-iceberg-tables.py delete mode 100644 benchmarks/tpc/create-iceberg-tpch.py diff --git a/benchmarks/tpc/README.md b/benchmarks/tpc/README.md index 48df9c63d1..36752668bd 100644 --- a/benchmarks/tpc/README.md +++ b/benchmarks/tpc/README.md @@ -121,14 +121,15 @@ export ICEBERG_JAR=/path/to/iceberg-spark-runtime-3.5_2.12-1.8.1.jar Note: Table creation uses `--packages` which auto-downloads the dependency. -### Create Iceberg TPC-H tables +### Create Iceberg tables -Convert existing Parquet TPC-H data to Iceberg format: +Convert existing Parquet data to Iceberg format using `create-iceberg-tables.py`: ```shell export ICEBERG_WAREHOUSE=/mnt/bigdata/iceberg-warehouse export ICEBERG_CATALOG=${ICEBERG_CATALOG:-local} +# TPC-H $SPARK_HOME/bin/spark-submit \ --master $SPARK_MASTER \ --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \ @@ -140,10 +141,27 @@ $SPARK_HOME/bin/spark-submit \ --conf spark.sql.catalog.${ICEBERG_CATALOG}=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.${ICEBERG_CATALOG}.type=hadoop \ --conf spark.sql.catalog.${ICEBERG_CATALOG}.warehouse=$ICEBERG_WAREHOUSE \ - create-iceberg-tpch.py \ + create-iceberg-tables.py \ + --benchmark tpch \ --parquet-path $TPCH_DATA \ - --catalog $ICEBERG_CATALOG \ - --database tpch + --catalog $ICEBERG_CATALOG + +# TPC-DS +$SPARK_HOME/bin/spark-submit \ + --master $SPARK_MASTER \ + --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \ + --conf spark.driver.memory=8G \ + --conf spark.executor.instances=2 \ + --conf spark.executor.cores=8 \ + --conf spark.cores.max=16 \ + --conf spark.executor.memory=16g \ + --conf spark.sql.catalog.${ICEBERG_CATALOG}=org.apache.iceberg.spark.SparkCatalog \ + --conf spark.sql.catalog.${ICEBERG_CATALOG}.type=hadoop \ + --conf spark.sql.catalog.${ICEBERG_CATALOG}.warehouse=$ICEBERG_WAREHOUSE \ + create-iceberg-tables.py \ + --benchmark tpcds \ + --parquet-path $TPCDS_DATA \ + --catalog $ICEBERG_CATALOG ``` ### Run Iceberg benchmark @@ -167,7 +185,7 @@ physical plan output. | Environment Variable | Default | Description | | -------------------- | ---------- | ----------------------------------- | | `ICEBERG_CATALOG` | `local` | Iceberg catalog name | -| `ICEBERG_DATABASE` | `tpch` | Database containing TPC-H tables | +| `ICEBERG_DATABASE` | `tpch` | Database containing TPC tables | | `ICEBERG_WAREHOUSE` | (required) | Path to Iceberg warehouse directory | ### Comparing Parquet vs Iceberg performance diff --git a/benchmarks/tpc/create-iceberg-tables.py b/benchmarks/tpc/create-iceberg-tables.py new file mode 100644 index 0000000000..56002fe3b5 --- /dev/null +++ b/benchmarks/tpc/create-iceberg-tables.py @@ -0,0 +1,150 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Convert TPC-H or TPC-DS Parquet data to Iceberg tables. + +Usage: + spark-submit \ + --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \ + --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \ + --conf spark.sql.catalog.local.type=hadoop \ + --conf spark.sql.catalog.local.warehouse=/path/to/iceberg-warehouse \ + create-iceberg-tables.py \ + --benchmark tpch \ + --parquet-path /path/to/tpch/parquet \ + --catalog local \ + --database tpch + + spark-submit \ + --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \ + --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \ + --conf spark.sql.catalog.local.type=hadoop \ + --conf spark.sql.catalog.local.warehouse=/path/to/iceberg-warehouse \ + create-iceberg-tables.py \ + --benchmark tpcds \ + --parquet-path /path/to/tpcds/parquet \ + --catalog local \ + --database tpcds +""" + +import argparse +from pyspark.sql import SparkSession +import time + +TPCH_TABLES = [ + "customer", + "lineitem", + "nation", + "orders", + "part", + "partsupp", + "region", + "supplier", +] + +TPCDS_TABLES = [ + "call_center", + "catalog_page", + "catalog_returns", + "catalog_sales", + "customer", + "customer_address", + "customer_demographics", + "date_dim", + "time_dim", + "household_demographics", + "income_band", + "inventory", + "item", + "promotion", + "reason", + "ship_mode", + "store", + "store_returns", + "store_sales", + "warehouse", + "web_page", + "web_returns", + "web_sales", + "web_site", +] + +BENCHMARK_TABLES = { + "tpch": TPCH_TABLES, + "tpcds": TPCDS_TABLES, +} + + +def main(benchmark: str, parquet_path: str, catalog: str, database: str): + table_names = BENCHMARK_TABLES[benchmark] + + spark = SparkSession.builder \ + .appName(f"Create Iceberg {benchmark.upper()} Tables") \ + .getOrCreate() + + # Create database if it doesn't exist + spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalog}.{database}") + + for table in table_names: + parquet_table_path = f"{parquet_path}/{table}.parquet" + iceberg_table = f"{catalog}.{database}.{table}" + + print(f"Converting {parquet_table_path} -> {iceberg_table}") + start_time = time.time() + + # Drop table if exists to allow re-running + spark.sql(f"DROP TABLE IF EXISTS {iceberg_table}") + + # Read parquet and write as Iceberg + df = spark.read.parquet(parquet_table_path) + df.writeTo(iceberg_table).using("iceberg").create() + + row_count = spark.table(iceberg_table).count() + elapsed = time.time() - start_time + print(f" Created {iceberg_table} with {row_count} rows in {elapsed:.2f}s") + + print(f"\nAll {benchmark.upper()} tables created successfully!") + print(f"Tables available at: {catalog}.{database}.*") + + spark.stop() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Convert TPC-H or TPC-DS Parquet data to Iceberg tables" + ) + parser.add_argument( + "--benchmark", required=True, choices=["tpch", "tpcds"], + help="Benchmark whose tables to convert (tpch or tpcds)" + ) + parser.add_argument( + "--parquet-path", required=True, + help="Path to Parquet data directory" + ) + parser.add_argument( + "--catalog", required=True, + help="Iceberg catalog name (e.g., 'local')" + ) + parser.add_argument( + "--database", default=None, + help="Database name to create tables in (defaults to benchmark name)" + ) + args = parser.parse_args() + + database = args.database if args.database else args.benchmark + main(args.benchmark, args.parquet_path, args.catalog, database) diff --git a/benchmarks/tpc/create-iceberg-tpch.py b/benchmarks/tpc/create-iceberg-tpch.py deleted file mode 100644 index 44f0f63a2e..0000000000 --- a/benchmarks/tpc/create-iceberg-tpch.py +++ /dev/null @@ -1,88 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -""" -Convert TPC-H Parquet data to Iceberg tables. - -Usage: - spark-submit \ - --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \ - --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \ - --conf spark.sql.catalog.local.type=hadoop \ - --conf spark.sql.catalog.local.warehouse=/path/to/iceberg-warehouse \ - create-iceberg-tpch.py \ - --parquet-path /path/to/tpch/parquet \ - --catalog local \ - --database tpch -""" - -import argparse -from pyspark.sql import SparkSession -import time - - -def main(parquet_path: str, catalog: str, database: str): - spark = SparkSession.builder \ - .appName("Create Iceberg TPC-H Tables") \ - .getOrCreate() - - table_names = [ - "customer", - "lineitem", - "nation", - "orders", - "part", - "partsupp", - "region", - "supplier" - ] - - # Create database if it doesn't exist - spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalog}.{database}") - - for table in table_names: - parquet_table_path = f"{parquet_path}/{table}.parquet" - iceberg_table = f"{catalog}.{database}.{table}" - - print(f"Converting {parquet_table_path} -> {iceberg_table}") - start_time = time.time() - - # Drop table if exists to allow re-running - spark.sql(f"DROP TABLE IF EXISTS {iceberg_table}") - - # Read parquet and write as Iceberg - df = spark.read.parquet(parquet_table_path) - df.writeTo(iceberg_table).using("iceberg").create() - - row_count = spark.table(iceberg_table).count() - elapsed = time.time() - start_time - print(f" Created {iceberg_table} with {row_count} rows in {elapsed:.2f}s") - - print("\nAll TPC-H tables created successfully!") - print(f"Tables available at: {catalog}.{database}.*") - - spark.stop() - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Convert TPC-H Parquet data to Iceberg tables") - parser.add_argument("--parquet-path", required=True, help="Path to TPC-H Parquet data directory") - parser.add_argument("--catalog", required=True, help="Iceberg catalog name (e.g., 'local')") - parser.add_argument("--database", default="tpch", help="Database name to create tables in") - args = parser.parse_args() - - main(args.parquet_path, args.catalog, args.database) diff --git a/benchmarks/tpc/engines/comet-iceberg.toml b/benchmarks/tpc/engines/comet-iceberg.toml index ebabfce7b4..2e01270f13 100644 --- a/benchmarks/tpc/engines/comet-iceberg.toml +++ b/benchmarks/tpc/engines/comet-iceberg.toml @@ -19,11 +19,10 @@ name = "comet-iceberg" [env] -required = ["COMET_JAR", "ICEBERG_JAR", "ICEBERG_WAREHOUSE", "TPCH_QUERIES"] +required = ["COMET_JAR", "ICEBERG_JAR", "ICEBERG_WAREHOUSE"] [env.defaults] ICEBERG_CATALOG = "local" -ICEBERG_DATABASE = "tpch" [spark_submit] jars = ["$COMET_JAR", "$ICEBERG_JAR"] diff --git a/benchmarks/tpc/run.py b/benchmarks/tpc/run.py index 122f90f821..d2c65ce012 100755 --- a/benchmarks/tpc/run.py +++ b/benchmarks/tpc/run.py @@ -208,6 +208,29 @@ def check_common_env(): sys.exit(1) +def check_benchmark_env(config, benchmark): + """Validate benchmark-specific environment variables.""" + profile = BENCHMARK_PROFILES[benchmark] + use_iceberg = config.get("tpcbench_args", {}).get("use_iceberg", False) + + required = [profile["queries_env"]] + if not use_iceberg: + required.append(profile["data_env"]) + + missing = [v for v in required if not os.environ.get(v)] + if missing: + print( + f"Error: Required environment variable(s) not set for " + f"{benchmark}: {', '.join(missing)}", + file=sys.stderr, + ) + sys.exit(1) + + # Default ICEBERG_DATABASE to the benchmark name if not already set + if use_iceberg and not os.environ.get("ICEBERG_DATABASE"): + os.environ["ICEBERG_DATABASE"] = benchmark + + def build_spark_submit_cmd(config, benchmark, args): """Build the spark-submit command list.""" spark_home = os.environ["SPARK_HOME"] @@ -349,6 +372,7 @@ def main(): check_common_env() check_required_env(config) + check_benchmark_env(config, args.benchmark) # Restart Spark unless --no-restart or --dry-run if not args.no_restart and not args.dry_run: From 49be49442ff3688ecc7506ec9d8416e2bfc997b8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 16 Feb 2026 15:12:24 -0700 Subject: [PATCH 3/7] Remove blaze engine configuration and references Co-Authored-By: Claude Opus 4.6 --- benchmarks/tpc/README.md | 2 +- benchmarks/tpc/engines/blaze.toml | 36 ------------------------------- benchmarks/tpc/run.py | 2 +- 3 files changed, 2 insertions(+), 38 deletions(-) delete mode 100644 benchmarks/tpc/engines/blaze.toml diff --git a/benchmarks/tpc/README.md b/benchmarks/tpc/README.md index 36752668bd..374f240523 100644 --- a/benchmarks/tpc/README.md +++ b/benchmarks/tpc/README.md @@ -44,7 +44,7 @@ python3 run.py --engine --benchmark [options] | `--no-restart` | Skip Spark master/worker restart | | `--dry-run` | Print the spark-submit command without executing | -Available engines: `spark`, `comet`, `comet-iceberg`, `gluten`, `blaze` +Available engines: `spark`, `comet`, `comet-iceberg`, `gluten` ## Example usage diff --git a/benchmarks/tpc/engines/blaze.toml b/benchmarks/tpc/engines/blaze.toml deleted file mode 100644 index 7298a8f3ab..0000000000 --- a/benchmarks/tpc/engines/blaze.toml +++ /dev/null @@ -1,36 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -[engine] -name = "blaze" - -[env] -required = ["BLAZE_JAR"] - -[spark_submit] -jars = ["$BLAZE_JAR"] -driver_class_path = ["$BLAZE_JAR"] - -[spark_conf] -"spark.driver.extraClassPath" = "$BLAZE_JAR" -"spark.executor.extraClassPath" = "$BLAZE_JAR" -"spark.sql.extensions" = "org.apache.spark.sql.blaze.BlazeSparkSessionExtension" -"spark.shuffle.manager" = "org.apache.spark.sql.execution.blaze.shuffle.BlazeShuffleManager" -"spark.blaze.enable" = "true" -"spark.blaze.forceShuffledHashJoin" = "true" -"spark.executor.memoryOverhead" = "16g" -"spark.memory.offHeap.enabled" = "false" diff --git a/benchmarks/tpc/run.py b/benchmarks/tpc/run.py index d2c65ce012..41a2d5fdaf 100755 --- a/benchmarks/tpc/run.py +++ b/benchmarks/tpc/run.py @@ -21,7 +21,7 @@ Usage: python3 run.py --engine comet --benchmark tpch - python3 run.py --engine blaze --benchmark tpcds --iterations 3 + python3 run.py --engine comet --benchmark tpcds --iterations 3 python3 run.py --engine comet-iceberg --benchmark tpch python3 run.py --engine comet --benchmark tpch --dry-run python3 run.py --engine spark --benchmark tpch --no-restart From 233f919ba1a2a90fdf9cca84717d8466eee286c5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 16 Feb 2026 15:24:16 -0700 Subject: [PATCH 4/7] Format README.md with prettier Co-Authored-By: Claude Opus 4.6 --- benchmarks/tpc/README.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/benchmarks/tpc/README.md b/benchmarks/tpc/README.md index 374f240523..fbdb64fef6 100644 --- a/benchmarks/tpc/README.md +++ b/benchmarks/tpc/README.md @@ -34,15 +34,15 @@ All benchmarks are run via `run.py`: python3 run.py --engine --benchmark [options] ``` -| Option | Description | -| --------------- | ---------------------------------------- | -| `--engine` | Engine name (matches a TOML file in `engines/`) | -| `--benchmark` | `tpch` or `tpcds` | -| `--iterations` | Number of iterations (default: 1) | -| `--output` | Output directory (default: `.`) | -| `--query` | Run a single query number | -| `--no-restart` | Skip Spark master/worker restart | -| `--dry-run` | Print the spark-submit command without executing | +| Option | Description | +| -------------- | ------------------------------------------------ | +| `--engine` | Engine name (matches a TOML file in `engines/`) | +| `--benchmark` | `tpch` or `tpcds` | +| `--iterations` | Number of iterations (default: 1) | +| `--output` | Output directory (default: `.`) | +| `--query` | Run a single query number | +| `--no-restart` | Skip Spark master/worker restart | +| `--dry-run` | Print the spark-submit command without executing | Available engines: `spark`, `comet`, `comet-iceberg`, `gluten` From fe0308bb85cafd26b56956b91a52eafe0de12a73 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 17 Feb 2026 06:22:07 -0700 Subject: [PATCH 5/7] Move Iceberg catalog config into create-iceberg-tables.py The script now configures the Iceberg catalog via SparkSession.builder instead of requiring --conf flags on the spark-submit command line. This adds --warehouse as a required CLI arg, makes --catalog optional (default: local), and validates paths with clear error messages before starting Spark. Co-Authored-By: Claude Opus 4.6 --- benchmarks/tpc/README.md | 29 ++++++-------- benchmarks/tpc/create-iceberg-tables.py | 53 +++++++++++++++++-------- 2 files changed, 50 insertions(+), 32 deletions(-) diff --git a/benchmarks/tpc/README.md b/benchmarks/tpc/README.md index fbdb64fef6..e2b0e8bcde 100644 --- a/benchmarks/tpc/README.md +++ b/benchmarks/tpc/README.md @@ -123,11 +123,12 @@ Note: Table creation uses `--packages` which auto-downloads the dependency. ### Create Iceberg tables -Convert existing Parquet data to Iceberg format using `create-iceberg-tables.py`: +Convert existing Parquet data to Iceberg format using `create-iceberg-tables.py`. +The script configures the Iceberg catalog automatically -- no `--conf` flags needed. ```shell export ICEBERG_WAREHOUSE=/mnt/bigdata/iceberg-warehouse -export ICEBERG_CATALOG=${ICEBERG_CATALOG:-local} +mkdir -p $ICEBERG_WAREHOUSE # TPC-H $SPARK_HOME/bin/spark-submit \ @@ -138,13 +139,10 @@ $SPARK_HOME/bin/spark-submit \ --conf spark.executor.cores=8 \ --conf spark.cores.max=8 \ --conf spark.executor.memory=16g \ - --conf spark.sql.catalog.${ICEBERG_CATALOG}=org.apache.iceberg.spark.SparkCatalog \ - --conf spark.sql.catalog.${ICEBERG_CATALOG}.type=hadoop \ - --conf spark.sql.catalog.${ICEBERG_CATALOG}.warehouse=$ICEBERG_WAREHOUSE \ create-iceberg-tables.py \ --benchmark tpch \ --parquet-path $TPCH_DATA \ - --catalog $ICEBERG_CATALOG + --warehouse $ICEBERG_WAREHOUSE # TPC-DS $SPARK_HOME/bin/spark-submit \ @@ -155,13 +153,10 @@ $SPARK_HOME/bin/spark-submit \ --conf spark.executor.cores=8 \ --conf spark.cores.max=16 \ --conf spark.executor.memory=16g \ - --conf spark.sql.catalog.${ICEBERG_CATALOG}=org.apache.iceberg.spark.SparkCatalog \ - --conf spark.sql.catalog.${ICEBERG_CATALOG}.type=hadoop \ - --conf spark.sql.catalog.${ICEBERG_CATALOG}.warehouse=$ICEBERG_WAREHOUSE \ create-iceberg-tables.py \ --benchmark tpcds \ --parquet-path $TPCDS_DATA \ - --catalog $ICEBERG_CATALOG + --warehouse $ICEBERG_WAREHOUSE ``` ### Run Iceberg benchmark @@ -180,13 +175,15 @@ The benchmark uses `spark.comet.scan.icebergNative.enabled=true` to enable Comet integration. Verify native scanning is active by checking for `CometIcebergNativeScanExec` in the physical plan output. -### Iceberg-specific options +### create-iceberg-tables.py options -| Environment Variable | Default | Description | -| -------------------- | ---------- | ----------------------------------- | -| `ICEBERG_CATALOG` | `local` | Iceberg catalog name | -| `ICEBERG_DATABASE` | `tpch` | Database containing TPC tables | -| `ICEBERG_WAREHOUSE` | (required) | Path to Iceberg warehouse directory | +| Option | Required | Default | Description | +| ---------------- | -------- | ---------------- | ----------------------------------- | +| `--benchmark` | Yes | | `tpch` or `tpcds` | +| `--parquet-path` | Yes | | Path to source Parquet data | +| `--warehouse` | Yes | | Path to Iceberg warehouse directory | +| `--catalog` | No | `local` | Iceberg catalog name | +| `--database` | No | benchmark name | Database name for the tables | ### Comparing Parquet vs Iceberg performance diff --git a/benchmarks/tpc/create-iceberg-tables.py b/benchmarks/tpc/create-iceberg-tables.py index 56002fe3b5..219969bda7 100644 --- a/benchmarks/tpc/create-iceberg-tables.py +++ b/benchmarks/tpc/create-iceberg-tables.py @@ -21,28 +21,22 @@ Usage: spark-submit \ --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \ - --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \ - --conf spark.sql.catalog.local.type=hadoop \ - --conf spark.sql.catalog.local.warehouse=/path/to/iceberg-warehouse \ create-iceberg-tables.py \ --benchmark tpch \ --parquet-path /path/to/tpch/parquet \ - --catalog local \ - --database tpch + --warehouse /path/to/iceberg-warehouse spark-submit \ --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \ - --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \ - --conf spark.sql.catalog.local.type=hadoop \ - --conf spark.sql.catalog.local.warehouse=/path/to/iceberg-warehouse \ create-iceberg-tables.py \ --benchmark tpcds \ --parquet-path /path/to/tpcds/parquet \ - --catalog local \ - --database tpcds + --warehouse /path/to/iceberg-warehouse """ import argparse +import os +import sys from pyspark.sql import SparkSession import time @@ -90,15 +84,38 @@ } -def main(benchmark: str, parquet_path: str, catalog: str, database: str): +def main(benchmark: str, parquet_path: str, warehouse: str, catalog: str, database: str): table_names = BENCHMARK_TABLES[benchmark] + # Validate paths before starting Spark + errors = [] + if not os.path.isdir(parquet_path): + errors.append(f"Error: --parquet-path '{parquet_path}' does not exist or is not a directory") + if not os.path.isdir(warehouse): + errors.append(f"Error: --warehouse '{warehouse}' does not exist or is not a directory. " + "Create it with: mkdir -p " + warehouse) + if errors: + for e in errors: + print(e, file=sys.stderr) + sys.exit(1) + spark = SparkSession.builder \ .appName(f"Create Iceberg {benchmark.upper()} Tables") \ + .config(f"spark.sql.catalog.{catalog}", "org.apache.iceberg.spark.SparkCatalog") \ + .config(f"spark.sql.catalog.{catalog}.type", "hadoop") \ + .config(f"spark.sql.catalog.{catalog}.warehouse", warehouse) \ .getOrCreate() - # Create database if it doesn't exist - spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalog}.{database}") + # Set the Iceberg catalog as the current catalog so that + # namespace operations are routed correctly + spark.sql(f"USE {catalog}") + + # Create namespace if it doesn't exist + try: + spark.sql(f"CREATE NAMESPACE IF NOT EXISTS {database}") + except Exception: + # Namespace may already exist + pass for table in table_names: parquet_table_path = f"{parquet_path}/{table}.parquet" @@ -137,8 +154,12 @@ def main(benchmark: str, parquet_path: str, catalog: str, database: str): help="Path to Parquet data directory" ) parser.add_argument( - "--catalog", required=True, - help="Iceberg catalog name (e.g., 'local')" + "--warehouse", required=True, + help="Path to Iceberg warehouse directory" + ) + parser.add_argument( + "--catalog", default="local", + help="Iceberg catalog name (default: 'local')" ) parser.add_argument( "--database", default=None, @@ -147,4 +168,4 @@ def main(benchmark: str, parquet_path: str, catalog: str, database: str): args = parser.parse_args() database = args.database if args.database else args.benchmark - main(args.benchmark, args.parquet_path, args.catalog, database) + main(args.benchmark, args.parquet_path, args.warehouse, args.catalog, database) From b4f2af02901de1cec7b72a6bd5e7c627633a75ec Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 17 Feb 2026 06:33:25 -0700 Subject: [PATCH 6/7] prettier --- benchmarks/tpc/README.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/benchmarks/tpc/README.md b/benchmarks/tpc/README.md index e2b0e8bcde..779ad1753a 100644 --- a/benchmarks/tpc/README.md +++ b/benchmarks/tpc/README.md @@ -177,13 +177,13 @@ physical plan output. ### create-iceberg-tables.py options -| Option | Required | Default | Description | -| ---------------- | -------- | ---------------- | ----------------------------------- | -| `--benchmark` | Yes | | `tpch` or `tpcds` | -| `--parquet-path` | Yes | | Path to source Parquet data | -| `--warehouse` | Yes | | Path to Iceberg warehouse directory | -| `--catalog` | No | `local` | Iceberg catalog name | -| `--database` | No | benchmark name | Database name for the tables | +| Option | Required | Default | Description | +| ---------------- | -------- | -------------- | ----------------------------------- | +| `--benchmark` | Yes | | `tpch` or `tpcds` | +| `--parquet-path` | Yes | | Path to source Parquet data | +| `--warehouse` | Yes | | Path to Iceberg warehouse directory | +| `--catalog` | No | `local` | Iceberg catalog name | +| `--database` | No | benchmark name | Database name for the tables | ### Comparing Parquet vs Iceberg performance From 482aaf30873d3d09b7f6d1d75fd70cca1ab10af3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 20 Feb 2026 14:36:10 -0700 Subject: [PATCH 7/7] Update docs/source/contributor-guide/benchmarking.md Co-authored-by: Oleks V --- docs/source/contributor-guide/benchmarking.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/contributor-guide/benchmarking.md b/docs/source/contributor-guide/benchmarking.md index ce98fc4be6..49af73376f 100644 --- a/docs/source/contributor-guide/benchmarking.md +++ b/docs/source/contributor-guide/benchmarking.md @@ -21,7 +21,7 @@ under the License. To track progress on performance, we regularly run benchmarks derived from TPC-H and TPC-DS. -The benchmarking scripts are contained at [https://github.com/apache/datafusion-comet/tree/main/benchmarks/tpc](https://github.com/apache/datafusion-comet/tree/main/benchmarks/tpc). +The benchmarking scripts are contained [here](https://github.com/apache/datafusion-comet/tree/main/benchmarks/tpc). Data generation scripts are available in the [DataFusion Benchmarks](https://github.com/apache/datafusion-benchmarks) GitHub repository.