diff --git a/dev/benchmarks/.gitignore b/benchmarks/tpc/.gitignore similarity index 100% rename from dev/benchmarks/.gitignore rename to benchmarks/tpc/.gitignore diff --git a/dev/benchmarks/README.md b/benchmarks/tpc/README.md similarity index 59% rename from dev/benchmarks/README.md rename to benchmarks/tpc/README.md index b3ea674199..779ad1753a 100644 --- a/dev/benchmarks/README.md +++ b/benchmarks/tpc/README.md @@ -26,6 +26,26 @@ For full instructions on running these benchmarks on an EC2 instance, see the [C [Comet Benchmarking on EC2 Guide]: https://datafusion.apache.org/comet/contributor-guide/benchmarking_aws_ec2.html +## Usage + +All benchmarks are run via `run.py`: + +``` +python3 run.py --engine --benchmark [options] +``` + +| Option | Description | +| -------------- | ------------------------------------------------ | +| `--engine` | Engine name (matches a TOML file in `engines/`) | +| `--benchmark` | `tpch` or `tpcds` | +| `--iterations` | Number of iterations (default: 1) | +| `--output` | Output directory (default: `.`) | +| `--query` | Run a single query number | +| `--no-restart` | Skip Spark master/worker restart | +| `--dry-run` | Print the spark-submit command without executing | + +Available engines: `spark`, `comet`, `comet-iceberg`, `gluten` + ## Example usage Set Spark environment variables: @@ -47,7 +67,7 @@ Run Spark benchmark: ```shell export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 sudo ./drop-caches.sh -./spark-tpch.sh +python3 run.py --engine spark --benchmark tpch ``` Run Comet benchmark: @@ -56,7 +76,7 @@ Run Comet benchmark: export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 export COMET_JAR=/opt/comet/comet-spark-spark3.5_2.12-0.10.0.jar sudo ./drop-caches.sh -./comet-tpch.sh +python3 run.py --engine comet --benchmark tpch ``` Run Gluten benchmark: @@ -65,7 +85,13 @@ Run Gluten benchmark: export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 export GLUTEN_JAR=/opt/gluten/gluten-velox-bundle-spark3.5_2.12-linux_amd64-1.4.0.jar sudo ./drop-caches.sh -./gluten-tpch.sh +python3 run.py --engine gluten --benchmark tpch +``` + +Preview a command without running it: + +```shell +python3 run.py --engine comet --benchmark tpch --dry-run ``` Generating charts: @@ -74,6 +100,11 @@ Generating charts: python3 generate-comparison.py --benchmark tpch --labels "Spark 3.5.3" "Comet 0.9.0" "Gluten 1.4.0" --title "TPC-H @ 100 GB (single executor, 8 cores, local Parquet files)" spark-tpch-1752338506381.json comet-tpch-1752337818039.json gluten-tpch-1752337474344.json ``` +## Engine Configuration + +Each engine is defined by a TOML file in `engines/`. The config specifies JARs, Spark conf overrides, +required environment variables, and optional defaults/exports. See existing files for examples. + ## Iceberg Benchmarking Comet includes native Iceberg support via iceberg-rust integration. This enables benchmarking TPC-H queries @@ -90,14 +121,16 @@ export ICEBERG_JAR=/path/to/iceberg-spark-runtime-3.5_2.12-1.8.1.jar Note: Table creation uses `--packages` which auto-downloads the dependency. -### Create Iceberg TPC-H tables +### Create Iceberg tables -Convert existing Parquet TPC-H data to Iceberg format: +Convert existing Parquet data to Iceberg format using `create-iceberg-tables.py`. +The script configures the Iceberg catalog automatically -- no `--conf` flags needed. ```shell export ICEBERG_WAREHOUSE=/mnt/bigdata/iceberg-warehouse -export ICEBERG_CATALOG=${ICEBERG_CATALOG:-local} +mkdir -p $ICEBERG_WAREHOUSE +# TPC-H $SPARK_HOME/bin/spark-submit \ --master $SPARK_MASTER \ --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \ @@ -106,13 +139,24 @@ $SPARK_HOME/bin/spark-submit \ --conf spark.executor.cores=8 \ --conf spark.cores.max=8 \ --conf spark.executor.memory=16g \ - --conf spark.sql.catalog.${ICEBERG_CATALOG}=org.apache.iceberg.spark.SparkCatalog \ - --conf spark.sql.catalog.${ICEBERG_CATALOG}.type=hadoop \ - --conf spark.sql.catalog.${ICEBERG_CATALOG}.warehouse=$ICEBERG_WAREHOUSE \ - create-iceberg-tpch.py \ + create-iceberg-tables.py \ + --benchmark tpch \ --parquet-path $TPCH_DATA \ - --catalog $ICEBERG_CATALOG \ - --database tpch + --warehouse $ICEBERG_WAREHOUSE + +# TPC-DS +$SPARK_HOME/bin/spark-submit \ + --master $SPARK_MASTER \ + --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \ + --conf spark.driver.memory=8G \ + --conf spark.executor.instances=2 \ + --conf spark.executor.cores=8 \ + --conf spark.cores.max=16 \ + --conf spark.executor.memory=16g \ + create-iceberg-tables.py \ + --benchmark tpcds \ + --parquet-path $TPCDS_DATA \ + --warehouse $ICEBERG_WAREHOUSE ``` ### Run Iceberg benchmark @@ -124,20 +168,22 @@ export ICEBERG_JAR=/path/to/iceberg-spark-runtime-3.5_2.12-1.8.1.jar export ICEBERG_WAREHOUSE=/mnt/bigdata/iceberg-warehouse export TPCH_QUERIES=/mnt/bigdata/tpch/queries/ sudo ./drop-caches.sh -./comet-tpch-iceberg.sh +python3 run.py --engine comet-iceberg --benchmark tpch ``` The benchmark uses `spark.comet.scan.icebergNative.enabled=true` to enable Comet's native iceberg-rust integration. Verify native scanning is active by checking for `CometIcebergNativeScanExec` in the physical plan output. -### Iceberg-specific options +### create-iceberg-tables.py options -| Environment Variable | Default | Description | -| -------------------- | ---------- | ----------------------------------- | -| `ICEBERG_CATALOG` | `local` | Iceberg catalog name | -| `ICEBERG_DATABASE` | `tpch` | Database containing TPC-H tables | -| `ICEBERG_WAREHOUSE` | (required) | Path to Iceberg warehouse directory | +| Option | Required | Default | Description | +| ---------------- | -------- | -------------- | ----------------------------------- | +| `--benchmark` | Yes | | `tpch` or `tpcds` | +| `--parquet-path` | Yes | | Path to source Parquet data | +| `--warehouse` | Yes | | Path to Iceberg warehouse directory | +| `--catalog` | No | `local` | Iceberg catalog name | +| `--database` | No | benchmark name | Database name for the tables | ### Comparing Parquet vs Iceberg performance diff --git a/benchmarks/tpc/create-iceberg-tables.py b/benchmarks/tpc/create-iceberg-tables.py new file mode 100644 index 0000000000..219969bda7 --- /dev/null +++ b/benchmarks/tpc/create-iceberg-tables.py @@ -0,0 +1,171 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Convert TPC-H or TPC-DS Parquet data to Iceberg tables. + +Usage: + spark-submit \ + --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \ + create-iceberg-tables.py \ + --benchmark tpch \ + --parquet-path /path/to/tpch/parquet \ + --warehouse /path/to/iceberg-warehouse + + spark-submit \ + --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \ + create-iceberg-tables.py \ + --benchmark tpcds \ + --parquet-path /path/to/tpcds/parquet \ + --warehouse /path/to/iceberg-warehouse +""" + +import argparse +import os +import sys +from pyspark.sql import SparkSession +import time + +TPCH_TABLES = [ + "customer", + "lineitem", + "nation", + "orders", + "part", + "partsupp", + "region", + "supplier", +] + +TPCDS_TABLES = [ + "call_center", + "catalog_page", + "catalog_returns", + "catalog_sales", + "customer", + "customer_address", + "customer_demographics", + "date_dim", + "time_dim", + "household_demographics", + "income_band", + "inventory", + "item", + "promotion", + "reason", + "ship_mode", + "store", + "store_returns", + "store_sales", + "warehouse", + "web_page", + "web_returns", + "web_sales", + "web_site", +] + +BENCHMARK_TABLES = { + "tpch": TPCH_TABLES, + "tpcds": TPCDS_TABLES, +} + + +def main(benchmark: str, parquet_path: str, warehouse: str, catalog: str, database: str): + table_names = BENCHMARK_TABLES[benchmark] + + # Validate paths before starting Spark + errors = [] + if not os.path.isdir(parquet_path): + errors.append(f"Error: --parquet-path '{parquet_path}' does not exist or is not a directory") + if not os.path.isdir(warehouse): + errors.append(f"Error: --warehouse '{warehouse}' does not exist or is not a directory. " + "Create it with: mkdir -p " + warehouse) + if errors: + for e in errors: + print(e, file=sys.stderr) + sys.exit(1) + + spark = SparkSession.builder \ + .appName(f"Create Iceberg {benchmark.upper()} Tables") \ + .config(f"spark.sql.catalog.{catalog}", "org.apache.iceberg.spark.SparkCatalog") \ + .config(f"spark.sql.catalog.{catalog}.type", "hadoop") \ + .config(f"spark.sql.catalog.{catalog}.warehouse", warehouse) \ + .getOrCreate() + + # Set the Iceberg catalog as the current catalog so that + # namespace operations are routed correctly + spark.sql(f"USE {catalog}") + + # Create namespace if it doesn't exist + try: + spark.sql(f"CREATE NAMESPACE IF NOT EXISTS {database}") + except Exception: + # Namespace may already exist + pass + + for table in table_names: + parquet_table_path = f"{parquet_path}/{table}.parquet" + iceberg_table = f"{catalog}.{database}.{table}" + + print(f"Converting {parquet_table_path} -> {iceberg_table}") + start_time = time.time() + + # Drop table if exists to allow re-running + spark.sql(f"DROP TABLE IF EXISTS {iceberg_table}") + + # Read parquet and write as Iceberg + df = spark.read.parquet(parquet_table_path) + df.writeTo(iceberg_table).using("iceberg").create() + + row_count = spark.table(iceberg_table).count() + elapsed = time.time() - start_time + print(f" Created {iceberg_table} with {row_count} rows in {elapsed:.2f}s") + + print(f"\nAll {benchmark.upper()} tables created successfully!") + print(f"Tables available at: {catalog}.{database}.*") + + spark.stop() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Convert TPC-H or TPC-DS Parquet data to Iceberg tables" + ) + parser.add_argument( + "--benchmark", required=True, choices=["tpch", "tpcds"], + help="Benchmark whose tables to convert (tpch or tpcds)" + ) + parser.add_argument( + "--parquet-path", required=True, + help="Path to Parquet data directory" + ) + parser.add_argument( + "--warehouse", required=True, + help="Path to Iceberg warehouse directory" + ) + parser.add_argument( + "--catalog", default="local", + help="Iceberg catalog name (default: 'local')" + ) + parser.add_argument( + "--database", default=None, + help="Database name to create tables in (defaults to benchmark name)" + ) + args = parser.parse_args() + + database = args.database if args.database else args.benchmark + main(args.benchmark, args.parquet_path, args.warehouse, args.catalog, database) diff --git a/dev/benchmarks/drop-caches.sh b/benchmarks/tpc/drop-caches.sh similarity index 100% rename from dev/benchmarks/drop-caches.sh rename to benchmarks/tpc/drop-caches.sh diff --git a/benchmarks/tpc/engines/comet-iceberg.toml b/benchmarks/tpc/engines/comet-iceberg.toml new file mode 100644 index 0000000000..2e01270f13 --- /dev/null +++ b/benchmarks/tpc/engines/comet-iceberg.toml @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[engine] +name = "comet-iceberg" + +[env] +required = ["COMET_JAR", "ICEBERG_JAR", "ICEBERG_WAREHOUSE"] + +[env.defaults] +ICEBERG_CATALOG = "local" + +[spark_submit] +jars = ["$COMET_JAR", "$ICEBERG_JAR"] +driver_class_path = ["$COMET_JAR", "$ICEBERG_JAR"] + +[spark_conf] +"spark.driver.extraClassPath" = "$COMET_JAR:$ICEBERG_JAR" +"spark.executor.extraClassPath" = "$COMET_JAR:$ICEBERG_JAR" +"spark.plugins" = "org.apache.spark.CometPlugin" +"spark.shuffle.manager" = "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager" +"spark.comet.exec.replaceSortMergeJoin" = "true" +"spark.comet.expression.Cast.allowIncompatible" = "true" +"spark.comet.enabled" = "true" +"spark.comet.exec.enabled" = "true" +"spark.comet.scan.icebergNative.enabled" = "true" +"spark.comet.explainFallback.enabled" = "true" +"spark.sql.catalog.${ICEBERG_CATALOG}" = "org.apache.iceberg.spark.SparkCatalog" +"spark.sql.catalog.${ICEBERG_CATALOG}.type" = "hadoop" +"spark.sql.catalog.${ICEBERG_CATALOG}.warehouse" = "$ICEBERG_WAREHOUSE" +"spark.sql.defaultCatalog" = "${ICEBERG_CATALOG}" + +[tpcbench_args] +use_iceberg = true diff --git a/benchmarks/tpc/engines/comet.toml b/benchmarks/tpc/engines/comet.toml new file mode 100644 index 0000000000..8e19165ebf --- /dev/null +++ b/benchmarks/tpc/engines/comet.toml @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[engine] +name = "comet" + +[env] +required = ["COMET_JAR"] + +[spark_submit] +jars = ["$COMET_JAR"] +driver_class_path = ["$COMET_JAR"] + +[spark_conf] +"spark.driver.extraClassPath" = "$COMET_JAR" +"spark.executor.extraClassPath" = "$COMET_JAR" +"spark.plugins" = "org.apache.spark.CometPlugin" +"spark.shuffle.manager" = "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager" +"spark.comet.scan.impl" = "native_datafusion" +"spark.comet.exec.replaceSortMergeJoin" = "true" +"spark.comet.expression.Cast.allowIncompatible" = "true" diff --git a/benchmarks/tpc/engines/gluten.toml b/benchmarks/tpc/engines/gluten.toml new file mode 100644 index 0000000000..20165788c2 --- /dev/null +++ b/benchmarks/tpc/engines/gluten.toml @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[engine] +name = "gluten" + +[env] +required = ["GLUTEN_JAR"] +exports = { TZ = "UTC" } + +[spark_submit] +jars = ["$GLUTEN_JAR"] + +[spark_conf] +"spark.plugins" = "org.apache.gluten.GlutenPlugin" +"spark.driver.extraClassPath" = "${GLUTEN_JAR}" +"spark.executor.extraClassPath" = "${GLUTEN_JAR}" +"spark.gluten.sql.columnar.forceShuffledHashJoin" = "true" +"spark.shuffle.manager" = "org.apache.spark.shuffle.sort.ColumnarShuffleManager" +"spark.sql.session.timeZone" = "UTC" diff --git a/benchmarks/tpc/engines/spark.toml b/benchmarks/tpc/engines/spark.toml new file mode 100644 index 0000000000..c02e7a6ad2 --- /dev/null +++ b/benchmarks/tpc/engines/spark.toml @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[engine] +name = "spark" diff --git a/dev/benchmarks/generate-comparison.py b/benchmarks/tpc/generate-comparison.py similarity index 100% rename from dev/benchmarks/generate-comparison.py rename to benchmarks/tpc/generate-comparison.py diff --git a/benchmarks/tpc/run.py b/benchmarks/tpc/run.py new file mode 100755 index 0000000000..41a2d5fdaf --- /dev/null +++ b/benchmarks/tpc/run.py @@ -0,0 +1,402 @@ +#!/usr/bin/env python3 +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Consolidated TPC benchmark runner for Spark-based engines. + +Usage: + python3 run.py --engine comet --benchmark tpch + python3 run.py --engine comet --benchmark tpcds --iterations 3 + python3 run.py --engine comet-iceberg --benchmark tpch + python3 run.py --engine comet --benchmark tpch --dry-run + python3 run.py --engine spark --benchmark tpch --no-restart +""" + +import argparse +import os +import re +import subprocess +import sys + +# --------------------------------------------------------------------------- +# TOML loading – prefer stdlib tomllib (3.11+), else minimal fallback +# --------------------------------------------------------------------------- + +try: + import tomllib # Python 3.11+ + + def load_toml(path): + with open(path, "rb") as f: + return tomllib.load(f) + +except ModuleNotFoundError: + + def _parse_toml(text): + """Minimal TOML parser supporting tables, quoted-key strings, plain + strings, arrays of strings, booleans, and comments. Sufficient for + the engine config files used by this runner.""" + root = {} + current = root + for line in text.splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + # Table header: [env.defaults] or [spark_conf] + m = re.match(r"^\[([^\]]+)\]$", line) + if m: + keys = m.group(1).split(".") + current = root + for k in keys: + current = current.setdefault(k, {}) + continue + # Key = value + m = re.match(r'^("(?:[^"\\]|\\.)*"|[A-Za-z0-9_.]+)\s*=\s*(.+)$', line) + if not m: + continue + raw_key, raw_val = m.group(1), m.group(2).strip() + key = raw_key.strip('"') + val = _parse_value(raw_val) + current[key] = val + return root + + def _parse_value(raw): + if raw == "true": + return True + if raw == "false": + return False + if raw.startswith('"') and raw.endswith('"'): + return raw[1:-1] + if raw.startswith("["): + # Simple array of strings + items = [] + for m in re.finditer(r'"((?:[^"\\]|\\.)*)"', raw): + items.append(m.group(1)) + return items + if raw.startswith("{"): + # Inline table: { KEY = "VAL", ... } + tbl = {} + for m in re.finditer(r'([A-Za-z0-9_]+)\s*=\s*"((?:[^"\\]|\\.)*)"', raw): + tbl[m.group(1)] = m.group(2) + return tbl + return raw + + def load_toml(path): + with open(path, "r") as f: + return _parse_toml(f.read()) + + +# --------------------------------------------------------------------------- +# Common Spark configuration (shared across all engines) +# --------------------------------------------------------------------------- + +COMMON_SPARK_CONF = { + "spark.driver.memory": "8G", + "spark.executor.memory": "16g", + "spark.memory.offHeap.enabled": "true", + "spark.memory.offHeap.size": "16g", + "spark.eventLog.enabled": "true", + "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", + "spark.hadoop.fs.s3a.aws.credentials.provider": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain", +} + +# --------------------------------------------------------------------------- +# Benchmark profiles +# --------------------------------------------------------------------------- + +BENCHMARK_PROFILES = { + "tpch": { + "executor_instances": "1", + "executor_cores": "8", + "max_cores": "8", + "data_env": "TPCH_DATA", + "queries_env": "TPCH_QUERIES", + "format": "parquet", + }, + "tpcds": { + "executor_instances": "2", + "executor_cores": "8", + "max_cores": "16", + "data_env": "TPCDS_DATA", + "queries_env": "TPCDS_QUERIES", + "format": None, # omit --format for TPC-DS + }, +} + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def resolve_env(value): + """Expand $VAR and ${VAR} references using os.environ.""" + if not isinstance(value, str): + return value + return re.sub( + r"\$\{([^}]+)\}|\$([A-Za-z_][A-Za-z0-9_]*)", + lambda m: os.environ.get(m.group(1) or m.group(2), ""), + value, + ) + + +def resolve_env_in_list(lst): + return [resolve_env(v) for v in lst] + + +def load_engine_config(engine_name): + """Load and return the TOML config for the given engine.""" + script_dir = os.path.dirname(os.path.abspath(__file__)) + config_path = os.path.join(script_dir, "engines", f"{engine_name}.toml") + if not os.path.exists(config_path): + available = sorted( + f.removesuffix(".toml") + for f in os.listdir(os.path.join(script_dir, "engines")) + if f.endswith(".toml") + ) + print(f"Error: Unknown engine '{engine_name}'", file=sys.stderr) + print(f"Available engines: {', '.join(available)}", file=sys.stderr) + sys.exit(1) + return load_toml(config_path) + + +def apply_env_defaults(config): + """Set environment variable defaults from [env.defaults].""" + defaults = config.get("env", {}).get("defaults", {}) + for key, val in defaults.items(): + if key not in os.environ: + os.environ[key] = val + + +def apply_env_exports(config): + """Export environment variables from [env.exports].""" + exports = config.get("env", {}).get("exports", {}) + for key, val in exports.items(): + os.environ[key] = val + + +def check_required_env(config): + """Validate that required environment variables are set.""" + required = config.get("env", {}).get("required", []) + missing = [v for v in required if not os.environ.get(v)] + if missing: + print( + f"Error: Required environment variable(s) not set: {', '.join(missing)}", + file=sys.stderr, + ) + sys.exit(1) + + +def check_common_env(): + """Validate SPARK_HOME and SPARK_MASTER are set.""" + for var in ("SPARK_HOME", "SPARK_MASTER"): + if not os.environ.get(var): + print(f"Error: {var} is not set", file=sys.stderr) + sys.exit(1) + + +def check_benchmark_env(config, benchmark): + """Validate benchmark-specific environment variables.""" + profile = BENCHMARK_PROFILES[benchmark] + use_iceberg = config.get("tpcbench_args", {}).get("use_iceberg", False) + + required = [profile["queries_env"]] + if not use_iceberg: + required.append(profile["data_env"]) + + missing = [v for v in required if not os.environ.get(v)] + if missing: + print( + f"Error: Required environment variable(s) not set for " + f"{benchmark}: {', '.join(missing)}", + file=sys.stderr, + ) + sys.exit(1) + + # Default ICEBERG_DATABASE to the benchmark name if not already set + if use_iceberg and not os.environ.get("ICEBERG_DATABASE"): + os.environ["ICEBERG_DATABASE"] = benchmark + + +def build_spark_submit_cmd(config, benchmark, args): + """Build the spark-submit command list.""" + spark_home = os.environ["SPARK_HOME"] + spark_master = os.environ["SPARK_MASTER"] + profile = BENCHMARK_PROFILES[benchmark] + + cmd = [os.path.join(spark_home, "bin", "spark-submit")] + cmd += ["--master", spark_master] + + # --jars + jars = config.get("spark_submit", {}).get("jars", []) + if jars: + cmd += ["--jars", ",".join(resolve_env_in_list(jars))] + + # --driver-class-path + driver_cp = config.get("spark_submit", {}).get("driver_class_path", []) + if driver_cp: + cmd += ["--driver-class-path", ":".join(resolve_env_in_list(driver_cp))] + + # Merge spark confs: common + benchmark profile + engine overrides + conf = dict(COMMON_SPARK_CONF) + conf["spark.executor.instances"] = profile["executor_instances"] + conf["spark.executor.cores"] = profile["executor_cores"] + conf["spark.cores.max"] = profile["max_cores"] + + engine_conf = config.get("spark_conf", {}) + for key, val in engine_conf.items(): + if isinstance(val, bool): + val = "true" if val else "false" + conf[resolve_env(key)] = resolve_env(str(val)) + + for key, val in sorted(conf.items()): + cmd += ["--conf", f"{key}={val}"] + + # tpcbench.py path + cmd.append("tpcbench.py") + + # tpcbench args + engine_name = config.get("engine", {}).get("name", args.engine) + cmd += ["--name", engine_name] + cmd += ["--benchmark", benchmark] + + use_iceberg = config.get("tpcbench_args", {}).get("use_iceberg", False) + if use_iceberg: + cmd += ["--catalog", resolve_env("${ICEBERG_CATALOG}")] + cmd += ["--database", resolve_env("${ICEBERG_DATABASE}")] + else: + data_var = profile["data_env"] + data_val = os.environ.get(data_var, "") + cmd += ["--data", data_val] + + queries_var = profile["queries_env"] + queries_val = os.environ.get(queries_var, "") + cmd += ["--queries", queries_val] + + cmd += ["--output", args.output] + cmd += ["--iterations", str(args.iterations)] + + if args.query is not None: + cmd += ["--query", str(args.query)] + + if profile["format"] and not use_iceberg: + cmd += ["--format", profile["format"]] + + return cmd + + +def restart_spark(): + """Stop and start Spark master and worker.""" + spark_home = os.environ["SPARK_HOME"] + sbin = os.path.join(spark_home, "sbin") + spark_master = os.environ["SPARK_MASTER"] + + # Stop (ignore errors) + subprocess.run( + [os.path.join(sbin, "stop-master.sh")], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + subprocess.run( + [os.path.join(sbin, "stop-worker.sh")], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + # Start (check errors) + r = subprocess.run([os.path.join(sbin, "start-master.sh")]) + if r.returncode != 0: + print("Error: Failed to start Spark master", file=sys.stderr) + sys.exit(1) + + r = subprocess.run([os.path.join(sbin, "start-worker.sh"), spark_master]) + if r.returncode != 0: + print("Error: Failed to start Spark worker", file=sys.stderr) + sys.exit(1) + + +def main(): + parser = argparse.ArgumentParser( + description="Consolidated TPC benchmark runner for Spark-based engines." + ) + parser.add_argument( + "--engine", + required=True, + help="Engine name (matches a TOML file in engines/)", + ) + parser.add_argument( + "--benchmark", + required=True, + choices=["tpch", "tpcds"], + help="Benchmark to run", + ) + parser.add_argument( + "--iterations", type=int, default=1, help="Number of iterations (default: 1)" + ) + parser.add_argument( + "--output", default=".", help="Output directory (default: .)" + ) + parser.add_argument( + "--query", type=int, default=None, help="Run a single query number" + ) + parser.add_argument( + "--no-restart", + action="store_true", + help="Skip Spark master/worker restart", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Print the spark-submit command without executing", + ) + args = parser.parse_args() + + config = load_engine_config(args.engine) + + # Apply env defaults and exports before validation + apply_env_defaults(config) + apply_env_exports(config) + + check_common_env() + check_required_env(config) + check_benchmark_env(config, args.benchmark) + + # Restart Spark unless --no-restart or --dry-run + if not args.no_restart and not args.dry_run: + restart_spark() + + cmd = build_spark_submit_cmd(config, args.benchmark, args) + + if args.dry_run: + # Group paired arguments (e.g. --conf key=value) on one line + parts = [] + i = 0 + while i < len(cmd): + token = cmd[i] + if token.startswith("--") and i + 1 < len(cmd) and not cmd[i + 1].startswith("--"): + parts.append(f"{token} {cmd[i + 1]}") + i += 2 + else: + parts.append(token) + i += 1 + print(" \\\n ".join(parts)) + else: + r = subprocess.run(cmd) + sys.exit(r.returncode) + + +if __name__ == "__main__": + main() diff --git a/dev/benchmarks/tpcbench.py b/benchmarks/tpc/tpcbench.py similarity index 100% rename from dev/benchmarks/tpcbench.py rename to benchmarks/tpc/tpcbench.py diff --git a/dev/benchmarks/blaze-tpcds.sh b/dev/benchmarks/blaze-tpcds.sh deleted file mode 100755 index 90a4a48468..0000000000 --- a/dev/benchmarks/blaze-tpcds.sh +++ /dev/null @@ -1,53 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -$SPARK_HOME/sbin/stop-master.sh -$SPARK_HOME/sbin/stop-worker.sh - -$SPARK_HOME/sbin/start-master.sh -$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER - -$SPARK_HOME/bin/spark-submit \ - --master $SPARK_MASTER \ - --jars $BLAZE_JAR \ - --driver-class-path $BLAZE_JAR \ - --conf spark.driver.memory=8G \ - --conf spark.executor.instances=2 \ - --conf spark.executor.cores=8 \ - --conf spark.cores.max=16 \ - --conf spark.executor.memory=16g \ - --conf spark.executor.memoryOverhead=16g \ - --conf spark.memory.offHeap.enabled=false \ - --conf spark.eventLog.enabled=true \ - --conf spark.driver.extraClassPath=$BLAZE_JAR \ - --conf spark.executor.extraClassPath=$BLAZE_JAR \ - --conf spark.sql.extensions=org.apache.spark.sql.blaze.BlazeSparkSessionExtension \ - --conf spark.shuffle.manager=org.apache.spark.sql.execution.blaze.shuffle.BlazeShuffleManager \ - --conf spark.blaze.enable=true \ - --conf spark.blaze.forceShuffledHashJoin=true \ - --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ - --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain \ - tpcbench.py \ - --name blaze \ - --benchmark tpcds \ - --data $TPCDS_DATA \ - --queries $TPCDS_QUERIES \ - --output . \ - --iterations 1 diff --git a/dev/benchmarks/blaze-tpch.sh b/dev/benchmarks/blaze-tpch.sh deleted file mode 100755 index 2c6878737d..0000000000 --- a/dev/benchmarks/blaze-tpch.sh +++ /dev/null @@ -1,53 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -$SPARK_HOME/sbin/stop-master.sh -$SPARK_HOME/sbin/stop-worker.sh - -$SPARK_HOME/sbin/start-master.sh -$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER - -$SPARK_HOME/bin/spark-submit \ - --master $SPARK_MASTER \ - --jars $BLAZE_JAR \ - --driver-class-path $BLAZE_JAR \ - --conf spark.driver.memory=8G \ - --conf spark.executor.instances=1 \ - --conf spark.executor.cores=8 \ - --conf spark.cores.max=8 \ - --conf spark.executor.memory=16g \ - --conf spark.executor.memoryOverhead=16g \ - --conf spark.memory.offHeap.enabled=false \ - --conf spark.eventLog.enabled=true \ - --conf spark.driver.extraClassPath=$BLAZE_JAR \ - --conf spark.executor.extraClassPath=$BLAZE_JAR \ - --conf spark.sql.extensions=org.apache.spark.sql.blaze.BlazeSparkSessionExtension \ - --conf spark.shuffle.manager=org.apache.spark.sql.execution.blaze.shuffle.BlazeShuffleManager \ - --conf spark.blaze.enable=true \ - --conf spark.blaze.forceShuffledHashJoin=true \ - --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ - --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain \ - tpcbench.py \ - --name blaze \ - --benchmark tpch \ - --data $TPCH_DATA \ - --queries $TPCH_QUERIES \ - --output . \ - --iterations 1 diff --git a/dev/benchmarks/comet-tpcds.sh b/dev/benchmarks/comet-tpcds.sh deleted file mode 100755 index b55b27188c..0000000000 --- a/dev/benchmarks/comet-tpcds.sh +++ /dev/null @@ -1,53 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -$SPARK_HOME/sbin/stop-master.sh -$SPARK_HOME/sbin/stop-worker.sh - -$SPARK_HOME/sbin/start-master.sh -$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER - -$SPARK_HOME/bin/spark-submit \ - --master $SPARK_MASTER \ - --jars $COMET_JAR \ - --driver-class-path $COMET_JAR \ - --conf spark.driver.memory=8G \ - --conf spark.executor.instances=2 \ - --conf spark.executor.cores=8 \ - --conf spark.cores.max=16 \ - --conf spark.executor.memory=16g \ - --conf spark.memory.offHeap.enabled=true \ - --conf spark.memory.offHeap.size=16g \ - --conf spark.eventLog.enabled=true \ - --conf spark.driver.extraClassPath=$COMET_JAR \ - --conf spark.executor.extraClassPath=$COMET_JAR \ - --conf spark.plugins=org.apache.spark.CometPlugin \ - --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ - --conf spark.comet.scan.impl=native_datafusion \ - --conf spark.comet.expression.Cast.allowIncompatible=true \ - --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ - --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain \ - tpcbench.py \ - --name comet \ - --benchmark tpcds \ - --data $TPCDS_DATA \ - --queries $TPCDS_QUERIES \ - --output . \ - --iterations 1 diff --git a/dev/benchmarks/comet-tpch-iceberg.sh b/dev/benchmarks/comet-tpch-iceberg.sh deleted file mode 100755 index 7907125c82..0000000000 --- a/dev/benchmarks/comet-tpch-iceberg.sh +++ /dev/null @@ -1,114 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# TPC-H benchmark using Iceberg tables with Comet's native iceberg-rust integration. -# -# Required environment variables: -# SPARK_HOME - Path to Spark installation -# SPARK_MASTER - Spark master URL (e.g., spark://localhost:7077) -# COMET_JAR - Path to Comet JAR -# ICEBERG_JAR - Path to Iceberg Spark runtime JAR -# ICEBERG_WAREHOUSE - Path to Iceberg warehouse directory -# TPCH_QUERIES - Path to TPC-H query files -# -# Optional: -# ICEBERG_CATALOG - Catalog name (default: local) -# ICEBERG_DATABASE - Database name (default: tpch) -# -# Setup (run once to create Iceberg tables from Parquet): -# $SPARK_HOME/bin/spark-submit \ -# --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \ -# --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \ -# --conf spark.sql.catalog.local.type=hadoop \ -# --conf spark.sql.catalog.local.warehouse=$ICEBERG_WAREHOUSE \ -# create-iceberg-tpch.py \ -# --parquet-path $TPCH_DATA \ -# --catalog local \ -# --database tpch - -set -e - -# Defaults -ICEBERG_CATALOG=${ICEBERG_CATALOG:-local} -ICEBERG_DATABASE=${ICEBERG_DATABASE:-tpch} - -# Validate required variables -if [ -z "$SPARK_HOME" ]; then - echo "Error: SPARK_HOME is not set" - exit 1 -fi -if [ -z "$COMET_JAR" ]; then - echo "Error: COMET_JAR is not set" - exit 1 -fi -if [ -z "$ICEBERG_JAR" ]; then - echo "Error: ICEBERG_JAR is not set" - echo "Download from: https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.8.1/" - exit 1 -fi -if [ -z "$ICEBERG_WAREHOUSE" ]; then - echo "Error: ICEBERG_WAREHOUSE is not set" - exit 1 -fi -if [ -z "$TPCH_QUERIES" ]; then - echo "Error: TPCH_QUERIES is not set" - exit 1 -fi - -$SPARK_HOME/sbin/stop-master.sh 2>/dev/null || true -$SPARK_HOME/sbin/stop-worker.sh 2>/dev/null || true - -$SPARK_HOME/sbin/start-master.sh -$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER - -$SPARK_HOME/bin/spark-submit \ - --master $SPARK_MASTER \ - --jars $COMET_JAR,$ICEBERG_JAR \ - --driver-class-path $COMET_JAR:$ICEBERG_JAR \ - --conf spark.driver.memory=8G \ - --conf spark.executor.instances=1 \ - --conf spark.executor.cores=8 \ - --conf spark.cores.max=8 \ - --conf spark.executor.memory=16g \ - --conf spark.memory.offHeap.enabled=true \ - --conf spark.memory.offHeap.size=16g \ - --conf spark.eventLog.enabled=true \ - --conf spark.driver.extraClassPath=$COMET_JAR:$ICEBERG_JAR \ - --conf spark.executor.extraClassPath=$COMET_JAR:$ICEBERG_JAR \ - --conf spark.plugins=org.apache.spark.CometPlugin \ - --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ - --conf spark.comet.exec.replaceSortMergeJoin=true \ - --conf spark.comet.expression.Cast.allowIncompatible=true \ - --conf spark.comet.enabled=true \ - --conf spark.comet.exec.enabled=true \ - --conf spark.comet.scan.icebergNative.enabled=true \ - --conf spark.comet.explainFallback.enabled=true \ - --conf spark.sql.catalog.${ICEBERG_CATALOG}=org.apache.iceberg.spark.SparkCatalog \ - --conf spark.sql.catalog.${ICEBERG_CATALOG}.type=hadoop \ - --conf spark.sql.catalog.${ICEBERG_CATALOG}.warehouse=$ICEBERG_WAREHOUSE \ - --conf spark.sql.defaultCatalog=${ICEBERG_CATALOG} \ - tpcbench.py \ - --name comet-iceberg \ - --benchmark tpch \ - --catalog $ICEBERG_CATALOG \ - --database $ICEBERG_DATABASE \ - --queries $TPCH_QUERIES \ - --output . \ - --iterations 1 diff --git a/dev/benchmarks/comet-tpch.sh b/dev/benchmarks/comet-tpch.sh deleted file mode 100755 index a748a02319..0000000000 --- a/dev/benchmarks/comet-tpch.sh +++ /dev/null @@ -1,55 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -$SPARK_HOME/sbin/stop-master.sh -$SPARK_HOME/sbin/stop-worker.sh - -$SPARK_HOME/sbin/start-master.sh -$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER - -$SPARK_HOME/bin/spark-submit \ - --master $SPARK_MASTER \ - --jars $COMET_JAR \ - --driver-class-path $COMET_JAR \ - --conf spark.driver.memory=8G \ - --conf spark.executor.instances=1 \ - --conf spark.executor.cores=8 \ - --conf spark.cores.max=8 \ - --conf spark.executor.memory=16g \ - --conf spark.memory.offHeap.enabled=true \ - --conf spark.memory.offHeap.size=16g \ - --conf spark.eventLog.enabled=true \ - --conf spark.driver.extraClassPath=$COMET_JAR \ - --conf spark.executor.extraClassPath=$COMET_JAR \ - --conf spark.plugins=org.apache.spark.CometPlugin \ - --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ - --conf spark.comet.scan.impl=native_datafusion \ - --conf spark.comet.exec.replaceSortMergeJoin=true \ - --conf spark.comet.expression.Cast.allowIncompatible=true \ - --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ - --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain \ - tpcbench.py \ - --name comet \ - --benchmark tpch \ - --data $TPCH_DATA \ - --queries $TPCH_QUERIES \ - --output . \ - --iterations 1 \ - --format parquet diff --git a/dev/benchmarks/create-iceberg-tpch.py b/dev/benchmarks/create-iceberg-tpch.py deleted file mode 100644 index 44f0f63a2e..0000000000 --- a/dev/benchmarks/create-iceberg-tpch.py +++ /dev/null @@ -1,88 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -""" -Convert TPC-H Parquet data to Iceberg tables. - -Usage: - spark-submit \ - --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \ - --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \ - --conf spark.sql.catalog.local.type=hadoop \ - --conf spark.sql.catalog.local.warehouse=/path/to/iceberg-warehouse \ - create-iceberg-tpch.py \ - --parquet-path /path/to/tpch/parquet \ - --catalog local \ - --database tpch -""" - -import argparse -from pyspark.sql import SparkSession -import time - - -def main(parquet_path: str, catalog: str, database: str): - spark = SparkSession.builder \ - .appName("Create Iceberg TPC-H Tables") \ - .getOrCreate() - - table_names = [ - "customer", - "lineitem", - "nation", - "orders", - "part", - "partsupp", - "region", - "supplier" - ] - - # Create database if it doesn't exist - spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalog}.{database}") - - for table in table_names: - parquet_table_path = f"{parquet_path}/{table}.parquet" - iceberg_table = f"{catalog}.{database}.{table}" - - print(f"Converting {parquet_table_path} -> {iceberg_table}") - start_time = time.time() - - # Drop table if exists to allow re-running - spark.sql(f"DROP TABLE IF EXISTS {iceberg_table}") - - # Read parquet and write as Iceberg - df = spark.read.parquet(parquet_table_path) - df.writeTo(iceberg_table).using("iceberg").create() - - row_count = spark.table(iceberg_table).count() - elapsed = time.time() - start_time - print(f" Created {iceberg_table} with {row_count} rows in {elapsed:.2f}s") - - print("\nAll TPC-H tables created successfully!") - print(f"Tables available at: {catalog}.{database}.*") - - spark.stop() - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Convert TPC-H Parquet data to Iceberg tables") - parser.add_argument("--parquet-path", required=True, help="Path to TPC-H Parquet data directory") - parser.add_argument("--catalog", required=True, help="Iceberg catalog name (e.g., 'local')") - parser.add_argument("--database", default="tpch", help="Database name to create tables in") - args = parser.parse_args() - - main(args.parquet_path, args.catalog, args.database) diff --git a/dev/benchmarks/gluten-tpcds.sh b/dev/benchmarks/gluten-tpcds.sh deleted file mode 100755 index 7c475c79c0..0000000000 --- a/dev/benchmarks/gluten-tpcds.sh +++ /dev/null @@ -1,53 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -export TZ=UTC - -$SPARK_HOME/sbin/stop-master.sh -$SPARK_HOME/sbin/stop-worker.sh - -$SPARK_HOME/sbin/start-master.sh -$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER - -$SPARK_HOME/bin/spark-submit \ - --master $SPARK_MASTER \ - --conf spark.driver.memory=8G \ - --conf spark.executor.instances=2 \ - --conf spark.executor.memory=16G \ - --conf spark.executor.cores=8 \ - --conf spark.cores.max=16 \ - --conf spark.eventLog.enabled=true \ - --jars $GLUTEN_JAR \ - --conf spark.plugins=org.apache.gluten.GlutenPlugin \ - --conf spark.driver.extraClassPath=${GLUTEN_JAR} \ - --conf spark.executor.extraClassPath=${GLUTEN_JAR} \ - --conf spark.memory.offHeap.enabled=true \ - --conf spark.memory.offHeap.size=16g \ - --conf spark.gluten.sql.columnar.forceShuffledHashJoin=true \ - --conf spark.shuffle.manager=org.apache.spark.shuffle.sort.ColumnarShuffleManager \ - --conf spark.sql.session.timeZone=UTC \ - --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ - --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain \ - tpcbench.py \ - --name gluten \ - --benchmark tpcds \ - --data $TPCDS_DATA \ - --queries $TPCDS_QUERIES \ - --output . \ - --iterations 1 diff --git a/dev/benchmarks/gluten-tpch.sh b/dev/benchmarks/gluten-tpch.sh deleted file mode 100755 index 46c3ed7527..0000000000 --- a/dev/benchmarks/gluten-tpch.sh +++ /dev/null @@ -1,53 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -export TZ=UTC - -$SPARK_HOME/sbin/stop-master.sh -$SPARK_HOME/sbin/stop-worker.sh - -$SPARK_HOME/sbin/start-master.sh -$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER - -$SPARK_HOME/bin/spark-submit \ - --master $SPARK_MASTER \ - --conf spark.driver.memory=8G \ - --conf spark.executor.instances=1 \ - --conf spark.executor.memory=16G \ - --conf spark.executor.cores=8 \ - --conf spark.cores.max=8 \ - --conf spark.eventLog.enabled=true \ - --jars $GLUTEN_JAR \ - --conf spark.plugins=org.apache.gluten.GlutenPlugin \ - --conf spark.driver.extraClassPath=${GLUTEN_JAR} \ - --conf spark.executor.extraClassPath=${GLUTEN_JAR} \ - --conf spark.memory.offHeap.enabled=true \ - --conf spark.memory.offHeap.size=16g \ - --conf spark.gluten.sql.columnar.forceShuffledHashJoin=true \ - --conf spark.shuffle.manager=org.apache.spark.shuffle.sort.ColumnarShuffleManager \ - --conf spark.sql.session.timeZone=UTC \ - --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ - --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain \ - tpcbench.py \ - --name gluten \ - --benchmark tpch \ - --data $TPCH_DATA \ - --queries $TPCH_QUERIES \ - --output . \ - --iterations 1 diff --git a/dev/benchmarks/spark-tpcds.sh b/dev/benchmarks/spark-tpcds.sh deleted file mode 100755 index dad079ba23..0000000000 --- a/dev/benchmarks/spark-tpcds.sh +++ /dev/null @@ -1,45 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -$SPARK_HOME/sbin/stop-master.sh -$SPARK_HOME/sbin/stop-worker.sh - -$SPARK_HOME/sbin/start-master.sh -$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER - -$SPARK_HOME/bin/spark-submit \ - --master $SPARK_MASTER \ - --conf spark.driver.memory=8G \ - --conf spark.executor.instances=2 \ - --conf spark.executor.cores=8 \ - --conf spark.cores.max=16 \ - --conf spark.executor.memory=16g \ - --conf spark.memory.offHeap.enabled=true \ - --conf spark.memory.offHeap.size=16g \ - --conf spark.eventLog.enabled=true \ - --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ - --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain \ - tpcbench.py \ - --name spark \ - --benchmark tpcds \ - --data $TPCDS_DATA \ - --queries $TPCDS_QUERIES \ - --output . \ - --iterations 1 diff --git a/dev/benchmarks/spark-tpch.sh b/dev/benchmarks/spark-tpch.sh deleted file mode 100755 index ae359f049f..0000000000 --- a/dev/benchmarks/spark-tpch.sh +++ /dev/null @@ -1,46 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -$SPARK_HOME/sbin/stop-master.sh -$SPARK_HOME/sbin/stop-worker.sh - -$SPARK_HOME/sbin/start-master.sh -$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER - -$SPARK_HOME/bin/spark-submit \ - --master $SPARK_MASTER \ - --conf spark.driver.memory=8G \ - --conf spark.executor.instances=1 \ - --conf spark.executor.cores=8 \ - --conf spark.cores.max=8 \ - --conf spark.executor.memory=16g \ - --conf spark.memory.offHeap.enabled=true \ - --conf spark.memory.offHeap.size=16g \ - --conf spark.eventLog.enabled=true \ - --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ - --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain \ - tpcbench.py \ - --name spark \ - --benchmark tpch \ - --data $TPCH_DATA \ - --queries $TPCH_QUERIES \ - --output . \ - --iterations 1 \ - --format parquet diff --git a/docs/source/about/gluten_comparison.md b/docs/source/about/gluten_comparison.md index 492479bb9d..40c6c2741a 100644 --- a/docs/source/about/gluten_comparison.md +++ b/docs/source/about/gluten_comparison.md @@ -86,7 +86,7 @@ on your existing Spark jobs. ![tpch_allqueries_comet_gluten.png](/_static/images/tpch_allqueries_comet_gluten.png) -The scripts that were used to generate these results can be found [here](https://github.com/apache/datafusion-comet/tree/main/dev/benchmarks). +The scripts that were used to generate these results can be found [here](https://github.com/apache/datafusion-comet/tree/main/benchmarks/tpc). ## Ease of Development & Contributing diff --git a/docs/source/contributor-guide/benchmark-results/tpc-ds.md b/docs/source/contributor-guide/benchmark-results/tpc-ds.md index 66ff2e51a7..bea254cc08 100644 --- a/docs/source/contributor-guide/benchmark-results/tpc-ds.md +++ b/docs/source/contributor-guide/benchmark-results/tpc-ds.md @@ -46,4 +46,4 @@ The raw results of these benchmarks in JSON format is available here: - [Spark](spark-3.5.3-tpcds.json) - [Comet](comet-0.11.0-tpcds.json) -The scripts that were used to generate these results can be found [here](https://github.com/apache/datafusion-comet/tree/main/dev/benchmarks). +The scripts that were used to generate these results can be found [here](https://github.com/apache/datafusion-comet/tree/main/benchmarks/tpc). diff --git a/docs/source/contributor-guide/benchmark-results/tpc-h.md b/docs/source/contributor-guide/benchmark-results/tpc-h.md index 4424d9eac7..2170426c05 100644 --- a/docs/source/contributor-guide/benchmark-results/tpc-h.md +++ b/docs/source/contributor-guide/benchmark-results/tpc-h.md @@ -46,4 +46,4 @@ The raw results of these benchmarks in JSON format is available here: - [Spark](spark-3.5.3-tpch.json) - [Comet](comet-0.11.0-tpch.json) -The scripts that were used to generate these results can be found [here](https://github.com/apache/datafusion-comet/tree/main/dev/benchmarks). +The scripts that were used to generate these results can be found [here](https://github.com/apache/datafusion-comet/tree/main/benchmarks/tpc). diff --git a/docs/source/contributor-guide/benchmarking.md b/docs/source/contributor-guide/benchmarking.md index 768089c955..49af73376f 100644 --- a/docs/source/contributor-guide/benchmarking.md +++ b/docs/source/contributor-guide/benchmarking.md @@ -21,7 +21,7 @@ under the License. To track progress on performance, we regularly run benchmarks derived from TPC-H and TPC-DS. -The benchmarking scripts are contained at [https://github.com/apache/datafusion-comet/tree/main/dev/benchmarks](https://github.com/apache/datafusion-comet/tree/main/dev/benchmarks). +The benchmarking scripts are contained [here](https://github.com/apache/datafusion-comet/tree/main/benchmarks/tpc). Data generation scripts are available in the [DataFusion Benchmarks](https://github.com/apache/datafusion-benchmarks) GitHub repository. diff --git a/docs/source/contributor-guide/benchmarking_aws_ec2.md b/docs/source/contributor-guide/benchmarking_aws_ec2.md index 922b0379fe..81f15d64ea 100644 --- a/docs/source/contributor-guide/benchmarking_aws_ec2.md +++ b/docs/source/contributor-guide/benchmarking_aws_ec2.md @@ -109,23 +109,23 @@ export COMET_JAR=/home/ec2-user/datafusion-comet/spark/target/comet-spark-spark3 ## Run Benchmarks -Use the scripts in `dev/benchmarks` in the Comet repository. +Use the scripts in `benchmarks/tpc` in the Comet repository. ```shell -cd dev/benchmarks +cd benchmarks/tpc export TPCH_QUERIES=/home/ec2-user/datafusion-benchmarks/tpch/queries/ ``` Run Spark benchmark: ```shell -./spark-tpch.sh +python3 run.py --engine spark --benchmark tpch ``` Run Comet benchmark: ```shell -./comet-tpch.sh +python3 run.py --engine comet --benchmark tpch ``` ## Running Benchmarks with S3 @@ -164,4 +164,9 @@ Modify the scripts to add the following configurations. --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain \ ``` -Now run the `spark-tpch.sh` and `comet-tpch.sh` scripts. +Now run the benchmarks: + +```shell +python3 run.py --engine spark --benchmark tpch +python3 run.py --engine comet --benchmark tpch +```