From d8be20670c7b4ce3b015ca52ae6ad6ba12bd007c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 20 Feb 2026 10:35:02 -0700 Subject: [PATCH 1/2] feat: add benchmark automation bot for running TPC-H and microbenchmarks on PRs Adds a GitHub bot that monitors PR comments for slash commands (/run tpch, /run micro, /help) and automatically runs benchmarks in Kubernetes, posting results back as PR comments. Includes CLI for manual benchmark runs, Docker image build/push, K8s job management, and deployment tooling. Co-Authored-By: Claude Opus 4.6 --- dev/benchmarking-bot/.gitignore | 5 + dev/benchmarking-bot/Dockerfile | 584 +++++++++++++++++ dev/benchmarking-bot/README.md | 141 ++++ dev/benchmarking-bot/authorized_users.txt | 59 ++ dev/benchmarking-bot/deploy/deploy.sh | 183 ++++++ dev/benchmarking-bot/k8s/bot-deployment.yaml | 37 ++ .../k8s/comet-job-template.yaml | 65 ++ dev/benchmarking-bot/pyproject.toml | 22 + dev/benchmarking-bot/src/cometbot/__init__.py | 3 + dev/benchmarking-bot/src/cometbot/bot.py | 603 ++++++++++++++++++ dev/benchmarking-bot/src/cometbot/cli.py | 339 ++++++++++ dev/benchmarking-bot/src/cometbot/k8s.py | 466 ++++++++++++++ dev/benchmarking-bot/src/cometbot/slack.py | 41 ++ .../source/contributor-guide/benchmark-bot.md | 254 ++++++++ docs/source/contributor-guide/index.md | 1 + 15 files changed, 2803 insertions(+) create mode 100644 dev/benchmarking-bot/.gitignore create mode 100644 dev/benchmarking-bot/Dockerfile create mode 100644 dev/benchmarking-bot/README.md create mode 100644 dev/benchmarking-bot/authorized_users.txt create mode 100755 dev/benchmarking-bot/deploy/deploy.sh create mode 100644 dev/benchmarking-bot/k8s/bot-deployment.yaml create mode 100644 dev/benchmarking-bot/k8s/comet-job-template.yaml create mode 100644 dev/benchmarking-bot/pyproject.toml create mode 100644 dev/benchmarking-bot/src/cometbot/__init__.py create mode 100644 dev/benchmarking-bot/src/cometbot/bot.py create mode 100644 dev/benchmarking-bot/src/cometbot/cli.py create mode 100644 dev/benchmarking-bot/src/cometbot/k8s.py create mode 100644 dev/benchmarking-bot/src/cometbot/slack.py create mode 100644 docs/source/contributor-guide/benchmark-bot.md diff --git a/dev/benchmarking-bot/.gitignore b/dev/benchmarking-bot/.gitignore new file mode 100644 index 0000000000..cfd5c5b080 --- /dev/null +++ b/dev/benchmarking-bot/.gitignore @@ -0,0 +1,5 @@ +dist/ +venv/ +__pycache__/ +*.egg-info/ +src/cometbot/_build_info.py diff --git a/dev/benchmarking-bot/Dockerfile b/dev/benchmarking-bot/Dockerfile new file mode 100644 index 0000000000..e91d06d30e --- /dev/null +++ b/dev/benchmarking-bot/Dockerfile @@ -0,0 +1,584 @@ +# Dockerfile for Comet benchmarks (TPC-H and microbenchmarks) +# Includes: JDK 17, Rust, Maven, Spark 3.5.x, Python + +FROM eclipse-temurin:17-jdk-jammy + +# Install system dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + git \ + curl \ + wget \ + build-essential \ + pkg-config \ + libssl-dev \ + python3 \ + python3-pip \ + unzip \ + jq \ + && rm -rf /var/lib/apt/lists/* + +# Install newer protoc (3.15+ required for proto3 optional fields) +ARG PROTOC_VERSION=25.1 +RUN curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-x86_64.zip \ + && unzip protoc-${PROTOC_VERSION}-linux-x86_64.zip -d /usr/local \ + && rm protoc-${PROTOC_VERSION}-linux-x86_64.zip + +# Install Rust +RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y +ENV PATH="/root/.cargo/bin:${PATH}" + +# Install Maven +ARG MAVEN_VERSION=3.9.6 +RUN wget -q https://archive.apache.org/dist/maven/maven-3/${MAVEN_VERSION}/binaries/apache-maven-${MAVEN_VERSION}-bin.tar.gz \ + && tar -xzf apache-maven-${MAVEN_VERSION}-bin.tar.gz -C /opt \ + && rm apache-maven-${MAVEN_VERSION}-bin.tar.gz \ + && ln -s /opt/apache-maven-${MAVEN_VERSION} /opt/maven +ENV MAVEN_HOME=/opt/maven +ENV PATH="${MAVEN_HOME}/bin:${PATH}" + +# Install Spark +ARG SPARK_VERSION=3.5.3 +ARG HADOOP_VERSION=3 +RUN wget -q https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz \ + && tar -xzf spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz -C /opt \ + && rm spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz \ + && ln -s /opt/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION} /opt/spark +ENV SPARK_HOME=/opt/spark +ENV PATH="${SPARK_HOME}/bin:${SPARK_HOME}/sbin:${PATH}" + +# Install PySpark +RUN pip3 install --no-cache-dir pyspark==${SPARK_VERSION} + +# Clone TPC-H queries from datafusion-benchmarks +RUN git clone --depth 1 https://github.com/apache/datafusion-benchmarks.git /tmp/datafusion-benchmarks \ + && mkdir -p /opt/tpch-queries \ + && cp -r /tmp/datafusion-benchmarks/tpch/queries/* /opt/tpch-queries/ \ + && rm -rf /tmp/datafusion-benchmarks + +# Create non-root user +RUN useradd -m -u 1000 bench \ + && mkdir -p /app \ + && chown bench:bench /app + +# Move cargo/rustup to shared locations accessible by bench user +RUN mv /root/.cargo /opt/cargo && mv /root/.rustup /opt/rustup \ + && chown -R bench:bench /opt/cargo /opt/rustup +ENV CARGO_HOME=/opt/cargo +ENV RUSTUP_HOME=/opt/rustup +ENV PATH="/opt/cargo/bin:${PATH}" + +# Set working directory +WORKDIR /app + +# Create entrypoint script +COPY <<'EOF' /app/entrypoint.sh +#!/bin/bash +set -e + +# Environment variables expected: +# - PR_NUMBER: GitHub PR number to benchmark +# - BENCHMARK_MODE: "tpch" (default) or "micro" +# - GITHUB_TOKEN: (optional) GitHub token to post results as PR comment +# +# For TPC-H mode: +# - TPCH_DATA: Path to TPC-H data (default: /data/tpch/sf100) +# - TPCH_QUERIES: Path to TPC-H queries (default: /opt/tpch-queries) +# - ITERATIONS: Number of iterations (default: 1) +# +# For Micro mode: +# - MICRO_BENCHMARK: Benchmark class name (e.g., "CometStringExpressionBenchmark") + +# Workaround: copy rustup/cargo to writable tmpfs to avoid overlayfs +# cross-device link errors when rustup updates toolchains at runtime +if [ -d /opt/rustup ]; then + echo "=== Preparing Rust toolchain ===" + cp -a /opt/rustup /tmp/rustup + cp -a /opt/cargo /tmp/cargo + export RUSTUP_HOME=/tmp/rustup + export CARGO_HOME=/tmp/cargo + export PATH="/tmp/cargo/bin:$PATH" +fi + +if [ -z "$PR_NUMBER" ]; then + echo "ERROR: PR_NUMBER environment variable is required" + exit 1 +fi + +BENCHMARK_MODE="${BENCHMARK_MODE:-tpch}" + +echo "=== Comet Benchmark ===" +echo "PR: #${PR_NUMBER}" +echo "Mode: ${BENCHMARK_MODE}" +echo "" + +# Clone the repository +echo "=== Cloning apache/datafusion-comet ===" +git clone --depth 50 https://github.com/apache/datafusion-comet.git /app/comet +cd /app/comet + +# Fetch and checkout PR +echo "=== Fetching PR #${PR_NUMBER} ===" +git fetch origin pull/${PR_NUMBER}/head:pr-${PR_NUMBER} +git checkout pr-${PR_NUMBER} +COMMIT_SHA=$(git rev-parse --short HEAD) +COMMIT_MSG=$(git log -1 --pretty=format:'%s') +echo "Commit: ${COMMIT_SHA} - ${COMMIT_MSG}" +echo "" + +# Build Comet +echo "=== Building Comet (make release) ===" +make release + +# Find the built JAR +COMET_JAR=$(find /app/comet/spark/target -name "comet-spark-spark*.jar" -not -name "*sources*" -not -name "*javadoc*" | head -1) +if [ -z "$COMET_JAR" ]; then + echo "ERROR: Could not find Comet JAR in spark/target/" + ls -la /app/comet/spark/target/ + exit 1 +fi +echo "Found Comet JAR: ${COMET_JAR}" +export COMET_JAR + +# Create output directory +mkdir -p /app/output + +if [ "$BENCHMARK_MODE" = "micro" ]; then + ################### + # Microbenchmark mode + ################### + if [ -z "$MICRO_BENCHMARK" ]; then + echo "ERROR: MICRO_BENCHMARK environment variable is required for micro mode" + exit 1 + fi + + echo "" + echo "=== Running Microbenchmark: ${MICRO_BENCHMARK} ===" + + # Run the microbenchmark with output generation + export SPARK_GENERATE_BENCHMARK_FILES=1 + cd /app/comet + + # Capture output to file + MICRO_OUTPUT="/app/output/micro-benchmark.txt" + make benchmark-org.apache.spark.sql.benchmark.${MICRO_BENCHMARK} 2>&1 | tee "$MICRO_OUTPUT" + + echo "" + echo "=== Microbenchmark Complete ===" + + # Post results to GitHub PR if token is provided + if [ -n "$GITHUB_TOKEN" ]; then + echo "" + echo "=== Posting Results to GitHub PR ===" + + # Extract benchmark results and create markdown comment + COMMENT_BODY=$(python3 << PYTHON +import re +import sys + +benchmark_name = "${MICRO_BENCHMARK}" +commit_sha = "${COMMIT_SHA}" +commit_msg = "${COMMIT_MSG}" + +# Read the benchmark output +with open("$MICRO_OUTPUT") as f: + content = f.read() + +lines = [] +lines.append(f"## Comet Microbenchmark Results: {benchmark_name}") +lines.append("") +lines.append(f"**Commit:** \`{commit_sha}\` - {commit_msg}") +lines.append("") + +# Extract benchmark result tables +# Format looks like: +# OpenJDK 64-Bit Server VM ... +# AMD Ryzen ... +# Benchmark Name: Best Time(ms) Avg Time(ms) ... +# -------------------------------------------------------- +# Spark 82 83 ... +# Comet (Scan) 79 80 ... + +result_tables = [] +current_table = [] +in_table = False + +for line in content.split('\n'): + # Detect start of a result table (JVM info line) + if 'OpenJDK' in line or 'Java HotSpot' in line: + if current_table: + result_tables.append(current_table) + current_table = [line] + in_table = True + continue + + if in_table: + # End of table detection + if line.startswith('Running benchmark:') or line.startswith('[INFO]') or line.startswith('make'): + if current_table: + result_tables.append(current_table) + current_table = [] + in_table = False + continue + + # Skip empty lines between tables + if not line.strip() and len(current_table) > 5: + result_tables.append(current_table) + current_table = [] + in_table = False + continue + + current_table.append(line) + +# Don't forget the last table +if current_table: + result_tables.append(current_table) + +# Format results +if result_tables: + lines.append("### Benchmark Results") + lines.append("") + lines.append("\`\`\`") + + # Include all tables (GitHub comments can be up to 65536 chars) + total_lines = 0 + max_lines = 500 + + for table in result_tables: + if total_lines >= max_lines: + lines.append(f"... (truncated, {len(result_tables)} total benchmarks)") + break + for tline in table: + if total_lines >= max_lines: + break + lines.append(tline) + total_lines += 1 + lines.append("") # blank line between tables + total_lines += 1 + + lines.append("\`\`\`") +else: + # Fallback: look for lines with "Best Time" or result data + lines.append("### Raw Output (last 80 lines)") + lines.append("") + lines.append("\`\`\`") + for line in content.split('\n')[-80:]: + if line.strip() and not line.startswith('[INFO]'): + lines.append(line) + lines.append("\`\`\`") + +lines.append("") +lines.append("---") +lines.append("*Automated benchmark run by cometbot*") + +print("\n".join(lines)) +PYTHON +) + + # Post comment to PR + RESPONSE=$(curl -s -w "\n%{http_code}" -X POST \ + -H "Authorization: token $GITHUB_TOKEN" \ + -H "Accept: application/vnd.github.v3+json" \ + -H "Content-Type: application/json" \ + "https://api.github.com/repos/apache/datafusion-comet/issues/${PR_NUMBER}/comments" \ + -d "$(jq -n --arg body "$COMMENT_BODY" '{body: $body}')") + + HTTP_CODE=$(echo "$RESPONSE" | tail -1) + BODY=$(echo "$RESPONSE" | sed '$d') + + if [ "$HTTP_CODE" = "201" ]; then + COMMENT_URL=$(echo "$BODY" | jq -r '.html_url') + echo "Successfully posted comment: $COMMENT_URL" + else + echo "Failed to post comment (HTTP $HTTP_CODE):" + echo "$BODY" + fi + else + echo "" + echo "GITHUB_TOKEN not set, skipping PR comment" + fi + +else + ################### + # TPC-H mode (default) + ################### + TPCH_DATA="${TPCH_DATA:-/data/tpch/sf100}" + ITERATIONS="${ITERATIONS:-1}" + TPCH_QUERIES="${TPCH_QUERIES:-/opt/tpch-queries}" + BASELINE_BRANCH="${BASELINE_BRANCH:-main}" + + echo "Data path: ${TPCH_DATA}" + echo "Queries path: ${TPCH_QUERIES}" + echo "Iterations: ${ITERATIONS}" + echo "Baseline branch: ${BASELINE_BRANCH}" + + # Use SPARK_MASTER from environment (default to local[*] if not set) + SPARK_MASTER="${SPARK_MASTER:-local[*]}" + echo "Spark master: ${SPARK_MASTER}" + + # Parse COMET_CONFIGS (comma-separated key=value pairs) + EXTRA_CONF_ARGS="" + if [ -n "$COMET_CONFIGS" ]; then + echo "Extra Comet configs: ${COMET_CONFIGS}" + IFS=',' read -ra CONFIGS <<< "$COMET_CONFIGS" + for conf in "${CONFIGS[@]}"; do + if [ -n "$conf" ]; then + EXTRA_CONF_ARGS="$EXTRA_CONF_ARGS --conf $conf" + fi + done + fi + + # Function to run TPC-H benchmark with current build + run_tpch() { + local output_dir="$1" + local label="$2" + local jar + jar=$(find /app/comet/spark/target -name "comet-spark-spark*.jar" -not -name "*sources*" -not -name "*javadoc*" | head -1) + if [ -z "$jar" ]; then + echo "ERROR: Could not find Comet JAR in spark/target/" + ls -la /app/comet/spark/target/ + exit 1 + fi + echo "Found Comet JAR for ${label}: ${jar}" + + mkdir -p "$output_dir" + cd /app/comet/dev/benchmarks + + echo "" + echo "=== Running TPC-H Benchmark (${label}) ===" + $SPARK_HOME/bin/spark-submit \ + --master $SPARK_MASTER \ + --jars $jar \ + --driver-class-path $jar \ + --conf spark.driver.memory=32G \ + --conf spark.driver.cores=8 \ + --conf spark.memory.offHeap.enabled=true \ + --conf spark.memory.offHeap.size=24g \ + --conf spark.driver.extraClassPath=$jar \ + --conf spark.plugins=org.apache.spark.CometPlugin \ + --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ + --conf spark.comet.exec.replaceSortMergeJoin=true \ + --conf spark.comet.expression.Cast.allowIncompatible=true \ + $EXTRA_CONF_ARGS \ + tpcbench.py \ + --name comet \ + --benchmark tpch \ + --data $TPCH_DATA \ + --queries $TPCH_QUERIES \ + --output "$output_dir" \ + --iterations $ITERATIONS \ + --format parquet + } + + # ---- Baseline benchmark ---- + echo "" + echo "=== Checking out baseline branch: ${BASELINE_BRANCH} ===" + cd /app/comet + git fetch origin ${BASELINE_BRANCH} + git checkout origin/${BASELINE_BRANCH} + BASELINE_SHA=$(git rev-parse --short HEAD) + echo "Baseline commit: ${BASELINE_SHA}" + + echo "=== Building baseline (make release) ===" + make release + + run_tpch /app/output/baseline "baseline" + + BASELINE_RESULTS=$(ls /app/output/baseline/*.json | head -1) + echo "" + echo "Baseline results:" + cat "$BASELINE_RESULTS" + + # ---- PR benchmark ---- + echo "" + echo "=== Cleaning build for PR ===" + cd /app/comet + make clean + + echo "=== Checking out PR #${PR_NUMBER} ===" + git checkout pr-${PR_NUMBER} + echo "PR commit: ${COMMIT_SHA}" + + echo "=== Building PR (make release) ===" + make release + + run_tpch /app/output/pr "PR #${PR_NUMBER}" + + PR_RESULTS=$(ls /app/output/pr/*.json | head -1) + echo "" + echo "PR results:" + cat "$PR_RESULTS" + + echo "" + echo "=== Benchmark Complete ===" + + # Post results to GitHub PR if token is provided + if [ -n "$GITHUB_TOKEN" ]; then + echo "" + echo "=== Posting Results to GitHub PR ===" + + # Parse results and create markdown comparison table + COMMENT_BODY=$(python3 << PYTHON +import json +import os + +with open("$BASELINE_RESULTS") as f: + baseline = json.load(f) +with open("$PR_RESULTS") as f: + pr = json.load(f) + +baseline_branch = "${BASELINE_BRANCH}" +baseline_sha = "${BASELINE_SHA}" +pr_sha = "${COMMIT_SHA}" +pr_msg = "${COMMIT_MSG}" + +lines = [] +lines.append("## Comet TPC-H Benchmark Results") +lines.append("") +lines.append(f"**Baseline:** \`{baseline_branch}\` (\`{baseline_sha}\`)") +lines.append(f"**PR:** \`{pr_sha}\` - {pr_msg}") +lines.append(f"**Scale Factor:** SF100") +lines.append(f"**Iterations:** ${ITERATIONS}") +lines.append("") +lines.append("### Query Times") +lines.append("") +lines.append("| Query | Baseline Avg (s) | Baseline Best (s) | PR Avg (s) | PR Best (s) | Change (Avg) | Change (Best) |") +lines.append("|-------|-----------------|-------------------|-----------|------------|-------------|--------------|") + +baseline_avg_total = 0.0 +baseline_best_total = 0.0 +pr_avg_total = 0.0 +pr_best_total = 0.0 +for i in range(1, 23): + key = str(i) + b_val = baseline.get(key) + p_val = pr.get(key) + if b_val is None or p_val is None: + lines.append(f"| Q{i} | N/A | N/A | N/A | N/A | | |") + continue + b_list = b_val if isinstance(b_val, list) else [b_val] + p_list = p_val if isinstance(p_val, list) else [p_val] + b_avg = sum(b_list) / len(b_list) + b_best = min(b_list) + p_avg = sum(p_list) / len(p_list) + p_best = min(p_list) + baseline_avg_total += b_avg + baseline_best_total += b_best + pr_avg_total += p_avg + pr_best_total += p_best + + def fmt_change(base, pr_val): + if base > 0: + pct = (pr_val - base) / base * 100 + if pct < -5: + icon = ":green_circle:" + elif pct > 5: + icon = ":red_circle:" + else: + icon = ":white_circle:" + return f"{icon} {pct:+.1f}%" + return "" + + avg_change = fmt_change(b_avg, p_avg) + best_change = fmt_change(b_best, p_best) + lines.append(f"| Q{i} | {b_avg:.2f} | {b_best:.2f} | {p_avg:.2f} | {p_best:.2f} | {avg_change} | {best_change} |") + +# Total row +def fmt_total_change(base, pr_val): + if base > 0: + pct = (pr_val - base) / base * 100 + if pct < -5: + icon = ":green_circle:" + elif pct > 5: + icon = ":red_circle:" + else: + icon = ":white_circle:" + return f"{icon} {pct:+.1f}%" + return "" + +total_avg_change = fmt_total_change(baseline_avg_total, pr_avg_total) +total_best_change = fmt_total_change(baseline_best_total, pr_best_total) +lines.append(f"| **Total** | **{baseline_avg_total:.2f}** | **{baseline_best_total:.2f}** | **{pr_avg_total:.2f}** | **{pr_best_total:.2f}** | {total_avg_change} | {total_best_change} |") + +lines.append("") + +# Add collapsible Spark configuration section +spark_conf = pr.get("spark_conf", {}) +spark_master = spark_conf.get("spark.master", "unknown") +driver_memory = spark_conf.get("spark.driver.memory", "unknown") +driver_cores = spark_conf.get("spark.driver.cores", "unknown") +executor_cores = spark_conf.get("spark.executor.cores", driver_cores) +executor_memory = spark_conf.get("spark.executor.memory", driver_memory) +offheap_enabled = spark_conf.get("spark.memory.offHeap.enabled", "false") +offheap_size = spark_conf.get("spark.memory.offHeap.size", "unknown") +shuffle_manager = spark_conf.get("spark.shuffle.manager", "unknown") +comet_replace_smj = spark_conf.get("spark.comet.exec.replaceSortMergeJoin", "unknown") + +comet_configs_env = os.environ.get("COMET_CONFIGS", "") +user_configs = {} +if comet_configs_env: + for conf in comet_configs_env.split(","): + if "=" in conf: + k, v = conf.split("=", 1) + user_configs[k] = v + +lines.append("
") +lines.append("Spark Configuration") +lines.append("") +lines.append("| Setting | Value |") +lines.append("|---------|-------|") +lines.append(f"| Spark Master | \`{spark_master}\` |") +lines.append(f"| Driver Memory | {driver_memory} |") +lines.append(f"| Driver Cores | {driver_cores} |") +lines.append(f"| Executor Memory | {executor_memory} |") +lines.append(f"| Executor Cores | {executor_cores} |") +lines.append(f"| Off-Heap Enabled | {offheap_enabled} |") +lines.append(f"| Off-Heap Size | {offheap_size} |") +lines.append(f"| Shuffle Manager | \`{shuffle_manager.split('.')[-1]}\` |") +lines.append(f"| Comet Replace SMJ | {comet_replace_smj} |") + +if user_configs: + lines.append("") + lines.append("**User-specified configs:**") + lines.append("") + for k, v in user_configs.items(): + lines.append(f"| {k} | \`{v}\` |") + +lines.append("") +lines.append("
") +lines.append("") +lines.append("---") +lines.append("*Automated benchmark run by cometbot*") + +print("\n".join(lines)) +PYTHON +) + + # Post comment to PR + RESPONSE=$(curl -s -w "\n%{http_code}" -X POST \ + -H "Authorization: token $GITHUB_TOKEN" \ + -H "Accept: application/vnd.github.v3+json" \ + -H "Content-Type: application/json" \ + "https://api.github.com/repos/apache/datafusion-comet/issues/${PR_NUMBER}/comments" \ + -d "$(jq -n --arg body "$COMMENT_BODY" '{body: $body}')") + + HTTP_CODE=$(echo "$RESPONSE" | tail -1) + BODY=$(echo "$RESPONSE" | sed '$d') + + if [ "$HTTP_CODE" = "201" ]; then + COMMENT_URL=$(echo "$BODY" | jq -r '.html_url') + echo "Successfully posted comment: $COMMENT_URL" + else + echo "Failed to post comment (HTTP $HTTP_CODE):" + echo "$BODY" + fi + else + echo "" + echo "GITHUB_TOKEN not set, skipping PR comment" + fi +fi +EOF + +RUN chmod +x /app/entrypoint.sh + +USER bench +ENTRYPOINT ["/app/entrypoint.sh"] diff --git a/dev/benchmarking-bot/README.md b/dev/benchmarking-bot/README.md new file mode 100644 index 0000000000..37cbab6b71 --- /dev/null +++ b/dev/benchmarking-bot/README.md @@ -0,0 +1,141 @@ +# Comet Benchmark Automation Bot + +Automated benchmarking for [Apache DataFusion Comet](https://github.com/apache/datafusion-comet) PRs. Runs TPC-H and microbenchmarks in Kubernetes, triggered by GitHub PR comments. + +## GitHub Bot + +Authorized users can trigger benchmarks by commenting on a Comet PR with slash commands. The bot picks up the comment, runs the benchmark in Kubernetes, and posts results back to the PR. + +### Commands + +``` +/run tpch +/run tpch --iterations 3 +/run tpch --baseline my-branch +/run tpch --conf spark.comet.exec.enabled=true +/run micro CometStringExpressionBenchmark +/run micro CometStringExpressionBenchmark --baseline my-branch +/help +``` + +### Reactions + +The bot uses reactions to signal status: +- :eyes: -- request acknowledged, job submitted +- :rocket: -- job completed successfully +- :thumbsdown: -- job failed or invalid request + +## CLI Usage + +### Install + +```bash +cd dev/benchmarking-bot +pip install -e . +``` + +### Build and run + +```bash +# Build the Comet benchmark Docker image +cometbot comet build + +# Run TPC-H benchmark for a PR +cometbot comet run --pr 1234 + +# With options +cometbot comet run --pr 1234 --iterations 3 --no-cleanup + +# Run microbenchmark instead +cometbot comet run --pr 1234 --micro CometStringExpressionBenchmark + +# Pass Spark/Comet configuration +cometbot comet run --pr 1234 --conf spark.comet.exec.enabled=true +``` + +### Manage jobs + +```bash +cometbot comet status # List all jobs +cometbot comet status --pr 1234 # Check specific PR +cometbot comet logs --pr 1234 # Get logs +cometbot comet logs --pr 1234 -f # Follow logs +cometbot comet delete --pr 1234 # Delete job +``` + +### Run the bot + +```bash +# Start continuous polling +cometbot bot start --github-token + +# Single poll (for testing) +cometbot bot poll-once --github-token +``` + +## Deployment + +Deploy to a remote host: + +```bash +export COMETBOT_DEPLOY_HOST=myhost +export COMETBOT_DEPLOY_USER=myuser +export COMETBOT_DEPLOY_DIR=/home/myuser/cometbot +./deploy/deploy.sh +``` + +This script: +1. Builds the Python wheel +2. Copies files to the remote host +3. Builds the Docker image on the remote host +4. Installs the wheel in a virtualenv +5. Generates and installs a systemd service (`cometbot`) + +### Environment variables + +**Required for deployment:** + +| Variable | Description | +|----------|-------------| +| `COMETBOT_DEPLOY_HOST` | Remote hostname to deploy to | +| `COMETBOT_DEPLOY_USER` | SSH username on remote host | +| `COMETBOT_DEPLOY_DIR` | Installation directory on remote host | + +**Runtime (set in `$COMETBOT_DEPLOY_DIR/env` on the deployment host):** + +| Variable | Description | +|----------|-------------| +| `COMETBOT_GITHUB_TOKEN` | GitHub token for API access and posting results | +| `COMETBOT_REGISTRY` | Docker registry for benchmark images (default: `localhost:5000`) | +| `COMETBOT_SLACK_TOKEN` | (Optional) Slack bot token for notifications | +| `COMETBOT_SLACK_CHANNEL` | (Optional) Slack channel for notifications | + +## Security Considerations + +**This bot executes arbitrary code from pull requests.** Anyone who can trigger a benchmark run can +execute code on your Kubernetes cluster. Before deploying, understand these risks: + +- **Code execution**: Benchmark jobs clone a PR branch, build it, and run it inside a container. A + malicious PR could contain code that exfiltrates secrets, attacks the network, or compromises the + node. +- **Authorization is the primary control**: Only GitHub usernames listed in `authorized_users.txt` + can trigger benchmarks. Keep this list to trusted committers and review it regularly. +- **GitHub token scope**: The `COMETBOT_GITHUB_TOKEN` is passed into benchmark containers so they + can post results. Use a token with minimal scope (only `repo` or a fine-grained token limited to + the target repository). Never use a token with org-wide admin permissions. +- **Network isolation**: Benchmark containers have access to the pod network. Use Kubernetes + NetworkPolicies to restrict egress if your cluster contains sensitive services. +- **hostPath volumes**: The TPC-H data mount uses `hostPath`, which gives containers access to the + host filesystem at that path. Do not widen this mount. +- **No sandboxing**: Jobs run as regular containers, not in a sandboxed runtime. Consider using + gVisor, Kata Containers, or a dedicated node pool for benchmark workloads. +- **Resource limits**: The job template should include resource limits to prevent a single benchmark + from consuming all cluster resources. Review `k8s/comet-job-template.yaml` before deploying. + +## Architecture + +- **Bot** (`src/cometbot/bot.py`): Polls GitHub for `/run` slash commands on open Comet PRs +- **K8s** (`src/cometbot/k8s.py`): Builds Docker images, creates/manages Kubernetes jobs +- **CLI** (`src/cometbot/cli.py`): Click-based CLI for manual benchmark runs and bot management +- **Dockerfile**: Builds a container with JDK 17, Rust, Maven, and Spark 3.5 for running benchmarks +- **K8s templates** (`k8s/`): Job and deployment manifests diff --git a/dev/benchmarking-bot/authorized_users.txt b/dev/benchmarking-bot/authorized_users.txt new file mode 100644 index 0000000000..906e356e1f --- /dev/null +++ b/dev/benchmarking-bot/authorized_users.txt @@ -0,0 +1,59 @@ +# GitHub usernames of authorized users (DataFusion committers). +# One username per line. Lines starting with # are comments. +# +# Source: https://projects.apache.org/committee.html?datafusion +# +# Apache ID -> GitHub username +adriangb # Adrian Garcia Badaracco +andygrove # agrove - Andrew Grove +mustafasrepo # akurmustafa - Mustafa Akur +alamb # alamb - Andrew Lamb +avantgardnerio # avantgardner - Brent Gardner +berkaysynnada # berkay - Berkay Sahin +blaginin # blaginin - Dmitrii Blaginin +comphead # comphead - Oleks V +Dandandan # dheres - Daniel Heres +findepi # findepi - Piotr Findeisen +gabotechs # gabotechs - Gabriel Musat +goldmedal # goldmedal - Jay Shin +houqp # houqp - QP Hou +huaxingao # huaxingao - Huaxin Gao +isidentical # iffyio - Batuhan Taskaya +jackwener # jakevin - Jack Wener +jayzhan211 # jayzhan - Jay Zhan +Jefffrey # jeffreyvo - Jeffrey +jiangzhx # jiayuliu - Jiayu Liu +jonahgao # jonah - Jonah Gao +korowa # korowa - Oleks Koval +kazuyukitanimura # kazuyukitanimura - Kazuyuki Tanimura +kosiew # kosiew - Ko Siew +lewiszlw # linwei - Lin Wei +liukun4515 # liukun - Kun Liu +mbutrovich # mbutrovich - Matt Butrovich +metesynnada # mete - Mehmet Ozan Kabak +milenkovicm # milenkovicm - Marko Milenkovic +mingmwang # mingmwang - Mingming Wang +matthewmturner # mjward - Matthew Turner +crepererum # mneumann - Marco Neumann +yahoNanJing # nju_yaho - Yaho +nuno-faria # nunofaria - Nuno Faria +ozankabak # ozankabak - Ozan Kabak (see also metesynnada) +paddyhoran # paddyhoran - Paddy Horan +parthchandra # parthc - Parth Chandra +rdettai # rdettai - Raphael Dettai (not in contributor list, keep Apache ID) +rluvaton # rluvaton - Raz Luvaton +sunchao # sunchao - Chao Sun +thinkharderdev # thinkharderdev - Andrew Lamb (alt) +timsaucer # timsaucer - Tim Saucer +tustvold # tustvold - Raphael Taylor-Davies +viirya # viirya - Liang-Chi Hsieh +waynexia # wayne - Wayne Xia +Weijun-H # weijun - Weijun Huang +wesm # wesm - Wes McKinney +wjones127 # wjones127 - Will Jones +xudong963 # xudong963 - Xudong +Ted-Jiang # yangjiang - Ted Jiang +ycohen # ycohen - Yoni Cohen (not in contributor list, keep Apache ID) +yjshen # yjshen - Yijie Shen +2010YOUY01 # ytyou - Yongting You +zhuqi-lucas # zhuqi - Zhu Qi diff --git a/dev/benchmarking-bot/deploy/deploy.sh b/dev/benchmarking-bot/deploy/deploy.sh new file mode 100755 index 0000000000..99a69a64ef --- /dev/null +++ b/dev/benchmarking-bot/deploy/deploy.sh @@ -0,0 +1,183 @@ +#!/bin/bash +# Deploy cometbot to a remote host +# +# Required environment variables: +# COMETBOT_DEPLOY_HOST - Remote hostname +# COMETBOT_DEPLOY_USER - Remote SSH username +# COMETBOT_DEPLOY_DIR - Remote installation directory (e.g., /home/user/cometbot) +# +# Optional environment variables: +# COMETBOT_GITHUB_TOKEN - GitHub token for API access +# COMETBOT_SLACK_TOKEN - Slack bot token for notifications +# COMETBOT_SLACK_CHANNEL - Slack channel for notifications +# +# Prerequisites on remote host: +# - Python 3.10+ +# - pip +# - kubectl (configured with cluster access) +# - docker + +set -e + +# Required configuration - fail if not set +: "${COMETBOT_DEPLOY_HOST:?Error: COMETBOT_DEPLOY_HOST environment variable is required}" +: "${COMETBOT_DEPLOY_USER:?Error: COMETBOT_DEPLOY_USER environment variable is required}" +: "${COMETBOT_DEPLOY_DIR:?Error: COMETBOT_DEPLOY_DIR environment variable is required}" + +REMOTE_HOST="$COMETBOT_DEPLOY_HOST" +REMOTE_USER="$COMETBOT_DEPLOY_USER" +REMOTE_DIR="$COMETBOT_DEPLOY_DIR" +GITHUB_TOKEN="${COMETBOT_GITHUB_TOKEN:-}" +SLACK_TOKEN="${COMETBOT_SLACK_TOKEN:-}" +SLACK_CH="${COMETBOT_SLACK_CHANNEL:-}" + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' # No Color + +echo -e "${GREEN}=== Deploying cometbot to ${REMOTE_HOST} ===${NC}" + +# Get the directory of this script +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_DIR="$(dirname "$SCRIPT_DIR")" + +# Generate build timestamp +BUILD_TIMESTAMP=$(date -u +"%Y-%m-%d %H:%M:%S UTC") +echo -e "${YELLOW}Build timestamp: ${BUILD_TIMESTAMP}${NC}" + +# Write build info to package +cat > "$PROJECT_DIR/src/cometbot/_build_info.py" << PYEOF +# Auto-generated by deploy script - do not edit +BUILD_TIMESTAMP = "${BUILD_TIMESTAMP}" +PYEOF + +# Build the wheel +echo -e "${YELLOW}Building wheel...${NC}" +cd "$PROJECT_DIR" +rm -rf dist/ # Clean old wheels +uv build --wheel --out-dir dist + +# Find the wheel file +WHEEL=$(ls -t dist/cometbot-*.whl | head -1) +if [ -z "$WHEEL" ]; then + echo -e "${RED}Error: No wheel file found${NC}" + exit 1 +fi +echo -e "${GREEN}Built: ${WHEEL}${NC}" + +# Copy files to remote host +echo -e "${YELLOW}Copying files to ${REMOTE_HOST}...${NC}" +ssh "${REMOTE_USER}@${REMOTE_HOST}" "mkdir -p ${REMOTE_DIR}/k8s" +scp "$WHEEL" "${REMOTE_USER}@${REMOTE_HOST}:${REMOTE_DIR}/" + +# Copy Dockerfile and k8s templates +scp "$PROJECT_DIR/Dockerfile" "${REMOTE_USER}@${REMOTE_HOST}:${REMOTE_DIR}/" +scp "$PROJECT_DIR/k8s/"*.yaml "${REMOTE_USER}@${REMOTE_HOST}:${REMOTE_DIR}/k8s/" +scp "$PROJECT_DIR/authorized_users.txt" "${REMOTE_USER}@${REMOTE_HOST}:${REMOTE_DIR}/" + +# Build Docker image on remote host +echo -e "${YELLOW}Building Docker image on ${REMOTE_HOST}...${NC}" +ssh "${REMOTE_USER}@${REMOTE_HOST}" bash < /dev/null +echo "cometbot installed successfully" +EOF + +# Set up systemd service +echo -e "${YELLOW}Setting up systemd service...${NC}" +ssh "${REMOTE_USER}@${REMOTE_HOST}" bash < ${REMOTE_DIR}/cometbot.service < ${REMOTE_DIR}/env <<'ENVEOF' +COMETBOT_GITHUB_TOKEN= +COMETBOT_REGISTRY=localhost:5000 +COMETBOT_SLACK_TOKEN= +COMETBOT_SLACK_CHANNEL= +ENVEOF + chmod 600 ${REMOTE_DIR}/env +fi + +# Update the env file with tokens if provided +if [ -n "${GITHUB_TOKEN}" ]; then + sed -i "s|^COMETBOT_GITHUB_TOKEN=.*|COMETBOT_GITHUB_TOKEN=${GITHUB_TOKEN}|" ${REMOTE_DIR}/env +fi +if [ -n "${SLACK_TOKEN}" ]; then + sed -i "s|^COMETBOT_SLACK_TOKEN=.*|COMETBOT_SLACK_TOKEN=${SLACK_TOKEN}|" ${REMOTE_DIR}/env +fi +if [ -n "${SLACK_CH}" ]; then + sed -i "s|^COMETBOT_SLACK_CHANNEL=.*|COMETBOT_SLACK_CHANNEL=${SLACK_CH}|" ${REMOTE_DIR}/env +fi + +# Install the service (requires sudo) +sudo cp ${REMOTE_DIR}/cometbot.service /etc/systemd/system/ +sudo systemctl daemon-reload +sudo systemctl enable cometbot + +echo "Service installed. To start: sudo systemctl start cometbot" +echo "To check status: sudo systemctl status cometbot" +echo "To view logs: sudo journalctl -u cometbot -f" +EOF + +# Restart the bot and tail logs +echo -e "${YELLOW}Restarting cometbot service...${NC}" +ssh "${REMOTE_USER}@${REMOTE_HOST}" "sudo systemctl restart cometbot" +echo -e "${GREEN}=== Deployment complete, tailing logs (Ctrl-C to stop) ===${NC}" +ssh "${REMOTE_USER}@${REMOTE_HOST}" "sudo journalctl -u cometbot -f" diff --git a/dev/benchmarking-bot/k8s/bot-deployment.yaml b/dev/benchmarking-bot/k8s/bot-deployment.yaml new file mode 100644 index 0000000000..1e7bff2775 --- /dev/null +++ b/dev/benchmarking-bot/k8s/bot-deployment.yaml @@ -0,0 +1,37 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: cometbot-bot + labels: + app: cometbot-bot +spec: + replicas: 1 + selector: + matchLabels: + app: cometbot-bot + template: + metadata: + labels: + app: cometbot-bot + spec: + containers: + - name: bot + image: {image} + command: ["cometbot", "bot", "start"] + env: + - name: COMETBOT_GITHUB_TOKEN + valueFrom: + secretKeyRef: + name: cometbot-secrets + key: github-token + resources: + requests: + memory: "256Mi" + cpu: "100m" + limits: + memory: "512Mi" + cpu: "500m" + restartPolicy: Always +--- +# Secret for GitHub token (create separately or use kubectl create secret) +# kubectl create secret generic cometbot-secrets --from-literal=github-token= diff --git a/dev/benchmarking-bot/k8s/comet-job-template.yaml b/dev/benchmarking-bot/k8s/comet-job-template.yaml new file mode 100644 index 0000000000..36c2ae695f --- /dev/null +++ b/dev/benchmarking-bot/k8s/comet-job-template.yaml @@ -0,0 +1,65 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: {job_name} + labels: + app: comet-benchmark-automation + pr: "{pr_number}" +spec: + backoffLimit: 0 + template: + metadata: + labels: + app: comet-benchmark-automation + pr: "{pr_number}" + spec: + restartPolicy: Never + automountServiceAccountToken: false + securityContext: + runAsNonRoot: true + runAsUser: 1000 + containers: + - name: benchmark + image: {image} + imagePullPolicy: Always + env: + - name: PR_NUMBER + value: "{pr_number}" + - name: BENCHMARK_MODE + value: "{benchmark_mode}" + - name: TPCH_DATA + value: "/data/tpch/sf100" + - name: TPCH_QUERIES + value: "{tpch_queries}" + - name: ITERATIONS + value: "{iterations}" + - name: MICRO_BENCHMARK + value: "{micro_benchmark}" + - name: SPARK_MASTER + value: "{spark_master}" + - name: COMET_CONFIGS + value: "{comet_configs}" + - name: GITHUB_TOKEN + value: "{github_token}" + - name: BASELINE_BRANCH + value: "{baseline_branch}" + securityContext: + allowPrivilegeEscalation: false + capabilities: + drop: ["ALL"] + resources: + requests: + memory: "64Gi" + cpu: "8" + limits: + memory: "64Gi" + cpu: "8" + volumeMounts: + - name: tpch-data + mountPath: /data/tpch + readOnly: true + volumes: + - name: tpch-data + hostPath: + path: /mnt/bigdata/tpch + type: Directory diff --git a/dev/benchmarking-bot/pyproject.toml b/dev/benchmarking-bot/pyproject.toml new file mode 100644 index 0000000000..dc88586749 --- /dev/null +++ b/dev/benchmarking-bot/pyproject.toml @@ -0,0 +1,22 @@ +[project] +name = "cometbot" +version = "0.1.0" +description = "Automate running Comet benchmarks against GitHub PRs" +readme = "README.md" +requires-python = ">=3.10" +dependencies = [ + "click>=8.0", + "rich>=13.0", + "requests>=2.28", + "slack-sdk>=3.0", +] + +[project.scripts] +cometbot = "cometbot.cli:main" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/cometbot"] diff --git a/dev/benchmarking-bot/src/cometbot/__init__.py b/dev/benchmarking-bot/src/cometbot/__init__.py new file mode 100644 index 0000000000..d7e66910be --- /dev/null +++ b/dev/benchmarking-bot/src/cometbot/__init__.py @@ -0,0 +1,3 @@ +"""Comet Benchmark Automation CLI.""" + +__version__ = "0.1.0" diff --git a/dev/benchmarking-bot/src/cometbot/bot.py b/dev/benchmarking-bot/src/cometbot/bot.py new file mode 100644 index 0000000000..d8b7134e08 --- /dev/null +++ b/dev/benchmarking-bot/src/cometbot/bot.py @@ -0,0 +1,603 @@ +"""GitHub bot that monitors Comet PRs for benchmark requests.""" + +import json +import os +import re +import time +import traceback +from dataclasses import dataclass +from pathlib import Path + +import requests +from rich.console import Console + +from cometbot.slack import notify as slack_notify + +console = Console() + +# Configuration +GITHUB_API = "https://api.github.com" +POLL_INTERVAL = 60 # 1 minute +K8S_POLL_INTERVAL = 10 # 10 seconds +MAX_CONCURRENT_JOBS = 4 + +# Repos to monitor +REPOS = { + "apache/datafusion-comet": "comet", +} + +# Authorized users - loaded from file +def _load_authorized_users() -> set[str]: + """Load authorized GitHub usernames from authorized_users.txt.""" + users_file = Path(os.environ.get("COMETBOT_PROJECT_DIR", Path(__file__).parent.parent.parent)) / "authorized_users.txt" + users = set() + if users_file.exists(): + with open(users_file) as f: + for line in f: + line = line.split("#")[0].strip() + if line: + users.add(line) + if not users: + # Fallback if file is missing + users = {"andygrove", "comphead", "mbutrovich", "parthchandra"} + return users + +AUTHORIZED_USERS = _load_authorized_users() + +# File to track processed comments (use COMETBOT_PROJECT_DIR if set, otherwise /tmp) +_state_dir = Path(os.environ.get("COMETBOT_PROJECT_DIR", "/tmp")) +STATE_FILE = _state_dir / "cometbot-bot-state.json" + + +@dataclass +class BenchmarkRequest: + """Parsed benchmark request from a comment.""" + repo: str + pr_number: int + comment_id: int + user: str + benchmark_type: str # "tpch" or "micro" + args: list[str] + + +def load_state() -> dict: + """Load processed comments state.""" + if STATE_FILE.exists(): + with open(STATE_FILE) as f: + state = json.load(f) + # Ensure running_jobs exists (for backwards compatibility) + if "running_jobs" not in state: + state["running_jobs"] = {} + return state + return {"processed_comments": [], "running_jobs": {}} + + +def save_state(state: dict) -> None: + """Save processed comments state.""" + with open(STATE_FILE, "w") as f: + json.dump(state, f) + + +def get_headers(token: str) -> dict: + """Get GitHub API headers.""" + return { + "Authorization": f"token {token}", + "Accept": "application/vnd.github.v3+json", + } + + +def get_open_prs(token: str, repo: str) -> list[dict]: + """Get list of open PRs for a repo.""" + url = f"{GITHUB_API}/repos/{repo}/pulls" + params = {"state": "open", "per_page": 100} + + response = requests.get(url, headers=get_headers(token), params=params) + response.raise_for_status() + return response.json() + + +def get_pr_comments(token: str, repo: str, pr_number: int) -> list[dict]: + """Get comments on a PR (issue comments, not review comments).""" + url = f"{GITHUB_API}/repos/{repo}/issues/{pr_number}/comments" + params = {"per_page": 100} + + response = requests.get(url, headers=get_headers(token), params=params) + response.raise_for_status() + return response.json() + + +def add_reaction(token: str, repo: str, comment_id: int, reaction: str) -> bool: + """Add a reaction to a comment. + + Reactions: +1, -1, laugh, confused, heart, hooray, rocket, eyes + """ + url = f"{GITHUB_API}/repos/{repo}/issues/comments/{comment_id}/reactions" + headers = get_headers(token) + headers["Accept"] = "application/vnd.github+json" + + response = requests.post(url, headers=headers, json={"content": reaction}) + return response.status_code in (200, 201) + + +def has_reaction(token: str, repo: str, comment_id: int, reaction: str) -> bool: + """Check if a comment already has a specific reaction from the bot.""" + url = f"{GITHUB_API}/repos/{repo}/issues/comments/{comment_id}/reactions" + headers = get_headers(token) + headers["Accept"] = "application/vnd.github+json" + + response = requests.get(url, headers=headers) + if response.status_code != 200: + return False + + reactions = response.json() + # Check if any reaction matches (we assume bot reactions are ours) + for r in reactions: + if r.get("content") == reaction: + return True + return False + + +def check_completed_jobs(token: str, state: dict) -> None: + """Check for completed jobs and add appropriate reactions. + + This function checks all running jobs in state, and for any that have + completed (succeeded or failed), adds the appropriate reaction and + removes them from the running_jobs tracking. + + State format for running_jobs: + {job_name: {"comment_id": int, "repo": str}} + """ + from cometbot.k8s import get_job_status + + if not state.get("running_jobs"): + return + + completed_jobs = [] + + for job_name, job_info in list(state["running_jobs"].items()): + # Handle both old format (just comment_id) and new format (dict with comment_id and repo) + if isinstance(job_info, dict): + comment_id = job_info["comment_id"] + repo = job_info.get("repo", "apache/datafusion-comet") + pr_number = job_info.get("pr_number") + else: + # Old format - just comment_id + comment_id = job_info + repo = "apache/datafusion-comet" + pr_number = None + + status = get_job_status(job_name) + + if status["status"] == "Succeeded": + console.print(f"[green]Job {job_name} completed successfully[/green]") + add_reaction(token, repo, comment_id, "rocket") + slack_notify(f"Job `{job_name}` completed successfully", "success") + completed_jobs.append(job_name) + elif status["status"] == "Failed": + console.print(f"[red]Job {job_name} failed[/red]") + if pr_number: + post_comment(token, repo, pr_number, f"Benchmark job `{job_name}` failed due to an error.") + slack_notify(f"Job `{job_name}` failed", "error") + completed_jobs.append(job_name) + elif status["status"] == "NotFound": + # Job was deleted externally or never existed + console.print(f"[yellow]Job {job_name} not found, removing from tracking[/yellow]") + slack_notify(f"Job `{job_name}` not found, removing from tracking", "warning") + completed_jobs.append(job_name) + + # Remove completed jobs from state + for job_name in completed_jobs: + del state["running_jobs"][job_name] + + if completed_jobs: + save_state(state) + + +def post_comment(token: str, repo: str, pr_number: int, body: str) -> bool: + """Post a comment on a PR.""" + url = f"{GITHUB_API}/repos/{repo}/issues/{pr_number}/comments" + response = requests.post(url, headers=get_headers(token), json={"body": body}) + return response.status_code == 201 + + +def get_help_text(repo: str) -> str: + """Get help text for Comet benchmarks.""" + lines = [ + "## Benchmark Bot Usage", + "", + "This bot is whitelisted for DataFusion committers.", + "", + "### Commands", + "", + "```", + "/run tpch [--iterations N] [--baseline BRANCH]", + "/run tpch [--conf spark.comet.KEY=VALUE ...]", + "/run micro [--baseline BRANCH]", + "/help", + "```", + "", + "### Examples", + "", + "```", + "/run tpch", + "/run tpch --iterations 3", + "/run tpch --conf spark.comet.exec.enabled=true", + "/run micro CometStringExpressionBenchmark", + "```", + "", + "---", + "*Automated by cometbot*", + ] + + return "\n".join(lines) + + +def parse_benchmark_request(comment: dict, repo: str, pr_number: int) -> BenchmarkRequest | None: + """Parse a comment for benchmark request. + + Expected format: + - /run tpch [--iterations N] [--query Q] + - /run micro + + Returns None if no valid request found. + """ + body = comment.get("body", "") + user = comment.get("user", {}).get("login", "") + comment_id = comment.get("id") + + # Ignore comments posted by the bot itself + if user == "cometbot": + return None + + # Strip quoted lines (starting with '>') to avoid triggering on quoted text + body = "\n".join(line for line in body.splitlines() if not line.lstrip().startswith(">")) + + # Check for help request + help_pattern = r"/help\b" + if re.search(help_pattern, body, re.MULTILINE): + return BenchmarkRequest( + repo=repo, + pr_number=pr_number, + comment_id=comment_id, + user=user, + benchmark_type="help", + args=[], + ) + + # /run tpch|micro [args] + pattern = r"/run\s+(tpch|micro)(\s+.*)?$" + match = re.search(pattern, body, re.IGNORECASE | re.MULTILINE) + + if not match: + if re.search(r"^/run\b", body, re.MULTILINE): + return BenchmarkRequest( + repo=repo, + pr_number=pr_number, + comment_id=comment_id, + user=user, + benchmark_type="invalid", + args=[], + ) + return None + + benchmark_type = match.group(1).lower() + args_str = match.group(2) or "" + args = args_str.split() + + return BenchmarkRequest( + repo=repo, + pr_number=pr_number, + comment_id=comment_id, + user=user, + benchmark_type=benchmark_type, + args=args, + ) + + +def parse_comet_configs(args: list[str]) -> dict[str, str]: + """Parse --conf arguments and filter to only spark.comet.* configs.""" + configs = {} + i = 0 + while i < len(args): + if args[i] == "--conf" and i + 1 < len(args): + conf = args[i + 1] + if "=" in conf: + key, value = conf.split("=", 1) + # Only allow spark.comet.* configs for security + if key.startswith("spark.comet."): + configs[key] = value + else: + console.print(f"[yellow]Ignoring non-comet config: {key}[/yellow]") + i += 2 + else: + i += 1 + return configs + + +def run_tpch_benchmark(token: str, pr_number: int, comment_id: int, args: list[str]) -> str: + """Run TPC-H benchmark for a PR. + + Builds the Docker image with PR-specific tag, submits the K8s job, + and returns immediately without waiting for completion. + + Args: + token: GitHub token + pr_number: PR number + comment_id: Comment ID that triggered this benchmark (used in job name) + args: Command arguments + + Returns: + The job name for tracking. + """ + from cometbot.k8s import ( + build_comet_image, + create_comet_job, + get_comet_image_for_pr, + ) + + # Parse args + iterations = 1 + baseline_branch = "main" + max_iterations = 3 # Limit to prevent abuse + for i, arg in enumerate(args): + if arg in ("--iterations", "-i") and i + 1 < len(args): + try: + iterations = min(int(args[i + 1]), max_iterations) + except ValueError: + pass + elif arg in ("--baseline", "-b") and i + 1 < len(args): + baseline_branch = args[i + 1] + + # Parse --conf arguments (only spark.comet.* allowed) + comet_configs = parse_comet_configs(args) + + console.print(f"[bold blue]Running TPC-H benchmark for PR #{pr_number}[/bold blue]") + console.print(f"Iterations: {iterations}, Baseline: {baseline_branch}") + if comet_configs: + console.print(f"Comet configs: {comet_configs}") + + # Get PR-specific image tags + local_tag, registry_tag = get_comet_image_for_pr(pr_number) + + # Build and push image with PR-specific tag + build_comet_image(tag=local_tag, push=True, registry_tag=registry_tag) + + # Create job with comment_id in name for uniqueness + job_name = create_comet_job( + pr_number=pr_number, + image=registry_tag, + benchmark_mode="tpch", + iterations=iterations, + comet_configs=comet_configs, + github_token=token, + comment_id=comment_id, + baseline_branch=baseline_branch, + ) + + console.print(f"[green]Job {job_name} submitted, will track completion asynchronously[/green]") + return job_name + + +def run_micro_benchmark(token: str, pr_number: int, comment_id: int, args: list[str]) -> str | None: + """Run microbenchmark for a PR. + + Builds the Docker image with PR-specific tag, submits the K8s job, + and returns immediately without waiting for completion. + + Args: + token: GitHub token + pr_number: PR number + comment_id: Comment ID that triggered this benchmark (used in job name) + args: Command arguments + + Returns: + The job name for tracking, or None if args are invalid. + """ + from cometbot.k8s import ( + build_comet_image, + create_comet_job, + get_comet_image_for_pr, + ) + + if not args: + console.print("[red]No benchmark class specified for micro benchmark[/red]") + return None + + benchmark_class = args[0] + baseline_branch = "main" + + # Parse remaining args + for i, arg in enumerate(args): + if arg in ("--baseline", "-b") and i + 1 < len(args): + baseline_branch = args[i + 1] + + console.print(f"[bold blue]Running microbenchmark for PR #{pr_number}[/bold blue]") + console.print(f"Benchmark: {benchmark_class}, Baseline: {baseline_branch}") + + # Get PR-specific image tags + local_tag, registry_tag = get_comet_image_for_pr(pr_number) + + # Build and push image with PR-specific tag + build_comet_image(tag=local_tag, push=True, registry_tag=registry_tag) + + # Create job with comment_id in name for uniqueness + job_name = create_comet_job( + pr_number=pr_number, + image=registry_tag, + benchmark_mode="micro", + micro_benchmark=benchmark_class, + github_token=token, + comment_id=comment_id, + baseline_branch=baseline_branch, + ) + + console.print(f"[green]Job {job_name} submitted, will track completion asynchronously[/green]") + return job_name + + +def process_request(token: str, request: BenchmarkRequest, state: dict) -> bool: + """Process a benchmark request. + + Returns: + True if a job was submitted, False otherwise. + """ + from cometbot.k8s import count_running_comet_jobs + + # Check if already processed (in state) + if request.comment_id in state["processed_comments"]: + return False + + # Check if already processed (has eyes reaction - handles state loss) + if has_reaction(token, request.repo, request.comment_id, "eyes"): + console.print(f"[dim]Skipping comment {request.comment_id} - already has eyes reaction[/dim]") + state["processed_comments"].append(request.comment_id) + save_state(state) + return False + + console.print(f"\n[bold]Processing request from @{request.user} on {request.repo} PR #{request.pr_number}[/bold]") + console.print(f"Type: {request.benchmark_type}, Args: {request.args}") + + # Handle help request (no auth required) + if request.benchmark_type == "help": + console.print("[blue]Posting help text[/blue]") + add_reaction(token, request.repo, request.comment_id, "eyes") + help_text = get_help_text(request.repo) + post_comment(token, request.repo, request.pr_number, help_text) + state["processed_comments"].append(request.comment_id) + save_state(state) + return False + + # Check authorization + if request.user not in AUTHORIZED_USERS: + console.print(f"[yellow]User @{request.user} not authorized[/yellow]") + add_reaction(token, request.repo, request.comment_id, "eyes") + post_comment( + token, request.repo, request.pr_number, + f"Sorry @{request.user}, this feature is restricted to DataFusion committers." + ) + state["processed_comments"].append(request.comment_id) + save_state(state) + return False + + # Handle invalid format - respond with help text + if request.benchmark_type == "invalid": + console.print("[yellow]Invalid command format, posting help text[/yellow]") + add_reaction(token, request.repo, request.comment_id, "eyes") + help_text = get_help_text(request.repo) + post_comment(token, request.repo, request.pr_number, help_text) + state["processed_comments"].append(request.comment_id) + save_state(state) + return False + + # Check concurrent job limit + running_comet = count_running_comet_jobs() + if running_comet >= MAX_CONCURRENT_JOBS: + console.print(f"[yellow]At max concurrent jobs ({running_comet}/{MAX_CONCURRENT_JOBS}), will retry later[/yellow]") + return False + + # Add eyes reaction to acknowledge + console.print("[blue]Adding eyes reaction to acknowledge request[/blue]") + add_reaction(token, request.repo, request.comment_id, "eyes") + + # Mark as processed before running (to avoid re-triggering if bot restarts) + state["processed_comments"].append(request.comment_id) + save_state(state) + + # Run the benchmark (builds image and submits job, returns immediately) + try: + job_name = None + if request.benchmark_type == "tpch": + job_name = run_tpch_benchmark(token, request.pr_number, request.comment_id, request.args) + elif request.benchmark_type == "micro": + job_name = run_micro_benchmark(token, request.pr_number, request.comment_id, request.args) + + # Track the running job so we can add reaction when it completes + if job_name: + state["running_jobs"][job_name] = {"comment_id": request.comment_id, "repo": request.repo, "pr_number": request.pr_number} + save_state(state) + slack_notify(f"Job `{job_name}` submitted for {request.repo} PR #{request.pr_number} ({request.benchmark_type})") + return True + else: + # Job wasn't created (e.g., invalid args for micro benchmark) + post_comment(token, request.repo, request.pr_number, "Benchmark run failed due to an error: job could not be created (invalid arguments?).") + return False + + except Exception as e: + console.print(f"[red]Error running benchmark: {e}[/red]") + console.print(f"[red]{traceback.format_exc()}[/red]") + slack_notify(f"Error submitting benchmark for {request.repo} PR #{request.pr_number}: {e}", "error") + # Post error comment on failure during build/submit + post_comment(token, request.repo, request.pr_number, "Benchmark run failed due to an error.") + return False + + +def poll_once(token: str, state: dict) -> None: + """Poll GitHub once for new benchmark requests.""" + # First, check for completed jobs and add reactions + check_completed_jobs(token, state) + + running_count = len(state.get("running_jobs", {})) + console.print(f"[dim]Polling for new benchmark requests... ({running_count}/{MAX_CONCURRENT_JOBS} jobs running)[/dim]") + + try: + for repo in REPOS: + prs = get_open_prs(token, repo) + console.print(f"[dim]{repo}: {len(prs)} open PRs[/dim]") + + for pr in prs: + pr_number = pr["number"] + comments = get_pr_comments(token, repo, pr_number) + + for comment in comments: + request = parse_benchmark_request(comment, repo, pr_number) + if request: + process_request(token, request, state) + + except requests.RequestException as e: + console.print(f"[red]Error polling GitHub: {e}[/red]") + slack_notify(f"GitHub API error: {e}", "error") + + +def run_bot(token: str, poll_interval: int = POLL_INTERVAL) -> None: + """Run the bot continuously.""" + # Try to load build info + try: + from cometbot._build_info import BUILD_TIMESTAMP + except ImportError: + BUILD_TIMESTAMP = "unknown (dev mode)" + + console.print("[bold green]Starting cometbot[/bold green]") + slack_notify("Bot started") + console.print(f"Build: {BUILD_TIMESTAMP}") + console.print(f"Repositories: {', '.join(REPOS.keys())}") + console.print(f"GitHub poll interval: {poll_interval} seconds") + console.print(f"K8s poll interval: {K8S_POLL_INTERVAL} seconds") + console.print(f"Max concurrent jobs: {MAX_CONCURRENT_JOBS}") + console.print(f"Authorized users: {', '.join(sorted(AUTHORIZED_USERS))}") + console.print() + + state = load_state() + console.print(f"[dim]Loaded state: {len(state['processed_comments'])} processed comments, {len(state.get('running_jobs', {}))} running jobs[/dim]") + + time_since_github_poll = poll_interval # poll immediately on startup + + while True: + try: + # Poll GitHub for new requests on the full interval + if time_since_github_poll >= poll_interval: + poll_once(token, state) + time_since_github_poll = 0 + elif state.get("running_jobs"): + # Check K8s job status frequently while jobs are running + check_completed_jobs(token, state) + except KeyboardInterrupt: + console.print("\n[yellow]Bot stopped by user[/yellow]") + slack_notify("Bot stopped") + break + except Exception as e: + console.print(f"[red]Unexpected error: {e}[/red]") + console.print(f"[red]{traceback.format_exc()}[/red]") + slack_notify(f"Unexpected error in bot main loop: {e}", "error") + + time.sleep(K8S_POLL_INTERVAL) + time_since_github_poll += K8S_POLL_INTERVAL diff --git a/dev/benchmarking-bot/src/cometbot/cli.py b/dev/benchmarking-bot/src/cometbot/cli.py new file mode 100644 index 0000000000..4f812995e9 --- /dev/null +++ b/dev/benchmarking-bot/src/cometbot/cli.py @@ -0,0 +1,339 @@ +"""CLI entry point for Comet benchmark automation.""" + +import click +from rich.console import Console + +console = Console() + + +# ============================================================================= +# Shared CLI helper functions for status/logs/delete commands +# ============================================================================= + + +def _handle_status(pr: int | None, job_prefix: str, app_label: str, display_name: str) -> None: + """Handle status command for any benchmark type. + + Args: + pr: Optional PR number to check specific job + job_prefix: Job name prefix (e.g., "comet") + app_label: K8s app label for listing jobs + display_name: Human-readable name for output + """ + from cometbot.k8s import get_job_status, _list_jobs_by_label + + if pr: + job_name = f"{job_prefix}-pr-{pr}" + status = get_job_status(job_name) + console.print(f"Job {job_name}: {status['status']}") + else: + jobs = _list_jobs_by_label(app_label) + if not jobs: + console.print(f"No {display_name} jobs found") + return + console.print(f"[bold]{display_name} jobs:[/bold]") + for job in jobs: + console.print(f" {job['name']}: {job['status']}") + + +def _handle_logs(pr: int, job_prefix: str, follow: bool) -> None: + """Handle logs command for any benchmark type. + + Args: + pr: PR number to get logs for + job_prefix: Job name prefix (e.g., "comet") + follow: Whether to follow log output + """ + from cometbot.k8s import stream_logs + + job_name = f"{job_prefix}-pr-{pr}" + stream_logs(job_name, follow=follow) + + +def _handle_delete(pr: int, job_prefix: str) -> None: + """Handle delete command for any benchmark type. + + Args: + pr: PR number to delete job for + job_prefix: Job name prefix (e.g., "comet") + """ + from cometbot.k8s import delete_job + + job_name = f"{job_prefix}-pr-{pr}" + delete_job(job_name) + console.print(f"[green]Deleted job: {job_name}[/green]") + + +@click.group() +@click.version_option() +def main(): + """Comet Benchmark Automation CLI. + + Automates running Comet benchmarks comparing a GitHub PR against main. + """ + pass + + +# Comet commands +@main.group() +def comet(): + """Comet benchmark commands for running TPC-H/TPC-DS benchmarks.""" + pass + + +@comet.command("build") +@click.option( + "--tag", + default="comet-benchmark-automation:latest", + help="Docker image tag", +) +def comet_build(tag: str): + """Build the Docker image for Comet benchmarks.""" + from cometbot.k8s import build_comet_image + + build_comet_image(tag) + + +@comet.command("run") +@click.option( + "--pr", + required=True, + type=int, + help="GitHub PR number to benchmark", +) +@click.option( + "--image", + default=None, # Will use COMET_REGISTRY_IMAGE from k8s.py + help="Docker image for K8s to pull (default: $COMETBOT_REGISTRY/comet-benchmark-automation:latest)", +) +@click.option( + "--micro", + default="", + help="Run microbenchmark instead of TPC-H (e.g., 'CometStringExpressionBenchmark')", +) +@click.option( + "--tpch-queries", + default="/opt/tpch-queries", + help="Path to TPC-H query files inside container (default: bundled queries)", +) +@click.option( + "--iterations", + default=1, + type=int, + help="Number of TPC-H benchmark iterations", +) +@click.option( + "--no-build", + is_flag=True, + default=False, + help="Skip building the Docker image", +) +@click.option( + "--no-cleanup", + is_flag=True, + default=False, + help="Don't delete the job after completion", +) +@click.option( + "--spark-master", + default="local[*]", + help="Spark master URL (default: local[*])", +) +@click.option( + "--github-token", + envvar="COMETBOT_GITHUB_TOKEN", + default="", + help="GitHub token to post results as PR comment (or set COMETBOT_GITHUB_TOKEN env var)", +) +@click.option( + "--conf", + multiple=True, + help="Spark/Comet config (e.g., --conf spark.comet.exec.enabled=true). Can be repeated.", +) +def comet_run( + pr: int, + image: str | None, + micro: str, + tpch_queries: str, + iterations: int, + no_build: bool, + no_cleanup: bool, + spark_master: str, + github_token: str, + conf: tuple[str, ...], +): + """Run Comet benchmark in Kubernetes (TPC-H or microbenchmark).""" + from cometbot.k8s import ( + COMET_REGISTRY_IMAGE, + build_comet_image, + create_comet_job, + delete_job, + stream_logs, + wait_for_completion, + ) + + # Use default registry image if not specified + if image is None: + image = COMET_REGISTRY_IMAGE + + benchmark_mode = "micro" if micro else "tpch" + + # Parse --conf key=value pairs into a dict + comet_configs = {} + for c in conf: + if "=" in c: + k, v = c.split("=", 1) + comet_configs[k] = v + + if micro: + console.print(f"[bold]Running Comet microbenchmark in Kubernetes: PR #{pr}[/bold]") + console.print(f"Benchmark: {micro}") + else: + console.print(f"[bold]Running Comet TPC-H benchmark in Kubernetes: PR #{pr}[/bold]") + console.print(f"Iterations: {iterations}") + console.print(f"Spark master: {spark_master}") + if comet_configs: + console.print(f"Configs: {comet_configs}") + console.print() + + # Build image if needed + if not no_build: + console.print("[bold blue]Step 1: Building Comet Docker image...[/bold blue]") + build_comet_image() # Builds locally and pushes to registry + else: + console.print("[bold blue]Step 1: Skipping image build[/bold blue]") + + # Create the job + console.print("[bold blue]Step 2: Creating Kubernetes job...[/bold blue]") + if github_token: + console.print("[dim]GitHub token provided - will post results to PR[/dim]") + job_name = create_comet_job( + pr_number=pr, + image=image, + benchmark_mode=benchmark_mode, + tpch_queries=tpch_queries, + iterations=iterations, + micro_benchmark=micro, + spark_master=spark_master, + comet_configs=comet_configs or None, + github_token=github_token, + ) + + try: + # Stream logs + console.print("[bold blue]Step 3: Streaming logs...[/bold blue]") + stream_logs(job_name) + + # Wait for completion + success = wait_for_completion(job_name) + + if success: + console.print("[bold green]Comet benchmark completed successfully![/bold green]") + else: + console.print("[bold red]Comet benchmark failed![/bold red]") + + finally: + if not no_cleanup: + console.print("[bold blue]Step 4: Cleaning up...[/bold blue]") + delete_job(job_name) + + +@comet.command("status") +@click.option( + "--pr", + type=int, + default=None, + help="Show status for a specific PR", +) +def comet_status(pr: int | None): + """Check status of Comet benchmark jobs.""" + from cometbot.k8s import APP_LABEL_COMET + + _handle_status(pr, "comet", APP_LABEL_COMET, "Comet") + + +@comet.command("logs") +@click.option( + "--pr", + required=True, + type=int, + help="PR number to get logs for", +) +@click.option( + "--follow", + "-f", + is_flag=True, + default=False, + help="Follow log output", +) +def comet_logs(pr: int, follow: bool): + """Get logs from a Comet benchmark job.""" + _handle_logs(pr, "comet", follow) + + +@comet.command("delete") +@click.option( + "--pr", + required=True, + type=int, + help="PR number to delete job for", +) +def comet_delete(pr: int): + """Delete a Comet benchmark job.""" + _handle_delete(pr, "comet") + + +# Bot commands +@main.group() +def bot(): + """GitHub bot commands for automated benchmark monitoring.""" + pass + + +@bot.command("start") +@click.option( + "--poll-interval", + default=60, + type=int, + help="GitHub polling interval in seconds (default: 60)", +) +@click.option( + "--github-token", + envvar="COMETBOT_GITHUB_TOKEN", + required=True, + help="GitHub token for API access (or set COMETBOT_GITHUB_TOKEN env var)", +) +def bot_start(poll_interval: int, github_token: str): + """Start the benchmark bot to monitor PRs for requests. + + The bot polls GitHub for comments containing slash commands: + + \b + - /run tpch [--iterations N] + - /run micro + - /help + + Only authorized users can trigger benchmarks. + """ + from cometbot.bot import run_bot + + run_bot(github_token, poll_interval) + + +@bot.command("poll-once") +@click.option( + "--github-token", + envvar="COMETBOT_GITHUB_TOKEN", + required=True, + help="GitHub token for API access (or set COMETBOT_GITHUB_TOKEN env var)", +) +def bot_poll_once(github_token: str): + """Poll GitHub once for benchmark requests (for testing).""" + from cometbot.bot import load_state, poll_once + + state = load_state() + poll_once(github_token, state) + + +if __name__ == "__main__": + main() diff --git a/dev/benchmarking-bot/src/cometbot/k8s.py b/dev/benchmarking-bot/src/cometbot/k8s.py new file mode 100644 index 0000000000..bf272990ab --- /dev/null +++ b/dev/benchmarking-bot/src/cometbot/k8s.py @@ -0,0 +1,466 @@ +"""Kubernetes operations for running Comet benchmarks in a cluster.""" + +import os +import subprocess +import time +from pathlib import Path + +from rich.console import Console + +console = Console() + +# Path to the project directory +# Use COMETBOT_PROJECT_DIR env var if set (for deployed bot), otherwise use package location +PACKAGE_DIR = Path(os.environ.get("COMETBOT_PROJECT_DIR", Path(__file__).parent.parent.parent)) +K8S_DIR = PACKAGE_DIR / "k8s" + +# Job templates +COMET_JOB_TEMPLATE = K8S_DIR / "comet-job-template.yaml" + +# Dockerfiles +DOCKERFILE = PACKAGE_DIR / "Dockerfile" + +# Registry configuration (used for both push and pull) +REGISTRY = os.environ.get("COMETBOT_REGISTRY", "localhost:5000") + +# Image name bases (without tags) +COMET_IMAGE_BASE = "comet-benchmark-automation" + +# Default image names (for CLI use) +DEFAULT_COMET_IMAGE = f"{COMET_IMAGE_BASE}:latest" +COMET_REGISTRY_IMAGE = f"{REGISTRY}/{COMET_IMAGE_BASE}:latest" + + +def get_comet_image_for_pr(pr_number: int) -> tuple[str, str]: + """Get local and registry image tags for a specific PR. + + Returns: + Tuple of (local_tag, registry_tag) + """ + local_tag = f"{COMET_IMAGE_BASE}:pr-{pr_number}" + registry_tag = f"{REGISTRY}/{COMET_IMAGE_BASE}:pr-{pr_number}" + return local_tag, registry_tag + + +# App labels for K8s job filtering +APP_LABEL_COMET = "comet-benchmark-automation" + + +def run_kubectl(*args: str, check: bool = True, capture: bool = False) -> subprocess.CompletedProcess: + """Run a kubectl command.""" + cmd = ["kubectl", *args] + return subprocess.run( + cmd, + check=check, + capture_output=capture, + text=True, + ) + + +# ============================================================================= +# Generic helper functions (used by benchmark-specific functions below) +# ============================================================================= + + +def _build_and_push_image( + dockerfile: Path, + tag: str, + registry_tag: str | None = None, + name: str = "", +) -> None: + """Generic function to build a Docker image and optionally push to registry. + + Args: + dockerfile: Path to the Dockerfile + tag: Local image tag + registry_tag: If provided, also tag and push to this registry location + name: Display name for console output (e.g., "Comet") + """ + display_name = f"{name} " if name else "" + console.print(f"[blue]Building {display_name}Docker image: {tag}[/blue]") + + # Build with explicit dockerfile path + build_cmd = ["docker", "build", "-t", tag] + if dockerfile != DOCKERFILE: + build_cmd.extend(["-f", str(dockerfile)]) + build_cmd.append(str(PACKAGE_DIR)) + + result = subprocess.run(build_cmd, check=False, capture_output=True, text=True) + + if result.returncode != 0: + error_msg = result.stderr or result.stdout or "Unknown error" + console.print(f"[red]Docker build failed:[/red]\n{error_msg}") + raise RuntimeError(f"Failed to build {display_name}Docker image: {error_msg}") + + console.print(f"[green]Successfully built {display_name}image: {tag}[/green]") + + if registry_tag: + ensure_registry_running() + console.print(f"[blue]Tagging and pushing to registry: {registry_tag}[/blue]") + subprocess.run(["docker", "tag", tag, registry_tag], check=True) + result = subprocess.run(["docker", "push", registry_tag], check=False, capture_output=True, text=True) + if result.returncode != 0: + error_msg = result.stderr or result.stdout or "Unknown error" + console.print(f"[red]Docker push failed:[/red]\n{error_msg}") + raise RuntimeError(f"Failed to push {display_name}image to registry: {error_msg}") + console.print("[green]Successfully pushed to registry[/green]") + + +def _list_jobs_by_label(app_label: str) -> list[dict]: + """Generic function to list K8s jobs filtered by app label. + + Args: + app_label: The value of the 'app' label to filter by + + Returns: + List of dicts with 'name' and 'status' keys + """ + result = subprocess.run( + [ + "kubectl", "get", "jobs", + "-l", f"app={app_label}", + "-o", "jsonpath={range .items[*]}{.metadata.name},{.status.succeeded},{.status.failed}{\"\\n\"}{end}", + ], + capture_output=True, + text=True, + check=False, + ) + + if result.returncode != 0: + return [] + + jobs = [] + for line in result.stdout.strip().split("\n"): + if not line: + continue + parts = line.split(",") + name = parts[0] + succeeded = parts[1] if len(parts) > 1 else "0" + failed = parts[2] if len(parts) > 2 else "0" + + if succeeded == "1": + status = "Succeeded" + elif failed == "1": + status = "Failed" + else: + status = "Running" + + jobs.append({"name": name, "status": status}) + + return jobs + + +def count_running_jobs_by_label(app_label: str) -> int: + """Count the number of running (not completed/failed) jobs for an app label.""" + jobs = _list_jobs_by_label(app_label) + return sum(1 for job in jobs if job["status"] == "Running") + + +def count_running_comet_jobs() -> int: + """Count the number of running Comet benchmark jobs.""" + return count_running_jobs_by_label(APP_LABEL_COMET) + + +def _apply_job_manifest(manifest: str, job_name: str, name: str = "") -> str: + """Generic function to apply a K8s job manifest. + + Args: + manifest: The rendered job manifest YAML + job_name: Name of the job being created + name: Display name for console output (e.g., "Comet") + + Returns: + The job name + """ + display_name = f"{name} " if name else "" + console.print(f"[blue]Creating {display_name}Kubernetes job: {job_name}[/blue]") + + subprocess.run( + ["kubectl", "apply", "-f", "-"], + input=manifest, + check=True, + capture_output=True, + text=True, + ) + + console.print(f"[green]{display_name}Job created: {job_name}[/green]") + return job_name + + +def get_job_status(job_name: str) -> dict: + """Get the status of a job.""" + result = subprocess.run( + [ + "kubectl", "get", "job", job_name, + "-o", "jsonpath={.status.conditions[0].type},{.status.succeeded},{.status.failed}", + ], + capture_output=True, + text=True, + check=False, + ) + + if result.returncode != 0: + return {"status": "NotFound"} + + parts = result.stdout.split(",") + condition = parts[0] if parts else "" + succeeded = parts[1] if len(parts) > 1 else "0" + failed = parts[2] if len(parts) > 2 else "0" + + if succeeded == "1": + return {"status": "Succeeded"} + elif failed == "1": + return {"status": "Failed"} + elif condition == "Complete": + return {"status": "Succeeded"} + else: + return {"status": "Running"} + + +def get_pod_name(job_name: str) -> str | None: + """Get the pod name for a job.""" + result = subprocess.run( + [ + "kubectl", "get", "pods", + "-l", f"job-name={job_name}", + "-o", "jsonpath={.items[0].metadata.name}", + ], + capture_output=True, + text=True, + check=False, + ) + + if result.returncode != 0 or not result.stdout.strip(): + return None + + return result.stdout.strip() + + +def stream_logs(job_name: str, follow: bool = True) -> None: + """Stream logs from a job's pod.""" + # Wait for pod to be created + console.print("[blue]Waiting for pod to start...[/blue]") + pod_name = None + for _ in range(60): # Wait up to 60 seconds + pod_name = get_pod_name(job_name) + if pod_name: + break + time.sleep(1) + + if not pod_name: + console.print("[red]Timeout waiting for pod to start[/red]") + return + + console.print(f"[blue]Streaming logs from pod: {pod_name}[/blue]") + + # Wait for pod to be running or completed + for _ in range(120): # Wait up to 2 minutes + result = subprocess.run( + [ + "kubectl", "get", "pod", pod_name, + "-o", "jsonpath={.status.phase}", + ], + capture_output=True, + text=True, + ) + phase = result.stdout.strip() + if phase in ("Running", "Succeeded", "Failed"): + break + time.sleep(1) + + # Stream logs + cmd = ["kubectl", "logs", pod_name] + if follow: + cmd.append("-f") + + subprocess.run(cmd, check=False) + + +def wait_for_completion(job_name: str, timeout: int = 3600) -> bool: + """Wait for a job to complete.""" + console.print(f"[blue]Waiting for job {job_name} to complete...[/blue]") + + start_time = time.time() + while time.time() - start_time < timeout: + status = get_job_status(job_name) + if status["status"] == "Succeeded": + console.print(f"[green]Job {job_name} completed successfully[/green]") + return True + elif status["status"] == "Failed": + console.print(f"[red]Job {job_name} failed[/red]") + return False + time.sleep(5) + + console.print(f"[red]Timeout waiting for job {job_name}[/red]") + return False + + +def delete_job(job_name: str) -> None: + """Delete a job and its pods.""" + console.print(f"[blue]Deleting job: {job_name}[/blue]") + + subprocess.run( + ["kubectl", "delete", "job", job_name, "--ignore-not-found"], + check=False, + ) + + +# Registry functions + +def ensure_registry_running() -> None: + """Ensure the local Docker registry container is running.""" + # Check if registry container exists and is running + result = subprocess.run( + ["docker", "ps", "-q", "-f", "name=registry"], + capture_output=True, + text=True, + ) + + if result.stdout.strip(): + # Registry is running + return + + # Check if container exists but is stopped + result = subprocess.run( + ["docker", "ps", "-aq", "-f", "name=registry"], + capture_output=True, + text=True, + ) + + if result.stdout.strip(): + # Container exists but stopped, start it + console.print("[blue]Starting stopped registry container...[/blue]") + subprocess.run(["docker", "start", "registry"], check=True) + console.print("[green]Registry container started[/green]") + else: + # Container doesn't exist, create and run it + console.print("[blue]Creating and starting registry container...[/blue]") + subprocess.run( + [ + "docker", "run", "-d", + "-p", "5000:5000", + "--restart=always", + "--name", "registry", + "registry:2.7", + ], + check=True, + ) + console.print("[green]Registry container created and started[/green]") + + +# ============================================================================= +# Comet benchmark functions +# ============================================================================= + + +def build_comet_image( + tag: str = DEFAULT_COMET_IMAGE, + push: bool = True, + registry_tag: str | None = None, +) -> None: + """Build the Docker image for Comet benchmarks and optionally push to registry. + + Args: + tag: Local image tag + push: Whether to push to registry + registry_tag: Override registry tag (if None, uses default COMET_REGISTRY_IMAGE) + """ + if push and registry_tag is None: + registry_tag = COMET_REGISTRY_IMAGE + elif not push: + registry_tag = None + _build_and_push_image(DOCKERFILE, tag, registry_tag, name="Comet") + + +def render_comet_job_manifest( + pr_number: int, + job_name: str, + image: str = DEFAULT_COMET_IMAGE, + benchmark_mode: str = "tpch", + tpch_queries: str = "/opt/tpch-queries", + iterations: int = 1, + micro_benchmark: str = "", + spark_master: str = "local[*]", + comet_configs: dict[str, str] | None = None, + github_token: str = "", + baseline_branch: str = "main", +) -> str: + """Render the Comet job template with the given parameters.""" + with open(COMET_JOB_TEMPLATE) as f: + template = f.read() + + # Convert comet_configs dict to comma-separated key=value string + comet_configs_str = "" + if comet_configs: + comet_configs_str = ",".join(f"{k}={v}" for k, v in comet_configs.items()) + + # Simple string replacement + manifest = template.format( + pr_number=pr_number, + job_name=job_name, + image=image, + benchmark_mode=benchmark_mode, + tpch_queries=tpch_queries, + iterations=iterations, + micro_benchmark=micro_benchmark, + spark_master=spark_master, + comet_configs=comet_configs_str, + github_token=github_token, + baseline_branch=baseline_branch, + ) + + return manifest + + +def create_comet_job( + pr_number: int, + image: str = DEFAULT_COMET_IMAGE, + benchmark_mode: str = "tpch", + tpch_queries: str = "/opt/tpch-queries", + iterations: int = 1, + micro_benchmark: str = "", + spark_master: str = "local[*]", + comet_configs: dict[str, str] | None = None, + github_token: str = "", + comment_id: int | None = None, + baseline_branch: str = "main", +) -> str: + """Create a Kubernetes job for Comet benchmark.""" + # Include comment_id in job name for uniqueness (truncate to keep name reasonable) + if comment_id: + job_name = f"comet-pr-{pr_number}-c{comment_id}" + else: + job_name = f"comet-pr-{pr_number}" + + console.print(f"[blue]Creating Comet Kubernetes job: {job_name}[/blue]") + + manifest = render_comet_job_manifest( + pr_number=pr_number, + job_name=job_name, + image=image, + benchmark_mode=benchmark_mode, + tpch_queries=tpch_queries, + iterations=iterations, + micro_benchmark=micro_benchmark, + spark_master=spark_master, + comet_configs=comet_configs, + github_token=github_token, + baseline_branch=baseline_branch, + ) + + # Apply the manifest + result = subprocess.run( + ["kubectl", "apply", "-f", "-"], + input=manifest, + check=True, + capture_output=True, + text=True, + ) + + console.print(f"[green]Comet job created: {job_name}[/green]") + return job_name + + +def list_comet_jobs() -> list[dict]: + """List all Comet benchmark jobs.""" + return _list_jobs_by_label(APP_LABEL_COMET) diff --git a/dev/benchmarking-bot/src/cometbot/slack.py b/dev/benchmarking-bot/src/cometbot/slack.py new file mode 100644 index 0000000000..2cc3b6661d --- /dev/null +++ b/dev/benchmarking-bot/src/cometbot/slack.py @@ -0,0 +1,41 @@ +"""Slack notifications for the cometbot.""" + +import os + +from rich.console import Console + +console = Console() + + +def notify(message: str, level: str = "info") -> None: + """Post a message to Slack. + + Silently no-ops if COMETBOT_SLACK_TOKEN is not set. + Never raises exceptions — Slack failures must not break the bot. + + Args: + message: Text to post. + level: One of "info", "success", "warning", "error". + """ + token = os.environ.get("COMETBOT_SLACK_TOKEN") + channel = os.environ.get("COMETBOT_SLACK_CHANNEL") + + if not token or not channel: + return + + level_emoji = { + "info": ":information_source:", + "success": ":white_check_mark:", + "warning": ":warning:", + "error": ":x:", + } + emoji = level_emoji.get(level, "") + text = f"{emoji} {message}" if emoji else message + + try: + from slack_sdk import WebClient + + client = WebClient(token=token) + client.chat_postMessage(channel=channel, text=text) + except Exception as e: + console.print(f"[dim]Slack notification failed: {e}[/dim]") diff --git a/docs/source/contributor-guide/benchmark-bot.md b/docs/source/contributor-guide/benchmark-bot.md new file mode 100644 index 0000000000..20ca915ac0 --- /dev/null +++ b/docs/source/contributor-guide/benchmark-bot.md @@ -0,0 +1,254 @@ + + +# Benchmark Bot + +The Comet benchmark bot automatically runs TPC-H and microbenchmarks against pull requests. Comment on +any Comet PR with a slash command and the bot will build the PR, run benchmarks in Kubernetes, and post +results back as a PR comment. + +## Triggering Benchmarks + +Add a comment on any open Comet PR with one of the following commands. + +### TPC-H Benchmarks + +Run the full TPC-H query suite (SF100) comparing the PR against the main branch: + +``` +/run tpch +``` + +Options: + +| Flag | Description | Default | +|------|-------------|---------| +| `--iterations N` | Number of benchmark iterations (max 3) | 1 | +| `--baseline BRANCH` | Branch to compare against | `main` | +| `--conf KEY=VALUE` | Spark/Comet config override (only `spark.comet.*` keys allowed). Can be repeated. | — | + +Examples: + +``` +/run tpch --iterations 3 +/run tpch --baseline my-feature-branch +/run tpch --conf spark.comet.exec.enabled=true +/run tpch --iterations 2 --conf spark.comet.exec.enabled=true --conf spark.comet.exec.replaceSortMergeJoin=false +``` + +### Microbenchmarks + +Run a specific JMH microbenchmark class: + +``` +/run micro +``` + +Options: + +| Flag | Description | Default | +|------|-------------|---------| +| `--baseline BRANCH` | Branch to compare against | `main` | + +Examples: + +``` +/run micro CometStringExpressionBenchmark +/run micro CometStringExpressionBenchmark --baseline my-branch +``` + +Available microbenchmark classes are located in +[spark/src/test/scala/org/apache/spark/sql/benchmark/](https://github.com/apache/datafusion-comet/tree/main/spark/src/test/scala/org/apache/spark/sql/benchmark). + +### Help + +``` +/help +``` + +Posts the usage reference as a PR comment. + +## Understanding Results + +### Reactions + +The bot uses GitHub reactions to signal progress: + +| Reaction | Meaning | +|----------|---------| +| :eyes: | Request acknowledged, job submitted | +| :rocket: | Job completed successfully | + +### TPC-H Results + +TPC-H results are posted as a comparison table showing baseline vs PR times for all 22 queries, with +percentage change indicators: + +- Green: >5% faster than baseline +- Red: >5% slower than baseline +- White: within 5% (neutral) + +The comment also includes a collapsible section with the full Spark configuration used. + +### Microbenchmark Results + +Microbenchmark results are posted as the raw JMH output showing timing comparisons between Spark and +Comet for each benchmark scenario. + +## Authorization + +Only DataFusion committers are authorized to trigger benchmarks. The list of authorized GitHub usernames +is maintained in `dev/benchmarking-bot/authorized_users.txt`. Unauthorized users receive a reply +explaining the restriction. + +## Security Considerations + +**This bot executes code from pull requests on a Kubernetes cluster.** Operators should understand the +following risks before deploying. + +- **Code execution**: Benchmark jobs clone a PR branch, build it with `make release`, and run it. A + malicious PR could contain arbitrary code that runs during the build or benchmark phase. +- **Authorization gate**: Only GitHub usernames in `dev/benchmarking-bot/authorized_users.txt` can trigger + benchmarks. This is the primary security control. Keep the list limited to trusted committers. +- **GitHub token**: The `COMETBOT_GITHUB_TOKEN` is passed into benchmark containers to post results. Use a + fine-grained token scoped only to the target repository with minimal permissions. +- **Network access**: Benchmark containers have access to the Kubernetes pod network. Apply NetworkPolicies + to restrict egress if your cluster hosts sensitive services. +- **Host filesystem**: TPC-H data is mounted via `hostPath`. Do not widen this mount beyond the data + directory. +- **No runtime sandboxing**: Jobs run as standard containers. Consider gVisor, Kata Containers, or a + dedicated node pool for stronger isolation. + +## Implementation Details + +The bot source code lives in `dev/benchmarking-bot/`. + +### Architecture + +``` +dev/benchmarking-bot/ +├── src/cometbot/ +│ ├── bot.py # GitHub polling loop and comment parsing +│ ├── cli.py # Click CLI (cometbot comet/bot commands) +│ ├── k8s.py # Docker image builds and Kubernetes job management +│ └── slack.py # Optional Slack notifications +├── Dockerfile # Benchmark container (JDK 17, Rust, Maven, Spark 3.5) +├── k8s/ +│ ├── comet-job-template.yaml # K8s Job manifest template +│ └── bot-deployment.yaml # K8s Deployment for the bot itself +├── deploy/ +│ ├── deploy.sh # Deployment script +│ └── cometbot-bot.service # systemd unit file +├── authorized_users.txt +└── pyproject.toml +``` + +### How It Works + +1. **Polling**: The bot polls the GitHub API every 60 seconds for new comments on open Comet PRs. +2. **Parsing**: Comments are checked for `/run` or `/help` slash commands. Quoted lines (starting with `>`) are + ignored to prevent re-triggering on quoted text. +3. **Authorization**: The commenter's GitHub username is checked against `authorized_users.txt`. +4. **Image Build**: A Docker image is built containing JDK 17, Rust, Maven, and Spark 3.5. The image is tagged + with the PR number and pushed to a local registry. +5. **Job Submission**: A Kubernetes Job is created from the `comet-job-template.yaml` template. The job + clones the Comet repo, checks out the PR, builds Comet (`make release`), and runs the requested benchmark. +6. **Completion Tracking**: While jobs are running, the bot checks Kubernetes job status every 10 seconds. + On completion, a :rocket: reaction is added to the original comment. +7. **Results Posting**: The benchmark container itself posts results as a PR comment using the GitHub API + (the GitHub token is passed as an environment variable to the container). + +### Concurrency + +The bot enforces a maximum of 4 concurrent benchmark jobs. If the limit is reached, new requests are +deferred and retried on the next poll cycle. + +### State Management + +Processed comment IDs are tracked in `cometbot-bot-state.json` to avoid re-processing. As a fallback, +the bot also checks for the :eyes: reaction on comments — if present, the comment is skipped even if +the state file is lost. + +### CLI + +The `cometbot` CLI can also be used directly for manual benchmark runs: + +```bash +# Install +cd dev/benchmarking-bot +pip install -e . + +# Run a benchmark manually +cometbot comet run --pr 1234 +cometbot comet run --pr 1234 --micro CometStringExpressionBenchmark + +# Manage jobs +cometbot comet status +cometbot comet logs --pr 1234 +cometbot comet delete --pr 1234 + +# Start the bot +cometbot bot start --github-token +``` + +### TPC-H Data Prerequisite + +The benchmark jobs expect TPC-H SF100 data to already exist on the Kubernetes nodes at `/mnt/bigdata/tpch`. +This path is mounted into the container as `/data/tpch` via a `hostPath` volume (see `k8s/comet-job-template.yaml`). + +You must generate and place this data before running TPC-H benchmarks. Data generation scripts are +available in the [DataFusion Benchmarks](https://github.com/apache/datafusion-benchmarks) repository. + +The expected layout on each node: + +``` +/mnt/bigdata/tpch/ +└── sf100/ + ├── customer/ + ├── lineitem/ + ├── nation/ + ├── orders/ + ├── part/ + ├── partsupp/ + ├── region/ + └── supplier/ +``` + +Each table directory should contain Parquet files. Microbenchmarks (`/run micro`) do not require TPC-H +data — they use internally generated datasets. + +### Deployment + +The bot is deployed to a Kubernetes host using the provided deploy script: + +```bash +cd dev/benchmarking-bot +./deploy/deploy.sh [GITHUB_TOKEN] +``` + +This builds a Python wheel, copies it to the remote host, builds the Docker image, installs the +package in a virtualenv, and sets up a systemd service. + +Required environment variables on the deployment host: + +| Variable | Description | +|----------|-------------| +| `COMETBOT_GITHUB_TOKEN` | GitHub token for API access and posting results | +| `COMETBOT_SLACK_TOKEN` | (Optional) Slack bot token for notifications | +| `COMETBOT_SLACK_CHANNEL` | (Optional) Slack channel for notifications | diff --git a/docs/source/contributor-guide/index.md b/docs/source/contributor-guide/index.md index 2b6842e449..99406df64d 100644 --- a/docs/source/contributor-guide/index.md +++ b/docs/source/contributor-guide/index.md @@ -32,6 +32,7 @@ Parquet Scans Development Guide Debugging Guide Benchmarking Guide +Benchmark Bot Adding a New Operator Adding a New Expression Tracing From 51b500872cfc3fcd5afa8282d691cd70abe54e1a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 20 Feb 2026 10:42:07 -0700 Subject: [PATCH 2/2] reduce authorized users --- dev/benchmarking-bot/Dockerfile | 17 +++++ dev/benchmarking-bot/README.md | 43 ++++++++--- dev/benchmarking-bot/authorized_users.txt | 53 +------------ dev/benchmarking-bot/deploy/deploy.sh | 18 +++++ dev/benchmarking-bot/k8s/bot-deployment.yaml | 17 +++++ .../k8s/comet-job-template.yaml | 17 +++++ dev/benchmarking-bot/pyproject.toml | 17 +++++ dev/benchmarking-bot/src/cometbot/__init__.py | 17 +++++ dev/benchmarking-bot/src/cometbot/bot.py | 76 ++++++++++++++++--- dev/benchmarking-bot/src/cometbot/cli.py | 17 +++++ dev/benchmarking-bot/src/cometbot/k8s.py | 17 +++++ dev/benchmarking-bot/src/cometbot/slack.py | 17 +++++ .../source/contributor-guide/benchmark-bot.md | 34 ++++----- 13 files changed, 271 insertions(+), 89 deletions(-) diff --git a/dev/benchmarking-bot/Dockerfile b/dev/benchmarking-bot/Dockerfile index e91d06d30e..9fee3ded2e 100644 --- a/dev/benchmarking-bot/Dockerfile +++ b/dev/benchmarking-bot/Dockerfile @@ -1,3 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + # Dockerfile for Comet benchmarks (TPC-H and microbenchmarks) # Includes: JDK 17, Rust, Maven, Spark 3.5.x, Python diff --git a/dev/benchmarking-bot/README.md b/dev/benchmarking-bot/README.md index 37cbab6b71..5b7d99a19f 100644 --- a/dev/benchmarking-bot/README.md +++ b/dev/benchmarking-bot/README.md @@ -1,3 +1,22 @@ + + # Comet Benchmark Automation Bot Automated benchmarking for [Apache DataFusion Comet](https://github.com/apache/datafusion-comet) PRs. Runs TPC-H and microbenchmarks in Kubernetes, triggered by GitHub PR comments. @@ -21,6 +40,7 @@ Authorized users can trigger benchmarks by commenting on a Comet PR with slash c ### Reactions The bot uses reactions to signal status: + - :eyes: -- request acknowledged, job submitted - :rocket: -- job completed successfully - :thumbsdown: -- job failed or invalid request @@ -85,6 +105,7 @@ export COMETBOT_DEPLOY_DIR=/home/myuser/cometbot ``` This script: + 1. Builds the Python wheel 2. Copies files to the remote host 3. Builds the Docker image on the remote host @@ -95,20 +116,20 @@ This script: **Required for deployment:** -| Variable | Description | -|----------|-------------| -| `COMETBOT_DEPLOY_HOST` | Remote hostname to deploy to | -| `COMETBOT_DEPLOY_USER` | SSH username on remote host | -| `COMETBOT_DEPLOY_DIR` | Installation directory on remote host | +| Variable | Description | +| ---------------------- | ------------------------------------- | +| `COMETBOT_DEPLOY_HOST` | Remote hostname to deploy to | +| `COMETBOT_DEPLOY_USER` | SSH username on remote host | +| `COMETBOT_DEPLOY_DIR` | Installation directory on remote host | **Runtime (set in `$COMETBOT_DEPLOY_DIR/env` on the deployment host):** -| Variable | Description | -|----------|-------------| -| `COMETBOT_GITHUB_TOKEN` | GitHub token for API access and posting results | -| `COMETBOT_REGISTRY` | Docker registry for benchmark images (default: `localhost:5000`) | -| `COMETBOT_SLACK_TOKEN` | (Optional) Slack bot token for notifications | -| `COMETBOT_SLACK_CHANNEL` | (Optional) Slack channel for notifications | +| Variable | Description | +| ------------------------ | ---------------------------------------------------------------- | +| `COMETBOT_GITHUB_TOKEN` | GitHub token for API access and posting results | +| `COMETBOT_REGISTRY` | Docker registry for benchmark images (default: `localhost:5000`) | +| `COMETBOT_SLACK_TOKEN` | (Optional) Slack bot token for notifications | +| `COMETBOT_SLACK_CHANNEL` | (Optional) Slack channel for notifications | ## Security Considerations diff --git a/dev/benchmarking-bot/authorized_users.txt b/dev/benchmarking-bot/authorized_users.txt index 906e356e1f..6aa180a0e3 100644 --- a/dev/benchmarking-bot/authorized_users.txt +++ b/dev/benchmarking-bot/authorized_users.txt @@ -1,59 +1,8 @@ -# GitHub usernames of authorized users (DataFusion committers). +# GitHub usernames of authorized users # One username per line. Lines starting with # are comments. -# -# Source: https://projects.apache.org/committee.html?datafusion -# # Apache ID -> GitHub username -adriangb # Adrian Garcia Badaracco andygrove # agrove - Andrew Grove -mustafasrepo # akurmustafa - Mustafa Akur -alamb # alamb - Andrew Lamb -avantgardnerio # avantgardner - Brent Gardner -berkaysynnada # berkay - Berkay Sahin -blaginin # blaginin - Dmitrii Blaginin comphead # comphead - Oleks V -Dandandan # dheres - Daniel Heres -findepi # findepi - Piotr Findeisen -gabotechs # gabotechs - Gabriel Musat -goldmedal # goldmedal - Jay Shin -houqp # houqp - QP Hou -huaxingao # huaxingao - Huaxin Gao -isidentical # iffyio - Batuhan Taskaya -jackwener # jakevin - Jack Wener -jayzhan211 # jayzhan - Jay Zhan -Jefffrey # jeffreyvo - Jeffrey -jiangzhx # jiayuliu - Jiayu Liu -jonahgao # jonah - Jonah Gao -korowa # korowa - Oleks Koval kazuyukitanimura # kazuyukitanimura - Kazuyuki Tanimura -kosiew # kosiew - Ko Siew -lewiszlw # linwei - Lin Wei -liukun4515 # liukun - Kun Liu mbutrovich # mbutrovich - Matt Butrovich -metesynnada # mete - Mehmet Ozan Kabak -milenkovicm # milenkovicm - Marko Milenkovic -mingmwang # mingmwang - Mingming Wang -matthewmturner # mjward - Matthew Turner -crepererum # mneumann - Marco Neumann -yahoNanJing # nju_yaho - Yaho -nuno-faria # nunofaria - Nuno Faria -ozankabak # ozankabak - Ozan Kabak (see also metesynnada) -paddyhoran # paddyhoran - Paddy Horan parthchandra # parthc - Parth Chandra -rdettai # rdettai - Raphael Dettai (not in contributor list, keep Apache ID) -rluvaton # rluvaton - Raz Luvaton -sunchao # sunchao - Chao Sun -thinkharderdev # thinkharderdev - Andrew Lamb (alt) -timsaucer # timsaucer - Tim Saucer -tustvold # tustvold - Raphael Taylor-Davies -viirya # viirya - Liang-Chi Hsieh -waynexia # wayne - Wayne Xia -Weijun-H # weijun - Weijun Huang -wesm # wesm - Wes McKinney -wjones127 # wjones127 - Will Jones -xudong963 # xudong963 - Xudong -Ted-Jiang # yangjiang - Ted Jiang -ycohen # ycohen - Yoni Cohen (not in contributor list, keep Apache ID) -yjshen # yjshen - Yijie Shen -2010YOUY01 # ytyou - Yongting You -zhuqi-lucas # zhuqi - Zhu Qi diff --git a/dev/benchmarking-bot/deploy/deploy.sh b/dev/benchmarking-bot/deploy/deploy.sh index 99a69a64ef..7a8ab1a2b7 100755 --- a/dev/benchmarking-bot/deploy/deploy.sh +++ b/dev/benchmarking-bot/deploy/deploy.sh @@ -1,4 +1,22 @@ #!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + # Deploy cometbot to a remote host # # Required environment variables: diff --git a/dev/benchmarking-bot/k8s/bot-deployment.yaml b/dev/benchmarking-bot/k8s/bot-deployment.yaml index 1e7bff2775..9482827c61 100644 --- a/dev/benchmarking-bot/k8s/bot-deployment.yaml +++ b/dev/benchmarking-bot/k8s/bot-deployment.yaml @@ -1,3 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + apiVersion: apps/v1 kind: Deployment metadata: diff --git a/dev/benchmarking-bot/k8s/comet-job-template.yaml b/dev/benchmarking-bot/k8s/comet-job-template.yaml index 36c2ae695f..fcf43f8907 100644 --- a/dev/benchmarking-bot/k8s/comet-job-template.yaml +++ b/dev/benchmarking-bot/k8s/comet-job-template.yaml @@ -1,3 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + apiVersion: batch/v1 kind: Job metadata: diff --git a/dev/benchmarking-bot/pyproject.toml b/dev/benchmarking-bot/pyproject.toml index dc88586749..6648237290 100644 --- a/dev/benchmarking-bot/pyproject.toml +++ b/dev/benchmarking-bot/pyproject.toml @@ -1,3 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + [project] name = "cometbot" version = "0.1.0" diff --git a/dev/benchmarking-bot/src/cometbot/__init__.py b/dev/benchmarking-bot/src/cometbot/__init__.py index d7e66910be..f363cf05de 100644 --- a/dev/benchmarking-bot/src/cometbot/__init__.py +++ b/dev/benchmarking-bot/src/cometbot/__init__.py @@ -1,3 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + """Comet Benchmark Automation CLI.""" __version__ = "0.1.0" diff --git a/dev/benchmarking-bot/src/cometbot/bot.py b/dev/benchmarking-bot/src/cometbot/bot.py index d8b7134e08..c75c729d05 100644 --- a/dev/benchmarking-bot/src/cometbot/bot.py +++ b/dev/benchmarking-bot/src/cometbot/bot.py @@ -1,3 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + """GitHub bot that monitors Comet PRs for benchmark requests.""" import json @@ -38,8 +55,7 @@ def _load_authorized_users() -> set[str]: if line: users.add(line) if not users: - # Fallback if file is missing - users = {"andygrove", "comphead", "mbutrovich", "parthchandra"} + console.print("[bold red]WARNING: No authorized users loaded. No one will be able to trigger benchmarks.[/bold red]") return users AUTHORIZED_USERS = _load_authorized_users() @@ -73,7 +89,10 @@ def load_state() -> dict: def save_state(state: dict) -> None: - """Save processed comments state.""" + """Save processed comments state. Prunes to last 10000 comment IDs.""" + max_comments = 10000 + if len(state["processed_comments"]) > max_comments: + state["processed_comments"] = state["processed_comments"][-max_comments:] with open(STATE_FILE, "w") as f: json.dump(state, f) @@ -294,8 +313,21 @@ def parse_benchmark_request(comment: dict, repo: str, pr_number: int) -> Benchma ) -def parse_comet_configs(args: list[str]) -> dict[str, str]: - """Parse --conf arguments and filter to only spark.comet.* configs.""" +def _is_safe_git_ref(ref: str) -> bool: + """Validate that a git ref name is safe (no shell metacharacters).""" + return bool(re.match(r'^[a-zA-Z0-9._\-/]+$', ref)) and len(ref) <= 128 + + +def _is_safe_class_name(name: str) -> bool: + """Validate that a benchmark class name is safe (Java class name chars only).""" + return bool(re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', name)) and len(name) <= 128 + + +def parse_comet_configs(args: list[str]) -> dict[str, str] | None: + """Parse --conf arguments and filter to only spark.comet.* configs. + + Returns None if any config value fails validation. + """ configs = {} i = 0 while i < len(args): @@ -304,17 +336,25 @@ def parse_comet_configs(args: list[str]) -> dict[str, str]: if "=" in conf: key, value = conf.split("=", 1) # Only allow spark.comet.* configs for security - if key.startswith("spark.comet."): - configs[key] = value - else: + if not key.startswith("spark.comet."): console.print(f"[yellow]Ignoring non-comet config: {key}[/yellow]") + i += 2 + continue + # Validate key and value contain no shell metacharacters + if not re.match(r'^[a-zA-Z0-9._\-]+$', key): + console.print(f"[red]Invalid config key: {key!r}[/red]") + return None + if not re.match(r'^[a-zA-Z0-9._\-/=:]+$', value): + console.print(f"[red]Invalid config value for {key}: {value!r}[/red]") + return None + configs[key] = value i += 2 else: i += 1 return configs -def run_tpch_benchmark(token: str, pr_number: int, comment_id: int, args: list[str]) -> str: +def run_tpch_benchmark(token: str, pr_number: int, comment_id: int, args: list[str]) -> str | None: """Run TPC-H benchmark for a PR. Builds the Docker image with PR-specific tag, submits the K8s job, @@ -348,8 +388,15 @@ def run_tpch_benchmark(token: str, pr_number: int, comment_id: int, args: list[s elif arg in ("--baseline", "-b") and i + 1 < len(args): baseline_branch = args[i + 1] + # Validate baseline branch name to prevent injection + if not _is_safe_git_ref(baseline_branch): + console.print(f"[red]Invalid baseline branch name: {baseline_branch!r}[/red]") + return None + # Parse --conf arguments (only spark.comet.* allowed) comet_configs = parse_comet_configs(args) + if comet_configs is None: + return None # Validation failed console.print(f"[bold blue]Running TPC-H benchmark for PR #{pr_number}[/bold blue]") console.print(f"Iterations: {iterations}, Baseline: {baseline_branch}") @@ -404,6 +451,12 @@ def run_micro_benchmark(token: str, pr_number: int, comment_id: int, args: list[ return None benchmark_class = args[0] + + # Validate benchmark class name to prevent injection + if not _is_safe_class_name(benchmark_class): + console.print(f"[red]Invalid benchmark class name: {benchmark_class!r}[/red]") + return None + baseline_branch = "main" # Parse remaining args @@ -411,6 +464,11 @@ def run_micro_benchmark(token: str, pr_number: int, comment_id: int, args: list[ if arg in ("--baseline", "-b") and i + 1 < len(args): baseline_branch = args[i + 1] + # Validate baseline branch name to prevent injection + if not _is_safe_git_ref(baseline_branch): + console.print(f"[red]Invalid baseline branch name: {baseline_branch!r}[/red]") + return None + console.print(f"[bold blue]Running microbenchmark for PR #{pr_number}[/bold blue]") console.print(f"Benchmark: {benchmark_class}, Baseline: {baseline_branch}") diff --git a/dev/benchmarking-bot/src/cometbot/cli.py b/dev/benchmarking-bot/src/cometbot/cli.py index 4f812995e9..b7f449ba28 100644 --- a/dev/benchmarking-bot/src/cometbot/cli.py +++ b/dev/benchmarking-bot/src/cometbot/cli.py @@ -1,3 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + """CLI entry point for Comet benchmark automation.""" import click diff --git a/dev/benchmarking-bot/src/cometbot/k8s.py b/dev/benchmarking-bot/src/cometbot/k8s.py index bf272990ab..48f552e5b5 100644 --- a/dev/benchmarking-bot/src/cometbot/k8s.py +++ b/dev/benchmarking-bot/src/cometbot/k8s.py @@ -1,3 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + """Kubernetes operations for running Comet benchmarks in a cluster.""" import os diff --git a/dev/benchmarking-bot/src/cometbot/slack.py b/dev/benchmarking-bot/src/cometbot/slack.py index 2cc3b6661d..7e6881bd19 100644 --- a/dev/benchmarking-bot/src/cometbot/slack.py +++ b/dev/benchmarking-bot/src/cometbot/slack.py @@ -1,3 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + """Slack notifications for the cometbot.""" import os diff --git a/docs/source/contributor-guide/benchmark-bot.md b/docs/source/contributor-guide/benchmark-bot.md index 20ca915ac0..f41096ec28 100644 --- a/docs/source/contributor-guide/benchmark-bot.md +++ b/docs/source/contributor-guide/benchmark-bot.md @@ -37,11 +37,11 @@ Run the full TPC-H query suite (SF100) comparing the PR against the main branch: Options: -| Flag | Description | Default | -|------|-------------|---------| -| `--iterations N` | Number of benchmark iterations (max 3) | 1 | -| `--baseline BRANCH` | Branch to compare against | `main` | -| `--conf KEY=VALUE` | Spark/Comet config override (only `spark.comet.*` keys allowed). Can be repeated. | — | +| Flag | Description | Default | +| ------------------- | --------------------------------------------------------------------------------- | ------- | +| `--iterations N` | Number of benchmark iterations (max 3) | 1 | +| `--baseline BRANCH` | Branch to compare against | `main` | +| `--conf KEY=VALUE` | Spark/Comet config override (only `spark.comet.*` keys allowed). Can be repeated. | — | Examples: @@ -62,9 +62,9 @@ Run a specific JMH microbenchmark class: Options: -| Flag | Description | Default | -|------|-------------|---------| -| `--baseline BRANCH` | Branch to compare against | `main` | +| Flag | Description | Default | +| ------------------- | ------------------------- | ------- | +| `--baseline BRANCH` | Branch to compare against | `main` | Examples: @@ -90,10 +90,10 @@ Posts the usage reference as a PR comment. The bot uses GitHub reactions to signal progress: -| Reaction | Meaning | -|----------|---------| -| :eyes: | Request acknowledged, job submitted | -| :rocket: | Job completed successfully | +| Reaction | Meaning | +| -------- | ----------------------------------- | +| :eyes: | Request acknowledged, job submitted | +| :rocket: | Job completed successfully | ### TPC-H Results @@ -247,8 +247,8 @@ package in a virtualenv, and sets up a systemd service. Required environment variables on the deployment host: -| Variable | Description | -|----------|-------------| -| `COMETBOT_GITHUB_TOKEN` | GitHub token for API access and posting results | -| `COMETBOT_SLACK_TOKEN` | (Optional) Slack bot token for notifications | -| `COMETBOT_SLACK_CHANNEL` | (Optional) Slack channel for notifications | +| Variable | Description | +| ------------------------ | ----------------------------------------------- | +| `COMETBOT_GITHUB_TOKEN` | GitHub token for API access and posting results | +| `COMETBOT_SLACK_TOKEN` | (Optional) Slack bot token for notifications | +| `COMETBOT_SLACK_CHANNEL` | (Optional) Slack channel for notifications |