From 3ddef45628854c2891b78d66635047af5a66fcb4 Mon Sep 17 00:00:00 2001 From: Gabriel Igliozzi Date: Fri, 22 Aug 2025 00:19:30 -0700 Subject: [PATCH 1/2] Add transaction tests to catalog tests --- tests/integration/test_catalog.py | 134 +++++++++++++++++++++++++++++- 1 file changed, 133 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_catalog.py b/tests/integration/test_catalog.py index 123aca1bef..1f5b1c0e5f 100644 --- a/tests/integration/test_catalog.py +++ b/tests/integration/test_catalog.py @@ -26,6 +26,7 @@ from pyiceberg.catalog.rest import RestCatalog from pyiceberg.catalog.sql import SqlCatalog from pyiceberg.exceptions import ( + CommitFailedException, NamespaceAlreadyExistsError, NamespaceNotEmptyError, NoSuchNamespaceError, @@ -33,7 +34,12 @@ TableAlreadyExistsError, ) from pyiceberg.io import WAREHOUSE -from pyiceberg.schema import Schema +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec +from pyiceberg.schema import INITIAL_SCHEMA_ID, Schema +from pyiceberg.table.metadata import INITIAL_SPEC_ID +from pyiceberg.table.sorting import INITIAL_SORT_ORDER_ID, SortField, SortOrder +from pyiceberg.transforms import IdentityTransform +from pyiceberg.types import IntegerType, LongType, UUIDType from tests.conftest import clean_up @@ -218,6 +224,132 @@ def test_table_exists(test_catalog: Catalog, table_schema_nested: Schema, databa assert test_catalog.table_exists((database_name, table_name)) is True +@pytest.mark.integration +@pytest.mark.parametrize("test_catalog", CATALOGS) +def test_update_table_transaction(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None: + identifier = (database_name, table_name) + + test_catalog.create_namespace(database_name) + table = test_catalog.create_table(identifier, test_schema) + assert test_catalog.table_exists(identifier) + + expected_schema: Schema = Schema() + expected_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC + + with table.transaction() as transaction: + with transaction.update_schema() as update_schema: + update_schema.add_column("new_col", IntegerType()) + expected_schema = update_schema._apply() + + with transaction.update_spec() as update_spec: + update_spec.add_field("new_col", IdentityTransform()) + expected_spec = update_spec._apply() + + table = test_catalog.load_table(identifier) + assert table.schema().as_struct() == expected_schema.as_struct() + assert table.spec().fields == expected_spec.fields + + +@pytest.mark.integration +@pytest.mark.parametrize("test_catalog", CATALOGS) +def test_update_schema_conflict(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None: + if isinstance(test_catalog, HiveCatalog): + pytest.skip("HiveCatalog fails in this test, need to investigate") + + identifier = (database_name, table_name) + + test_catalog.create_namespace(database_name) + table = test_catalog.create_table(identifier, test_schema) + assert test_catalog.table_exists(identifier) + + original_update = table.update_schema().add_column("new_col", LongType()) + + # Update schema concurrently so that the original update fails + concurrent_update = test_catalog.load_table(identifier).update_schema().delete_column("VendorID") + expected_schema = concurrent_update._apply() + concurrent_update.commit() + + with pytest.raises(CommitFailedException): + original_update.commit() + + table = test_catalog.load_table(identifier) + assert table.schema().as_struct() == expected_schema.as_struct() + + +@pytest.mark.integration +@pytest.mark.parametrize("test_catalog", CATALOGS) +def test_create_table_transaction_simple(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None: + identifier = (database_name, table_name) + + test_catalog.create_namespace(database_name) + table_transaction = test_catalog.create_table_transaction(identifier, test_schema) + assert not test_catalog.table_exists(identifier) + + table_transaction.update_schema().add_column("new_col", IntegerType()).commit() + assert not test_catalog.table_exists(identifier) + + table_transaction.commit_transaction() + assert test_catalog.table_exists(identifier) + + table = test_catalog.load_table(identifier) + assert table.schema().find_type("new_col").is_primitive + + +@pytest.mark.integration +@pytest.mark.parametrize("test_catalog", CATALOGS) +def test_create_table_transaction_multiple_schemas( + test_catalog: Catalog, test_schema: Schema, test_partition_spec: PartitionSpec, table_name: str, database_name: str +) -> None: + identifier = (database_name, table_name) + + test_catalog.create_namespace(database_name) + table_transaction = test_catalog.create_table_transaction( + identifier=identifier, + schema=test_schema, + partition_spec=test_partition_spec, + sort_order=SortOrder(SortField(source_id=1)), + ) + assert not test_catalog.table_exists(identifier) + + table_transaction.update_schema().add_column("new_col", IntegerType()).commit() + assert not test_catalog.table_exists(identifier) + + table_transaction.update_schema().add_column("new_col_1", UUIDType()).commit() + assert not test_catalog.table_exists(identifier) + + table_transaction.update_spec().add_field("new_col", IdentityTransform()).commit() + assert not test_catalog.table_exists(identifier) + + # TODO: test replace sort order when available + + expected_schema = table_transaction.update_schema()._apply() + expected_spec = table_transaction.update_spec()._apply() + + table_transaction.commit_transaction() + assert test_catalog.table_exists(identifier) + + table = test_catalog.load_table(identifier) + assert table.schema().as_struct() == expected_schema.as_struct() + assert table.schema().schema_id == INITIAL_SCHEMA_ID + 2 + assert table.spec().fields == expected_spec.fields + assert table.spec().spec_id == INITIAL_SPEC_ID + 1 + assert table.sort_order().order_id == INITIAL_SORT_ORDER_ID + + +@pytest.mark.integration +@pytest.mark.parametrize("test_catalog", CATALOGS) +def test_concurrent_create_transaction(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None: + identifier = (database_name, table_name) + + test_catalog.create_namespace(database_name) + table = test_catalog.create_table_transaction(identifier=identifier, schema=test_schema) + assert not test_catalog.table_exists(identifier) + + test_catalog.create_table(identifier, test_schema) + with pytest.raises(CommitFailedException): + table.commit_transaction() + + @pytest.mark.integration @pytest.mark.parametrize("test_catalog", CATALOGS) def test_create_namespace(test_catalog: Catalog, database_name: str) -> None: From 962c77165e046c073c3bbac292d0b36943a32169 Mon Sep 17 00:00:00 2001 From: Gabriel Igliozzi Date: Sat, 30 Aug 2025 15:10:59 -0700 Subject: [PATCH 2/2] explicit decl of expected schemas, lint --- mkdocs/docs/api.md | 2 ++ tests/integration/test_catalog.py | 34 ++++++++++++++++++++++--------- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 89d5692d0b..0e0dc375de 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -1031,6 +1031,7 @@ Expert Iceberg users may choose to commit existing parquet files to the Iceberg ### Example Add files to Iceberg table: + ```python # Given that these parquet files have schema consistent with the Iceberg table @@ -1047,6 +1048,7 @@ tbl.add_files(file_paths=file_paths) ``` Add files to Iceberg table with custom snapshot properties: + ```python # Assume an existing Iceberg table object `tbl` diff --git a/tests/integration/test_catalog.py b/tests/integration/test_catalog.py index 1f5b1c0e5f..a68b054c4e 100644 --- a/tests/integration/test_catalog.py +++ b/tests/integration/test_catalog.py @@ -34,12 +34,12 @@ TableAlreadyExistsError, ) from pyiceberg.io import WAREHOUSE -from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec +from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import INITIAL_SCHEMA_ID, Schema from pyiceberg.table.metadata import INITIAL_SPEC_ID from pyiceberg.table.sorting import INITIAL_SORT_ORDER_ID, SortField, SortOrder -from pyiceberg.transforms import IdentityTransform -from pyiceberg.types import IntegerType, LongType, UUIDType +from pyiceberg.transforms import DayTransform, IdentityTransform +from pyiceberg.types import IntegerType, LongType, NestedField, TimestampType, UUIDType from tests.conftest import clean_up @@ -233,17 +233,20 @@ def test_update_table_transaction(test_catalog: Catalog, test_schema: Schema, ta table = test_catalog.create_table(identifier, test_schema) assert test_catalog.table_exists(identifier) - expected_schema: Schema = Schema() - expected_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC + expected_schema = Schema( + NestedField(1, "VendorID", IntegerType(), False), + NestedField(2, "tpep_pickup_datetime", TimestampType(), False), + NestedField(3, "new_col", IntegerType(), False), + ) + + expected_spec = PartitionSpec(PartitionField(3, 1000, IdentityTransform(), "new_col")) with table.transaction() as transaction: with transaction.update_schema() as update_schema: update_schema.add_column("new_col", IntegerType()) - expected_schema = update_schema._apply() with transaction.update_spec() as update_spec: update_spec.add_field("new_col", IdentityTransform()) - expected_spec = update_spec._apply() table = test_catalog.load_table(identifier) assert table.schema().as_struct() == expected_schema.as_struct() @@ -266,9 +269,10 @@ def test_update_schema_conflict(test_catalog: Catalog, test_schema: Schema, tabl # Update schema concurrently so that the original update fails concurrent_update = test_catalog.load_table(identifier).update_schema().delete_column("VendorID") - expected_schema = concurrent_update._apply() concurrent_update.commit() + expected_schema = Schema(NestedField(2, "tpep_pickup_datetime", TimestampType(), False)) + with pytest.raises(CommitFailedException): original_update.commit() @@ -322,8 +326,18 @@ def test_create_table_transaction_multiple_schemas( # TODO: test replace sort order when available - expected_schema = table_transaction.update_schema()._apply() - expected_spec = table_transaction.update_spec()._apply() + expected_schema = Schema( + NestedField(1, "VendorID", IntegerType(), False), + NestedField(2, "tpep_pickup_datetime", TimestampType(), False), + NestedField(3, "new_col", IntegerType(), False), + NestedField(4, "new_col_1", UUIDType(), False), + ) + + expected_spec = PartitionSpec( + PartitionField(1, 1000, IdentityTransform(), "VendorID"), + PartitionField(2, 1001, DayTransform(), "tpep_pickup_day"), + PartitionField(3, 1002, IdentityTransform(), "new_col"), + ) table_transaction.commit_transaction() assert test_catalog.table_exists(identifier)