diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index c30d960d38..30315b0cc1 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -193,6 +193,11 @@ class RemoveStatisticsUpdate(IcebergBaseModel): snapshot_id: int = Field(alias="snapshot-id") +class RemoveSchemasUpdate(IcebergBaseModel): + action: Literal["remove-schemas"] = Field(default="remove-schemas") + schema_ids: List[int] = Field(alias="schema-ids") + + class SetPartitionStatisticsUpdate(IcebergBaseModel): action: Literal["set-partition-statistics"] = Field(default="set-partition-statistics") partition_statistics: PartitionStatisticsFile @@ -222,6 +227,7 @@ class RemovePartitionStatisticsUpdate(IcebergBaseModel): RemovePropertiesUpdate, SetStatisticsUpdate, RemoveStatisticsUpdate, + RemoveSchemasUpdate, SetPartitionStatisticsUpdate, RemovePartitionStatisticsUpdate, ], @@ -589,6 +595,23 @@ def _(update: RemoveStatisticsUpdate, base_metadata: TableMetadata, context: _Ta return base_metadata.model_copy(update={"statistics": statistics}) +@_apply_table_update.register(RemoveSchemasUpdate) +def _(update: RemoveSchemasUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata: + # This method should error if any schemas do not exist. + # It should error if the default schema is being removed. + # Otherwise, remove the schemas listed in update.schema_ids. + for remove_schema_id in update.schema_ids: + if not any(schema.schema_id == remove_schema_id for schema in base_metadata.schemas): + raise ValueError(f"Schema with schema id {remove_schema_id} does not exist") + if base_metadata.current_schema_id == remove_schema_id: + raise ValueError(f"Cannot remove current schema with id {remove_schema_id}") + + schemas = [schema for schema in base_metadata.schemas if schema.schema_id not in update.schema_ids] + context.add_update(update) + + return base_metadata.model_copy(update={"schemas": schemas}) + + @_apply_table_update.register(SetPartitionStatisticsUpdate) def _(update: SetPartitionStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata: partition_statistics = filter_statistics_by_snapshot_id( diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 9d284e77f4..dea4e0696d 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -78,6 +78,7 @@ AssertTableUUID, RemovePartitionStatisticsUpdate, RemovePropertiesUpdate, + RemoveSchemasUpdate, RemoveSnapshotRefUpdate, RemoveSnapshotsUpdate, RemoveStatisticsUpdate, @@ -1286,6 +1287,35 @@ def test_update_metadata_log_overflow(table_v2: Table) -> None: assert len(new_metadata.metadata_log) == 1 +def test_remove_schemas_update(table_v2: Table) -> None: + base_metadata = table_v2.metadata + assert len(base_metadata.schemas) == 2 + + update = RemoveSchemasUpdate(schema_ids=[0]) + updated_metadata = update_table_metadata( + base_metadata, + (update,), + ) + + assert len(updated_metadata.schemas) == 1 + + +def test_remove_schemas_update_schema_does_not_exist(table_v2: Table) -> None: + update = RemoveSchemasUpdate( + schema_ids=[123], + ) + with pytest.raises(ValueError, match="Schema with schema id 123 does not exist"): + update_table_metadata(table_v2.metadata, (update,)) + + +def test_remove_schemas_update_current_schema(table_v2: Table) -> None: + update = RemoveSchemasUpdate( + schema_ids=[1], + ) + with pytest.raises(ValueError, match="Cannot remove current schema with id 1"): + update_table_metadata(table_v2.metadata, (update,)) + + def test_set_statistics_update(table_v2_with_statistics: Table) -> None: snapshot_id = table_v2_with_statistics.metadata.current_snapshot_id