From 21b2d5ac6824ff629a88a11c65e96959f5ec0f11 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 6 Aug 2025 16:01:43 +0200 Subject: [PATCH 1/7] Add test for migrated tables Identified two issues that can be worked on in parallel --- tests/conftest.py | 17 ++++- tests/integration/test_hive_migration.py | 84 ++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 1 deletion(-) create mode 100644 tests/integration/test_hive_migration.py diff --git a/tests/conftest.py b/tests/conftest.py index 79560bc532..f09af6186a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2502,9 +2502,14 @@ def spark() -> "SparkSession": spark_version = ".".join(importlib.metadata.version("pyspark").split(".")[:2]) scala_version = "2.12" iceberg_version = "1.9.0" + # Should match with Spark: + hadoop_version = "3.3.4" + aws_sdk_version = "1.12.753" os.environ["PYSPARK_SUBMIT_ARGS"] = ( f"--packages org.apache.iceberg:iceberg-spark-runtime-{spark_version}_{scala_version}:{iceberg_version}," + f"org.apache.hadoop:hadoop-aws:{hadoop_version}," + f"com.amazonaws:aws-java-sdk-bundle:{aws_sdk_version}," f"org.apache.iceberg:iceberg-aws-bundle:{iceberg_version} pyspark-shell" ) os.environ["AWS_REGION"] = "us-east-1" @@ -2518,6 +2523,8 @@ def spark() -> "SparkSession": .config("spark.sql.shuffle.partitions", "1") .config("spark.default.parallelism", "1") .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") + .config("spark.hadoop.fs.s3a.path.style.access", "true") .config("spark.sql.catalog.integration", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.integration.catalog-impl", "org.apache.iceberg.rest.RESTCatalog") .config("spark.sql.catalog.integration.cache-enabled", "false") @@ -2526,7 +2533,6 @@ def spark() -> "SparkSession": .config("spark.sql.catalog.integration.warehouse", "s3://warehouse/wh/") .config("spark.sql.catalog.integration.s3.endpoint", "http://localhost:9000") .config("spark.sql.catalog.integration.s3.path-style-access", "true") - .config("spark.sql.defaultCatalog", "integration") .config("spark.sql.catalog.hive", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.hive.type", "hive") .config("spark.sql.catalog.hive.uri", "http://localhost:9083") @@ -2534,6 +2540,15 @@ def spark() -> "SparkSession": .config("spark.sql.catalog.hive.warehouse", "s3://warehouse/hive/") .config("spark.sql.catalog.hive.s3.endpoint", "http://localhost:9000") .config("spark.sql.catalog.hive.s3.path-style-access", "true") + .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") + .config("spark.sql.catalog.spark_catalog.type", "hive") + .config("spark.sql.catalog.spark_catalog.uri", "http://localhost:9083") + .config("spark.sql.catalog.spark_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") + .config("spark.sql.catalog.spark_catalog.warehouse", "s3://warehouse/hive/") + .config("spark.sql.catalog.spark_catalog.s3.endpoint", "http://localhost:9000") + .config("spark.sql.catalog.spark_catalog.s3.path-style-access", "true") + .config("spark.sql.catalogImplementation", "hive") + .config("spark.sql.defaultCatalog", "integration") .config("spark.sql.execution.arrow.pyspark.enabled", "true") .getOrCreate() ) diff --git a/tests/integration/test_hive_migration.py b/tests/integration/test_hive_migration.py new file mode 100644 index 0000000000..b457c6d046 --- /dev/null +++ b/tests/integration/test_hive_migration.py @@ -0,0 +1,84 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import time + +import pytest +from pyspark.sql import SparkSession + +from pyiceberg.catalog import Catalog + + +@pytest.mark.integration +def test_migrate_table( + session_catalog_hive: Catalog, + spark: SparkSession, +) -> None: + """ + Imported tables are an edge case since the partition column is not stored + in the Parquet files: + + test_migrate_table_hive_1754486926/dt=2022-01-01/part-00000-30a9798b-7597-4027-86d9-79d7c529bc87.c000.snappy.parquet + { + "type" : "record", + "name" : "spark_schema", + "fields" : [ { + "name" : "number", + "type" : "int" + } ] + } + + PyIceberg will project this column when the table is being read + """ + # Create new tables to avoid complex cleanup + src_table_identifier = f"spark_catalog.default.test_migrate_table_hive_{int(time.time())}" + dst_table_identifier = f"default.test_migrate_table_{int(time.time())}" + + spark.sql(f""" + CREATE TABLE {src_table_identifier} ( + number INTEGER + ) + PARTITIONED BY (dt date) + STORED AS parquet + """) + + spark.sql(f""" + INSERT OVERWRITE TABLE {src_table_identifier} + PARTITION (dt='2022-01-01') + VALUES (1), (2), (3) + """) + + spark.sql(f""" + INSERT OVERWRITE TABLE {src_table_identifier} + PARTITION (dt='2023-01-01') + VALUES (4), (5), (6) + """) + + spark.sql(f""" + CALL hive.system.snapshot('{src_table_identifier}', 'hive.{dst_table_identifier}') + """) + + tbl = session_catalog_hive.load_table(dst_table_identifier) + assert tbl.schema().column_names == ["number", "dt"] + + # TODO: Returns the primitive type (int), rather than the logical type + # assert set(tbl.scan().to_arrow().column(1).combine_chunks().tolist()) == {'2022-01-01', '2023-01-01'} + + assert tbl.scan(row_filter="number > 3").to_arrow().column(0).combine_chunks().tolist() == [4, 5, 6] + assert tbl.scan(row_filter="dt == '2023-01-01'").to_arrow().column(0).combine_chunks().tolist() == [] + + # TODO: Issue with filtering the projected column + # assert tbl.scan(row_filter="dt == '2022-01-01'").to_arrow().column(0).combine_chunks().tolist() == [1, 2, 3] From f88b1abafb2804ce5d5b9b066d9bff4ad8c606ce Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 6 Aug 2025 16:59:48 +0200 Subject: [PATCH 2/7] Fix test --- tests/integration/test_hive_migration.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_hive_migration.py b/tests/integration/test_hive_migration.py index b457c6d046..65a548e445 100644 --- a/tests/integration/test_hive_migration.py +++ b/tests/integration/test_hive_migration.py @@ -78,7 +78,8 @@ def test_migrate_table( # assert set(tbl.scan().to_arrow().column(1).combine_chunks().tolist()) == {'2022-01-01', '2023-01-01'} assert tbl.scan(row_filter="number > 3").to_arrow().column(0).combine_chunks().tolist() == [4, 5, 6] - assert tbl.scan(row_filter="dt == '2023-01-01'").to_arrow().column(0).combine_chunks().tolist() == [] + + assert tbl.scan(row_filter="dt == '2023-01-01'").to_arrow().column(0).combine_chunks().tolist() == [4, 5, 6] # TODO: Issue with filtering the projected column # assert tbl.scan(row_filter="dt == '2022-01-01'").to_arrow().column(0).combine_chunks().tolist() == [1, 2, 3] From baa1b356cf9f091aa65522af7239cbd7ca2a0519 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 6 Aug 2025 18:45:42 +0200 Subject: [PATCH 3/7] Remove comment Co-authored-by: Kevin Liu --- tests/conftest.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index 0685b16074..d38ad9b854 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2502,7 +2502,6 @@ def spark() -> "SparkSession": spark_version = ".".join(importlib.metadata.version("pyspark").split(".")[:2]) scala_version = "2.12" iceberg_version = "1.9.2" - # Should match with Spark: hadoop_version = "3.3.4" aws_sdk_version = "1.12.753" From 7b9a7f05c621ab2ca1f1f8ff9a05409e0668c3d2 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 6 Aug 2025 18:47:31 +0200 Subject: [PATCH 4/7] Add link --- tests/integration/test_hive_migration.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/test_hive_migration.py b/tests/integration/test_hive_migration.py index 65a548e445..319534d473 100644 --- a/tests/integration/test_hive_migration.py +++ b/tests/integration/test_hive_migration.py @@ -67,6 +67,7 @@ def test_migrate_table( VALUES (4), (5), (6) """) +# Docs: https://iceberg.apache.org/docs/latest/hive-migration/#snapshot-hive-table-to-iceberg spark.sql(f""" CALL hive.system.snapshot('{src_table_identifier}', 'hive.{dst_table_identifier}') """) From 3cc50eced204d27ef1e5c8b7e09743d9c51c1f51 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 6 Aug 2025 18:54:33 +0200 Subject: [PATCH 5/7] lint --- tests/integration/test_hive_migration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_hive_migration.py b/tests/integration/test_hive_migration.py index 319534d473..060450731e 100644 --- a/tests/integration/test_hive_migration.py +++ b/tests/integration/test_hive_migration.py @@ -67,7 +67,7 @@ def test_migrate_table( VALUES (4), (5), (6) """) -# Docs: https://iceberg.apache.org/docs/latest/hive-migration/#snapshot-hive-table-to-iceberg + # Docs: https://iceberg.apache.org/docs/latest/hive-migration/#snapshot-hive-table-to-iceberg spark.sql(f""" CALL hive.system.snapshot('{src_table_identifier}', 'hive.{dst_table_identifier}') """) From 667e118164c5b83137711d98f7e01eb799605a1b Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 6 Aug 2025 19:12:31 +0200 Subject: [PATCH 6/7] Cleanup Co-authored-by: Kevin Liu --- tests/conftest.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index d38ad9b854..ffa0d81ba0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2542,10 +2542,9 @@ def spark() -> "SparkSession": .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") .config("spark.sql.catalog.spark_catalog.type", "hive") .config("spark.sql.catalog.spark_catalog.uri", "http://localhost:9083") - .config("spark.sql.catalog.spark_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") .config("spark.sql.catalog.spark_catalog.warehouse", "s3://warehouse/hive/") - .config("spark.sql.catalog.spark_catalog.s3.endpoint", "http://localhost:9000") - .config("spark.sql.catalog.spark_catalog.s3.path-style-access", "true") + .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") + .config("spark.hadoop.fs.s3a.path.style.access", "true") .config("spark.sql.catalogImplementation", "hive") .config("spark.sql.defaultCatalog", "integration") .config("spark.sql.execution.arrow.pyspark.enabled", "true") From 046bd89d6dcd4696c1a314a107c72873044ad67e Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 6 Aug 2025 19:14:28 +0200 Subject: [PATCH 7/7] Cleanup Co-authored-by: Kevin Liu --- tests/conftest.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index ffa0d81ba0..58ed010d95 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2522,8 +2522,6 @@ def spark() -> "SparkSession": .config("spark.sql.shuffle.partitions", "1") .config("spark.default.parallelism", "1") .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") - .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") - .config("spark.hadoop.fs.s3a.path.style.access", "true") .config("spark.sql.catalog.integration", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.integration.catalog-impl", "org.apache.iceberg.rest.RESTCatalog") .config("spark.sql.catalog.integration.cache-enabled", "false")