From b6ba31942db6e94fe7676d7da367bf2e888a2253 Mon Sep 17 00:00:00 2001 From: Borodin Gregory Date: Wed, 22 Jan 2025 17:03:40 +0100 Subject: [PATCH 01/14] Implement update for `remove-snapshot` action --- pyiceberg/table/update/__init__.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 3cf2db630d..0914a2cfd0 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -455,6 +455,16 @@ def _(update: SetSnapshotRefUpdate, base_metadata: TableMetadata, context: _Tabl return base_metadata.model_copy(update=metadata_updates) +@_apply_table_update.register(RemoveSnapshotsUpdate) +def _(update: RemoveSnapshotsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata: + if not any(s.snapshot_id == update.snapshot_id for s in base_metadata.snapshots): + raise ValueError(f"Snapshot with snapshot id {update.snapshot_id} does not exist") + + snapshots = [s for s in base_metadata.snapshots if s.snapshot_id != update.snapshot_id] + context.add_update(update) + return base_metadata.model_copy(update={"snapshots": snapshots}) + + @_apply_table_update.register(AddSortOrderUpdate) def _(update: AddSortOrderUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata: context.add_update(update) From 88064c07aaed654c6dde1adb8eae575e8f9d744a Mon Sep 17 00:00:00 2001 From: Borodin Gregory Date: Wed, 22 Jan 2025 20:19:47 +0100 Subject: [PATCH 02/14] Add tests --- pyiceberg/table/update/__init__.py | 9 ++++++--- tests/table/test_init.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 0914a2cfd0..0dfe3c9a0c 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -457,10 +457,13 @@ def _(update: SetSnapshotRefUpdate, base_metadata: TableMetadata, context: _Tabl @_apply_table_update.register(RemoveSnapshotsUpdate) def _(update: RemoveSnapshotsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata: - if not any(s.snapshot_id == update.snapshot_id for s in base_metadata.snapshots): - raise ValueError(f"Snapshot with snapshot id {update.snapshot_id} does not exist") + for remove_snapshot_id in update.snapshot_ids: + if remove_snapshot_id == base_metadata.current_snapshot_id: + raise ValueError(f"Can't remove current snapshot id {remove_snapshot_id}") + if not any(s.snapshot_id == remove_snapshot_id for s in base_metadata.snapshots): + raise ValueError(f"Snapshot with snapshot id {remove_snapshot_id} does not exist: {base_metadata.snapshots}") - snapshots = [s for s in base_metadata.snapshots if s.snapshot_id != update.snapshot_id] + snapshots = [s for s in base_metadata.snapshots if s.snapshot_id not in update.snapshot_ids] context.add_update(update) return base_metadata.model_copy(update={"snapshots": snapshots}) diff --git a/tests/table/test_init.py b/tests/table/test_init.py index e1f2ccc876..5d3e39e9ee 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -78,6 +78,7 @@ AssertRefSnapshotId, AssertTableUUID, RemovePropertiesUpdate, + RemoveSnapshotsUpdate, RemoveStatisticsUpdate, SetDefaultSortOrderUpdate, SetPropertiesUpdate, @@ -793,6 +794,33 @@ def test_update_metadata_set_snapshot_ref(table_v2: Table) -> None: ) +def test_update_remove_snapshots(table_v2: Table) -> None: + update = RemoveSnapshotsUpdate( + snapshot_ids=[3051729675574597004], + ) + new_metadata = update_table_metadata(table_v2.metadata, (update,)) + assert len(new_metadata.snapshots) == 1 + assert new_metadata.snapshots[0].snapshot_id == 3055729675574597004 + assert new_metadata.current_snapshot_id == 3055729675574597004 + assert new_metadata.last_updated_ms > table_v2.metadata.last_updated_ms + + +def test_update_remove_snapshots_doesnt_exist(table_v2: Table) -> None: + update = RemoveSnapshotsUpdate( + snapshot_ids=[123], + ) + with pytest.raises(ValueError, match="Snapshot with snapshot id 123 does not exist"): + update_table_metadata(table_v2.metadata, (update,)) + + +def test_update_remove_snapshots_cant_remove_current_snapshot_id(table_v2: Table) -> None: + update = RemoveSnapshotsUpdate( + snapshot_ids=[3055729675574597004], + ) + with pytest.raises(ValueError, match="Can't remove current snapshot id 3055729675574597004"): + update_table_metadata(table_v2.metadata, (update,)) + + def test_update_metadata_add_update_sort_order(table_v2: Table) -> None: new_sort_order = SortOrder(order_id=table_v2.sort_order().order_id + 1) new_metadata = update_table_metadata( From 818f77190527de11018d9ac636e75a87326a58b5 Mon Sep 17 00:00:00 2001 From: Borodin Gregory Date: Wed, 22 Jan 2025 20:23:01 +0100 Subject: [PATCH 03/14] Make sure parent_snapshot_id is consistent --- tests/table/test_init.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 5d3e39e9ee..aa27a6f3bd 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -801,6 +801,7 @@ def test_update_remove_snapshots(table_v2: Table) -> None: new_metadata = update_table_metadata(table_v2.metadata, (update,)) assert len(new_metadata.snapshots) == 1 assert new_metadata.snapshots[0].snapshot_id == 3055729675574597004 + assert new_metadata.snapshots[0].parent_snapshot_id == None assert new_metadata.current_snapshot_id == 3055729675574597004 assert new_metadata.last_updated_ms > table_v2.metadata.last_updated_ms From 9b3169088449447409908ef762f5a39e2065ff81 Mon Sep 17 00:00:00 2001 From: Borodin Gregory Date: Thu, 23 Jan 2025 15:38:42 +0100 Subject: [PATCH 04/14] Set parent_snapshot_id to None --- pyiceberg/table/update/__init__.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 0dfe3c9a0c..a671858fde 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -463,7 +463,11 @@ def _(update: RemoveSnapshotsUpdate, base_metadata: TableMetadata, context: _Tab if not any(s.snapshot_id == remove_snapshot_id for s in base_metadata.snapshots): raise ValueError(f"Snapshot with snapshot id {remove_snapshot_id} does not exist: {base_metadata.snapshots}") - snapshots = [s for s in base_metadata.snapshots if s.snapshot_id not in update.snapshot_ids] + snapshots = [ + (s.model_copy(update={"parent_snapshot_id": None}) if s.parent_snapshot_id in update.snapshot_ids else s) + for s in base_metadata.snapshots + if s.snapshot_id not in update.snapshot_ids + ] context.add_update(update) return base_metadata.model_copy(update={"snapshots": snapshots}) From c5cbfe4b9341d9fce7f2774728d591f0fedbbcda Mon Sep 17 00:00:00 2001 From: Borodin Gregory Date: Thu, 23 Jan 2025 22:17:48 +0100 Subject: [PATCH 05/14] cherry-pick: Support Remove Branch or Tag APIs (#822) --- pyiceberg/table/update/__init__.py | 17 ++++++++ pyiceberg/table/update/snapshot.py | 42 +++++++++++++++++++ tests/integration/test_snapshot_operations.py | 32 ++++++++++++++ 3 files changed, 91 insertions(+) diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index a671858fde..74dc4644bf 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -472,6 +472,23 @@ def _(update: RemoveSnapshotsUpdate, base_metadata: TableMetadata, context: _Tab return base_metadata.model_copy(update={"snapshots": snapshots}) +@_apply_table_update.register(RemoveSnapshotRefUpdate) +def _(update: RemoveSnapshotRefUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata: + if (existing_ref := base_metadata.refs.get(update.ref_name, None)) is None: + return base_metadata + + if base_metadata.snapshot_by_id(existing_ref.snapshot_id) is None: + raise ValueError(f"Cannot remove {update.ref_name} ref with unknown snapshot {existing_ref.snapshot_id}") + + if update.ref_name == MAIN_BRANCH: + raise ValueError("Cannot remove main branch") + + metadata_refs = {**base_metadata.refs} + metadata_refs.pop(update.ref_name, None) + context.add_update(update) + return base_metadata.model_copy(update={"refs": metadata_refs}) + + @_apply_table_update.register(AddSortOrderUpdate) def _(update: AddSortOrderUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata: context.add_update(update) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index c0d0056e7c..9373c9001c 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -65,6 +65,7 @@ from pyiceberg.table.update import ( AddSnapshotUpdate, AssertRefSnapshotId, + RemoveSnapshotRefUpdate, SetSnapshotRefUpdate, TableRequirement, TableUpdate, @@ -749,6 +750,27 @@ def _commit(self) -> UpdatesAndRequirements: """Apply the pending changes and commit.""" return self._updates, self._requirements + def _remove_ref_snapshot(self, ref_name: str) -> ManageSnapshots: + """Remove a snapshot ref. + Args: + ref_name: branch / tag name to remove + Stages the updates and requirements for the remove-snapshot-ref. + Returns + This method for chaining + """ + updates = (RemoveSnapshotRefUpdate(ref_name=ref_name),) + requirements = ( + AssertRefSnapshotId( + snapshot_id=self._transaction.table_metadata.refs[ref_name].snapshot_id + if ref_name in self._transaction.table_metadata.refs + else None, + ref=ref_name, + ), + ) + self._updates += updates + self._requirements += requirements + return self + def create_tag(self, snapshot_id: int, tag_name: str, max_ref_age_ms: Optional[int] = None) -> ManageSnapshots: """ Create a new tag pointing to the given snapshot id. @@ -771,6 +793,16 @@ def create_tag(self, snapshot_id: int, tag_name: str, max_ref_age_ms: Optional[i self._requirements += requirement return self + def remove_tag(self, tag_name: str) -> ManageSnapshots: + """ + Remove a tag. + Args: + tag_name (str): name of tag to remove + Returns: + This for method chaining + """ + return self._remove_ref_snapshot(ref_name=tag_name) + def create_branch( self, snapshot_id: int, @@ -802,3 +834,13 @@ def create_branch( self._updates += update self._requirements += requirement return self + + def remove_branch(self, branch_name: str) -> ManageSnapshots: + """ + Remove a branch. + Args: + branch_name (str): name of branch to remove + Returns: + This for method chaining + """ + return self._remove_ref_snapshot(ref_name=branch_name) diff --git a/tests/integration/test_snapshot_operations.py b/tests/integration/test_snapshot_operations.py index 639193383e..1b7f2d3a29 100644 --- a/tests/integration/test_snapshot_operations.py +++ b/tests/integration/test_snapshot_operations.py @@ -40,3 +40,35 @@ def test_create_branch(catalog: Catalog) -> None: branch_snapshot_id = tbl.history()[-2].snapshot_id tbl.manage_snapshots().create_branch(snapshot_id=branch_snapshot_id, branch_name="branch123").commit() assert tbl.metadata.refs["branch123"] == SnapshotRef(snapshot_id=branch_snapshot_id, snapshot_ref_type="branch") + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_remove_tag(catalog: Catalog) -> None: + identifier = "default.test_table_snapshot_operations" + tbl = catalog.load_table(identifier) + assert len(tbl.history()) > 3 + # first, create the tag to remove + tag_name = "tag_to_remove" + tag_snapshot_id = tbl.history()[-3].snapshot_id + tbl.manage_snapshots().create_tag(snapshot_id=tag_snapshot_id, tag_name=tag_name).commit() + assert tbl.metadata.refs[tag_name] == SnapshotRef(snapshot_id=tag_snapshot_id, snapshot_ref_type="tag") + # now, remove the tag + tbl.manage_snapshots().remove_tag(tag_name=tag_name).commit() + assert tbl.metadata.refs.get(tag_name, None) is None + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_remove_branch(catalog: Catalog) -> None: + identifier = "default.test_table_snapshot_operations" + tbl = catalog.load_table(identifier) + assert len(tbl.history()) > 2 + # first, create the branch to remove + branch_name = "branch_to_remove" + branch_snapshot_id = tbl.history()[-2].snapshot_id + tbl.manage_snapshots().create_branch(snapshot_id=branch_snapshot_id, branch_name=branch_name).commit() + assert tbl.metadata.refs[branch_name] == SnapshotRef(snapshot_id=branch_snapshot_id, snapshot_ref_type="branch") + # now, remove the branch + tbl.manage_snapshots().remove_branch(branch_name=branch_name).commit() + assert tbl.metadata.refs.get(branch_name, None) is None From 29feaf71cbca7e227bef04b95caef0731680901f Mon Sep 17 00:00:00 2001 From: Borodin Gregory Date: Thu, 23 Jan 2025 22:40:40 +0100 Subject: [PATCH 06/14] Make sure refs are removed when removing snapshots --- pyiceberg/table/update/__init__.py | 12 +++++++++++- tests/table/test_init.py | 9 ++++++--- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 74dc4644bf..2577ed03d4 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -468,8 +468,18 @@ def _(update: RemoveSnapshotsUpdate, base_metadata: TableMetadata, context: _Tab for s in base_metadata.snapshots if s.snapshot_id not in update.snapshot_ids ] + + remove_ref_updates = ( + RemoveSnapshotRefUpdate(ref_name=ref_name) + for ref_name, ref in base_metadata.refs.items() + if ref.snapshot_id in update.snapshot_ids + ) + new_metadata = base_metadata + for remove_ref in remove_ref_updates: + new_metadata = _apply_table_update(remove_ref, new_metadata, context) + context.add_update(update) - return base_metadata.model_copy(update={"snapshots": snapshots}) + return new_metadata.model_copy(update={"snapshots": snapshots}) @_apply_table_update.register(RemoveSnapshotRefUpdate) diff --git a/tests/table/test_init.py b/tests/table/test_init.py index aa27a6f3bd..e76f3de9a6 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -795,15 +795,18 @@ def test_update_metadata_set_snapshot_ref(table_v2: Table) -> None: def test_update_remove_snapshots(table_v2: Table) -> None: - update = RemoveSnapshotsUpdate( - snapshot_ids=[3051729675574597004], - ) + # assert fixture data to easily understand the test assumptions + assert len(table_v2.metadata.snapshots) == 2 + assert len(table_v2.metadata.refs) == 2 + update = RemoveSnapshotsUpdate(snapshot_ids=[3051729675574597004]) new_metadata = update_table_metadata(table_v2.metadata, (update,)) assert len(new_metadata.snapshots) == 1 assert new_metadata.snapshots[0].snapshot_id == 3055729675574597004 assert new_metadata.snapshots[0].parent_snapshot_id == None assert new_metadata.current_snapshot_id == 3055729675574597004 assert new_metadata.last_updated_ms > table_v2.metadata.last_updated_ms + assert len(new_metadata.refs) == 1 + assert new_metadata.refs["main"].snapshot_id == 3055729675574597004 def test_update_remove_snapshots_doesnt_exist(table_v2: Table) -> None: From 9e3f7c7e3efb05a54810d33f188f8193427cf10d Mon Sep 17 00:00:00 2001 From: Borodin Gregory Date: Mon, 27 Jan 2025 13:37:59 +0100 Subject: [PATCH 07/14] Fix ruff and pydocstyle --- pyiceberg/table/update/snapshot.py | 3 +++ tests/table/test_init.py | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 9373c9001c..c1bf46566f 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -752,6 +752,7 @@ def _commit(self) -> UpdatesAndRequirements: def _remove_ref_snapshot(self, ref_name: str) -> ManageSnapshots: """Remove a snapshot ref. + Args: ref_name: branch / tag name to remove Stages the updates and requirements for the remove-snapshot-ref. @@ -796,6 +797,7 @@ def create_tag(self, snapshot_id: int, tag_name: str, max_ref_age_ms: Optional[i def remove_tag(self, tag_name: str) -> ManageSnapshots: """ Remove a tag. + Args: tag_name (str): name of tag to remove Returns: @@ -838,6 +840,7 @@ def create_branch( def remove_branch(self, branch_name: str) -> ManageSnapshots: """ Remove a branch. + Args: branch_name (str): name of branch to remove Returns: diff --git a/tests/table/test_init.py b/tests/table/test_init.py index e76f3de9a6..49ec329cad 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -802,7 +802,7 @@ def test_update_remove_snapshots(table_v2: Table) -> None: new_metadata = update_table_metadata(table_v2.metadata, (update,)) assert len(new_metadata.snapshots) == 1 assert new_metadata.snapshots[0].snapshot_id == 3055729675574597004 - assert new_metadata.snapshots[0].parent_snapshot_id == None + assert new_metadata.snapshots[0].parent_snapshot_id is None assert new_metadata.current_snapshot_id == 3055729675574597004 assert new_metadata.last_updated_ms > table_v2.metadata.last_updated_ms assert len(new_metadata.refs) == 1 From b9c912de94b7b5d8f2191647144bdaedfa3e6c0d Mon Sep 17 00:00:00 2001 From: Borodin Gregory Date: Thu, 30 Jan 2025 12:52:51 +0100 Subject: [PATCH 08/14] Remove statistics --- pyiceberg/table/update/__init__.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 2577ed03d4..cfd97e0a96 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import itertools import uuid from abc import ABC, abstractmethod from datetime import datetime @@ -474,9 +475,15 @@ def _(update: RemoveSnapshotsUpdate, base_metadata: TableMetadata, context: _Tab for ref_name, ref in base_metadata.refs.items() if ref.snapshot_id in update.snapshot_ids ) + remove_statistics_updates = ( + RemoveStatisticsUpdate(statistics_file.snapshot_id) + for statistics_file in base_metadata.statistics + if statistics_file.snapshot_id in update.snapshot_ids + ) + updates = itertools.chain(remove_ref_updates, remove_statistics_updates) new_metadata = base_metadata - for remove_ref in remove_ref_updates: - new_metadata = _apply_table_update(remove_ref, new_metadata, context) + for update in updates: + new_metadata = _apply_table_update(update, new_metadata, context) context.add_update(update) return new_metadata.model_copy(update={"snapshots": snapshots}) From d515e74ad33e8f093111283ff41e0471f1475552 Mon Sep 17 00:00:00 2001 From: Borodin Gregory Date: Thu, 30 Jan 2025 13:08:58 +0100 Subject: [PATCH 09/14] Fix name collision --- pyiceberg/table/update/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index cfd97e0a96..2e2aea1340 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -482,8 +482,8 @@ def _(update: RemoveSnapshotsUpdate, base_metadata: TableMetadata, context: _Tab ) updates = itertools.chain(remove_ref_updates, remove_statistics_updates) new_metadata = base_metadata - for update in updates: - new_metadata = _apply_table_update(update, new_metadata, context) + for upd in updates: + new_metadata = _apply_table_update(upd, new_metadata, context) context.add_update(update) return new_metadata.model_copy(update={"snapshots": snapshots}) From c7481fe096d9fefc8320c8d1da1e7f816f762da7 Mon Sep 17 00:00:00 2001 From: Borodin Gregory Date: Thu, 30 Jan 2025 13:41:17 +0100 Subject: [PATCH 10/14] Remove entries from snapshot log --- pyiceberg/table/update/__init__.py | 7 ++++++- tests/table/test_init.py | 3 +++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 2e2aea1340..9e04643913 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -469,6 +469,11 @@ def _(update: RemoveSnapshotsUpdate, base_metadata: TableMetadata, context: _Tab for s in base_metadata.snapshots if s.snapshot_id not in update.snapshot_ids ] + snapshot_log = [ + snapshot_log_entry + for snapshot_log_entry in base_metadata.snapshot_log + if snapshot_log_entry.snapshot_id not in update.snapshot_ids + ] remove_ref_updates = ( RemoveSnapshotRefUpdate(ref_name=ref_name) @@ -486,7 +491,7 @@ def _(update: RemoveSnapshotsUpdate, base_metadata: TableMetadata, context: _Tab new_metadata = _apply_table_update(upd, new_metadata, context) context.add_update(update) - return new_metadata.model_copy(update={"snapshots": snapshots}) + return new_metadata.model_copy(update={"snapshots": snapshots, "snapshot_log": snapshot_log}) @_apply_table_update.register(RemoveSnapshotRefUpdate) diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 49ec329cad..7c0c70d145 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -797,6 +797,7 @@ def test_update_metadata_set_snapshot_ref(table_v2: Table) -> None: def test_update_remove_snapshots(table_v2: Table) -> None: # assert fixture data to easily understand the test assumptions assert len(table_v2.metadata.snapshots) == 2 + assert len(table_v2.metadata.snapshot_log) == 2 assert len(table_v2.metadata.refs) == 2 update = RemoveSnapshotsUpdate(snapshot_ids=[3051729675574597004]) new_metadata = update_table_metadata(table_v2.metadata, (update,)) @@ -805,6 +806,8 @@ def test_update_remove_snapshots(table_v2: Table) -> None: assert new_metadata.snapshots[0].parent_snapshot_id is None assert new_metadata.current_snapshot_id == 3055729675574597004 assert new_metadata.last_updated_ms > table_v2.metadata.last_updated_ms + assert len(new_metadata.snapshot_log) == 1 + assert new_metadata.snapshot_log[0].snapshot_id == 3055729675574597004 assert len(new_metadata.refs) == 1 assert new_metadata.refs["main"].snapshot_id == 3055729675574597004 From da4db9f024663aebb427ba7bd3f17830520287c0 Mon Sep 17 00:00:00 2001 From: Borodin Gregory Date: Sat, 1 Feb 2025 10:34:05 +0100 Subject: [PATCH 11/14] Introduce constants for test readability --- tests/table/test_init.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 7c0c70d145..1cede9c2cd 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -795,21 +795,23 @@ def test_update_metadata_set_snapshot_ref(table_v2: Table) -> None: def test_update_remove_snapshots(table_v2: Table) -> None: + REMOVE_SNAPSHOT = 3051729675574597004 + KEEP_SNAPSHOT = 3055729675574597004 # assert fixture data to easily understand the test assumptions assert len(table_v2.metadata.snapshots) == 2 assert len(table_v2.metadata.snapshot_log) == 2 assert len(table_v2.metadata.refs) == 2 - update = RemoveSnapshotsUpdate(snapshot_ids=[3051729675574597004]) + update = RemoveSnapshotsUpdate(snapshot_ids=[REMOVE_SNAPSHOT]) new_metadata = update_table_metadata(table_v2.metadata, (update,)) assert len(new_metadata.snapshots) == 1 - assert new_metadata.snapshots[0].snapshot_id == 3055729675574597004 + assert new_metadata.snapshots[0].snapshot_id == KEEP_SNAPSHOT assert new_metadata.snapshots[0].parent_snapshot_id is None - assert new_metadata.current_snapshot_id == 3055729675574597004 + assert new_metadata.current_snapshot_id == KEEP_SNAPSHOT assert new_metadata.last_updated_ms > table_v2.metadata.last_updated_ms assert len(new_metadata.snapshot_log) == 1 - assert new_metadata.snapshot_log[0].snapshot_id == 3055729675574597004 + assert new_metadata.snapshot_log[0].snapshot_id == KEEP_SNAPSHOT assert len(new_metadata.refs) == 1 - assert new_metadata.refs["main"].snapshot_id == 3055729675574597004 + assert new_metadata.refs["main"].snapshot_id == KEEP_SNAPSHOT def test_update_remove_snapshots_doesnt_exist(table_v2: Table) -> None: From 19e17f0631d51b365c7fa12e4fcfa0cf28368769 Mon Sep 17 00:00:00 2001 From: Borodin Gregory Date: Sat, 1 Feb 2025 10:44:02 +0100 Subject: [PATCH 12/14] Allow to remove main branch --- pyiceberg/table/update/__init__.py | 10 +++------- tests/table/test_init.py | 12 ++++++------ 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 9e04643913..f7b5b8fd31 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -459,8 +459,6 @@ def _(update: SetSnapshotRefUpdate, base_metadata: TableMetadata, context: _Tabl @_apply_table_update.register(RemoveSnapshotsUpdate) def _(update: RemoveSnapshotsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata: for remove_snapshot_id in update.snapshot_ids: - if remove_snapshot_id == base_metadata.current_snapshot_id: - raise ValueError(f"Can't remove current snapshot id {remove_snapshot_id}") if not any(s.snapshot_id == remove_snapshot_id for s in base_metadata.snapshots): raise ValueError(f"Snapshot with snapshot id {remove_snapshot_id} does not exist: {base_metadata.snapshots}") @@ -502,13 +500,11 @@ def _(update: RemoveSnapshotRefUpdate, base_metadata: TableMetadata, context: _T if base_metadata.snapshot_by_id(existing_ref.snapshot_id) is None: raise ValueError(f"Cannot remove {update.ref_name} ref with unknown snapshot {existing_ref.snapshot_id}") - if update.ref_name == MAIN_BRANCH: - raise ValueError("Cannot remove main branch") + current_snapshot_id = None if update.ref_name == MAIN_BRANCH else base_metadata.current_snapshot_id - metadata_refs = {**base_metadata.refs} - metadata_refs.pop(update.ref_name, None) + metadata_refs = {ref_name: ref for ref_name, ref in base_metadata.refs.items() if ref_name != update.ref_name} context.add_update(update) - return base_metadata.model_copy(update={"refs": metadata_refs}) + return base_metadata.model_copy(update={"refs": metadata_refs, "current_snapshot_id": current_snapshot_id}) @_apply_table_update.register(AddSortOrderUpdate) diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 1cede9c2cd..6fbcca357e 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -822,12 +822,12 @@ def test_update_remove_snapshots_doesnt_exist(table_v2: Table) -> None: update_table_metadata(table_v2.metadata, (update,)) -def test_update_remove_snapshots_cant_remove_current_snapshot_id(table_v2: Table) -> None: - update = RemoveSnapshotsUpdate( - snapshot_ids=[3055729675574597004], - ) - with pytest.raises(ValueError, match="Can't remove current snapshot id 3055729675574597004"): - update_table_metadata(table_v2.metadata, (update,)) +def test_update_remove_snapshots_remove_current_snapshot_id(table_v2: Table) -> None: + update = RemoveSnapshotsUpdate(snapshot_ids=[3055729675574597004]) + new_metadata = update_table_metadata(table_v2.metadata, (update,)) + assert len(new_metadata.refs) == 1 + assert new_metadata.refs["test"].snapshot_id == 3051729675574597004 + assert new_metadata.current_snapshot_id is None def test_update_metadata_add_update_sort_order(table_v2: Table) -> None: From 32e1e854ec8d12ece996d7cf00b59e6ddbf7de30 Mon Sep 17 00:00:00 2001 From: Borodin Gregory Date: Sat, 1 Feb 2025 10:46:54 +0100 Subject: [PATCH 13/14] Revert changes (moved to #1598) --- pyiceberg/table/update/snapshot.py | 45 ------------------- tests/integration/test_snapshot_operations.py | 32 ------------- 2 files changed, 77 deletions(-) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index c1bf46566f..c0d0056e7c 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -65,7 +65,6 @@ from pyiceberg.table.update import ( AddSnapshotUpdate, AssertRefSnapshotId, - RemoveSnapshotRefUpdate, SetSnapshotRefUpdate, TableRequirement, TableUpdate, @@ -750,28 +749,6 @@ def _commit(self) -> UpdatesAndRequirements: """Apply the pending changes and commit.""" return self._updates, self._requirements - def _remove_ref_snapshot(self, ref_name: str) -> ManageSnapshots: - """Remove a snapshot ref. - - Args: - ref_name: branch / tag name to remove - Stages the updates and requirements for the remove-snapshot-ref. - Returns - This method for chaining - """ - updates = (RemoveSnapshotRefUpdate(ref_name=ref_name),) - requirements = ( - AssertRefSnapshotId( - snapshot_id=self._transaction.table_metadata.refs[ref_name].snapshot_id - if ref_name in self._transaction.table_metadata.refs - else None, - ref=ref_name, - ), - ) - self._updates += updates - self._requirements += requirements - return self - def create_tag(self, snapshot_id: int, tag_name: str, max_ref_age_ms: Optional[int] = None) -> ManageSnapshots: """ Create a new tag pointing to the given snapshot id. @@ -794,17 +771,6 @@ def create_tag(self, snapshot_id: int, tag_name: str, max_ref_age_ms: Optional[i self._requirements += requirement return self - def remove_tag(self, tag_name: str) -> ManageSnapshots: - """ - Remove a tag. - - Args: - tag_name (str): name of tag to remove - Returns: - This for method chaining - """ - return self._remove_ref_snapshot(ref_name=tag_name) - def create_branch( self, snapshot_id: int, @@ -836,14 +802,3 @@ def create_branch( self._updates += update self._requirements += requirement return self - - def remove_branch(self, branch_name: str) -> ManageSnapshots: - """ - Remove a branch. - - Args: - branch_name (str): name of branch to remove - Returns: - This for method chaining - """ - return self._remove_ref_snapshot(ref_name=branch_name) diff --git a/tests/integration/test_snapshot_operations.py b/tests/integration/test_snapshot_operations.py index 1b7f2d3a29..639193383e 100644 --- a/tests/integration/test_snapshot_operations.py +++ b/tests/integration/test_snapshot_operations.py @@ -40,35 +40,3 @@ def test_create_branch(catalog: Catalog) -> None: branch_snapshot_id = tbl.history()[-2].snapshot_id tbl.manage_snapshots().create_branch(snapshot_id=branch_snapshot_id, branch_name="branch123").commit() assert tbl.metadata.refs["branch123"] == SnapshotRef(snapshot_id=branch_snapshot_id, snapshot_ref_type="branch") - - -@pytest.mark.integration -@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -def test_remove_tag(catalog: Catalog) -> None: - identifier = "default.test_table_snapshot_operations" - tbl = catalog.load_table(identifier) - assert len(tbl.history()) > 3 - # first, create the tag to remove - tag_name = "tag_to_remove" - tag_snapshot_id = tbl.history()[-3].snapshot_id - tbl.manage_snapshots().create_tag(snapshot_id=tag_snapshot_id, tag_name=tag_name).commit() - assert tbl.metadata.refs[tag_name] == SnapshotRef(snapshot_id=tag_snapshot_id, snapshot_ref_type="tag") - # now, remove the tag - tbl.manage_snapshots().remove_tag(tag_name=tag_name).commit() - assert tbl.metadata.refs.get(tag_name, None) is None - - -@pytest.mark.integration -@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -def test_remove_branch(catalog: Catalog) -> None: - identifier = "default.test_table_snapshot_operations" - tbl = catalog.load_table(identifier) - assert len(tbl.history()) > 2 - # first, create the branch to remove - branch_name = "branch_to_remove" - branch_snapshot_id = tbl.history()[-2].snapshot_id - tbl.manage_snapshots().create_branch(snapshot_id=branch_snapshot_id, branch_name=branch_name).commit() - assert tbl.metadata.refs[branch_name] == SnapshotRef(snapshot_id=branch_snapshot_id, snapshot_ref_type="branch") - # now, remove the branch - tbl.manage_snapshots().remove_branch(branch_name=branch_name).commit() - assert tbl.metadata.refs.get(branch_name, None) is None From fb8f350f4c1dbdc9383385a211d97c16efbdfe13 Mon Sep 17 00:00:00 2001 From: Borodin Gregory Date: Mon, 17 Feb 2025 12:55:25 +0100 Subject: [PATCH 14/14] Use verbose variable names --- pyiceberg/table/update/__init__.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index a5a08c5d30..f60ac1e3ee 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -470,13 +470,17 @@ def _(update: SetSnapshotRefUpdate, base_metadata: TableMetadata, context: _Tabl @_apply_table_update.register(RemoveSnapshotsUpdate) def _(update: RemoveSnapshotsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata: for remove_snapshot_id in update.snapshot_ids: - if not any(s.snapshot_id == remove_snapshot_id for s in base_metadata.snapshots): + if not any(snapshot.snapshot_id == remove_snapshot_id for snapshot in base_metadata.snapshots): raise ValueError(f"Snapshot with snapshot id {remove_snapshot_id} does not exist: {base_metadata.snapshots}") snapshots = [ - (s.model_copy(update={"parent_snapshot_id": None}) if s.parent_snapshot_id in update.snapshot_ids else s) - for s in base_metadata.snapshots - if s.snapshot_id not in update.snapshot_ids + ( + snapshot.model_copy(update={"parent_snapshot_id": None}) + if snapshot.parent_snapshot_id in update.snapshot_ids + else snapshot + ) + for snapshot in base_metadata.snapshots + if snapshot.snapshot_id not in update.snapshot_ids ] snapshot_log = [ snapshot_log_entry @@ -503,7 +507,6 @@ def _(update: RemoveSnapshotsUpdate, base_metadata: TableMetadata, context: _Tab return new_metadata.model_copy(update={"snapshots": snapshots, "snapshot_log": snapshot_log}) - @_apply_table_update.register(RemoveSnapshotRefUpdate) def _(update: RemoveSnapshotRefUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata: if update.ref_name not in base_metadata.refs: