From e0df693dd0f720daa0529eb124034c02932c438a Mon Sep 17 00:00:00 2001 From: Leon Lin Date: Thu, 31 Jul 2025 12:35:15 -0700 Subject: [PATCH 1/5] fix snapshot id in deletes --- pyiceberg/table/update/snapshot.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 3ffb275ded..ed64332303 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -405,7 +405,7 @@ def _compute_deletes(self) -> Tuple[List[ManifestFile], List[ManifestEntry], boo def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> ManifestEntry: return ManifestEntry.from_args( status=status, - snapshot_id=entry.snapshot_id, + snapshot_id=self.snapshot_id if status != ManifestEntryStatus.EXISTING else entry.snapshot_id, sequence_number=entry.sequence_number, file_sequence_number=entry.file_sequence_number, data_file=entry.data_file, From ca52be5ea8406f3aae26f3d0578e45b792e3be3b Mon Sep 17 00:00:00 2001 From: Leon Lin Date: Fri, 1 Aug 2025 11:18:59 -0700 Subject: [PATCH 2/5] Add unit tests --- tests/table/test_deletes.py | 82 +++++++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 tests/table/test_deletes.py diff --git a/tests/table/test_deletes.py b/tests/table/test_deletes.py new file mode 100644 index 0000000000..61feb03e2b --- /dev/null +++ b/tests/table/test_deletes.py @@ -0,0 +1,82 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from pathlib import PosixPath + +import pyarrow as pa +import pytest + +from pyiceberg.catalog import Catalog +from pyiceberg.catalog.memory import InMemoryCatalog +from pyiceberg.exceptions import NoSuchTableError +from pyiceberg.expressions import LessThanOrEqual +from pyiceberg.manifest import ManifestContent, ManifestEntryStatus + + +@pytest.fixture +def catalog(tmp_path: PosixPath) -> InMemoryCatalog: + catalog = InMemoryCatalog("test.in_memory.catalog", warehouse=tmp_path.absolute().as_posix()) + catalog.create_namespace("default") + return catalog + + +def _drop_table(catalog: Catalog, identifier: str) -> None: + try: + catalog.drop_table(identifier) + except NoSuchTableError: + pass + + +def test_manifest_entry_after_deletes(catalog: Catalog) -> None: + identifier = "default.test_manifest_entry_after_deletes" + _drop_table(catalog, identifier) + + schema = pa.schema( + [ + ("id", pa.int32()), + ("name", pa.string()), + ] + ) + + table = catalog.create_table(identifier, schema) + data = pa.Table.from_pylist( + [ + {"id": 1, "name": "foo"}, + {"id": 2, "name": "bar"}, + {"id": 3, "name": "bar"}, + {"id": 4, "name": "bar"}, + ], + schema=schema, + ) + table.append(data) + + def assert_manifest_entry(expected_status: ManifestEntryStatus, expected_snapshot_id: int) -> None: + current_snapshot = table.refresh().current_snapshot() + manifest_files = current_snapshot.manifests(table.io) + assert len(manifest_files) == 1 + + entries = manifest_files[0].fetch_manifest_entry(table.io, discard_deleted=False) + assert len(entries) == 1 + entry = entries[0] + assert entry.status == expected_status + assert entry.snapshot_id == expected_snapshot_id + + before_delete_snapshot_id = table.current_snapshot().snapshot_id + assert_manifest_entry(ManifestEntryStatus.ADDED, before_delete_snapshot_id) + + table.delete(LessThanOrEqual("id", 4)) + after_delete_snapshot_id = table.refresh().current_snapshot().snapshot_id + assert_manifest_entry(ManifestEntryStatus.DELETED, after_delete_snapshot_id) From d3bd2286b8de98ab0a019ebe0f7f0008600bcac4 Mon Sep 17 00:00:00 2001 From: Leon Lin Date: Fri, 1 Aug 2025 13:23:44 -0700 Subject: [PATCH 3/5] rename test file to avoid conflict --- tests/table/{test_deletes.py => test_delete.py} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename tests/table/{test_deletes.py => test_delete.py} (97%) diff --git a/tests/table/test_deletes.py b/tests/table/test_delete.py similarity index 97% rename from tests/table/test_deletes.py rename to tests/table/test_delete.py index 61feb03e2b..ab20cd3874 100644 --- a/tests/table/test_deletes.py +++ b/tests/table/test_delete.py @@ -23,7 +23,7 @@ from pyiceberg.catalog.memory import InMemoryCatalog from pyiceberg.exceptions import NoSuchTableError from pyiceberg.expressions import LessThanOrEqual -from pyiceberg.manifest import ManifestContent, ManifestEntryStatus +from pyiceberg.manifest import ManifestEntryStatus @pytest.fixture From 658c012be536c31e479725bf583e875f7256efa8 Mon Sep 17 00:00:00 2001 From: Leon Lin Date: Mon, 11 Aug 2025 10:56:47 -0700 Subject: [PATCH 4/5] Update condition Co-authored-by: Fokko Driesprong --- pyiceberg/table/update/snapshot.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index ed64332303..e6ca4104a1 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -405,7 +405,8 @@ def _compute_deletes(self) -> Tuple[List[ManifestFile], List[ManifestEntry], boo def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> ManifestEntry: return ManifestEntry.from_args( status=status, - snapshot_id=self.snapshot_id if status != ManifestEntryStatus.EXISTING else entry.snapshot_id, + # When a file is replaced or deleted from the dataset, its manifest entry fields store the snapshot ID in which the file was deleted and status 2 (deleted). + snapshot_id=self.snapshot_id if status == ManifestEntryStatus.DELETED else entry.snapshot_id, sequence_number=entry.sequence_number, file_sequence_number=entry.file_sequence_number, data_file=entry.data_file, From c554bda3fb347e09d2bdf3ab8e486d4d6ab13bf4 Mon Sep 17 00:00:00 2001 From: Leon Lin Date: Mon, 11 Aug 2025 23:25:34 -0700 Subject: [PATCH 5/5] Fix lint and move tests to test_deletes --- tests/integration/test_deletes.py | 54 +++++++++++++++++++- tests/table/test_delete.py | 82 ------------------------------- 2 files changed, 53 insertions(+), 83 deletions(-) delete mode 100644 tests/table/test_delete.py diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index abf8502ac7..21c3d12999 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -24,7 +24,7 @@ from pyiceberg.catalog.rest import RestCatalog from pyiceberg.exceptions import NoSuchTableError -from pyiceberg.expressions import AlwaysTrue, EqualTo +from pyiceberg.expressions import AlwaysTrue, EqualTo, LessThanOrEqual from pyiceberg.manifest import ManifestEntryStatus from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema @@ -923,3 +923,55 @@ def test_delete_on_empty_table(spark: SparkSession, session_catalog: RestCatalog # Assert that no new snapshot was created because no rows were deleted assert len(tbl.snapshots()) == 0 + + +@pytest.mark.integration +def test_manifest_entry_after_deletes(session_catalog: RestCatalog) -> None: + identifier = "default.test_manifest_entry_after_deletes" + try: + session_catalog.drop_table(identifier) + except NoSuchTableError: + pass + + schema = pa.schema( + [ + ("id", pa.int32()), + ("name", pa.string()), + ] + ) + + table = session_catalog.create_table(identifier, schema) + data = pa.Table.from_pylist( + [ + {"id": 1, "name": "foo"}, + {"id": 2, "name": "bar"}, + {"id": 3, "name": "bar"}, + {"id": 4, "name": "bar"}, + ], + schema=schema, + ) + table.append(data) + + def assert_manifest_entry(expected_status: ManifestEntryStatus, expected_snapshot_id: int) -> None: + current_snapshot = table.refresh().current_snapshot() + assert current_snapshot is not None + + manifest_files = current_snapshot.manifests(table.io) + assert len(manifest_files) == 1 + + entries = manifest_files[0].fetch_manifest_entry(table.io, discard_deleted=False) + assert len(entries) == 1 + entry = entries[0] + assert entry.status == expected_status + assert entry.snapshot_id == expected_snapshot_id + + before_delete_snapshot = table.current_snapshot() + assert before_delete_snapshot is not None + + assert_manifest_entry(ManifestEntryStatus.ADDED, before_delete_snapshot.snapshot_id) + + table.delete(LessThanOrEqual("id", 4)) + after_delete_snapshot = table.refresh().current_snapshot() + assert after_delete_snapshot is not None + + assert_manifest_entry(ManifestEntryStatus.DELETED, after_delete_snapshot.snapshot_id) diff --git a/tests/table/test_delete.py b/tests/table/test_delete.py deleted file mode 100644 index ab20cd3874..0000000000 --- a/tests/table/test_delete.py +++ /dev/null @@ -1,82 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from pathlib import PosixPath - -import pyarrow as pa -import pytest - -from pyiceberg.catalog import Catalog -from pyiceberg.catalog.memory import InMemoryCatalog -from pyiceberg.exceptions import NoSuchTableError -from pyiceberg.expressions import LessThanOrEqual -from pyiceberg.manifest import ManifestEntryStatus - - -@pytest.fixture -def catalog(tmp_path: PosixPath) -> InMemoryCatalog: - catalog = InMemoryCatalog("test.in_memory.catalog", warehouse=tmp_path.absolute().as_posix()) - catalog.create_namespace("default") - return catalog - - -def _drop_table(catalog: Catalog, identifier: str) -> None: - try: - catalog.drop_table(identifier) - except NoSuchTableError: - pass - - -def test_manifest_entry_after_deletes(catalog: Catalog) -> None: - identifier = "default.test_manifest_entry_after_deletes" - _drop_table(catalog, identifier) - - schema = pa.schema( - [ - ("id", pa.int32()), - ("name", pa.string()), - ] - ) - - table = catalog.create_table(identifier, schema) - data = pa.Table.from_pylist( - [ - {"id": 1, "name": "foo"}, - {"id": 2, "name": "bar"}, - {"id": 3, "name": "bar"}, - {"id": 4, "name": "bar"}, - ], - schema=schema, - ) - table.append(data) - - def assert_manifest_entry(expected_status: ManifestEntryStatus, expected_snapshot_id: int) -> None: - current_snapshot = table.refresh().current_snapshot() - manifest_files = current_snapshot.manifests(table.io) - assert len(manifest_files) == 1 - - entries = manifest_files[0].fetch_manifest_entry(table.io, discard_deleted=False) - assert len(entries) == 1 - entry = entries[0] - assert entry.status == expected_status - assert entry.snapshot_id == expected_snapshot_id - - before_delete_snapshot_id = table.current_snapshot().snapshot_id - assert_manifest_entry(ManifestEntryStatus.ADDED, before_delete_snapshot_id) - - table.delete(LessThanOrEqual("id", 4)) - after_delete_snapshot_id = table.refresh().current_snapshot().snapshot_id - assert_manifest_entry(ManifestEntryStatus.DELETED, after_delete_snapshot_id)