diff --git a/.github/workflows/python-ci.yml b/.github/workflows/python-ci.yml index 9f8ebf7215..6a2b144f05 100644 --- a/.github/workflows/python-ci.yml +++ b/.github/workflows/python-ci.yml @@ -75,6 +75,9 @@ jobs: steps: - uses: actions/checkout@v5 + - uses: actions/setup-python@v6 + with: + python-version: ${{ matrix.python }} - name: Install system dependencies run: sudo apt-get update && sudo apt-get install -y libkrb5-dev # for kerberos - name: Install diff --git a/Makefile b/Makefile index cbcc26dd21..d142c5ad1c 100644 --- a/Makefile +++ b/Makefile @@ -100,10 +100,8 @@ test-integration: test-integration-setup test-integration-exec test-integration- test-integration-setup: ## Start Docker services for integration tests docker compose -f dev/docker-compose-integration.yml kill docker compose -f dev/docker-compose-integration.yml rm -f - docker compose -f dev/docker-compose-integration.yml up -d - sleep 10 - docker compose -f dev/docker-compose-integration.yml cp ./dev/provision.py spark-iceberg:/opt/spark/provision.py - docker compose -f dev/docker-compose-integration.yml exec -T spark-iceberg ipython ./provision.py + docker compose -f dev/docker-compose-integration.yml up -d --wait + $(POETRY) run python dev/provision.py test-integration-exec: ## Run integration tests (excluding provision) $(TEST_RUNNER) pytest tests/ -m integration $(PYTEST_ARGS) diff --git a/dev/Dockerfile b/dev/Dockerfile index 77ba154851..d0fc6a4fdd 100644 --- a/dev/Dockerfile +++ b/dev/Dockerfile @@ -13,86 +13,57 @@ # See the License for the specific language governing permissions and # limitations under the License. -FROM python:3.12-bullseye +ARG BASE_IMAGE_SPARK_VERSION=3.5.6 -RUN apt-get -qq update && \ - apt-get -qq install -y --no-install-recommends \ - sudo \ - curl \ - vim \ - unzip \ - openjdk-11-jdk \ - build-essential \ - software-properties-common \ - ssh && \ - apt-get -qq clean && \ - rm -rf /var/lib/apt/lists/* +FROM apache/spark:${BASE_IMAGE_SPARK_VERSION} -# Optional env variables -ENV SPARK_HOME=${SPARK_HOME:-"/opt/spark"} -ENV HADOOP_HOME=${HADOOP_HOME:-"/opt/hadoop"} -ENV PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH +# Dependency versions - keep these compatible +ARG ICEBERG_VERSION=1.10.0 +ARG ICEBERG_SPARK_RUNTIME_VERSION=3.5_2.12 +ARG SPARK_VERSION=3.5.6 +ARG SCALA_VERSION=2.12 +ARG HADOOP_VERSION=3.3.4 +ARG AWS_SDK_VERSION=1.12.753 +ARG MAVEN_MIRROR=https://repo.maven.apache.org/maven2 -RUN mkdir -p ${HADOOP_HOME} && mkdir -p ${SPARK_HOME} && mkdir -p /home/iceberg/spark-events +USER root WORKDIR ${SPARK_HOME} -ENV SPARK_VERSION=3.5.6 -ENV SCALA_VERSION=2.12 -ENV ICEBERG_SPARK_RUNTIME_VERSION=3.5_${SCALA_VERSION} -ENV ICEBERG_VERSION=1.10.0 -ENV PYICEBERG_VERSION=0.10.0 -ENV HADOOP_VERSION=3.3.4 -ENV AWS_SDK_VERSION=1.12.753 - -# Try the primary Apache mirror (downloads.apache.org) first, then fall back to the archive -RUN set -eux; \ - FILE=spark-${SPARK_VERSION}-bin-hadoop3.tgz; \ - URLS="https://downloads.apache.org/spark/spark-${SPARK_VERSION}/${FILE} https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/${FILE}"; \ - for url in $URLS; do \ - echo "Attempting download: $url"; \ - if curl --retry 3 --retry-delay 5 -f -s -C - "$url" -o "$FILE"; then \ - echo "Downloaded from: $url"; \ - break; \ - else \ - echo "Failed to download from: $url"; \ - fi; \ - done; \ - if [ ! -f "$FILE" ]; then echo "Failed to download Spark from all mirrors" >&2; exit 1; fi; \ - tar xzf "$FILE" --directory /opt/spark --strip-components 1; \ - rm -rf "$FILE" - -# Download Spark Connect server JAR -RUN curl --retry 5 -s -L https://repo1.maven.org/maven2/org/apache/spark/spark-connect_${SCALA_VERSION}/${SPARK_VERSION}/spark-connect_${SCALA_VERSION}-${SPARK_VERSION}.jar \ - -Lo /opt/spark/jars/spark-connect_${SCALA_VERSION}-${SPARK_VERSION}.jar - -# Download iceberg spark runtime -RUN curl --retry 5 -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}/${ICEBERG_VERSION}/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar \ - -Lo /opt/spark/jars/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar - -# Download AWS bundle -RUN curl --retry 5 -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar \ - -Lo /opt/spark/jars/iceberg-aws-bundle-${ICEBERG_VERSION}.jar - -# Download hadoop-aws (required for S3 support) -RUN curl --retry 5 -s https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar \ - -Lo /opt/spark/jars/hadoop-aws-${HADOOP_VERSION}.jar - -# Download AWS SDK bundle -RUN curl --retry 5 -s https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_VERSION}/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar \ - -Lo /opt/spark/jars/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar - -COPY spark-defaults.conf /opt/spark/conf -ENV PATH="/opt/spark/sbin:/opt/spark/bin:${PATH}" - -RUN chmod u+x /opt/spark/sbin/* && \ - chmod u+x /opt/spark/bin/* - -RUN pip3 install -q ipython - -RUN pip3 install "pyiceberg[s3fs,hive,pyarrow]==${PYICEBERG_VERSION}" +# Install curl for JAR downloads +RUN apt-get update && \ + apt-get install -y --no-install-recommends curl && \ + rm -rf /var/lib/apt/lists/* -COPY entrypoint.sh . -COPY provision.py . +# Copy configuration (early for better caching) +COPY --chown=spark:spark spark-defaults.conf ${SPARK_HOME}/conf/ + +# Create event log directory +RUN mkdir -p /home/iceberg/spark-events && \ + chown -R spark:spark /home/iceberg + +# Required JAR dependencies +ENV JARS_TO_DOWNLOAD="\ + org/apache/spark/spark-connect_${SCALA_VERSION}/${SPARK_VERSION}/spark-connect_${SCALA_VERSION}-${SPARK_VERSION}.jar \ + org/apache/iceberg/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}/${ICEBERG_VERSION}/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar \ + org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar \ + org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar \ + com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_VERSION}/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar" + +# Download JARs with retry logic +RUN set -e && \ + cd "${SPARK_HOME}/jars" && \ + for jar_path in ${JARS_TO_DOWNLOAD}; do \ + jar_name=$(basename "${jar_path}") && \ + echo "Downloading ${jar_name}..." && \ + curl -fsSL --retry 3 --retry-delay 5 \ + -o "${jar_name}" \ + "${MAVEN_MIRROR}/${jar_path}" && \ + echo "✓ Downloaded ${jar_name}"; \ + done && \ + chown -R spark:spark "${SPARK_HOME}/jars" + +USER spark +WORKDIR ${SPARK_HOME} -ENTRYPOINT ["./entrypoint.sh"] -CMD ["notebook"] +# Start Spark Connect server +CMD ["sh", "-c", "SPARK_NO_DAEMONIZE=true ${SPARK_HOME}/sbin/start-connect-server.sh"] diff --git a/dev/docker-compose-integration.yml b/dev/docker-compose-integration.yml index ec4245bcbf..9e983356f6 100644 --- a/dev/docker-compose-integration.yml +++ b/dev/docker-compose-integration.yml @@ -17,7 +17,6 @@ services: spark-iceberg: - image: python-integration container_name: pyiceberg-spark build: . networks: @@ -37,6 +36,12 @@ services: - rest:rest - hive:hive - minio:minio + healthcheck: + test: ["CMD", "sh", "-c", "netstat -an | grep 15002 | grep LISTEN"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 90s rest: image: apache/iceberg-rest-fixture container_name: pyiceberg-rest @@ -87,7 +92,7 @@ services: " hive: build: hive/ - container_name: hive + container_name: pyiceberg-hive hostname: hive networks: iceberg_net: diff --git a/dev/entrypoint.sh b/dev/entrypoint.sh deleted file mode 100755 index 3912eb4b15..0000000000 --- a/dev/entrypoint.sh +++ /dev/null @@ -1,23 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -start-connect-server.sh - -tail -f /dev/null diff --git a/dev/provision.py b/dev/provision.py index 71bbbd73c3..695ef9b1bf 100644 --- a/dev/provision.py +++ b/dev/provision.py @@ -14,7 +14,6 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -import math from pyspark.sql import SparkSession from pyspark.sql.functions import current_date, date_add, expr @@ -23,35 +22,26 @@ from pyiceberg.schema import Schema from pyiceberg.types import FixedType, NestedField, UUIDType -# The configuration is important, otherwise we get many small -# parquet files with a single row. When a positional delete -# hits the Parquet file with one row, the parquet file gets -# dropped instead of having a merge-on-read delete file. -spark = ( - SparkSession - .builder - .config("spark.sql.shuffle.partitions", "1") - .config("spark.default.parallelism", "1") - .getOrCreate() -) +# Create SparkSession against the remote Spark Connect server +spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate() catalogs = { - 'rest': load_catalog( + "rest": load_catalog( "rest", **{ "type": "rest", - "uri": "http://rest:8181", - "s3.endpoint": "http://minio:9000", + "uri": "http://localhost:8181", + "s3.endpoint": "http://localhost:9000", "s3.access-key-id": "admin", "s3.secret-access-key": "password", }, ), - 'hive': load_catalog( + "hive": load_catalog( "hive", **{ "type": "hive", - "uri": "thrift://hive:9083", - "s3.endpoint": "http://minio:9000", + "uri": "thrift://localhost:9083", + "s3.endpoint": "http://localhost:9000", "s3.access-key-id": "admin", "s3.secret-access-key": "password", }, @@ -119,7 +109,7 @@ # v3: Using deletion vectors for format_version in [2, 3]: - identifier = f'{catalog_name}.default.test_positional_mor_deletes_v{format_version}' + identifier = f"{catalog_name}.default.test_positional_mor_deletes_v{format_version}" spark.sql( f""" CREATE OR REPLACE TABLE {identifier} ( @@ -137,10 +127,8 @@ """ ) - spark.sql( - f""" - INSERT INTO {identifier} - VALUES + spark.sql(""" + SELECT * FROM VALUES (CAST('2023-03-01' AS date), 1, 'a'), (CAST('2023-03-02' AS date), 2, 'b'), (CAST('2023-03-03' AS date), 3, 'c'), @@ -152,9 +140,9 @@ (CAST('2023-03-09' AS date), 9, 'i'), (CAST('2023-03-10' AS date), 10, 'j'), (CAST('2023-03-11' AS date), 11, 'k'), - (CAST('2023-03-12' AS date), 12, 'l'); - """ - ) + (CAST('2023-03-12' AS date), 12, 'l') + AS t(dt, number, letter) + """).coalesce(1).writeTo(identifier).append() spark.sql(f"ALTER TABLE {identifier} CREATE TAG tag_12") @@ -164,7 +152,7 @@ spark.sql(f"DELETE FROM {identifier} WHERE number = 9") - identifier = f'{catalog_name}.default.test_positional_mor_double_deletes_v{format_version}' + identifier = f"{catalog_name}.default.test_positional_mor_double_deletes_v{format_version}" spark.sql( f""" @@ -178,15 +166,13 @@ 'write.delete.mode'='merge-on-read', 'write.update.mode'='merge-on-read', 'write.merge.mode'='merge-on-read', - 'format-version'='2' + 'format-version'='{format_version}' ); """ ) - spark.sql( - f""" - INSERT INTO {identifier} - VALUES + spark.sql(""" + SELECT * FROM VALUES (CAST('2023-03-01' AS date), 1, 'a'), (CAST('2023-03-02' AS date), 2, 'b'), (CAST('2023-03-03' AS date), 3, 'c'), @@ -198,9 +184,9 @@ (CAST('2023-03-09' AS date), 9, 'i'), (CAST('2023-03-10' AS date), 10, 'j'), (CAST('2023-03-11' AS date), 11, 'k'), - (CAST('2023-03-12' AS date), 12, 'l'); - """ - ) + (CAST('2023-03-12' AS date), 12, 'l') + AS t(dt, number, letter) + """).coalesce(1).writeTo(identifier).append() # Perform two deletes, should produce: # v2: two positional delete files in v2 diff --git a/mkdocs/docs/how-to-release.md b/mkdocs/docs/how-to-release.md index a2fa3f7047..cefc982d54 100644 --- a/mkdocs/docs/how-to-release.md +++ b/mkdocs/docs/how-to-release.md @@ -389,10 +389,6 @@ Run the [`Release Docs` Github Action](https://github.com/apache/iceberg-python/ Make sure to create a PR to update the [GitHub issues template](https://github.com/apache/iceberg-python/blob/main/.github/ISSUE_TEMPLATE/iceberg_bug_report.yml) with the latest version. -### Update the integration tests - -Ensure to update the `PYICEBERG_VERSION` in the [Dockerfile](https://github.com/apache/iceberg-python/blob/main/dev/Dockerfile). - ## Misc ### Set up GPG key and Upload to Apache Iceberg KEYS file diff --git a/ruff.toml b/ruff.toml index 11fd2a957b..b7bc461cf6 100644 --- a/ruff.toml +++ b/ruff.toml @@ -16,7 +16,6 @@ # under the License. src = ['pyiceberg','tests'] -extend-exclude = ["dev/provision.py"] # Exclude a variety of commonly ignored directories. exclude = [ diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 375eb35b2b..99116ad16f 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -432,6 +432,8 @@ def test_pyarrow_deletes(catalog: Catalog, format_version: int) -> None: # (11, 'k'), # (12, 'l') test_positional_mor_deletes = catalog.load_table(f"default.test_positional_mor_deletes_v{format_version}") + if format_version == 2: + assert len(test_positional_mor_deletes.inspect.delete_files()) > 0, "Table should produce position delete files" arrow_table = test_positional_mor_deletes.scan().to_arrow() assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12] @@ -470,6 +472,8 @@ def test_pyarrow_deletes_double(catalog: Catalog, format_version: int) -> None: # (11, 'k'), # (12, 'l') test_positional_mor_double_deletes = catalog.load_table(f"default.test_positional_mor_double_deletes_v{format_version}") + if format_version == 2: + assert len(test_positional_mor_double_deletes.inspect.delete_files()) > 0, "Table should produce position delete files" arrow_table = test_positional_mor_double_deletes.scan().to_arrow() assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 7, 8, 10, 11, 12] @@ -508,6 +512,8 @@ def test_pyarrow_batches_deletes(catalog: Catalog, format_version: int) -> None: # (11, 'k'), # (12, 'l') test_positional_mor_deletes = catalog.load_table(f"default.test_positional_mor_deletes_v{format_version}") + if format_version == 2: + assert len(test_positional_mor_deletes.inspect.delete_files()) > 0, "Table should produce position delete files" arrow_table = test_positional_mor_deletes.scan().to_arrow_batch_reader().read_all() assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12] @@ -550,6 +556,8 @@ def test_pyarrow_batches_deletes_double(catalog: Catalog, format_version: int) - # (11, 'k'), # (12, 'l') test_positional_mor_double_deletes = catalog.load_table(f"default.test_positional_mor_double_deletes_v{format_version}") + if format_version == 2: + assert len(test_positional_mor_double_deletes.inspect.delete_files()) > 0, "Table should produce position delete files" arrow_table = test_positional_mor_double_deletes.scan().to_arrow_batch_reader().read_all() assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 7, 8, 10, 11, 12]