From 25beeb25069579e391d92f64b755913d90f64ff6 Mon Sep 17 00:00:00 2001 From: Jonnis Date: Mon, 27 Jan 2025 10:33:46 +0200 Subject: [PATCH 1/5] add support for register table in hive --- pyiceberg/catalog/hive.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 40703c072a..0e22154e01 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -404,7 +404,22 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: Raises: TableAlreadyExistsError: If the table already exists """ - raise NotImplementedError + database_name, table_name = self.identifier_to_database_and_table(identifier) + io = self._load_file_io(location=metadata_location) + metadata_file = io.new_input(metadata_location) + staged_table = StagedTable( + identifier=(database_name, table_name), + metadata=FromInputFile.table_metadata(metadata_file), + metadata_location=metadata_location, + io=io, + catalog=self, + ) + tbl = self._convert_iceberg_into_hive(staged_table) + with self._client as open_client: + self._create_hive_table(open_client, tbl) + hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name) + + return self._convert_hive_into_iceberg(hive_table) def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: raise NotImplementedError From 5cedb9ac08d9d4e88380e9c6572441016be4a2c4 Mon Sep 17 00:00:00 2001 From: Jonnis Date: Mon, 27 Jan 2025 10:34:01 +0200 Subject: [PATCH 2/5] hive register table unit tests --- tests/catalog/test_hive.py | 81 ++++++++++++++++++++++++++++++++++++++ tests/conftest.py | 11 ++++++ 2 files changed, 92 insertions(+) diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py index f60cc38b15..ac54d76334 100644 --- a/tests/catalog/test_hive.py +++ b/tests/catalog/test_hive.py @@ -204,6 +204,87 @@ def test_check_number_of_namespaces(table_schema_simple: Schema) -> None: catalog.create_table("table", schema=table_schema_simple) +@pytest.mark.parametrize("hive2_compatible", [True, False]) +@patch("time.time", MagicMock(return_value=12345)) +def test_register_table( + table_schema_with_all_types: Schema, + hive_database: HiveDatabase, + hive_table: HiveTable, + hive2_compatible: bool, + metadata_with_owner_location: str, +) -> None: + catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL) + if hive2_compatible: + catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL, **{"hive.hive2-compatible": "true"}) + + catalog._client = MagicMock() + catalog._client.__enter__().create_table.return_value = None + catalog._client.__enter__().register_table.return_value = None + catalog._client.__enter__().get_table.return_value = hive_table + catalog._client.__enter__().get_database.return_value = hive_database + + catalog.register_table(("default", "table"), metadata_location=metadata_with_owner_location) + + catalog._client.__enter__().create_table.assert_called_with( + HiveTable( + tableName="table", + dbName="default", + owner="test", + createTime=12345, + lastAccessTime=12345, + retention=None, + sd=StorageDescriptor( + cols=[ + FieldSchema(name="x", type="bigint", comment=None), # Corrected columns + FieldSchema(name="y", type="bigint", comment="comment"), + FieldSchema(name="z", type="bigint", comment=None), + ], + location="s3://bucket/test/location", # Corrected location + inputFormat="org.apache.hadoop.mapred.FileInputFormat", + outputFormat="org.apache.hadoop.mapred.FileOutputFormat", + compressed=None, + numBuckets=None, + serdeInfo=SerDeInfo( + name=None, + serializationLib="org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", + parameters=None, + description=None, + serializerClass=None, + deserializerClass=None, + serdeType=None, + ), + bucketCols=None, + sortCols=None, + parameters=None, + skewedInfo=None, + storedAsSubDirectories=None, + ), + partitionKeys=None, + parameters={"EXTERNAL": "TRUE", "table_type": "ICEBERG", "metadata_location": metadata_with_owner_location}, + viewOriginalText=None, + viewExpandedText=None, + tableType="EXTERNAL_TABLE", + privileges=None, + temporary=False, + rewriteEnabled=None, + creationMetadata=None, + catName=None, + ownerType=1, + writeId=-1, + isStatsCompliant=None, + colStats=None, + accessType=None, + requiredReadCapabilities=None, + requiredWriteCapabilities=None, + id=None, + fileMetadata=None, + dictionary=None, + txnId=None, + ) + ) + assert catalog.table_exists(identifier="default.table") + + @pytest.mark.parametrize("hive2_compatible", [True, False]) @patch("time.time", MagicMock(return_value=12345)) def test_create_table( diff --git a/tests/conftest.py b/tests/conftest.py index cfd9796312..b1be21a6fd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1133,6 +1133,17 @@ def metadata_location(tmp_path_factory: pytest.TempPathFactory) -> str: return metadata_location +@pytest.fixture(scope="session") +def metadata_with_owner_location(tmp_path_factory: pytest.TempPathFactory) -> str: + from pyiceberg.io.pyarrow import PyArrowFileIO + + metadata_location = str(tmp_path_factory.mktemp("metadata") / f"{uuid.uuid4()}.metadata.json") + metadata = TableMetadataV2(**EXAMPLE_TABLE_METADATA_V2) + metadata.properties["owner"] = "test" + ToOutputFile.table_metadata(metadata, PyArrowFileIO().new_output(location=metadata_location), overwrite=True) + return metadata_location + + @pytest.fixture(scope="session") def metadata_location_gz(tmp_path_factory: pytest.TempPathFactory) -> str: from pyiceberg.io.pyarrow import PyArrowFileIO From e8f1c9ce606a7901ca126e67c5137ccdb491f6ab Mon Sep 17 00:00:00 2001 From: Jonnis Date: Wed, 29 Jan 2025 10:49:34 +0200 Subject: [PATCH 3/5] register table integration tests --- tests/catalog/test_hive.py | 81 ------------------ tests/conftest.py | 11 --- tests/integration/test_register_table.py | 100 +++++++++++++++++++++++ 3 files changed, 100 insertions(+), 92 deletions(-) create mode 100644 tests/integration/test_register_table.py diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py index ac54d76334..f60cc38b15 100644 --- a/tests/catalog/test_hive.py +++ b/tests/catalog/test_hive.py @@ -204,87 +204,6 @@ def test_check_number_of_namespaces(table_schema_simple: Schema) -> None: catalog.create_table("table", schema=table_schema_simple) -@pytest.mark.parametrize("hive2_compatible", [True, False]) -@patch("time.time", MagicMock(return_value=12345)) -def test_register_table( - table_schema_with_all_types: Schema, - hive_database: HiveDatabase, - hive_table: HiveTable, - hive2_compatible: bool, - metadata_with_owner_location: str, -) -> None: - catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL) - if hive2_compatible: - catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL, **{"hive.hive2-compatible": "true"}) - - catalog._client = MagicMock() - catalog._client.__enter__().create_table.return_value = None - catalog._client.__enter__().register_table.return_value = None - catalog._client.__enter__().get_table.return_value = hive_table - catalog._client.__enter__().get_database.return_value = hive_database - - catalog.register_table(("default", "table"), metadata_location=metadata_with_owner_location) - - catalog._client.__enter__().create_table.assert_called_with( - HiveTable( - tableName="table", - dbName="default", - owner="test", - createTime=12345, - lastAccessTime=12345, - retention=None, - sd=StorageDescriptor( - cols=[ - FieldSchema(name="x", type="bigint", comment=None), # Corrected columns - FieldSchema(name="y", type="bigint", comment="comment"), - FieldSchema(name="z", type="bigint", comment=None), - ], - location="s3://bucket/test/location", # Corrected location - inputFormat="org.apache.hadoop.mapred.FileInputFormat", - outputFormat="org.apache.hadoop.mapred.FileOutputFormat", - compressed=None, - numBuckets=None, - serdeInfo=SerDeInfo( - name=None, - serializationLib="org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", - parameters=None, - description=None, - serializerClass=None, - deserializerClass=None, - serdeType=None, - ), - bucketCols=None, - sortCols=None, - parameters=None, - skewedInfo=None, - storedAsSubDirectories=None, - ), - partitionKeys=None, - parameters={"EXTERNAL": "TRUE", "table_type": "ICEBERG", "metadata_location": metadata_with_owner_location}, - viewOriginalText=None, - viewExpandedText=None, - tableType="EXTERNAL_TABLE", - privileges=None, - temporary=False, - rewriteEnabled=None, - creationMetadata=None, - catName=None, - ownerType=1, - writeId=-1, - isStatsCompliant=None, - colStats=None, - accessType=None, - requiredReadCapabilities=None, - requiredWriteCapabilities=None, - id=None, - fileMetadata=None, - dictionary=None, - txnId=None, - ) - ) - assert catalog.table_exists(identifier="default.table") - - @pytest.mark.parametrize("hive2_compatible", [True, False]) @patch("time.time", MagicMock(return_value=12345)) def test_create_table( diff --git a/tests/conftest.py b/tests/conftest.py index b1be21a6fd..cfd9796312 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1133,17 +1133,6 @@ def metadata_location(tmp_path_factory: pytest.TempPathFactory) -> str: return metadata_location -@pytest.fixture(scope="session") -def metadata_with_owner_location(tmp_path_factory: pytest.TempPathFactory) -> str: - from pyiceberg.io.pyarrow import PyArrowFileIO - - metadata_location = str(tmp_path_factory.mktemp("metadata") / f"{uuid.uuid4()}.metadata.json") - metadata = TableMetadataV2(**EXAMPLE_TABLE_METADATA_V2) - metadata.properties["owner"] = "test" - ToOutputFile.table_metadata(metadata, PyArrowFileIO().new_output(location=metadata_location), overwrite=True) - return metadata_location - - @pytest.fixture(scope="session") def metadata_location_gz(tmp_path_factory: pytest.TempPathFactory) -> str: from pyiceberg.io.pyarrow import PyArrowFileIO diff --git a/tests/integration/test_register_table.py b/tests/integration/test_register_table.py new file mode 100644 index 0000000000..c73619aace --- /dev/null +++ b/tests/integration/test_register_table.py @@ -0,0 +1,100 @@ +import pytest + +from pyiceberg.catalog import Catalog +from pyiceberg.catalog.hive import ( + HiveCatalog, +) +from pyiceberg.exceptions import NoSuchTableError, TableAlreadyExistsError +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.table import Table +from pyiceberg.types import ( + BooleanType, + DateType, + IntegerType, + NestedField, + StringType, +) + +TABLE_SCHEMA = Schema( + NestedField(field_id=1, name="foo", field_type=BooleanType(), required=False), + NestedField(field_id=2, name="bar", field_type=StringType(), required=False), + NestedField(field_id=4, name="baz", field_type=IntegerType(), required=False), + NestedField(field_id=10, name="qux", field_type=DateType(), required=False), +) + + +def _create_table( + session_catalog: Catalog, + identifier: str, + format_version: int, + location: str, + partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, + schema: Schema = TABLE_SCHEMA, +) -> Table: + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + return session_catalog.create_table( + identifier=identifier, + schema=schema, + location=location, + properties={"format-version": str(format_version)}, + partition_spec=partition_spec, + ) + + +@pytest.mark.integration +def test_hive_register_table( + session_catalog: HiveCatalog, +) -> None: + identifier = "default.hive_register_table" + location = "s3a://warehouse/default/hive_register_table" + tbl = _create_table(session_catalog, identifier, 2, location) + assert session_catalog.table_exists(identifier=identifier) + session_catalog.drop_table(identifier=identifier) + assert not session_catalog.table_exists(identifier=identifier) + session_catalog.register_table(("default", "hive_register_table"), metadata_location=tbl.metadata_location) + assert session_catalog.table_exists(identifier=identifier) + + +@pytest.mark.integration +def test_hive_register_table_existing( + session_catalog: HiveCatalog, +) -> None: + identifier = "default.hive_register_table_existing" + location = "s3a://warehouse/default/hive_register_table_existing" + tbl = _create_table(session_catalog, identifier, 2, location) + assert session_catalog.table_exists(identifier=identifier) + # Assert that registering the table again raises TableAlreadyExistsError + with pytest.raises(TableAlreadyExistsError): + session_catalog.register_table(("default", "hive_register_table_existing"), metadata_location=tbl.metadata_location) + + +@pytest.mark.integration +def test_rest_register_table( + session_catalog: Catalog, +) -> None: + identifier = "default.rest_register_table" + location = "s3a://warehouse/default/rest_register_table" + tbl = _create_table(session_catalog, identifier, 2, location) + assert session_catalog.table_exists(identifier=identifier) + session_catalog.drop_table(identifier=identifier) + assert not session_catalog.table_exists(identifier=identifier) + session_catalog.register_table(identifier=identifier, metadata_location=tbl.metadata_location) + assert session_catalog.table_exists(identifier=identifier) + + +@pytest.mark.integration +def test_rest_register_table_existing( + session_catalog: Catalog, +) -> None: + identifier = "default.rest_register_table_existing" + location = "s3a://warehouse/default/rest_register_table_existing" + tbl = _create_table(session_catalog, identifier, 2, location) + assert session_catalog.table_exists(identifier=identifier) + # Assert that registering the table again raises TableAlreadyExistsError + with pytest.raises(TableAlreadyExistsError): + session_catalog.register_table(identifier=identifier, metadata_location=tbl.metadata_location) From b6b864a88a307519ce5d0db298a250fa4ed996fa Mon Sep 17 00:00:00 2001 From: Jonnis Date: Wed, 29 Jan 2025 18:42:36 +0200 Subject: [PATCH 4/5] license --- tests/integration/test_register_table.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/tests/integration/test_register_table.py b/tests/integration/test_register_table.py index c73619aace..babfe4cc57 100644 --- a/tests/integration/test_register_table.py +++ b/tests/integration/test_register_table.py @@ -1,3 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. import pytest from pyiceberg.catalog import Catalog From b912fc768774a0d4cf1afa4adecdcfa5f0fe0046 Mon Sep 17 00:00:00 2001 From: Jonnis Date: Thu, 30 Jan 2025 15:04:50 +0200 Subject: [PATCH 5/5] register_table integration test parametrised catalog --- tests/integration/test_register_table.py | 66 +++++++----------------- 1 file changed, 19 insertions(+), 47 deletions(-) diff --git a/tests/integration/test_register_table.py b/tests/integration/test_register_table.py index babfe4cc57..c0db2014af 100644 --- a/tests/integration/test_register_table.py +++ b/tests/integration/test_register_table.py @@ -17,9 +17,6 @@ import pytest from pyiceberg.catalog import Catalog -from pyiceberg.catalog.hive import ( - HiveCatalog, -) from pyiceberg.exceptions import NoSuchTableError, TableAlreadyExistsError from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema @@ -63,54 +60,29 @@ def _create_table( @pytest.mark.integration -def test_hive_register_table( - session_catalog: HiveCatalog, +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_register_table( + catalog: Catalog, ) -> None: - identifier = "default.hive_register_table" - location = "s3a://warehouse/default/hive_register_table" - tbl = _create_table(session_catalog, identifier, 2, location) - assert session_catalog.table_exists(identifier=identifier) - session_catalog.drop_table(identifier=identifier) - assert not session_catalog.table_exists(identifier=identifier) - session_catalog.register_table(("default", "hive_register_table"), metadata_location=tbl.metadata_location) - assert session_catalog.table_exists(identifier=identifier) - - -@pytest.mark.integration -def test_hive_register_table_existing( - session_catalog: HiveCatalog, -) -> None: - identifier = "default.hive_register_table_existing" - location = "s3a://warehouse/default/hive_register_table_existing" - tbl = _create_table(session_catalog, identifier, 2, location) - assert session_catalog.table_exists(identifier=identifier) - # Assert that registering the table again raises TableAlreadyExistsError - with pytest.raises(TableAlreadyExistsError): - session_catalog.register_table(("default", "hive_register_table_existing"), metadata_location=tbl.metadata_location) + identifier = "default.register_table" + location = "s3a://warehouse/default/register_table" + tbl = _create_table(catalog, identifier, 2, location) + assert catalog.table_exists(identifier=identifier) + catalog.drop_table(identifier=identifier) + assert not catalog.table_exists(identifier=identifier) + catalog.register_table(("default", "register_table"), metadata_location=tbl.metadata_location) + assert catalog.table_exists(identifier=identifier) @pytest.mark.integration -def test_rest_register_table( - session_catalog: Catalog, -) -> None: - identifier = "default.rest_register_table" - location = "s3a://warehouse/default/rest_register_table" - tbl = _create_table(session_catalog, identifier, 2, location) - assert session_catalog.table_exists(identifier=identifier) - session_catalog.drop_table(identifier=identifier) - assert not session_catalog.table_exists(identifier=identifier) - session_catalog.register_table(identifier=identifier, metadata_location=tbl.metadata_location) - assert session_catalog.table_exists(identifier=identifier) - - -@pytest.mark.integration -def test_rest_register_table_existing( - session_catalog: Catalog, +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_register_table_existing( + catalog: Catalog, ) -> None: - identifier = "default.rest_register_table_existing" - location = "s3a://warehouse/default/rest_register_table_existing" - tbl = _create_table(session_catalog, identifier, 2, location) - assert session_catalog.table_exists(identifier=identifier) + identifier = "default.register_table_existing" + location = "s3a://warehouse/default/register_table_existing" + tbl = _create_table(catalog, identifier, 2, location) + assert catalog.table_exists(identifier=identifier) # Assert that registering the table again raises TableAlreadyExistsError with pytest.raises(TableAlreadyExistsError): - session_catalog.register_table(identifier=identifier, metadata_location=tbl.metadata_location) + catalog.register_table(("default", "register_table_existing"), metadata_location=tbl.metadata_location)