Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/python-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ jobs:

steps:
- uses: actions/checkout@v5
- uses: actions/setup-python@v6
with:
python-version: ${{ matrix.python }}
- name: Install system dependencies
run: sudo apt-get update && sudo apt-get install -y libkrb5-dev # for kerberos
- name: Install
Expand Down
6 changes: 2 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,8 @@ test-integration: test-integration-setup test-integration-exec test-integration-
test-integration-setup: ## Start Docker services for integration tests
docker compose -f dev/docker-compose-integration.yml kill
docker compose -f dev/docker-compose-integration.yml rm -f
docker compose -f dev/docker-compose-integration.yml up -d
sleep 10
docker compose -f dev/docker-compose-integration.yml cp ./dev/provision.py spark-iceberg:/opt/spark/provision.py
docker compose -f dev/docker-compose-integration.yml exec -T spark-iceberg ipython ./provision.py
docker compose -f dev/docker-compose-integration.yml up -d --wait
$(POETRY) run python dev/provision.py

test-integration-exec: ## Run integration tests (excluding provision)
$(TEST_RUNNER) pytest tests/ -m integration $(PYTEST_ARGS)
Expand Down
123 changes: 47 additions & 76 deletions dev/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,86 +13,57 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM python:3.12-bullseye
ARG BASE_IMAGE_SPARK_VERSION=3.5.6

