diff --git a/tests/conftest.py b/tests/conftest.py index 967180bc5a..58ed010d95 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2502,9 +2502,13 @@ def spark() -> "SparkSession": spark_version = ".".join(importlib.metadata.version("pyspark").split(".")[:2]) scala_version = "2.12" iceberg_version = "1.9.2" + hadoop_version = "3.3.4" + aws_sdk_version = "1.12.753" os.environ["PYSPARK_SUBMIT_ARGS"] = ( f"--packages org.apache.iceberg:iceberg-spark-runtime-{spark_version}_{scala_version}:{iceberg_version}," + f"org.apache.hadoop:hadoop-aws:{hadoop_version}," + f"com.amazonaws:aws-java-sdk-bundle:{aws_sdk_version}," f"org.apache.iceberg:iceberg-aws-bundle:{iceberg_version} pyspark-shell" ) os.environ["AWS_REGION"] = "us-east-1" @@ -2526,7 +2530,6 @@ def spark() -> "SparkSession": .config("spark.sql.catalog.integration.warehouse", "s3://warehouse/wh/") .config("spark.sql.catalog.integration.s3.endpoint", "http://localhost:9000") .config("spark.sql.catalog.integration.s3.path-style-access", "true") - .config("spark.sql.defaultCatalog", "integration") .config("spark.sql.catalog.hive", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.hive.type", "hive") .config("spark.sql.catalog.hive.uri", "http://localhost:9083") @@ -2534,6 +2537,14 @@ def spark() -> "SparkSession": .config("spark.sql.catalog.hive.warehouse", "s3://warehouse/hive/") .config("spark.sql.catalog.hive.s3.endpoint", "http://localhost:9000") .config("spark.sql.catalog.hive.s3.path-style-access", "true") + .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") + .config("spark.sql.catalog.spark_catalog.type", "hive") + .config("spark.sql.catalog.spark_catalog.uri", "http://localhost:9083") + .config("spark.sql.catalog.spark_catalog.warehouse", "s3://warehouse/hive/") + .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") + .config("spark.hadoop.fs.s3a.path.style.access", "true") + .config("spark.sql.catalogImplementation", "hive") + .config("spark.sql.defaultCatalog", "integration") .config("spark.sql.execution.arrow.pyspark.enabled", "true") .getOrCreate() ) diff --git a/tests/integration/test_hive_migration.py b/tests/integration/test_hive_migration.py new file mode 100644 index 0000000000..060450731e --- /dev/null +++ b/tests/integration/test_hive_migration.py @@ -0,0 +1,86 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import time + +import pytest +from pyspark.sql import SparkSession + +from pyiceberg.catalog import Catalog + + +@pytest.mark.integration +def test_migrate_table( + session_catalog_hive: Catalog, + spark: SparkSession, +) -> None: + """ + Imported tables are an edge case since the partition column is not stored + in the Parquet files: + + test_migrate_table_hive_1754486926/dt=2022-01-01/part-00000-30a9798b-7597-4027-86d9-79d7c529bc87.c000.snappy.parquet + { + "type" : "record", + "name" : "spark_schema", + "fields" : [ { + "name" : "number", + "type" : "int" + } ] + } + + PyIceberg will project this column when the table is being read + """ + # Create new tables to avoid complex cleanup + src_table_identifier = f"spark_catalog.default.test_migrate_table_hive_{int(time.time())}" + dst_table_identifier = f"default.test_migrate_table_{int(time.time())}" + + spark.sql(f""" + CREATE TABLE {src_table_identifier} ( + number INTEGER + ) + PARTITIONED BY (dt date) + STORED AS parquet + """) + + spark.sql(f""" + INSERT OVERWRITE TABLE {src_table_identifier} + PARTITION (dt='2022-01-01') + VALUES (1), (2), (3) + """) + + spark.sql(f""" + INSERT OVERWRITE TABLE {src_table_identifier} + PARTITION (dt='2023-01-01') + VALUES (4), (5), (6) + """) + + # Docs: https://iceberg.apache.org/docs/latest/hive-migration/#snapshot-hive-table-to-iceberg + spark.sql(f""" + CALL hive.system.snapshot('{src_table_identifier}', 'hive.{dst_table_identifier}') + """) + + tbl = session_catalog_hive.load_table(dst_table_identifier) + assert tbl.schema().column_names == ["number", "dt"] + + # TODO: Returns the primitive type (int), rather than the logical type + # assert set(tbl.scan().to_arrow().column(1).combine_chunks().tolist()) == {'2022-01-01', '2023-01-01'} + + assert tbl.scan(row_filter="number > 3").to_arrow().column(0).combine_chunks().tolist() == [4, 5, 6] + + assert tbl.scan(row_filter="dt == '2023-01-01'").to_arrow().column(0).combine_chunks().tolist() == [4, 5, 6] + + # TODO: Issue with filtering the projected column + # assert tbl.scan(row_filter="dt == '2022-01-01'").to_arrow().column(0).combine_chunks().tolist() == [1, 2, 3]