From 0ce3e56de330c59f7a5d6d1ddea7bb7df2eb01b5 Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Fri, 13 Feb 2026 16:01:34 +0800 Subject: [PATCH] [python] Introduce Iceberg Table in Rest Catalog --- .../pypaimon/catalog/rest/rest_catalog.py | 29 ++++- .../pypaimon/table/iceberg/__init__.py | 19 ++++ .../pypaimon/table/iceberg/iceberg_table.py | 104 ++++++++++++++++++ .../tests/rest/rest_iceberg_table_test.py | 81 ++++++++++++++ 4 files changed, 230 insertions(+), 3 deletions(-) create mode 100644 paimon-python/pypaimon/table/iceberg/__init__.py create mode 100644 paimon-python/pypaimon/table/iceberg/iceberg_table.py create mode 100644 paimon-python/pypaimon/tests/rest/rest_iceberg_table_test.py diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py b/paimon-python/pypaimon/catalog/rest/rest_catalog.py index f05113cd9466..c7c0e910521d 100644 --- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py +++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py @@ -43,9 +43,11 @@ from pypaimon.snapshot.snapshot_commit import PartitionStatistics from pypaimon.table.file_store_table import FileStoreTable from pypaimon.table.format.format_table import FormatTable, Format +from pypaimon.table.iceberg.iceberg_table import IcebergTable FORMAT_TABLE_TYPE = "format-table" +ICEBERG_TABLE_TYPE = "iceberg-table" class RESTCatalog(Catalog): @@ -224,10 +226,11 @@ def drop_partitions( if not partitions: raise ValueError("Partitions list cannot be empty.") table = self.get_table(identifier) - if isinstance(table, FormatTable): + if isinstance(table, (FormatTable, IcebergTable)): + unsupported_type = type(table).__name__ raise ValueError( - "drop_partitions is not supported for format tables. " - "Only Paimon (FileStore) tables support partition drop." + f"drop_partitions is not supported for table type '{unsupported_type}'. " + "Only Paimon (FileStore) tables support this operation." ) commit = table.new_batch_write_builder().new_commit() try: @@ -295,6 +298,8 @@ def load_table(self, if table_type == FORMAT_TABLE_TYPE: return self._create_format_table(identifier, metadata, internal_file_io, external_file_io) data_file_io = external_file_io if metadata.is_external else internal_file_io + if table_type == ICEBERG_TABLE_TYPE: + return self._create_iceberg_table(identifier, metadata, data_file_io) catalog_env = CatalogEnvironment( identifier=identifier, uuid=metadata.uuid, @@ -333,6 +338,24 @@ def _create_format_table(self, comment=schema.comment, ) + def _create_iceberg_table(self, + identifier: Identifier, + metadata: TableMetadata, + file_io: Callable[[str], Any], + ) -> IcebergTable: + schema = metadata.schema + location = schema.options.get(CoreOptions.PATH.key()) + file_io = file_io(location) + return IcebergTable( + file_io=file_io, + identifier=identifier, + table_schema=schema, + location=location, + options=dict(schema.options), + comment=schema.comment, + uuid=metadata.uuid, + ) + @staticmethod def create(file_io: FileIO, table_path: str, diff --git a/paimon-python/pypaimon/table/iceberg/__init__.py b/paimon-python/pypaimon/table/iceberg/__init__.py new file mode 100644 index 000000000000..d26c883f3f74 --- /dev/null +++ b/paimon-python/pypaimon/table/iceberg/__init__.py @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pypaimon.table.iceberg.iceberg_table import IcebergTable + +__all__ = ["IcebergTable"] diff --git a/paimon-python/pypaimon/table/iceberg/iceberg_table.py b/paimon-python/pypaimon/table/iceberg/iceberg_table.py new file mode 100644 index 000000000000..ab2b8c9e03e4 --- /dev/null +++ b/paimon-python/pypaimon/table/iceberg/iceberg_table.py @@ -0,0 +1,104 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Dict, List, Optional + +from pypaimon.common.file_io import FileIO +from pypaimon.common.identifier import Identifier +from pypaimon.schema.table_schema import TableSchema +from pypaimon.table.table import Table + + +class IcebergTable(Table): + """Metadata-only Iceberg table. + + Paimon Python currently exposes Iceberg table metadata, but does not + support read / write operations on Iceberg tables. + """ + + def __init__( + self, + file_io: FileIO, + identifier: Identifier, + table_schema: TableSchema, + location: str, + options: Optional[Dict[str, str]] = None, + comment: Optional[str] = None, + uuid: Optional[str] = None, + ): + self.file_io = file_io + self.identifier = identifier + self._table_schema = table_schema + self._location = location.rstrip("/") + self._options = options or dict(table_schema.options) + self.comment = comment + self._uuid = uuid + + self.fields = table_schema.fields + self.field_names = [f.name for f in self.fields] + self.partition_keys: List[str] = table_schema.partition_keys or [] + self.primary_keys: List[str] = [] + + def name(self) -> str: + return self.identifier.get_table_name() + + def full_name(self) -> str: + return self.identifier.get_full_name() + + @property + def table_schema(self) -> TableSchema: + return self._table_schema + + @table_schema.setter + def table_schema(self, value: TableSchema): + self._table_schema = value + + def location(self) -> str: + return self._location + + def options(self) -> Dict[str, str]: + return self._options + + def uuid(self) -> str: + return self._uuid or self.full_name() + + def copy(self, dynamic_options: Dict[str, str]) -> "IcebergTable": + new_options = dict(self._options) + new_options.update(dynamic_options or {}) + return IcebergTable( + file_io=self.file_io, + identifier=self.identifier, + table_schema=self._table_schema, + location=self._location, + options=new_options, + comment=self.comment, + uuid=self._uuid, + ) + + def new_read_builder(self): + raise NotImplementedError( + "IcebergTable does not support read operation in paimon-python yet." + ) + + def new_batch_write_builder(self): + raise NotImplementedError( + "IcebergTable does not support batch write operation in paimon-python yet." + ) + + def new_stream_write_builder(self): + raise NotImplementedError( + "IcebergTable does not support stream write operation in paimon-python yet." + ) diff --git a/paimon-python/pypaimon/tests/rest/rest_iceberg_table_test.py b/paimon-python/pypaimon/tests/rest/rest_iceberg_table_test.py new file mode 100644 index 000000000000..66fa3841957f --- /dev/null +++ b/paimon-python/pypaimon/tests/rest/rest_iceberg_table_test.py @@ -0,0 +1,81 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import pyarrow as pa + +from pypaimon import Schema +from pypaimon.table.iceberg import IcebergTable +from pypaimon.tests.rest.rest_base_test import RESTBaseTest + + +class RESTIcebergTableTest(RESTBaseTest): + + def test_get_iceberg_table(self): + schema = Schema.from_pyarrow_schema( + pa.schema([("id", pa.int32()), ("dt", pa.string())]), + partition_keys=["dt"], + options={"type": "iceberg-table"}, + ) + table_name = "default.iceberg_table_basic" + self.rest_catalog.drop_table(table_name, True) + self.rest_catalog.create_table(table_name, schema, False) + + table = self.rest_catalog.get_table(table_name) + self.assertIsInstance(table, IcebergTable) + self.assertEqual(table.name(), "iceberg_table_basic") + self.assertEqual(table.full_name(), table_name) + self.assertEqual(table.partition_keys, ["dt"]) + self.assertEqual(table.primary_keys, []) + self.assertEqual(table.options().get("type"), "iceberg-table") + self.assertTrue(table.location().startswith("file://")) + + def test_iceberg_table_unsupported_read_write(self): + schema = Schema.from_pyarrow_schema( + pa.schema([("id", pa.int32())]), + options={"type": "iceberg-table"}, + ) + table_name = "default.iceberg_table_unsupported_ops" + self.rest_catalog.drop_table(table_name, True) + self.rest_catalog.create_table(table_name, schema, False) + table = self.rest_catalog.get_table(table_name) + + with self.assertRaises(NotImplementedError): + table.new_read_builder() + with self.assertRaises(NotImplementedError): + table.new_batch_write_builder() + with self.assertRaises(NotImplementedError): + table.new_stream_write_builder() + + def test_iceberg_table_unsupported_drop_partitions(self): + schema = Schema.from_pyarrow_schema( + pa.schema([("id", pa.int32()), ("dt", pa.string())]), + partition_keys=["dt"], + options={"type": "iceberg-table"}, + ) + table_name = "default.iceberg_table_unsupported_drop_partitions" + self.rest_catalog.drop_table(table_name, True) + self.rest_catalog.create_table(table_name, schema, False) + + with self.assertRaisesRegex( + ValueError, + "drop_partitions is not supported for table type 'IcebergTable'", + ): + self.rest_catalog.drop_partitions( + table_name, + [{"dt": "20250101"}], + )