From 949e140bf20876b10b29cae6d25af50372998b0a Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Thu, 6 Mar 2025 17:38:33 -0500 Subject: [PATCH 1/8] feat: validate snapshot write compatibility --- pyiceberg/table/update/snapshot.py | 28 ++++++++++++++++ pyiceberg/utils/snapshot.py | 53 ++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+) create mode 100644 pyiceberg/utils/snapshot.py diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index f21c501780..6a7bf3f63c 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -80,6 +80,7 @@ from pyiceberg.utils.bin_packing import ListPacker from pyiceberg.utils.concurrent import ExecutorFactory from pyiceberg.utils.properties import property_as_bool, property_as_int +from pyiceberg.utils.snapshot import ancestors_between if TYPE_CHECKING: from pyiceberg.table import Transaction @@ -251,6 +252,12 @@ def _commit(self) -> UpdatesAndRequirements: ) location_provider = self._transaction._table.location_provider() manifest_list_file_path = location_provider.new_metadata_location(file_name) + + # get current snapshot id and starting snapshot id, and validate that there are no conflicts + starting_snapshot_id = self._parent_snapshot_id + current_snapshot_id = self._transaction._table.refresh().metadata.current_snapshot_id + self._validate(starting_snapshot_id, current_snapshot_id) + with write_manifest_list( format_version=self._transaction.table_metadata.format_version, output_file=self._io.new_output(manifest_list_file_path), @@ -279,6 +286,27 @@ def _commit(self) -> UpdatesAndRequirements: (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"),), ) + def _validate(self, starting_snapshot_id: Optional[int], current_snapshot_id: Optional[int]) -> None: + # get all the snapshots between the current snapshot id and the parent id + snapshots = ancestors_between(starting_snapshot_id, current_snapshot_id, self._transaction._table.metadata.snapshot_by_id) + + # Define allowed operations for each type of operation + allowed_operations = { + Operation.APPEND: {Operation.APPEND, Operation.REPLACE, Operation.OVERWRITE, Operation.DELETE}, + Operation.REPLACE: {Operation.APPEND}, + Operation.OVERWRITE: set(), + Operation.DELETE: set(), + } + + for snapshot in snapshots: + snapshot_operation = snapshot.summary.operation + + if snapshot_operation not in allowed_operations[self._operation]: + raise ValueError( + f"Operation {snapshot_operation} is not allowed when performing {self._operation}. " + "Check for overlaps or conflicts." + ) + @property def snapshot_id(self) -> int: return self._snapshot_id diff --git a/pyiceberg/utils/snapshot.py b/pyiceberg/utils/snapshot.py new file mode 100644 index 0000000000..28f9722ceb --- /dev/null +++ b/pyiceberg/utils/snapshot.py @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Callable, Iterable, Iterator, Optional +from pyiceberg.table.snapshots import Snapshot + + +def ancestors_of(snapshot_id: Optional[int], lookup_fn: Callable[[int], Optional[Snapshot]]) -> Iterable[Snapshot]: + def _snapshot_iterator(snapshot: Snapshot) -> Iterator[Snapshot]: + next_snapshot: Optional[Snapshot] = snapshot + consumed = False + + while next_snapshot is not None: + if not consumed: + yield next_snapshot + consumed = True + + parent_id = next_snapshot.parent_snapshot_id + if parent_id is None: + break + + next_snapshot = lookup_fn(parent_id) + consumed = False + + snapshot: Optional[Snapshot] = lookup_fn(snapshot_id) + if snapshot is not None: + return _snapshot_iterator(snapshot) + else: + return iter([]) + +def ancestors_between(starting_snapshot_id: Optional[int], current_snapshot_id: Optional[int], lookup_fn: Callable[[int], Optional[Snapshot]]) -> Iterable[Snapshot]: + if starting_snapshot_id == current_snapshot_id: + return iter([]) + + return ancestors_of( + current_snapshot_id, + lambda snapshot_id: lookup_fn(snapshot_id) if snapshot_id != starting_snapshot_id else None + ) + \ No newline at end of file From e631ddfa67d9df07e2b5b2095be68b2dcb8c533f Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Wed, 12 Mar 2025 23:29:09 -0400 Subject: [PATCH 2/8] reuse ancestors_of existing functionality --- pyiceberg/table/update/snapshot.py | 19 +++++++---- pyiceberg/utils/snapshot.py | 53 ------------------------------ 2 files changed, 12 insertions(+), 60 deletions(-) delete mode 100644 pyiceberg/utils/snapshot.py diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 6a7bf3f63c..c2539900c6 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -60,6 +60,7 @@ Snapshot, SnapshotSummaryCollector, Summary, + ancestors_of, update_snapshot_summaries, ) from pyiceberg.table.update import ( @@ -254,9 +255,10 @@ def _commit(self) -> UpdatesAndRequirements: manifest_list_file_path = location_provider.new_metadata_location(file_name) # get current snapshot id and starting snapshot id, and validate that there are no conflicts - starting_snapshot_id = self._parent_snapshot_id - current_snapshot_id = self._transaction._table.refresh().metadata.current_snapshot_id - self._validate(starting_snapshot_id, current_snapshot_id) + if self._transaction._table.__class__.__name__ != "StagedTable": + starting_snapshot = self._transaction.table_metadata.current_snapshot() + current_snapshot = self._transaction._table.refresh().metadata.current_snapshot() + self._validate(starting_snapshot, current_snapshot) with write_manifest_list( format_version=self._transaction.table_metadata.format_version, @@ -286,10 +288,7 @@ def _commit(self) -> UpdatesAndRequirements: (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"),), ) - def _validate(self, starting_snapshot_id: Optional[int], current_snapshot_id: Optional[int]) -> None: - # get all the snapshots between the current snapshot id and the parent id - snapshots = ancestors_between(starting_snapshot_id, current_snapshot_id, self._transaction._table.metadata.snapshot_by_id) - + def _validate(self, starting_snapshot: Optional[Snapshot], current_snapshot: Optional[Snapshot]) -> None: # Define allowed operations for each type of operation allowed_operations = { Operation.APPEND: {Operation.APPEND, Operation.REPLACE, Operation.OVERWRITE, Operation.DELETE}, @@ -298,7 +297,13 @@ def _validate(self, starting_snapshot_id: Optional[int], current_snapshot_id: Op Operation.DELETE: set(), } + # get all the snapshots between the current snapshot id and the parent id + snapshots = ancestors_of(current_snapshot, self._transaction._table.metadata) + for snapshot in snapshots: + if snapshot.snapshot_id == starting_snapshot.snapshot_id: + break + snapshot_operation = snapshot.summary.operation if snapshot_operation not in allowed_operations[self._operation]: diff --git a/pyiceberg/utils/snapshot.py b/pyiceberg/utils/snapshot.py deleted file mode 100644 index 28f9722ceb..0000000000 --- a/pyiceberg/utils/snapshot.py +++ /dev/null @@ -1,53 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from typing import Callable, Iterable, Iterator, Optional -from pyiceberg.table.snapshots import Snapshot - - -def ancestors_of(snapshot_id: Optional[int], lookup_fn: Callable[[int], Optional[Snapshot]]) -> Iterable[Snapshot]: - def _snapshot_iterator(snapshot: Snapshot) -> Iterator[Snapshot]: - next_snapshot: Optional[Snapshot] = snapshot - consumed = False - - while next_snapshot is not None: - if not consumed: - yield next_snapshot - consumed = True - - parent_id = next_snapshot.parent_snapshot_id - if parent_id is None: - break - - next_snapshot = lookup_fn(parent_id) - consumed = False - - snapshot: Optional[Snapshot] = lookup_fn(snapshot_id) - if snapshot is not None: - return _snapshot_iterator(snapshot) - else: - return iter([]) - -def ancestors_between(starting_snapshot_id: Optional[int], current_snapshot_id: Optional[int], lookup_fn: Callable[[int], Optional[Snapshot]]) -> Iterable[Snapshot]: - if starting_snapshot_id == current_snapshot_id: - return iter([]) - - return ancestors_of( - current_snapshot_id, - lambda snapshot_id: lookup_fn(snapshot_id) if snapshot_id != starting_snapshot_id else None - ) - \ No newline at end of file From 740db96a287d6421d658f1b7024cd537265dace5 Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Sat, 22 Mar 2025 22:23:22 -0400 Subject: [PATCH 3/8] Update pyiceberg/table/update/snapshot.py Co-authored-by: Fokko Driesprong --- pyiceberg/table/update/snapshot.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index c2539900c6..769dee87d4 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -255,7 +255,8 @@ def _commit(self) -> UpdatesAndRequirements: manifest_list_file_path = location_provider.new_metadata_location(file_name) # get current snapshot id and starting snapshot id, and validate that there are no conflicts - if self._transaction._table.__class__.__name__ != "StagedTable": + from pyiceberg.table import StagedTable + if not isinstance(self._transaction._table, StagedTable): starting_snapshot = self._transaction.table_metadata.current_snapshot() current_snapshot = self._transaction._table.refresh().metadata.current_snapshot() self._validate(starting_snapshot, current_snapshot) From 611b017902a159edcd80f428d32d856bf02325ed Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Tue, 25 Mar 2025 22:03:50 -0400 Subject: [PATCH 4/8] fix mypy errors --- pyiceberg/table/update/snapshot.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 769dee87d4..b77f613784 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -81,7 +81,6 @@ from pyiceberg.utils.bin_packing import ListPacker from pyiceberg.utils.concurrent import ExecutorFactory from pyiceberg.utils.properties import property_as_bool, property_as_int -from pyiceberg.utils.snapshot import ancestors_between if TYPE_CHECKING: from pyiceberg.table import Transaction @@ -256,10 +255,12 @@ def _commit(self) -> UpdatesAndRequirements: # get current snapshot id and starting snapshot id, and validate that there are no conflicts from pyiceberg.table import StagedTable + if not isinstance(self._transaction._table, StagedTable): starting_snapshot = self._transaction.table_metadata.current_snapshot() current_snapshot = self._transaction._table.refresh().metadata.current_snapshot() - self._validate(starting_snapshot, current_snapshot) + if starting_snapshot is not None and current_snapshot is not None: + self._validate(starting_snapshot, current_snapshot) with write_manifest_list( format_version=self._transaction.table_metadata.format_version, @@ -289,7 +290,7 @@ def _commit(self) -> UpdatesAndRequirements: (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"),), ) - def _validate(self, starting_snapshot: Optional[Snapshot], current_snapshot: Optional[Snapshot]) -> None: + def _validate(self, starting_snapshot: Snapshot, current_snapshot: Snapshot) -> None: # Define allowed operations for each type of operation allowed_operations = { Operation.APPEND: {Operation.APPEND, Operation.REPLACE, Operation.OVERWRITE, Operation.DELETE}, @@ -305,7 +306,7 @@ def _validate(self, starting_snapshot: Optional[Snapshot], current_snapshot: Opt if snapshot.snapshot_id == starting_snapshot.snapshot_id: break - snapshot_operation = snapshot.summary.operation + snapshot_operation = snapshot.summary.operation if snapshot.summary is not None else None if snapshot_operation not in allowed_operations[self._operation]: raise ValueError( From 0923dc4e9ee34bc5c1853a503bb5af537864002c Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Tue, 25 Mar 2025 22:09:30 -0400 Subject: [PATCH 5/8] add tests for verifying snapshot compatibility --- tests/integration/test_add_files.py | 61 ++++++++++++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 8713615218..02ab351a52 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -28,7 +28,7 @@ from pytest_mock.plugin import MockerFixture from pyiceberg.catalog import Catalog -from pyiceberg.exceptions import NoSuchTableError +from pyiceberg.exceptions import CommitFailedException, NoSuchTableError from pyiceberg.io import FileIO from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, _pyarrow_schema_ensure_large_types from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec @@ -850,3 +850,62 @@ def test_add_files_that_referenced_by_current_snapshot_with_check_duplicate_file with pytest.raises(ValueError) as exc_info: tbl.add_files(file_paths=[existing_files_in_table], check_duplicate_files=True) assert f"Cannot add files that are already referenced by table, files: {existing_files_in_table}" in str(exc_info.value) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_conflict_delete_delete( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + identifier = "default.test_conflict" + tbl1 = _create_table(session_catalog, identifier, format_version, [arrow_table_with_null]) + tbl2 = session_catalog.load_table(identifier) + + tbl1.delete("string == 'z'") + + with pytest.raises(CommitFailedException, match="(branch main has changed: expected id ).*"): + # tbl2 isn't aware of the commit by tbl1 + tbl2.delete("string == 'z'") + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_conflict_delete_append( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + identifier = "default.test_conflict" + tbl1 = _create_table(session_catalog, identifier, format_version, [arrow_table_with_null]) + tbl2 = session_catalog.load_table(identifier) + + # This is allowed + tbl1.delete("string == 'z'") + tbl2.append(arrow_table_with_null) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_conflict_append_delete( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + identifier = "default.test_conflict" + tbl1 = _create_table(session_catalog, identifier, format_version, [arrow_table_with_null]) + tbl2 = session_catalog.load_table(identifier) + + tbl1.append(arrow_table_with_null) + + with pytest.raises(CommitFailedException, match="(branch main has changed: expected id ).*"): + # tbl2 isn't aware of the commit by tbl1 + tbl2.delete("string == 'z'") + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_conflict_append_append( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + identifier = "default.test_conflict" + tbl1 = _create_table(session_catalog, identifier, format_version, [arrow_table_with_null]) + tbl2 = session_catalog.load_table(identifier) + + tbl1.append(arrow_table_with_null) + tbl2.append(arrow_table_with_null) From 57e0f90087d706f8557af09abeaec498f8da67a6 Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Wed, 26 Mar 2025 23:00:18 -0400 Subject: [PATCH 6/8] update parent snapshot when there are conflicts and change exception --- pyiceberg/table/update/snapshot.py | 8 +++++++- tests/integration/test_add_files.py | 20 ++++++++++++++------ 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index b77f613784..dac5edebcf 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -27,6 +27,7 @@ from sortedcontainers import SortedList +from pyiceberg.exceptions import CommitFailedException from pyiceberg.expressions import ( AlwaysFalse, BooleanExpression, @@ -259,9 +260,14 @@ def _commit(self) -> UpdatesAndRequirements: if not isinstance(self._transaction._table, StagedTable): starting_snapshot = self._transaction.table_metadata.current_snapshot() current_snapshot = self._transaction._table.refresh().metadata.current_snapshot() + if starting_snapshot is not None and current_snapshot is not None: self._validate(starting_snapshot, current_snapshot) + # If the current snapshot is not the same as the parent snapshot, update the parent snapshot id + if current_snapshot is not None and current_snapshot.snapshot_id != self._parent_snapshot_id: + self._parent_snapshot_id = current_snapshot.snapshot_id + with write_manifest_list( format_version=self._transaction.table_metadata.format_version, output_file=self._io.new_output(manifest_list_file_path), @@ -309,7 +315,7 @@ def _validate(self, starting_snapshot: Snapshot, current_snapshot: Snapshot) -> snapshot_operation = snapshot.summary.operation if snapshot.summary is not None else None if snapshot_operation not in allowed_operations[self._operation]: - raise ValueError( + raise CommitFailedException( f"Operation {snapshot_operation} is not allowed when performing {self._operation}. " "Check for overlaps or conflicts." ) diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 02ab351a52..a671d4aa9c 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -858,12 +858,15 @@ def test_conflict_delete_delete( spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int ) -> None: identifier = "default.test_conflict" - tbl1 = _create_table(session_catalog, identifier, format_version, [arrow_table_with_null]) + tbl1 = _create_table(session_catalog, identifier, format_version, schema=arrow_table_with_null.schema) + tbl1.append(arrow_table_with_null) tbl2 = session_catalog.load_table(identifier) tbl1.delete("string == 'z'") - with pytest.raises(CommitFailedException, match="(branch main has changed: expected id ).*"): + with pytest.raises( + CommitFailedException, match="Operation .* is not allowed when performing .*. Check for overlaps or conflicts." + ): # tbl2 isn't aware of the commit by tbl1 tbl2.delete("string == 'z'") @@ -874,7 +877,8 @@ def test_conflict_delete_append( spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int ) -> None: identifier = "default.test_conflict" - tbl1 = _create_table(session_catalog, identifier, format_version, [arrow_table_with_null]) + tbl1 = _create_table(session_catalog, identifier, format_version, schema=arrow_table_with_null.schema) + tbl1.append(arrow_table_with_null) tbl2 = session_catalog.load_table(identifier) # This is allowed @@ -888,12 +892,15 @@ def test_conflict_append_delete( spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int ) -> None: identifier = "default.test_conflict" - tbl1 = _create_table(session_catalog, identifier, format_version, [arrow_table_with_null]) + tbl1 = _create_table(session_catalog, identifier, format_version, schema=arrow_table_with_null.schema) + tbl1.append(arrow_table_with_null) tbl2 = session_catalog.load_table(identifier) tbl1.append(arrow_table_with_null) - with pytest.raises(CommitFailedException, match="(branch main has changed: expected id ).*"): + with pytest.raises( + CommitFailedException, match="Operation .* is not allowed when performing .*. Check for overlaps or conflicts." + ): # tbl2 isn't aware of the commit by tbl1 tbl2.delete("string == 'z'") @@ -904,7 +911,8 @@ def test_conflict_append_append( spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int ) -> None: identifier = "default.test_conflict" - tbl1 = _create_table(session_catalog, identifier, format_version, [arrow_table_with_null]) + tbl1 = _create_table(session_catalog, identifier, format_version, schema=arrow_table_with_null.schema) + tbl1.append(arrow_table_with_null) tbl2 = session_catalog.load_table(identifier) tbl1.append(arrow_table_with_null) From 66849dd93a9a587d46ab650cb1a17a255b3c1e8e Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Thu, 8 May 2025 21:12:48 -0400 Subject: [PATCH 7/8] add table content verification for tests --- .gitignore | 2 ++ tests/integration/test_add_files.py | 10 +++++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 7043f0e7d4..fb6e37429b 100644 --- a/.gitignore +++ b/.gitignore @@ -50,3 +50,5 @@ htmlcov pyiceberg/avro/decoder_fast.c pyiceberg/avro/*.html pyiceberg/avro/*.so + +.ks/ \ No newline at end of file diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 9024b83a56..f4ade53b76 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -934,6 +934,10 @@ def test_conflict_delete_append( tbl1.delete("string == 'z'") tbl2.append(arrow_table_with_null) + # verify against expected table + arrow_table_expected = arrow_table_with_null[:2] + assert tbl1.scan().to_arrow() == arrow_table_expected + @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) @@ -955,7 +959,7 @@ def test_conflict_append_delete( @pytest.mark.integration -@pytest.mark.parametrize("format_version", [1, 2]) +@pytest.mark.parametrize("format_version", [2]) def test_conflict_append_append( spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int ) -> None: @@ -967,6 +971,10 @@ def test_conflict_append_append( tbl1.append(arrow_table_with_null) tbl2.append(arrow_table_with_null) + # verify against expected table + arrow_table_expected = pa.concat_tables([arrow_table_with_null, arrow_table_with_null, arrow_table_with_null]) + assert tbl1.scan().to_arrow() == arrow_table_expected + @pytest.mark.integration def test_add_files_hour_transform(session_catalog: Catalog) -> None: From 0824c3580747094b88cd73da907c98c3c0db8c52 Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Thu, 8 May 2025 21:13:16 -0400 Subject: [PATCH 8/8] modify allowed operations for replace --- pyiceberg/table/update/snapshot.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 48ec248e29..14697e3457 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -299,7 +299,7 @@ def _validate(self, starting_snapshot: Snapshot, current_snapshot: Snapshot) -> # Define allowed operations for each type of operation allowed_operations = { Operation.APPEND: {Operation.APPEND, Operation.REPLACE, Operation.OVERWRITE, Operation.DELETE}, - Operation.REPLACE: {Operation.APPEND}, + Operation.REPLACE: {}, Operation.OVERWRITE: set(), Operation.DELETE: set(), }