RUN apt-get -qq update && \
apt-get -qq install -y --no-install-recommends \
sudo \
curl \
vim \
unzip \
openjdk-11-jdk \
build-essential \
software-properties-common \
ssh && \
apt-get -qq clean && \
rm -rf /var/lib/apt/lists/*
FROM apache/spark:${BASE_IMAGE_SPARK_VERSION}

# Optional env variables
ENV SPARK_HOME=${SPARK_HOME:-"/opt/spark"}
ENV HADOOP_HOME=${HADOOP_HOME:-"/opt/hadoop"}
ENV PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH
# Dependency versions - keep these compatible
ARG ICEBERG_VERSION=1.10.0
ARG ICEBERG_SPARK_RUNTIME_VERSION=3.5_2.12
ARG SPARK_VERSION=3.5.6
ARG SCALA_VERSION=2.12
ARG HADOOP_VERSION=3.3.4
ARG AWS_SDK_VERSION=1.12.753
ARG MAVEN_MIRROR=https://repo.maven.apache.org/maven2

RUN mkdir -p ${HADOOP_HOME} && mkdir -p ${SPARK_HOME} && mkdir -p /home/iceberg/spark-events
USER root
WORKDIR ${SPARK_HOME}

ENV SPARK_VERSION=3.5.6
ENV SCALA_VERSION=2.12
ENV ICEBERG_SPARK_RUNTIME_VERSION=3.5_${SCALA_VERSION}
ENV ICEBERG_VERSION=1.10.0
ENV PYICEBERG_VERSION=0.10.0
ENV HADOOP_VERSION=3.3.4
ENV AWS_SDK_VERSION=1.12.753

# Try the primary Apache mirror (downloads.apache.org) first, then fall back to the archive
RUN set -eux; \
FILE=spark-${SPARK_VERSION}-bin-hadoop3.tgz; \
URLS="https://downloads.apache.org/spark/spark-${SPARK_VERSION}/${FILE} https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/${FILE}"; \
for url in $URLS; do \
echo "Attempting download: $url"; \
if curl --retry 3 --retry-delay 5 -f -s -C - "$url" -o "$FILE"; then \
echo "Downloaded from: $url"; \
break; \
else \
echo "Failed to download from: $url"; \
fi; \
done; \
if [ ! -f "$FILE" ]; then echo "Failed to download Spark from all mirrors" >&2; exit 1; fi; \
tar xzf "$FILE" --directory /opt/spark --strip-components 1; \
rm -rf "$FILE"

# Download Spark Connect server JAR
RUN curl --retry 5 -s -L https://repo1.maven.org/maven2/org/apache/spark/spark-connect_${SCALA_VERSION}/${SPARK_VERSION}/spark-connect_${SCALA_VERSION}-${SPARK_VERSION}.jar \
-Lo /opt/spark/jars/spark-connect_${SCALA_VERSION}-${SPARK_VERSION}.jar

# Download iceberg spark runtime
RUN curl --retry 5 -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}/${ICEBERG_VERSION}/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar \
-Lo /opt/spark/jars/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar

# Download AWS bundle
RUN curl --retry 5 -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar \
-Lo /opt/spark/jars/iceberg-aws-bundle-${ICEBERG_VERSION}.jar

# Download hadoop-aws (required for S3 support)
RUN curl --retry 5 -s https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar \
-Lo /opt/spark/jars/hadoop-aws-${HADOOP_VERSION}.jar

# Download AWS SDK bundle
RUN curl --retry 5 -s https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_VERSION}/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar \
-Lo /opt/spark/jars/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar

COPY spark-defaults.conf /opt/spark/conf
ENV PATH="/opt/spark/sbin:/opt/spark/bin:${PATH}"

RUN chmod u+x /opt/spark/sbin/* && \
chmod u+x /opt/spark/bin/*

RUN pip3 install -q ipython

RUN pip3 install "pyiceberg[s3fs,hive,pyarrow]==${PYICEBERG_VERSION}"
# Install curl for JAR downloads
RUN apt-get update && \
apt-get install -y --no-install-recommends curl && \
rm -rf /var/lib/apt/lists/*

COPY entrypoint.sh .
COPY provision.py .
# Copy configuration (early for better caching)
COPY --chown=spark:spark spark-defaults.conf ${SPARK_HOME}/conf/

# Create event log directory
RUN mkdir -p /home/iceberg/spark-events && \
chown -R spark:spark /home/iceberg

# Required JAR dependencies
ENV JARS_TO_DOWNLOAD="\
org/apache/spark/spark-connect_${SCALA_VERSION}/${SPARK_VERSION}/spark-connect_${SCALA_VERSION}-${SPARK_VERSION}.jar \
org/apache/iceberg/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}/${ICEBERG_VERSION}/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar \
org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar \
org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar \
com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_VERSION}/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar"

# Download JARs with retry logic
RUN set -e && \
cd "${SPARK_HOME}/jars" && \
for jar_path in ${JARS_TO_DOWNLOAD}; do \
jar_name=$(basename "${jar_path}") && \
echo "Downloading ${jar_name}..." && \
curl -fsSL --retry 3 --retry-delay 5 \
-o "${jar_name}" \
"${MAVEN_MIRROR}/${jar_path}" && \
echo "✓ Downloaded ${jar_name}"; \
done && \
chown -R spark:spark "${SPARK_HOME}/jars"

USER spark
WORKDIR ${SPARK_HOME}

ENTRYPOINT ["./entrypoint.sh"]
CMD ["notebook"]
# Start Spark Connect server
CMD ["sh", "-c", "SPARK_NO_DAEMONIZE=true ${SPARK_HOME}/sbin/start-connect-server.sh"]
9 changes: 7 additions & 2 deletions dev/docker-compose-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

services:
spark-iceberg:
image: python-integration
container_name: pyiceberg-spark
build: .
networks:
Expand All @@ -37,6 +36,12 @@ services:
- rest:rest
- hive:hive
- minio:minio
healthcheck:
test: ["CMD", "sh", "-c", "netstat -an | grep 15002 | grep LISTEN"]
interval: 30s
timeout: 10s
retries: 5
start_period: 90s
rest:
image: apache/iceberg-rest-fixture
container_name: pyiceberg-rest
Expand Down Expand Up @@ -87,7 +92,7 @@ services:
"
hive:
build: hive/
container_name: hive
container_name: pyiceberg-hive
hostname: hive
networks:
iceberg_net:
Expand Down
23 changes: 0 additions & 23 deletions dev/entrypoint.sh

This file was deleted.

56 changes: 21 additions & 35 deletions dev/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import math

from pyspark.sql import SparkSession
from pyspark.sql.functions import current_date, date_add, expr
Expand All @@ -23,35 +22,26 @@
from pyiceberg.schema import Schema
from pyiceberg.types import FixedType, NestedField, UUIDType

# The configuration is important, otherwise we get many small
# parquet files with a single row. When a positional delete
# hits the Parquet file with one row, the parquet file gets
# dropped instead of having a merge-on-read delete file.
spark = (
SparkSession
.builder
.config("spark.sql.shuffle.partitions", "1")
.config("spark.default.parallelism", "1")
Comment on lines -33 to -34
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we've set these to avoid creating multiple files with just one row. This way, when a shuffle is performed, it will be coalesced into a single file. This affects tests such as positional deletes, because when all the rows in a single file are marked for deletion, the whole file is dropped instead of creating merge-on-read deletes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea i remember these, i can add them to spark-defaults but the tests are passing now without them 🤷

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests are passing, but we're not testing the positional deletes anymore since Spark will throw away the whole file, instead of creating the positional deletes:

The following test illustrates the problem:

diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py
index 375eb35b2..ed6e805e3 100644
--- a/tests/integration/test_reads.py
+++ b/tests/integration/test_reads.py
@@ -432,6 +432,11 @@ def test_pyarrow_deletes(catalog: Catalog, format_version: int) -> None:
     #  (11, 'k'),
     #  (12, 'l')
     test_positional_mor_deletes = catalog.load_table(f"default.test_positional_mor_deletes_v{format_version}")
+
+    if format_version == 2:
+        files = test_positional_mor_deletes.scan().plan_files()
+        assert all([len(file.delete_files) > 0 for file in files])
+
     arrow_table = test_positional_mor_deletes.scan().to_arrow()
     assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12]

This one passes on main but fails on this branch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like spark connect doesnt support these options

        .config("spark.sql.shuffle.partitions", "1")
        .config("spark.default.parallelism", "1")

And INSERT INTO writes 1 data file per row. In order to force a single data file, im using

.coalesce(1).writeTo(identifier).append()

.getOrCreate()
)
# Create SparkSession against the remote Spark Connect server
spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate()

catalogs = {
'rest': load_catalog(
"rest": load_catalog(
"rest",
**{
"type": "rest",
"uri": "http://rest:8181",
"s3.endpoint": "http://minio:9000",
"uri": "http://localhost:8181",
"s3.endpoint": "http://localhost:9000",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
},
),
'hive': load_catalog(
"hive": load_catalog(
"hive",
**{
"type": "hive",
"uri": "thrift://hive:9083",
"s3.endpoint": "http://minio:9000",
"uri": "thrift://localhost:9083",
"s3.endpoint": "http://localhost:9000",
"s3.access-key-id": "admin",
"s3.secret-access-key": "password",
},
Expand Down Expand Up @@ -119,7 +109,7 @@
# v3: Using deletion vectors

for format_version in [2, 3]:
identifier = f'{catalog_name}.default.test_positional_mor_deletes_v{format_version}'
identifier = f"{catalog_name}.default.test_positional_mor_deletes_v{format_version}"
spark.sql(
f"""
CREATE OR REPLACE TABLE {identifier} (
Expand All @@ -137,10 +127,8 @@
"""
)

spark.sql(
f"""
INSERT INTO {identifier}
VALUES
spark.sql("""
SELECT * FROM VALUES
(CAST('2023-03-01' AS date), 1, 'a'),
(CAST('2023-03-02' AS date), 2, 'b'),
(CAST('2023-03-03' AS date), 3, 'c'),
Expand All @@ -152,9 +140,9 @@
(CAST('2023-03-09' AS date), 9, 'i'),
(CAST('2023-03-10' AS date), 10, 'j'),
(CAST('2023-03-11' AS date), 11, 'k'),
(CAST('2023-03-12' AS date), 12, 'l');
"""
)
(CAST('2023-03-12' AS date), 12, 'l')
AS t(dt, number, letter)
""").coalesce(1).writeTo(identifier).append()

spark.sql(f"ALTER TABLE {identifier} CREATE TAG tag_12")

Expand All @@ -164,7 +152,7 @@

spark.sql(f"DELETE FROM {identifier} WHERE number = 9")

identifier = f'{catalog_name}.default.test_positional_mor_double_deletes_v{format_version}'
identifier = f"{catalog_name}.default.test_positional_mor_double_deletes_v{format_version}"

spark.sql(
f"""
Expand All @@ -178,15 +166,13 @@
'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read',
'format-version'='2'
'format-version'='{format_version}'
);
"""
)

spark.sql(
f"""
INSERT INTO {identifier}
VALUES
spark.sql("""
SELECT * FROM VALUES
(CAST('2023-03-01' AS date), 1, 'a'),
(CAST('2023-03-02' AS date), 2, 'b'),
(CAST('2023-03-03' AS date), 3, 'c'),
Expand All @@ -198,9 +184,9 @@
(CAST('2023-03-09' AS date), 9, 'i'),
(CAST('2023-03-10' AS date), 10, 'j'),
(CAST('2023-03-11' AS date), 11, 'k'),
(CAST('2023-03-12' AS date), 12, 'l');
"""
)
(CAST('2023-03-12' AS date), 12, 'l')
AS t(dt, number, letter)
""").coalesce(1).writeTo(identifier).append()

# Perform two deletes, should produce:
# v2: two positional delete files in v2
Expand Down
4 changes: 0 additions & 4 deletions mkdocs/docs/how-to-release.md
Original file line number Diff line number Diff line change
Expand Up @@ -389,10 +389,6 @@ Run the [`Release Docs` Github Action](https://github.com/apache/iceberg-python/

Make sure to create a PR to update the [GitHub issues template](https://github.com/apache/iceberg-python/blob/main/.github/ISSUE_TEMPLATE/iceberg_bug_report.yml) with the latest version.

### Update the integration tests

Ensure to update the `PYICEBERG_VERSION` in the [Dockerfile](https://github.com/apache/iceberg-python/blob/main/dev/Dockerfile).

## Misc

### Set up GPG key and Upload to Apache Iceberg KEYS file
Expand Down
1 change: 0 additions & 1 deletion ruff.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
# under the License.

src = ['pyiceberg','tests']
extend-exclude = ["dev/provision.py"]

# Exclude a variety of commonly ignored directories.
exclude = [
Expand Down
8 changes: 8 additions & 0 deletions tests/integration/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,8 @@ def test_pyarrow_deletes(catalog: Catalog, format_version: int) -> None:
# (11, 'k'),
# (12, 'l')
test_positional_mor_deletes = catalog.load_table(f"default.test_positional_mor_deletes_v{format_version}")
if format_version == 2:
assert len(test_positional_mor_deletes.inspect.delete_files()) > 0, "Table should produce position delete files"
arrow_table = test_positional_mor_deletes.scan().to_arrow()
assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12]

Expand Down Expand Up @@ -470,6 +472,8 @@ def test_pyarrow_deletes_double(catalog: Catalog, format_version: int) -> None:
# (11, 'k'),
# (12, 'l')
test_positional_mor_double_deletes = catalog.load_table(f"default.test_positional_mor_double_deletes_v{format_version}")
if format_version == 2:
assert len(test_positional_mor_double_deletes.inspect.delete_files()) > 0, "Table should produce position delete files"
arrow_table = test_positional_mor_double_deletes.scan().to_arrow()
assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 7, 8, 10, 11, 12]

Expand Down Expand Up @@ -508,6 +512,8 @@ def test_pyarrow_batches_deletes(catalog: Catalog, format_version: int) -> None:
# (11, 'k'),
# (12, 'l')
test_positional_mor_deletes = catalog.load_table(f"default.test_positional_mor_deletes_v{format_version}")
if format_version == 2:
assert len(test_positional_mor_deletes.inspect.delete_files()) > 0, "Table should produce position delete files"
arrow_table = test_positional_mor_deletes.scan().to_arrow_batch_reader().read_all()
assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12]

Expand Down Expand Up @@ -550,6 +556,8 @@ def test_pyarrow_batches_deletes_double(catalog: Catalog, format_version: int) -
# (11, 'k'),
# (12, 'l')
test_positional_mor_double_deletes = catalog.load_table(f"default.test_positional_mor_double_deletes_v{format_version}")
if format_version == 2:
assert len(test_positional_mor_double_deletes.inspect.delete_files()) > 0, "Table should produce position delete files"
arrow_table = test_positional_mor_double_deletes.scan().to_arrow_batch_reader().read_all()
assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 7, 8, 10, 11, 12]

Expand Down