From 9cde94081bc612993af75d4c0e9fd73d8e13e83d Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Thu, 19 Feb 2026 13:37:38 +0530 Subject: [PATCH 1/7] Add Kind cluster setup scripts for K8s benchmarks --- hack/k8s-benchmark-rbac.yaml | 88 ++++++++++++++++++++++ hack/k8s-benchmark-setup.sh | 125 ++++++++++++++++++++++++++++++++ hack/kind-benchmark-config.yaml | 29 ++++++++ 3 files changed, 242 insertions(+) create mode 100644 hack/k8s-benchmark-rbac.yaml create mode 100755 hack/k8s-benchmark-setup.sh create mode 100644 hack/kind-benchmark-config.yaml diff --git a/hack/k8s-benchmark-rbac.yaml b/hack/k8s-benchmark-rbac.yaml new file mode 100644 index 0000000000..dab4c242bb --- /dev/null +++ b/hack/k8s-benchmark-rbac.yaml @@ -0,0 +1,88 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +--- +apiVersion: v1 +kind: Namespace +metadata: + name: comet-bench + +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: spark + namespace: comet-bench + +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: spark-driver + namespace: comet-bench +rules: + - apiGroups: [""] + resources: ["pods", "services", "configmaps", "persistentvolumeclaims"] + verbs: ["get", "list", "watch", "create", "delete", "patch", "update"] + - apiGroups: [""] + resources: ["pods/log"] + verbs: ["get", "list"] + +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: spark-driver + namespace: comet-bench +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: spark-driver +subjects: + - kind: ServiceAccount + name: spark + namespace: comet-bench + +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: comet-bench-spark-operator +rules: + - apiGroups: [""] + resources: ["pods", "services", "configmaps", "persistentvolumeclaims"] + verbs: ["get", "list", "watch", "create", "delete", "patch", "update"] + - apiGroups: [""] + resources: ["pods/log"] + verbs: ["get", "list"] + - apiGroups: ["sparkoperator.k8s.io"] + resources: ["sparkapplications", "sparkapplications/status", "scheduledsparkapplications", "scheduledsparkapplications/status"] + verbs: ["get", "list", "watch", "create", "delete", "patch", "update"] + +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: comet-bench-spark-operator +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: comet-bench-spark-operator +subjects: + - kind: ServiceAccount + name: spark + namespace: comet-bench diff --git a/hack/k8s-benchmark-setup.sh b/hack/k8s-benchmark-setup.sh new file mode 100755 index 0000000000..fb7c53caeb --- /dev/null +++ b/hack/k8s-benchmark-setup.sh @@ -0,0 +1,125 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -euo pipefail + +CLUSTER_NAME="${COMET_BENCH_CLUSTER:-comet-bench}" +NAMESPACE="${COMET_BENCH_NAMESPACE:-comet-bench}" +K8S_VERSION="${K8S_VERSION:-1.32.0}" +KIND_BIN="${KIND:-kind}" +SPARK_OPERATOR_VERSION="${SPARK_OPERATOR_VERSION:-2.1.0}" +KIND_NODE_IMAGE="kindest/node:v${K8S_VERSION}" + +log_info() { echo "[INFO] $1"; } +log_error() { echo "[ERROR] $1" >&2; } + +check_prerequisites() { + local missing=() + command -v "$KIND_BIN" &>/dev/null || missing+=("kind") + command -v kubectl &>/dev/null || missing+=("kubectl") + command -v helm &>/dev/null || missing+=("helm") + command -v docker &>/dev/null || missing+=("docker") + + if [[ ${#missing[@]} -gt 0 ]]; then + log_error "Missing tools: ${missing[*]}" + exit 1 + fi +} + +delete_cluster() { + log_info "Deleting cluster: $CLUSTER_NAME" + "$KIND_BIN" delete cluster --name "$CLUSTER_NAME" 2>/dev/null || true +} + +create_cluster() { + if "$KIND_BIN" get clusters 2>/dev/null | grep -q "^${CLUSTER_NAME}$"; then + log_info "Cluster '$CLUSTER_NAME' already exists" + return 0 + fi + + local script_dir + script_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + mkdir -p /tmp/comet-bench-data + + log_info "Creating cluster: $CLUSTER_NAME" + "$KIND_BIN" create cluster \ + --name "$CLUSTER_NAME" \ + --image "$KIND_NODE_IMAGE" \ + --config "${script_dir}/kind-benchmark-config.yaml" \ + --wait 120s +} + +setup_namespace() { + local script_dir + script_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + log_info "Setting up namespace: $NAMESPACE" + kubectl apply -f "${script_dir}/k8s-benchmark-rbac.yaml" +} + +install_spark_operator() { + log_info "Installing Spark Operator v${SPARK_OPERATOR_VERSION}" + kubectl config use-context "kind-${CLUSTER_NAME}" 2>/dev/null || true + + helm repo add spark-operator https://kubeflow.github.io/spark-operator 2>/dev/null || true + helm repo update + + if helm list -n spark-operator 2>/dev/null | grep -q spark-operator; then + helm upgrade spark-operator spark-operator/spark-operator \ + --namespace spark-operator \ + --version "$SPARK_OPERATOR_VERSION" \ + --set webhook.enable=true \ + --set "spark.jobNamespaces[0]=$NAMESPACE" \ + --timeout 10m --wait || true + else + helm install spark-operator spark-operator/spark-operator \ + --namespace spark-operator --create-namespace \ + --version "$SPARK_OPERATOR_VERSION" \ + --set webhook.enable=true \ + --set "spark.jobNamespaces[0]=$NAMESPACE" \ + --timeout 10m --wait || true + fi + + sleep 5 + kubectl get deployment -n spark-operator 2>/dev/null || true +} + +print_status() { + echo "" + log_info "Cluster: $CLUSTER_NAME" + log_info "API Server: $(kubectl config view --minify -o jsonpath='{.clusters[0].cluster.server}')" + echo "" + kubectl get deployment -n spark-operator 2>/dev/null || true + echo "" + log_info "Run benchmarks: ./benchmarks/scripts/run-k8s-benchmark.sh [spark|comet]" + log_info "Delete cluster: ./hack/k8s-benchmark-setup.sh --delete" +} + +main() { + if [[ "${1:-}" == "--delete" ]]; then + delete_cluster + exit 0 + fi + + check_prerequisites + create_cluster + setup_namespace + install_spark_operator + print_status +} + +main "$@" diff --git a/hack/kind-benchmark-config.yaml b/hack/kind-benchmark-config.yaml new file mode 100644 index 0000000000..277f39ae62 --- /dev/null +++ b/hack/kind-benchmark-config.yaml @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +kind: Cluster +apiVersion: kind.x-k8s.io/v1alpha4 +nodes: + - role: control-plane + - role: worker + extraMounts: + - hostPath: /tmp/comet-bench-data + containerPath: /data + - role: worker + extraMounts: + - hostPath: /tmp/comet-bench-data + containerPath: /data From 7c70e52c3744f4999c62a6af97d3497905e5d568 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Thu, 19 Feb 2026 13:37:45 +0530 Subject: [PATCH 2/7] Add Dockerfile for K8s benchmark image --- benchmarks/Dockerfile.k8s | 93 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 benchmarks/Dockerfile.k8s diff --git a/benchmarks/Dockerfile.k8s b/benchmarks/Dockerfile.k8s new file mode 100644 index 0000000000..91c99a499b --- /dev/null +++ b/benchmarks/Dockerfile.k8s @@ -0,0 +1,93 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM apache/spark:3.5.8 AS builder + +USER root + +RUN apt-get update \ + && apt-get install -y curl openjdk-17-jdk gcc-10 g++-10 cpp-10 unzip git \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +ENV CC="gcc-10" +ENV CXX="g++-10" + +RUN PB_REL="https://github.com/protocolbuffers/protobuf/releases" \ + && curl -LO $PB_REL/download/v30.2/protoc-30.2-linux-x86_64.zip \ + && unzip protoc-30.2-linux-x86_64.zip -d /root/.local \ + && rm protoc-30.2-linux-x86_64.zip +ENV PATH="$PATH:/root/.local/bin" + +RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y +ENV PATH="/root/.cargo/bin:${PATH}" +ENV RUSTFLAGS="-C debuginfo=line-tables-only -C incremental=false" +ENV SPARK_VERSION=3.5 +ENV SCALA_VERSION=2.12 + +WORKDIR /comet +COPY native /comet/native +RUN cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release + +COPY .mvn /comet/.mvn +COPY mvnw /comet/mvnw +COPY common /comet/common +COPY dev /comet/dev +COPY spark /comet/spark +COPY spark-integration /comet/spark-integration +COPY scalafmt.conf /comet/scalafmt.conf +COPY .scalafix.conf /comet/.scalafix.conf +COPY Makefile /comet/Makefile +COPY pom.xml /comet/pom.xml + +RUN mkdir -p /root/.m2 && \ + echo 'centralcentralhttps://repo1.maven.org/maven2' > /root/.m2/settings.xml + +RUN cd /comet \ + && JAVA_HOME=$(readlink -f $(which javac) | sed "s/\/bin\/javac//") \ + make release-nogit PROFILES="-Pspark-$SPARK_VERSION -Pscala-$SCALA_VERSION" + +FROM apache/spark:3.5.8 + +USER root + +ENV SPARK_VERSION=3.5 +ENV SCALA_VERSION=2.12 + +RUN apt-get update \ + && apt-get install -y python3 python3-pip git curl \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +COPY --from=builder /comet/spark/target/comet-spark-spark${SPARK_VERSION}_${SCALA_VERSION}-*.jar $SPARK_HOME/jars/ + +RUN cd /opt \ + && git clone https://github.com/databricks/tpch-dbgen.git \ + && cd tpch-dbgen \ + && make + +RUN cd /opt \ + && git clone https://github.com/apache/datafusion-benchmarks.git + +COPY benchmarks/pyspark /opt/comet-bench/pyspark +COPY benchmarks/scripts /opt/comet-bench/scripts +COPY benchmarks/conf /opt/comet-bench/conf + +WORKDIR /opt/comet-bench + +ENV COMET_JAR_PATH="$SPARK_HOME/jars" +ENV PYTHONUNBUFFERED=1 + +CMD ["python3", "--version"] From 473a563ddad8bea19c306da7c2313697dd780e30 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Thu, 19 Feb 2026 13:37:54 +0530 Subject: [PATCH 3/7] Add TPC-H benchmark runner scripts and config --- benchmarks/conf/k8s.conf | 31 +++++ benchmarks/scripts/compare-results.py | 104 ++++++++++++++++ benchmarks/scripts/generate-tpch-data.sh | 52 ++++++++ benchmarks/scripts/run-k8s-benchmark.sh | 147 +++++++++++++++++++++++ benchmarks/scripts/tpch-benchmark.py | 133 ++++++++++++++++++++ 5 files changed, 467 insertions(+) create mode 100644 benchmarks/conf/k8s.conf create mode 100755 benchmarks/scripts/compare-results.py create mode 100755 benchmarks/scripts/generate-tpch-data.sh create mode 100755 benchmarks/scripts/run-k8s-benchmark.sh create mode 100755 benchmarks/scripts/tpch-benchmark.py diff --git a/benchmarks/conf/k8s.conf b/benchmarks/conf/k8s.conf new file mode 100644 index 0000000000..a9c5b45e78 --- /dev/null +++ b/benchmarks/conf/k8s.conf @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +spark.kubernetes.namespace=comet-bench +spark.kubernetes.container.image=comet-bench:local +spark.kubernetes.authenticate.driver.serviceAccountName=spark +spark.driver.memory=2g +spark.kubernetes.driver.pod.name=comet-bench-driver +spark.executor.instances=2 +spark.executor.memory=2g +spark.executor.cores=2 +spark.kubernetes.driver.volumes.hostPath.data.mount.path=/data +spark.kubernetes.driver.volumes.hostPath.data.options.path=/tmp/comet-bench-data +spark.kubernetes.executor.volumes.hostPath.data.mount.path=/data +spark.kubernetes.executor.volumes.hostPath.data.options.path=/tmp/comet-bench-data +spark.sql.adaptive.enabled=true +spark.dynamicAllocation.enabled=false diff --git a/benchmarks/scripts/compare-results.py b/benchmarks/scripts/compare-results.py new file mode 100755 index 0000000000..25e0edcc71 --- /dev/null +++ b/benchmarks/scripts/compare-results.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python3 +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import argparse +import json +import sys +from pathlib import Path + + +def load_result(filepath: str) -> dict: + with open(filepath, 'r') as f: + return json.load(f) + + +def calculate_speedup(spark_duration: float, comet_duration: float) -> float: + if comet_duration <= 0: + return float('inf') + return spark_duration / comet_duration + + +def print_comparison(spark_result: dict, comet_result: dict, speedup: float, min_speedup: float): + spark_duration = spark_result['duration_seconds'] + comet_duration = comet_result['duration_seconds'] + + print("\n" + "=" * 50) + print("BENCHMARK COMPARISON") + print("=" * 50) + print(f"Query: {spark_result.get('query', 'unknown')}") + print(f"Spark: {spark_duration:.2f}s") + print(f"Comet: {comet_duration:.2f}s") + print(f"Speedup: {speedup:.2f}x") + print(f"Required: {min_speedup:.2f}x") + print("-" * 50) + + if speedup >= min_speedup: + print(f"PASS: {speedup:.2f}x >= {min_speedup:.2f}x") + else: + print(f"FAIL: {speedup:.2f}x < {min_speedup:.2f}x") + print("=" * 50 + "\n") + + +def main(): + parser = argparse.ArgumentParser(description="Compare benchmark results") + parser.add_argument("--spark", "-s", required=True, help="Spark result JSON") + parser.add_argument("--comet", "-c", required=True, help="Comet result JSON") + parser.add_argument("--min-speedup", type=float, default=1.1, help="Min speedup (default: 1.1)") + parser.add_argument("--output", "-o", help="Output summary JSON") + parser.add_argument("--strict", action="store_true", help="Exit with error if below threshold") + + args = parser.parse_args() + + if not Path(args.spark).exists(): + print(f"Error: {args.spark} not found", file=sys.stderr) + return 1 + + if not Path(args.comet).exists(): + print(f"Error: {args.comet} not found", file=sys.stderr) + return 1 + + spark_result = load_result(args.spark) + comet_result = load_result(args.comet) + + speedup = calculate_speedup( + spark_result['duration_seconds'], + comet_result['duration_seconds'] + ) + + passed = speedup >= args.min_speedup + print_comparison(spark_result, comet_result, speedup, args.min_speedup) + + if args.output: + summary = { + "spark": spark_result, + "comet": comet_result, + "speedup": speedup, + "min_speedup": args.min_speedup, + "passed": passed + } + with open(args.output, 'w') as f: + json.dump(summary, f, indent=2) + + if args.strict and not passed: + return 1 + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/benchmarks/scripts/generate-tpch-data.sh b/benchmarks/scripts/generate-tpch-data.sh new file mode 100755 index 0000000000..13da4a8f7d --- /dev/null +++ b/benchmarks/scripts/generate-tpch-data.sh @@ -0,0 +1,52 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -euo pipefail + +SCALE_FACTOR="${1:-1}" +OUTPUT_DIR="${2:-/tmp/comet-bench-data/tpch}" +TPCH_DBGEN_DIR="${TPCH_DBGEN_DIR:-/tmp/tpch-dbgen}" + +log_info() { echo "[INFO] $1"; } +log_error() { echo "[ERROR] $1" >&2; } + +check_dbgen() { + if [[ ! -d "$TPCH_DBGEN_DIR" ]]; then + log_info "Cloning tpch-dbgen" + git clone https://github.com/databricks/tpch-dbgen.git "$TPCH_DBGEN_DIR" + cd "$TPCH_DBGEN_DIR" && make + fi +} + +generate_data() { + log_info "Generating TPC-H SF=$SCALE_FACTOR" + mkdir -p "$OUTPUT_DIR" + cd "$TPCH_DBGEN_DIR" + ./dbgen -s "$SCALE_FACTOR" -f + mv *.tbl "$OUTPUT_DIR/" 2>/dev/null || true + log_info "Data generated: $OUTPUT_DIR" + ls -lh "$OUTPUT_DIR" +} + +main() { + log_info "TPC-H Data Generation (SF=$SCALE_FACTOR)" + check_dbgen + generate_data +} + +main "$@" diff --git a/benchmarks/scripts/run-k8s-benchmark.sh b/benchmarks/scripts/run-k8s-benchmark.sh new file mode 100755 index 0000000000..3bfdc39e9f --- /dev/null +++ b/benchmarks/scripts/run-k8s-benchmark.sh @@ -0,0 +1,147 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -euo pipefail + +MODE="${1:-spark}" +QUERY="${2:-q1}" + +CLUSTER_NAME="${COMET_BENCH_CLUSTER:-comet-bench}" +NAMESPACE="${COMET_BENCH_NAMESPACE:-comet-bench}" +DOCKER_IMAGE="${COMET_DOCKER_IMAGE:-comet-bench:local}" +DATA_PATH="${TPCH_DATA_PATH:-/data/tpch}" +RESULTS_DIR="${RESULTS_DIR:-/tmp/comet-bench-results}" + +DRIVER_MEMORY="${DRIVER_MEMORY:-2g}" +EXECUTOR_MEMORY="${EXECUTOR_MEMORY:-2g}" +EXECUTOR_INSTANCES="${EXECUTOR_INSTANCES:-2}" +EXECUTOR_CORES="${EXECUTOR_CORES:-2}" + +log_info() { echo "[INFO] $1"; } +log_error() { echo "[ERROR] $1" >&2; } + +get_k8s_api_server() { + kubectl config view --minify -o jsonpath='{.clusters[0].cluster.server}' +} + +build_spark_submit_cmd() { + local mode="$1" + local query="$2" + local api_server + api_server=$(get_k8s_api_server) + + local cmd="$SPARK_HOME/bin/spark-submit" + cmd+=" --master k8s://${api_server}" + cmd+=" --deploy-mode cluster" + cmd+=" --name comet-bench-${mode}-${query}" + cmd+=" --conf spark.kubernetes.namespace=${NAMESPACE}" + cmd+=" --conf spark.kubernetes.container.image=${DOCKER_IMAGE}" + cmd+=" --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark" + cmd+=" --conf spark.kubernetes.driver.pod.name=comet-bench-driver" + cmd+=" --conf spark.driver.memory=${DRIVER_MEMORY}" + cmd+=" --conf spark.executor.instances=${EXECUTOR_INSTANCES}" + cmd+=" --conf spark.executor.memory=${EXECUTOR_MEMORY}" + cmd+=" --conf spark.executor.cores=${EXECUTOR_CORES}" + cmd+=" --conf spark.kubernetes.driver.volumes.hostPath.data.mount.path=/data" + cmd+=" --conf spark.kubernetes.driver.volumes.hostPath.data.options.path=/tmp/comet-bench-data" + cmd+=" --conf spark.kubernetes.executor.volumes.hostPath.data.mount.path=/data" + cmd+=" --conf spark.kubernetes.executor.volumes.hostPath.data.options.path=/tmp/comet-bench-data" + + if [[ "$mode" == "comet" ]]; then + local comet_jar + comet_jar=$(find "$SPARK_HOME/jars" -name "comet-spark-*.jar" 2>/dev/null | head -1) + if [[ -n "$comet_jar" ]]; then + cmd+=" --jars local://${comet_jar}" + cmd+=" --conf spark.executor.extraClassPath=${comet_jar}" + cmd+=" --conf spark.driver.extraClassPath=${comet_jar}" + fi + cmd+=" --conf spark.plugins=org.apache.spark.CometPlugin" + cmd+=" --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions" + cmd+=" --conf spark.comet.enabled=true" + cmd+=" --conf spark.comet.exec.enabled=true" + cmd+=" --conf spark.comet.exec.all.enabled=true" + cmd+=" --conf spark.comet.cast.allowIncompatible=true" + cmd+=" --conf spark.comet.exec.shuffle.enabled=true" + cmd+=" --conf spark.comet.exec.shuffle.mode=auto" + cmd+=" --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager" + fi + + cmd+=" local:///opt/comet-bench/scripts/tpch-benchmark.py" + cmd+=" --query ${query}" + cmd+=" --data ${DATA_PATH}" + cmd+=" --mode ${mode}" + + echo "$cmd" +} + +run_benchmark() { + local mode="$1" + local query="$2" + local result_file="${RESULTS_DIR}/${mode}_${query}_result.json" + + mkdir -p "$RESULTS_DIR" + log_info "Running benchmark: mode=$mode, query=$query" + + local start_time + start_time=$(date +%s.%N) + + local cmd + cmd=$(build_spark_submit_cmd "$mode" "$query") + log_info "Command: $cmd" + + if eval "$cmd"; then + local end_time duration + end_time=$(date +%s.%N) + duration=$(echo "$end_time - $start_time" | bc) + + cat > "$result_file" << EOF +{ + "mode": "$mode", + "query": "$query", + "duration_seconds": $duration, + "timestamp": "$(date -u +"%Y-%m-%dT%H:%M:%SZ")" +} +EOF + log_info "Results: $result_file" + echo "result_file=$result_file" >> "${GITHUB_OUTPUT:-/dev/null}" + else + log_error "Benchmark failed" + exit 1 + fi +} + +cleanup() { + kubectl delete pod comet-bench-driver -n "$NAMESPACE" --ignore-not-found=true 2>/dev/null || true +} + +main() { + if [[ "$MODE" == "-h" ]] || [[ "$MODE" == "--help" ]]; then + echo "Usage: $0 [spark|comet] [query]" + exit 0 + fi + + if [[ "$MODE" != "spark" ]] && [[ "$MODE" != "comet" ]]; then + log_error "Invalid mode: $MODE" + exit 1 + fi + + cleanup + run_benchmark "$MODE" "$QUERY" +} + +main "$@" diff --git a/benchmarks/scripts/tpch-benchmark.py b/benchmarks/scripts/tpch-benchmark.py new file mode 100755 index 0000000000..6cc316a908 --- /dev/null +++ b/benchmarks/scripts/tpch-benchmark.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python3 +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import argparse +import json +import os +import sys +import time +from datetime import datetime + +from pyspark.sql import SparkSession + +TPCH_Q1 = """ +SELECT + l_returnflag, l_linestatus, + SUM(l_quantity) AS sum_qty, + SUM(l_extendedprice) AS sum_base_price, + SUM(l_extendedprice * (1 - l_discount)) AS sum_disc_price, + SUM(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge, + AVG(l_quantity) AS avg_qty, + AVG(l_extendedprice) AS avg_price, + AVG(l_discount) AS avg_disc, + COUNT(*) AS count_order +FROM lineitem +WHERE l_shipdate <= DATE '1998-12-01' - INTERVAL '90' DAY +GROUP BY l_returnflag, l_linestatus +ORDER BY l_returnflag, l_linestatus +""" + +TPCH_Q6 = """ +SELECT SUM(l_extendedprice * l_discount) AS revenue +FROM lineitem +WHERE l_shipdate >= DATE '1994-01-01' + AND l_shipdate < DATE '1994-01-01' + INTERVAL '1' YEAR + AND l_discount BETWEEN 0.05 AND 0.07 + AND l_quantity < 24 +""" + +QUERIES = {"q1": TPCH_Q1, "q6": TPCH_Q6} + + +def load_tables(spark: SparkSession, data_path: str): + parquet_path = os.path.join(data_path, "parquet") + if os.path.exists(parquet_path): + for table in ["lineitem", "part", "orders"]: + table_path = os.path.join(parquet_path, table) + if os.path.exists(table_path): + spark.read.parquet(table_path).createOrReplaceTempView(table) + return + + schemas = { + "lineitem": "l_orderkey,l_partkey,l_suppkey,l_linenumber,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment", + } + + for table, columns in schemas.items(): + tbl_file = os.path.join(data_path, f"{table}.tbl") + if os.path.exists(tbl_file): + df = spark.read.csv(tbl_file, sep="|", header=False) + col_names = columns.split(",") + for i, col_name in enumerate(col_names): + df = df.withColumnRenamed(f"_c{i}", col_name) + if table == "lineitem": + df = df.withColumn("l_shipdate", df["l_shipdate"].cast("date")) + df.createOrReplaceTempView(table) + + +def run_query(spark: SparkSession, query_name: str, query_sql: str) -> dict: + print(f"Running: {query_name}") + + # Warmup + spark.sql(query_sql).collect() + + # Timed runs + durations = [] + for i in range(3): + start = time.time() + spark.sql(query_sql).count() + durations.append(time.time() - start) + print(f" Run {i+1}: {durations[-1]:.2f}s") + + return { + "query": query_name, + "durations": durations, + "min_duration_seconds": min(durations), + "avg_duration_seconds": sum(durations) / len(durations), + } + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--query", "-q", default="q1", choices=list(QUERIES.keys())) + parser.add_argument("--data", "-d", required=True) + parser.add_argument("--mode", "-m", default="spark", choices=["spark", "comet"]) + parser.add_argument("--output", "-o") + args = parser.parse_args() + + spark = SparkSession.builder.appName(f"TPC-H-{args.query}-{args.mode}").getOrCreate() + + try: + load_tables(spark, args.data) + result = run_query(spark, args.query, QUERIES[args.query]) + result["mode"] = args.mode + result["duration_seconds"] = result["min_duration_seconds"] + result["timestamp"] = datetime.utcnow().isoformat() + "Z" + + if args.output: + with open(args.output, 'w') as f: + json.dump(result, f, indent=2) + else: + print(json.dumps(result, indent=2)) + + return 0 + finally: + spark.stop() + + +if __name__ == "__main__": + sys.exit(main()) From c9429a7f178b6391c14b47c290bddb87743e440c Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Thu, 19 Feb 2026 13:38:01 +0530 Subject: [PATCH 4/7] Add GitHub CI workflow for K8s benchmark validation --- .github/workflows/k8s_benchmark.yml | 149 ++++++++++++++++++++++++++++ 1 file changed, 149 insertions(+) create mode 100644 .github/workflows/k8s_benchmark.yml diff --git a/.github/workflows/k8s_benchmark.yml b/.github/workflows/k8s_benchmark.yml new file mode 100644 index 0000000000..1df24132fa --- /dev/null +++ b/.github/workflows/k8s_benchmark.yml @@ -0,0 +1,149 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: K8s Benchmark CI + +concurrency: + group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }} + cancel-in-progress: true + +on: + pull_request: + paths: + - "native/**/*.rs" + - "spark/**/*.scala" + - "spark/**/*.java" + workflow_dispatch: + inputs: + scale_factor: + description: 'TPC-H scale factor' + default: '1' + query: + description: 'TPC-H query' + default: 'q1' + type: choice + options: [q1, q6] + min_speedup: + description: 'Minimum speedup' + default: '1.1' + +env: + RUST_VERSION: stable + K8S_VERSION: "1.32.0" + SPARK_VERSION: "3.5" + SCALA_VERSION: "2.12" + TPCH_SCALE_FACTOR: ${{ github.event.inputs.scale_factor || '1' }} + TPCH_QUERY: ${{ github.event.inputs.query || 'q1' }} + MIN_SPEEDUP: ${{ github.event.inputs.min_speedup || '1.1' }} + +jobs: + k8s-benchmark: + name: K8s Benchmark + runs-on: ubuntu-latest + timeout-minutes: 60 + + steps: + - name: Checkout + uses: actions/checkout@v6 + + - name: Free disk space + run: | + sudo rm -rf /usr/share/dotnet /usr/local/share/boost /opt/ghc + docker system prune -af || true + + - name: Install K8s tools + run: | + curl -Lo ./kind "https://kind.sigs.k8s.io/dl/v0.26.0/kind-linux-amd64" + chmod +x ./kind && sudo mv ./kind /usr/local/bin/kind + curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl" + chmod +x ./kubectl && sudo mv ./kubectl /usr/local/bin/kubectl + curl -sSfL https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash + + - name: Setup Rust + uses: ./.github/actions/setup-builder + with: + rust-version: ${{ env.RUST_VERSION }} + jdk-version: 17 + + - name: Cache Cargo + uses: actions/cache@v5 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + native/target + key: ${{ runner.os }}-cargo-bench-${{ hashFiles('native/**/Cargo.lock') }} + + - name: Cache Maven + uses: actions/cache@v5 + with: + path: ~/.m2/repository + key: ${{ runner.os }}-maven-bench-${{ hashFiles('**/pom.xml') }} + + - name: Build Comet + run: make release PROFILES="-Pspark-${{ env.SPARK_VERSION }} -Pscala-${{ env.SCALA_VERSION }}" + timeout-minutes: 20 + + - name: Setup Kind cluster + run: ./hack/k8s-benchmark-setup.sh + timeout-minutes: 10 + + - name: Build benchmark image + run: | + docker build -t comet-bench:local -f benchmarks/Dockerfile.k8s . + kind load docker-image comet-bench:local --name comet-bench + timeout-minutes: 15 + + - name: Generate TPC-H data + run: ./benchmarks/scripts/generate-tpch-data.sh ${{ env.TPCH_SCALE_FACTOR }} + timeout-minutes: 10 + + - name: Run Spark baseline + id: spark_bench + run: ./benchmarks/scripts/run-k8s-benchmark.sh spark ${{ env.TPCH_QUERY }} + timeout-minutes: 15 + + - name: Run Comet benchmark + id: comet_bench + run: ./benchmarks/scripts/run-k8s-benchmark.sh comet ${{ env.TPCH_QUERY }} + timeout-minutes: 15 + + - name: Validate performance + run: | + python3 benchmarks/scripts/compare-results.py \ + --spark /tmp/comet-bench-results/spark_${{ env.TPCH_QUERY }}_result.json \ + --comet /tmp/comet-bench-results/comet_${{ env.TPCH_QUERY }}_result.json \ + --min-speedup ${{ env.MIN_SPEEDUP }} \ + --output /tmp/comet-bench-results/comparison.json \ + --strict + + - name: Upload results + uses: actions/upload-artifact@v4 + if: always() + with: + name: benchmark-results + path: /tmp/comet-bench-results/ + + - name: Collect logs on failure + if: failure() + run: | + kubectl get pods -A || true + kubectl logs -n comet-bench comet-bench-driver --tail=100 2>/dev/null || true + + - name: Cleanup + if: always() + run: kind delete cluster --name comet-bench || true From d41ce7481e5590f061dce16b8374193267d3c62c Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Thu, 19 Feb 2026 13:38:08 +0530 Subject: [PATCH 5/7] Add GitHub CI workflow for K8s benchmark validation --- .github/workflows/k8s_benchmark.yml | 8 +++--- benchmarks/README.md | 38 ++++++++++++++++++++++++++--- 2 files changed, 39 insertions(+), 7 deletions(-) diff --git a/.github/workflows/k8s_benchmark.yml b/.github/workflows/k8s_benchmark.yml index 1df24132fa..bd960a459b 100644 --- a/.github/workflows/k8s_benchmark.yml +++ b/.github/workflows/k8s_benchmark.yml @@ -23,10 +23,10 @@ concurrency: on: pull_request: - paths: - - "native/**/*.rs" - - "spark/**/*.scala" - - "spark/**/*.java" + # paths: + # - "native/**/*.rs" + # - "spark/**/*.scala" + # - "spark/**/*.java" workflow_dispatch: inputs: scale_factor: diff --git a/benchmarks/README.md b/benchmarks/README.md index 7e2dfc9f2b..70c77782be 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -17,10 +17,42 @@ specific language governing permissions and limitations under the License. --> -# Running Comet Benchmarks in Microk8s +# Comet Benchmarks -This guide explains how to run benchmarks derived from TPC-H and TPC-DS in Apache DataFusion Comet deployed in a -local Microk8s cluster. +## GitHub CI (Kind) + +The CI runs TPC-H benchmarks on Kind cluster for PRs modifying `native/**/*.rs` or `spark/**/*.scala|java`. +Target: Comet >= 1.1x speedup over Spark. + +## Local Development (Kind) + +```bash +# Setup +./hack/k8s-benchmark-setup.sh + +# Build +make release PROFILES="-Pspark-3.5 -Pscala-2.12" +docker build -t comet-bench:local -f benchmarks/Dockerfile.k8s . +kind load docker-image comet-bench:local --name comet-bench + +# Generate data +./benchmarks/scripts/generate-tpch-data.sh 1 + +# Run benchmarks +./benchmarks/scripts/run-k8s-benchmark.sh spark q1 +./benchmarks/scripts/run-k8s-benchmark.sh comet q1 + +# Compare +python3 benchmarks/scripts/compare-results.py \ + --spark /tmp/comet-bench-results/spark_q1_result.json \ + --comet /tmp/comet-bench-results/comet_q1_result.json \ + --min-speedup 1.1 + +# Cleanup +./hack/k8s-benchmark-setup.sh --delete +``` + +## Microk8s Deployment ## Use Microk8s locally From c81e1533d581c416633817ded02031a5147691cc Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Thu, 19 Feb 2026 18:05:34 +0530 Subject: [PATCH 6/7] Fix setup-builder action to use sudo for apt-get --- .github/actions/setup-builder/action.yaml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/.github/actions/setup-builder/action.yaml b/.github/actions/setup-builder/action.yaml index 0ccd01ad72..66569167e4 100644 --- a/.github/actions/setup-builder/action.yaml +++ b/.github/actions/setup-builder/action.yaml @@ -32,9 +32,8 @@ runs: - name: Install Build Dependencies shell: bash run: | - apt-get update - apt-get install -y protobuf-compiler - apt-get install -y clang + sudo apt-get update + sudo apt-get install -y protobuf-compiler clang - name: Install JDK ${{inputs.jdk-version}} uses: actions/setup-java@v4 From d65fffd19e4a6411a6410c1d4044707afd315dd8 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Thu, 19 Feb 2026 23:09:58 +0530 Subject: [PATCH 7/7] Fix Docker build timeout by using pre-built JAR --- .github/workflows/k8s_benchmark.yml | 8 +++- benchmarks/Dockerfile.k8s | 60 ++--------------------------- 2 files changed, 10 insertions(+), 58 deletions(-) diff --git a/.github/workflows/k8s_benchmark.yml b/.github/workflows/k8s_benchmark.yml index bd960a459b..394e075c16 100644 --- a/.github/workflows/k8s_benchmark.yml +++ b/.github/workflows/k8s_benchmark.yml @@ -104,9 +104,13 @@ jobs: - name: Build benchmark image run: | - docker build -t comet-bench:local -f benchmarks/Dockerfile.k8s . + COMET_JAR=$(find spark/target -name "comet-spark-spark${{ env.SPARK_VERSION }}_${{ env.SCALA_VERSION }}-*.jar" -not -name "*sources*" -not -name "*javadoc*" | head -1) + echo "Using Comet JAR: $COMET_JAR" + docker build -t comet-bench:local \ + --build-arg COMET_JAR=$COMET_JAR \ + -f benchmarks/Dockerfile.k8s . kind load docker-image comet-bench:local --name comet-bench - timeout-minutes: 15 + timeout-minutes: 10 - name: Generate TPC-H data run: ./benchmarks/scripts/generate-tpch-data.sh ${{ env.TPCH_SCALE_FACTOR }} diff --git a/benchmarks/Dockerfile.k8s b/benchmarks/Dockerfile.k8s index 91c99a499b..6096e84954 100644 --- a/benchmarks/Dockerfile.k8s +++ b/benchmarks/Dockerfile.k8s @@ -13,75 +13,23 @@ # See the License for the specific language governing permissions and # limitations under the License. -FROM apache/spark:3.5.8 AS builder - -USER root - -RUN apt-get update \ - && apt-get install -y curl openjdk-17-jdk gcc-10 g++-10 cpp-10 unzip git \ - && apt-get clean \ - && rm -rf /var/lib/apt/lists/* - -ENV CC="gcc-10" -ENV CXX="g++-10" - -RUN PB_REL="https://github.com/protocolbuffers/protobuf/releases" \ - && curl -LO $PB_REL/download/v30.2/protoc-30.2-linux-x86_64.zip \ - && unzip protoc-30.2-linux-x86_64.zip -d /root/.local \ - && rm protoc-30.2-linux-x86_64.zip -ENV PATH="$PATH:/root/.local/bin" - -RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y -ENV PATH="/root/.cargo/bin:${PATH}" -ENV RUSTFLAGS="-C debuginfo=line-tables-only -C incremental=false" -ENV SPARK_VERSION=3.5 -ENV SCALA_VERSION=2.12 - -WORKDIR /comet -COPY native /comet/native -RUN cd native && RUSTFLAGS="-Ctarget-cpu=native" cargo build --release - -COPY .mvn /comet/.mvn -COPY mvnw /comet/mvnw -COPY common /comet/common -COPY dev /comet/dev -COPY spark /comet/spark -COPY spark-integration /comet/spark-integration -COPY scalafmt.conf /comet/scalafmt.conf -COPY .scalafix.conf /comet/.scalafix.conf -COPY Makefile /comet/Makefile -COPY pom.xml /comet/pom.xml - -RUN mkdir -p /root/.m2 && \ - echo 'centralcentralhttps://repo1.maven.org/maven2' > /root/.m2/settings.xml - -RUN cd /comet \ - && JAVA_HOME=$(readlink -f $(which javac) | sed "s/\/bin\/javac//") \ - make release-nogit PROFILES="-Pspark-$SPARK_VERSION -Pscala-$SCALA_VERSION" - FROM apache/spark:3.5.8 USER root -ENV SPARK_VERSION=3.5 -ENV SCALA_VERSION=2.12 - RUN apt-get update \ - && apt-get install -y python3 python3-pip git curl \ + && apt-get install -y python3 python3-pip git curl make gcc \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* -COPY --from=builder /comet/spark/target/comet-spark-spark${SPARK_VERSION}_${SCALA_VERSION}-*.jar $SPARK_HOME/jars/ +ARG COMET_JAR +COPY ${COMET_JAR} $SPARK_HOME/jars/ RUN cd /opt \ - && git clone https://github.com/databricks/tpch-dbgen.git \ + && git clone --depth 1 https://github.com/databricks/tpch-dbgen.git \ && cd tpch-dbgen \ && make -RUN cd /opt \ - && git clone https://github.com/apache/datafusion-benchmarks.git - -COPY benchmarks/pyspark /opt/comet-bench/pyspark COPY benchmarks/scripts /opt/comet-bench/scripts COPY benchmarks/conf /opt/comet-bench/conf