From 3da1528a41ddd34e99cdf395ade390059db1de32 Mon Sep 17 00:00:00 2001 From: ForeverAngry <61765732+ForeverAngry@users.noreply.github.com> Date: Thu, 21 Aug 2025 23:17:09 -0400 Subject: [PATCH 1/7] feat: add snapshot expiration methods with retention strategies --- pyiceberg/table/update/snapshot.py | 196 ++++++++++++++++++++++++++- tests/table/test_expire_snapshots.py | 171 +++++++++++++++++++++++ 2 files changed, 366 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 2ed03ced73..816dfd013f 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -953,7 +953,7 @@ def _get_protected_snapshot_ids(self) -> Set[int]: if ref.snapshot_ref_type in [SnapshotRefType.TAG, SnapshotRefType.BRANCH] } - def by_id(self, snapshot_id: int) -> ExpireSnapshots: + def by_id(self, snapshot_id: int) -> "ExpireSnapshots": """ Expire a snapshot by its ID. @@ -1005,3 +1005,197 @@ def older_than(self, dt: datetime) -> "ExpireSnapshots": if snapshot.timestamp_ms < expire_from and snapshot.snapshot_id not in protected_ids: self._snapshot_ids_to_expire.add(snapshot.snapshot_id) return self + + def older_than_with_retention( + self, timestamp_ms: int, retain_last_n: Optional[int] = None, min_snapshots_to_keep: Optional[int] = None + ) -> "ExpireSnapshots": + """Expire all unprotected snapshots with a timestamp older than a given value, with retention strategies. + + Args: + timestamp_ms: Only snapshots with timestamp_ms < this value will be expired. + retain_last_n: Always keep the last N snapshots regardless of age. + min_snapshots_to_keep: Minimum number of snapshots to keep in total. + + Returns: + This for method chaining. + """ + snapshots_to_expire = self._get_snapshots_to_expire_with_retention( + timestamp_ms=timestamp_ms, retain_last_n=retain_last_n, min_snapshots_to_keep=min_snapshots_to_keep + ) + self._snapshot_ids_to_expire.update(snapshots_to_expire) + return self + + def with_retention_policy( + self, timestamp_ms: Optional[int] = None, retain_last_n: Optional[int] = None, min_snapshots_to_keep: Optional[int] = None + ) -> "ExpireSnapshots": + """Comprehensive snapshot expiration with multiple retention strategies. + + This method provides a unified interface for snapshot expiration with various + retention policies to ensure operational resilience while allowing space reclamation. + + The method will use table properties as defaults if they are set: + - history.expire.max-snapshot-age-ms: Default for timestamp_ms if not provided + - history.expire.min-snapshots-to-keep: Default for min_snapshots_to_keep if not provided + - history.expire.max-ref-age-ms: Used for ref expiration (branches/tags) + + Args: + timestamp_ms: Only snapshots with timestamp_ms < this value will be considered for expiration. + If None, will use history.expire.max-snapshot-age-ms table property if set. + retain_last_n: Always keep the last N snapshots regardless of age. + Useful when regular snapshot creation occurs and users want to keep + the last few for rollback purposes. + min_snapshots_to_keep: Minimum number of snapshots to keep in total. + Acts as a guardrail to prevent aggressive expiration logic. + If None, will use history.expire.min-snapshots-to-keep table property if set. + + Returns: + This for method chaining. + + Raises: + ValueError: If retain_last_n or min_snapshots_to_keep is less than 1. + + Examples: + # Use table property defaults + table.expire_snapshots().with_retention_policy().commit() + + # Override defaults with explicit values + table.expire_snapshots().with_retention_policy( + timestamp_ms=1234567890000, + retain_last_n=10, + min_snapshots_to_keep=5 + ).commit() + """ + # Get default values from table properties + default_max_age, default_min_snapshots, _ = self._get_expiration_properties() + + # Use defaults from table properties if not explicitly provided + if timestamp_ms is None: + timestamp_ms = default_max_age + + if min_snapshots_to_keep is None: + min_snapshots_to_keep = default_min_snapshots + + # If no expiration criteria are provided, don't expire anything + if timestamp_ms is None and retain_last_n is None and min_snapshots_to_keep is None: + return self + + if retain_last_n is not None and retain_last_n < 1: + raise ValueError("retain_last_n must be at least 1") + + if min_snapshots_to_keep is not None and min_snapshots_to_keep < 1: + raise ValueError("min_snapshots_to_keep must be at least 1") + + snapshots_to_expire = self._get_snapshots_to_expire_with_retention( + timestamp_ms=timestamp_ms, retain_last_n=retain_last_n, min_snapshots_to_keep=min_snapshots_to_keep + ) + self._snapshot_ids_to_expire.update(snapshots_to_expire) + return self + + def retain_last_n(self, n: int) -> "ExpireSnapshots": + """Keep only the last N snapshots, expiring all others. + + Args: + n: Number of most recent snapshots to keep. + + Returns: + This for method chaining. + + Raises: + ValueError: If n is less than 1. + """ + if n < 1: + raise ValueError("Number of snapshots to retain must be at least 1") + + protected_ids = self._get_protected_snapshot_ids() + + # Sort snapshots by timestamp (most recent first) + sorted_snapshots = sorted(self._transaction.table_metadata.snapshots, key=lambda s: s.timestamp_ms, reverse=True) + + # Keep the last N snapshots and all protected ones + snapshots_to_keep = set() + snapshots_to_keep.update(protected_ids) + + # Add the N most recent snapshots + for i, snapshot in enumerate(sorted_snapshots): + if i < n: + snapshots_to_keep.add(snapshot.snapshot_id) + + # Find snapshots to expire + snapshots_to_expire = [] + for snapshot in self._transaction.table_metadata.snapshots: + if snapshot.snapshot_id not in snapshots_to_keep: + snapshots_to_expire.append(snapshot.snapshot_id) + + self._snapshot_ids_to_expire.update(snapshots_to_expire) + return self + + def _get_snapshots_to_expire_with_retention( + self, timestamp_ms: Optional[int] = None, retain_last_n: Optional[int] = None, min_snapshots_to_keep: Optional[int] = None + ) -> List[int]: + """Get snapshots to expire considering retention strategies. + + Args: + timestamp_ms: Only snapshots with timestamp_ms < this value will be considered for expiration. + retain_last_n: Always keep the last N snapshots regardless of age. + min_snapshots_to_keep: Minimum number of snapshots to keep in total. + + Returns: + List of snapshot IDs to expire. + """ + protected_ids = self._get_protected_snapshot_ids() + + # Sort snapshots by timestamp (most recent first) + sorted_snapshots = sorted(self._transaction.table_metadata.snapshots, key=lambda s: s.timestamp_ms, reverse=True) + + # Start with all snapshots that could be expired + candidates_for_expiration = [] + snapshots_to_keep = set(protected_ids) + + # Apply retain_last_n constraint + if retain_last_n is not None: + for i, snapshot in enumerate(sorted_snapshots): + if i < retain_last_n: + snapshots_to_keep.add(snapshot.snapshot_id) + + # Apply timestamp constraint + for snapshot in self._transaction.table_metadata.snapshots: + if snapshot.snapshot_id not in snapshots_to_keep and (timestamp_ms is None or snapshot.timestamp_ms < timestamp_ms): + candidates_for_expiration.append(snapshot) + + # Sort candidates by timestamp (oldest first) for potential expiration + candidates_for_expiration.sort(key=lambda s: s.timestamp_ms) + + # Apply min_snapshots_to_keep constraint + total_snapshots = len(self._transaction.table_metadata.snapshots) + snapshots_to_expire: List[int] = [] + + for candidate in candidates_for_expiration: + # Check if expiring this snapshot would violate min_snapshots_to_keep + remaining_after_expiration = total_snapshots - len(snapshots_to_expire) - 1 + + if min_snapshots_to_keep is None or remaining_after_expiration >= min_snapshots_to_keep: + snapshots_to_expire.append(candidate.snapshot_id) + else: + # Stop expiring to maintain minimum count + break + + return snapshots_to_expire + + def _get_expiration_properties(self) -> tuple[Optional[int], Optional[int], Optional[int]]: + """Get the default expiration properties from table properties. + + Returns: + Tuple of (max_snapshot_age_ms, min_snapshots_to_keep, max_ref_age_ms) + """ + properties = self._transaction.table_metadata.properties + + max_snapshot_age_ms = properties.get("history.expire.max-snapshot-age-ms") + max_snapshot_age = int(max_snapshot_age_ms) if max_snapshot_age_ms is not None else None + + min_snapshots = properties.get("history.expire.min-snapshots-to-keep") + min_snapshots_to_keep = int(min_snapshots) if min_snapshots is not None else None + + max_ref_age = properties.get("history.expire.max-ref-age-ms") + max_ref_age_ms = int(max_ref_age) if max_ref_age is not None else None + + return max_snapshot_age, min_snapshots_to_keep, max_ref_age_ms diff --git a/tests/table/test_expire_snapshots.py b/tests/table/test_expire_snapshots.py index e2b2d47b67..ddfed2aab9 100644 --- a/tests/table/test_expire_snapshots.py +++ b/tests/table/test_expire_snapshots.py @@ -21,6 +21,7 @@ import pytest from pyiceberg.table import CommitTableResponse, Table +from pyiceberg.table.update.snapshot import ExpireSnapshots def test_cannot_expire_protected_head_snapshot(table_v2: Table) -> None: @@ -223,3 +224,173 @@ def test_expire_snapshots_by_ids(table_v2: Table) -> None: assert EXPIRE_SNAPSHOT_1 not in remaining_snapshots assert EXPIRE_SNAPSHOT_2 not in remaining_snapshots assert len(table_v2.metadata.snapshots) == 1 + + +def test_retain_last_n_with_protection(table_v2: Table) -> None: + """Test retain_last_n keeps most recent snapshots plus protected ones.""" + from types import SimpleNamespace + + # Clear shared state set on the class between tests + ExpireSnapshots._snapshot_ids_to_expire.clear() + + S1 = 101 # oldest (also protected) + S2 = 102 + S3 = 103 + S4 = 104 # newest + + # Protected S1 as branch head + table_v2.metadata = table_v2.metadata.model_copy( + update={ + "refs": { + "main": MagicMock(snapshot_id=S1, snapshot_ref_type="branch"), + }, + "snapshots": [ + SimpleNamespace(snapshot_id=S1, timestamp_ms=1, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=S2, timestamp_ms=2, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=S3, timestamp_ms=3, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=S4, timestamp_ms=4, parent_snapshot_id=None), + ], + } + ) + + table_v2.catalog = MagicMock() + kept_ids = {S1, S3, S4} # retain_last_n=2 keeps S4,S3 plus protected S1 + mock_response = CommitTableResponse( + metadata=table_v2.metadata.model_copy(update={"snapshots": list(kept_ids)}), + metadata_location="mock://metadata/location", + uuid=uuid4(), + ) + table_v2.catalog.commit_table.return_value = mock_response + + table_v2.maintenance.expire_snapshots().retain_last_n(2).commit() + table_v2.metadata = mock_response.metadata + + args, kwargs = table_v2.catalog.commit_table.call_args + updates = args[2] if len(args) > 2 else () + remove_update = next((u for u in updates if getattr(u, "action", None) == "remove-snapshots"), None) + assert remove_update is not None + # Only S2 should be expired + assert set(remove_update.snapshot_ids) == {S2} + assert S2 not in table_v2.metadata.snapshots + + +def test_older_than_with_retention_combination(table_v2: Table) -> None: + """Test older_than_with_retention combining timestamp, retain_last_n and min_snapshots_to_keep.""" + from types import SimpleNamespace + + ExpireSnapshots._snapshot_ids_to_expire.clear() + + # Create 5 snapshots with increasing timestamps + S1, S2, S3, S4, S5 = 201, 202, 203, 204, 205 + snapshots = [ + SimpleNamespace(snapshot_id=S1, timestamp_ms=100, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=S2, timestamp_ms=200, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=S3, timestamp_ms=300, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=S4, timestamp_ms=400, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=S5, timestamp_ms=500, parent_snapshot_id=None), + ] + table_v2.metadata = table_v2.metadata.model_copy(update={"refs": {}, "snapshots": snapshots}) + table_v2.catalog = MagicMock() + + # Expect to expire S1,S2,S3 ; keep S4 (due to min snapshots) and S5 (retain_last_n=1) + mock_response = CommitTableResponse( + metadata=table_v2.metadata.model_copy(update={"snapshots": [S4, S5]}), + metadata_location="mock://metadata/location", + uuid=uuid4(), + ) + table_v2.catalog.commit_table.return_value = mock_response + + table_v2.maintenance.expire_snapshots().older_than_with_retention( + timestamp_ms=450, retain_last_n=1, min_snapshots_to_keep=2 + ).commit() + table_v2.metadata = mock_response.metadata + + args, kwargs = table_v2.catalog.commit_table.call_args + updates = args[2] if len(args) > 2 else () + remove_update = next((u for u in updates if getattr(u, "action", None) == "remove-snapshots"), None) + assert remove_update is not None + assert set(remove_update.snapshot_ids) == {S1, S2, S3} + assert set(table_v2.metadata.snapshots) == {S4, S5} + + +def test_with_retention_policy_defaults(table_v2: Table) -> None: + """Test with_retention_policy uses table property defaults when arguments omitted.""" + from types import SimpleNamespace + + ExpireSnapshots._snapshot_ids_to_expire.clear() + + # Properties: expire snapshots older than 350ms, keep at least 3 snapshots + properties = { + "history.expire.max-snapshot-age-ms": "350", + "history.expire.min-snapshots-to-keep": "3", + } + S1, S2, S3, S4, S5 = 301, 302, 303, 304, 305 + snapshots = [ + SimpleNamespace(snapshot_id=S1, timestamp_ms=100, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=S2, timestamp_ms=200, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=S3, timestamp_ms=300, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=S4, timestamp_ms=400, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=S5, timestamp_ms=500, parent_snapshot_id=None), + ] + table_v2.metadata = table_v2.metadata.model_copy(update={"refs": {}, "snapshots": snapshots, "properties": properties}) + table_v2.catalog = MagicMock() + + # Expect S1,S2 expired; S3 kept due to min_snapshots_to_keep + mock_response = CommitTableResponse( + metadata=table_v2.metadata.model_copy(update={"snapshots": [S3, S4, S5]}), + metadata_location="mock://metadata/location", + uuid=uuid4(), + ) + table_v2.catalog.commit_table.return_value = mock_response + + table_v2.maintenance.expire_snapshots().with_retention_policy().commit() + table_v2.metadata = mock_response.metadata + + args, kwargs = table_v2.catalog.commit_table.call_args + updates = args[2] if len(args) > 2 else () + remove_update = next((u for u in updates if getattr(u, "action", None) == "remove-snapshots"), None) + assert remove_update is not None + assert set(remove_update.snapshot_ids) == {S1, S2} + assert set(table_v2.metadata.snapshots) == {S3, S4, S5} + + +def test_get_expiration_properties(table_v2: Table) -> None: + """Test retrieval of expiration properties from table metadata.""" + ExpireSnapshots._snapshot_ids_to_expire.clear() + properties = { + "history.expire.max-snapshot-age-ms": "60000", + "history.expire.min-snapshots-to-keep": "5", + "history.expire.max-ref-age-ms": "120000", + } + table_v2.metadata = table_v2.metadata.model_copy(update={"properties": properties}) + expire = table_v2.maintenance.expire_snapshots() + max_age, min_snaps, max_ref_age = expire._get_expiration_properties() + assert max_age == 60000 + assert min_snaps == 5 + assert max_ref_age == 120000 + + +def test_get_snapshots_to_expire_with_retention_respects_protection(table_v2: Table) -> None: + """Internal helper should not select protected snapshots for expiration.""" + from types import SimpleNamespace + + ExpireSnapshots._snapshot_ids_to_expire.clear() + + P = 401 # protected + A = 402 + B = 403 + table_v2.metadata = table_v2.metadata.model_copy( + update={ + "refs": {"main": MagicMock(snapshot_id=P, snapshot_ref_type="branch")}, + "snapshots": [ + SimpleNamespace(snapshot_id=P, timestamp_ms=10, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=A, timestamp_ms=20, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=B, timestamp_ms=30, parent_snapshot_id=None), + ], + } + ) + expire = table_v2.maintenance.expire_snapshots() + to_expire = expire._get_snapshots_to_expire_with_retention(timestamp_ms=100, retain_last_n=None, min_snapshots_to_keep=1) + # Protected snapshot P should not be in list; both A and B can expire respecting min keep + assert P not in to_expire + assert set(to_expire) == {A, B} From a8fe03d329d3054d44a06e69208682d454a1cfec Mon Sep 17 00:00:00 2001 From: ForeverAngry <61765732+ForeverAngry@users.noreply.github.com> Date: Thu, 21 Aug 2025 23:31:55 -0400 Subject: [PATCH 2/7] chore: documented new retention methods --- mkdocs/docs/api.md | 55 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 0e0dc375de..dfa6067652 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -1376,6 +1376,61 @@ def cleanup_old_snapshots(table_name: str, snapshot_ids: list[int]): cleanup_old_snapshots("analytics.user_events", [12345, 67890, 11111]) ``` +#### Advanced Retention Strategies + +PyIceberg provides additional retention helpers on `ExpireSnapshots` to balance safety and cleanup: + +Key table properties used as defaults (all optional): + +- `history.expire.max-snapshot-age-ms`: Default age threshold for `with_retention_policy` +- `history.expire.min-snapshots-to-keep`: Minimum total snapshots to retain +- `history.expire.max-ref-age-ms`: (Reserved for future protected ref/branch cleanup logic) + +Protected snapshots (referenced by branches or tags) are never expired by these APIs. + +Keep only the last N snapshots (plus protected): + +```python +table.maintenance.expire_snapshots().retain_last_n(5).commit() +``` + +Expire older snapshots but always keep the most recent N and a safety floor: + +```python +from datetime import datetime, timedelta + +cutoff = int((datetime.now() - timedelta(days=7)).timestamp() * 1000) +table.maintenance.expire_snapshots().older_than_with_retention( + timestamp_ms=cutoff, + retain_last_n=3, + min_snapshots_to_keep=4, +).commit() +``` + +Unified policy that also reads table property defaults: + +```python +# Uses table properties if arguments omitted +table.maintenance.expire_snapshots().with_retention_policy().commit() + +# Override selectively +table.maintenance.expire_snapshots().with_retention_policy( + retain_last_n=2, # keep 2 newest regardless of age + min_snapshots_to_keep=5, # never go below 5 total + # timestamp_ms omitted -> falls back to history.expire.max-snapshot-age-ms if set +).commit() +``` + +Parameter interaction rules: + +- `retain_last_n` snapshots are always kept (plus protected refs) +- `timestamp_ms` filters candidates (older than only) +- `min_snapshots_to_keep` stops expiration once the floor would be violated +- If all of (`timestamp_ms`, `retain_last_n`, `min_snapshots_to_keep`) are None in `with_retention_policy`, nothing is expired +- Passing invalid values (`< 1`) for counts raises `ValueError` + +Safety tip: Start with higher `min_snapshots_to_keep` when first enabling automated cleanup. + ## Views PyIceberg supports view operations. From a1a126f1050cc1dfdfd3b2561426dd827c987b16 Mon Sep 17 00:00:00 2001 From: ForeverAngry <61765732+ForeverAngry@users.noreply.github.com> Date: Thu, 28 Aug 2025 22:16:56 -0400 Subject: [PATCH 3/7] test: enhance snapshot expiration tests with retention policy validations --- tests/table/test_expire_snapshots.py | 200 +++++++++++++++------------ 1 file changed, 111 insertions(+), 89 deletions(-) diff --git a/tests/table/test_expire_snapshots.py b/tests/table/test_expire_snapshots.py index ddfed2aab9..59491ff767 100644 --- a/tests/table/test_expire_snapshots.py +++ b/tests/table/test_expire_snapshots.py @@ -233,28 +233,28 @@ def test_retain_last_n_with_protection(table_v2: Table) -> None: # Clear shared state set on the class between tests ExpireSnapshots._snapshot_ids_to_expire.clear() - S1 = 101 # oldest (also protected) - S2 = 102 - S3 = 103 - S4 = 104 # newest + PROTECTED_SNAPSHOT = 3051729675574597101 # oldest (also protected) + EXPIRE_SNAPSHOT = 3051729675574597102 + KEEP_SNAPSHOT_1 = 3051729675574597103 + KEEP_SNAPSHOT_2 = 3051729675574597104 # newest - # Protected S1 as branch head + # Protected PROTECTED_SNAPSHOT as branch head table_v2.metadata = table_v2.metadata.model_copy( update={ "refs": { - "main": MagicMock(snapshot_id=S1, snapshot_ref_type="branch"), + "main": MagicMock(snapshot_id=PROTECTED_SNAPSHOT, snapshot_ref_type="branch"), }, "snapshots": [ - SimpleNamespace(snapshot_id=S1, timestamp_ms=1, parent_snapshot_id=None), - SimpleNamespace(snapshot_id=S2, timestamp_ms=2, parent_snapshot_id=None), - SimpleNamespace(snapshot_id=S3, timestamp_ms=3, parent_snapshot_id=None), - SimpleNamespace(snapshot_id=S4, timestamp_ms=4, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=PROTECTED_SNAPSHOT, timestamp_ms=1, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=EXPIRE_SNAPSHOT, timestamp_ms=2, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=KEEP_SNAPSHOT_1, timestamp_ms=3, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=KEEP_SNAPSHOT_2, timestamp_ms=4, parent_snapshot_id=None), ], } ) table_v2.catalog = MagicMock() - kept_ids = {S1, S3, S4} # retain_last_n=2 keeps S4,S3 plus protected S1 + kept_ids = {PROTECTED_SNAPSHOT, KEEP_SNAPSHOT_1, KEEP_SNAPSHOT_2} # retain_last_n=2 keeps newest plus protected mock_response = CommitTableResponse( metadata=table_v2.metadata.model_copy(update={"snapshots": list(kept_ids)}), metadata_location="mock://metadata/location", @@ -269,128 +269,150 @@ def test_retain_last_n_with_protection(table_v2: Table) -> None: updates = args[2] if len(args) > 2 else () remove_update = next((u for u in updates if getattr(u, "action", None) == "remove-snapshots"), None) assert remove_update is not None - # Only S2 should be expired - assert set(remove_update.snapshot_ids) == {S2} - assert S2 not in table_v2.metadata.snapshots + # Only EXPIRE_SNAPSHOT should be expired + assert set(remove_update.snapshot_ids) == {EXPIRE_SNAPSHOT} + assert EXPIRE_SNAPSHOT not in table_v2.metadata.snapshots -def test_older_than_with_retention_combination(table_v2: Table) -> None: - """Test older_than_with_retention combining timestamp, retain_last_n and min_snapshots_to_keep.""" +def test_retain_last_n_validation(table_v2: Table) -> None: + """Test retain_last_n validates n >= 1.""" + ExpireSnapshots._snapshot_ids_to_expire.clear() + + with pytest.raises(ValueError, match="Number of snapshots to retain must be at least 1"): + table_v2.maintenance.expire_snapshots().retain_last_n(0) + + with pytest.raises(ValueError, match="Number of snapshots to retain must be at least 1"): + table_v2.maintenance.expire_snapshots().retain_last_n(-1) + + +def test_with_retention_policy_validation(table_v2: Table) -> None: + """Test with_retention_policy validates parameter ranges.""" + ExpireSnapshots._snapshot_ids_to_expire.clear() + + with pytest.raises(ValueError, match="retain_last_n must be at least 1"): + table_v2.maintenance.expire_snapshots().with_retention_policy(retain_last_n=0).commit() + + with pytest.raises(ValueError, match="min_snapshots_to_keep must be at least 1"): + table_v2.maintenance.expire_snapshots().with_retention_policy(min_snapshots_to_keep=0).commit() + + +def test_with_retention_policy_no_criteria_does_nothing(table_v2: Table) -> None: + """Test with_retention_policy does nothing when no criteria provided.""" + from types import SimpleNamespace + + ExpireSnapshots._snapshot_ids_to_expire.clear() + + SNAPSHOT_1, SNAPSHOT_2 = 3051729675574597501, 3051729675574597502 + snapshots = [ + SimpleNamespace(snapshot_id=SNAPSHOT_1, timestamp_ms=100, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=SNAPSHOT_2, timestamp_ms=200, parent_snapshot_id=None), + ] + table_v2.metadata = table_v2.metadata.model_copy(update={"refs": {}, "snapshots": snapshots, "properties": {}}) + table_v2.catalog = MagicMock() + + # Should be a no-op + result = table_v2.maintenance.expire_snapshots().with_retention_policy() + assert result._snapshot_ids_to_expire == set() + + +def test_get_expiration_properties_missing_values(table_v2: Table) -> None: + """Test _get_expiration_properties handles missing properties gracefully.""" + ExpireSnapshots._snapshot_ids_to_expire.clear() + + # No expiration properties set + table_v2.metadata = table_v2.metadata.model_copy(update={"properties": {}}) + expire = table_v2.maintenance.expire_snapshots() + max_age, min_snaps, max_ref_age = expire._get_expiration_properties() + + assert max_age is None + assert min_snaps is None + assert max_ref_age is None + + +def test_retain_last_n_fewer_snapshots_than_requested(table_v2: Table) -> None: + """Test retain_last_n when table has fewer snapshots than requested.""" from types import SimpleNamespace ExpireSnapshots._snapshot_ids_to_expire.clear() - # Create 5 snapshots with increasing timestamps - S1, S2, S3, S4, S5 = 201, 202, 203, 204, 205 + SNAPSHOT_1, SNAPSHOT_2 = 3051729675574597601, 3051729675574597602 snapshots = [ - SimpleNamespace(snapshot_id=S1, timestamp_ms=100, parent_snapshot_id=None), - SimpleNamespace(snapshot_id=S2, timestamp_ms=200, parent_snapshot_id=None), - SimpleNamespace(snapshot_id=S3, timestamp_ms=300, parent_snapshot_id=None), - SimpleNamespace(snapshot_id=S4, timestamp_ms=400, parent_snapshot_id=None), - SimpleNamespace(snapshot_id=S5, timestamp_ms=500, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=SNAPSHOT_1, timestamp_ms=100, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=SNAPSHOT_2, timestamp_ms=200, parent_snapshot_id=None), + ] + table_v2.metadata = table_v2.metadata.model_copy(update={"refs": {}, "snapshots": snapshots}) + table_v2.catalog = MagicMock() + + # Request to keep 5 snapshots, but only 2 exist + result = table_v2.maintenance.expire_snapshots().retain_last_n(5) + # Should not expire anything + assert result._snapshot_ids_to_expire == set() + + +def test_older_than_with_retention_edge_cases(table_v2: Table) -> None: + """Test edge cases for older_than_with_retention.""" + from types import SimpleNamespace + + ExpireSnapshots._snapshot_ids_to_expire.clear() + + # Test with retain_last_n and a valid timestamp + EXPIRE_SNAPSHOT, KEEP_SNAPSHOT_1, KEEP_SNAPSHOT_2 = 3051729675574597701, 3051729675574597702, 3051729675574597703 + snapshots = [ + SimpleNamespace(snapshot_id=EXPIRE_SNAPSHOT, timestamp_ms=100, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=KEEP_SNAPSHOT_1, timestamp_ms=200, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=KEEP_SNAPSHOT_2, timestamp_ms=300, parent_snapshot_id=None), ] table_v2.metadata = table_v2.metadata.model_copy(update={"refs": {}, "snapshots": snapshots}) table_v2.catalog = MagicMock() - # Expect to expire S1,S2,S3 ; keep S4 (due to min snapshots) and S5 (retain_last_n=1) mock_response = CommitTableResponse( - metadata=table_v2.metadata.model_copy(update={"snapshots": [S4, S5]}), + metadata=table_v2.metadata.model_copy(update={"snapshots": [KEEP_SNAPSHOT_1, KEEP_SNAPSHOT_2]}), metadata_location="mock://metadata/location", uuid=uuid4(), ) table_v2.catalog.commit_table.return_value = mock_response + # Use a timestamp that would include all snapshots, but retain_last_n=2 should limit to keeping KEEP_SNAPSHOT_1, KEEP_SNAPSHOT_2 table_v2.maintenance.expire_snapshots().older_than_with_retention( - timestamp_ms=450, retain_last_n=1, min_snapshots_to_keep=2 + timestamp_ms=400, retain_last_n=2, min_snapshots_to_keep=None ).commit() - table_v2.metadata = mock_response.metadata args, kwargs = table_v2.catalog.commit_table.call_args updates = args[2] if len(args) > 2 else () remove_update = next((u for u in updates if getattr(u, "action", None) == "remove-snapshots"), None) assert remove_update is not None - assert set(remove_update.snapshot_ids) == {S1, S2, S3} - assert set(table_v2.metadata.snapshots) == {S4, S5} + assert set(remove_update.snapshot_ids) == {EXPIRE_SNAPSHOT} -def test_with_retention_policy_defaults(table_v2: Table) -> None: - """Test with_retention_policy uses table property defaults when arguments omitted.""" +def test_with_retention_policy_partial_properties(table_v2: Table) -> None: + """Test with_retention_policy with only some properties set.""" from types import SimpleNamespace ExpireSnapshots._snapshot_ids_to_expire.clear() - # Properties: expire snapshots older than 350ms, keep at least 3 snapshots - properties = { - "history.expire.max-snapshot-age-ms": "350", - "history.expire.min-snapshots-to-keep": "3", - } - S1, S2, S3, S4, S5 = 301, 302, 303, 304, 305 + # Only max-snapshot-age-ms set, no min-snapshots-to-keep + properties = {"history.expire.max-snapshot-age-ms": "250"} + EXPIRE_SNAPSHOT_1, EXPIRE_SNAPSHOT_2, KEEP_SNAPSHOT = 3051729675574597801, 3051729675574597802, 3051729675574597803 snapshots = [ - SimpleNamespace(snapshot_id=S1, timestamp_ms=100, parent_snapshot_id=None), - SimpleNamespace(snapshot_id=S2, timestamp_ms=200, parent_snapshot_id=None), - SimpleNamespace(snapshot_id=S3, timestamp_ms=300, parent_snapshot_id=None), - SimpleNamespace(snapshot_id=S4, timestamp_ms=400, parent_snapshot_id=None), - SimpleNamespace(snapshot_id=S5, timestamp_ms=500, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=EXPIRE_SNAPSHOT_1, timestamp_ms=100, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=EXPIRE_SNAPSHOT_2, timestamp_ms=200, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=KEEP_SNAPSHOT, timestamp_ms=300, parent_snapshot_id=None), ] table_v2.metadata = table_v2.metadata.model_copy(update={"refs": {}, "snapshots": snapshots, "properties": properties}) table_v2.catalog = MagicMock() - # Expect S1,S2 expired; S3 kept due to min_snapshots_to_keep mock_response = CommitTableResponse( - metadata=table_v2.metadata.model_copy(update={"snapshots": [S3, S4, S5]}), + metadata=table_v2.metadata.model_copy(update={"snapshots": [KEEP_SNAPSHOT]}), metadata_location="mock://metadata/location", uuid=uuid4(), ) table_v2.catalog.commit_table.return_value = mock_response + # Should expire EXPIRE_SNAPSHOT_1,EXPIRE_SNAPSHOT_2 (older than 250ms), keep KEEP_SNAPSHOT table_v2.maintenance.expire_snapshots().with_retention_policy().commit() - table_v2.metadata = mock_response.metadata args, kwargs = table_v2.catalog.commit_table.call_args updates = args[2] if len(args) > 2 else () remove_update = next((u for u in updates if getattr(u, "action", None) == "remove-snapshots"), None) assert remove_update is not None - assert set(remove_update.snapshot_ids) == {S1, S2} - assert set(table_v2.metadata.snapshots) == {S3, S4, S5} - - -def test_get_expiration_properties(table_v2: Table) -> None: - """Test retrieval of expiration properties from table metadata.""" - ExpireSnapshots._snapshot_ids_to_expire.clear() - properties = { - "history.expire.max-snapshot-age-ms": "60000", - "history.expire.min-snapshots-to-keep": "5", - "history.expire.max-ref-age-ms": "120000", - } - table_v2.metadata = table_v2.metadata.model_copy(update={"properties": properties}) - expire = table_v2.maintenance.expire_snapshots() - max_age, min_snaps, max_ref_age = expire._get_expiration_properties() - assert max_age == 60000 - assert min_snaps == 5 - assert max_ref_age == 120000 - - -def test_get_snapshots_to_expire_with_retention_respects_protection(table_v2: Table) -> None: - """Internal helper should not select protected snapshots for expiration.""" - from types import SimpleNamespace - - ExpireSnapshots._snapshot_ids_to_expire.clear() - - P = 401 # protected - A = 402 - B = 403 - table_v2.metadata = table_v2.metadata.model_copy( - update={ - "refs": {"main": MagicMock(snapshot_id=P, snapshot_ref_type="branch")}, - "snapshots": [ - SimpleNamespace(snapshot_id=P, timestamp_ms=10, parent_snapshot_id=None), - SimpleNamespace(snapshot_id=A, timestamp_ms=20, parent_snapshot_id=None), - SimpleNamespace(snapshot_id=B, timestamp_ms=30, parent_snapshot_id=None), - ], - } - ) - expire = table_v2.maintenance.expire_snapshots() - to_expire = expire._get_snapshots_to_expire_with_retention(timestamp_ms=100, retain_last_n=None, min_snapshots_to_keep=1) - # Protected snapshot P should not be in list; both A and B can expire respecting min keep - assert P not in to_expire - assert set(to_expire) == {A, B} + assert set(remove_update.snapshot_ids) == {EXPIRE_SNAPSHOT_1, EXPIRE_SNAPSHOT_2} From 2c6eb0b02324828b3f92abd4737ffd05953bcbff Mon Sep 17 00:00:00 2001 From: ForeverAngry <61765732+ForeverAngry@users.noreply.github.com> Date: Thu, 28 Aug 2025 22:31:50 -0400 Subject: [PATCH 4/7] feat: add context manager examples for snapshot expiration with retention policies --- mkdocs/docs/api.md | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index dfa6067652..70a192ff0f 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -1421,6 +1421,27 @@ table.maintenance.expire_snapshots().with_retention_policy( ).commit() ``` +##### Using a context manager + +You can use a context manager to automatically commit on successful exit (and skip commit if an exception occurs): + +```python +# Keep the 3 newest snapshots (plus protected refs) and enforce a floor of 8 total +with table.maintenance.expire_snapshots() as expire: + expire.with_retention_policy(retain_last_n=3, min_snapshots_to_keep=8) + +# Only keep the last 5 snapshots +with table.maintenance.expire_snapshots() as expire: + expire.retain_last_n(5) + +# Combine explicit cutoff with other guards +from datetime import datetime, timedelta +cutoff = int((datetime.utcnow() - timedelta(days=14)).timestamp() * 1000) + +with table.maintenance.expire_snapshots() as expire: + expire.older_than_with_retention(timestamp_ms=cutoff, retain_last_n=2, min_snapshots_to_keep=6) +``` + Parameter interaction rules: - `retain_last_n` snapshots are always kept (plus protected refs) From 855e22abb6c8e35f7b1b80c8a8e20ef6ce8155d8 Mon Sep 17 00:00:00 2001 From: ForeverAngry <61765732+ForeverAngry@users.noreply.github.com> Date: Sun, 14 Sep 2025 20:59:51 -0400 Subject: [PATCH 5/7] chore: update snapshot.py for code cleanup and organization --- pyiceberg/table/update/snapshot.py | 92 ++++++++++++------------------ 1 file changed, 37 insertions(+), 55 deletions(-) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 816dfd013f..cf954e3b7f 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -953,7 +953,7 @@ def _get_protected_snapshot_ids(self) -> Set[int]: if ref.snapshot_ref_type in [SnapshotRefType.TAG, SnapshotRefType.BRANCH] } - def by_id(self, snapshot_id: int) -> "ExpireSnapshots": + def by_id(self, snapshot_id: int) -> ExpireSnapshots: """ Expire a snapshot by its ID. @@ -1008,7 +1008,7 @@ def older_than(self, dt: datetime) -> "ExpireSnapshots": def older_than_with_retention( self, timestamp_ms: int, retain_last_n: Optional[int] = None, min_snapshots_to_keep: Optional[int] = None - ) -> "ExpireSnapshots": + ) -> ExpireSnapshots: """Expire all unprotected snapshots with a timestamp older than a given value, with retention strategies. Args: @@ -1027,7 +1027,7 @@ def older_than_with_retention( def with_retention_policy( self, timestamp_ms: Optional[int] = None, retain_last_n: Optional[int] = None, min_snapshots_to_keep: Optional[int] = None - ) -> "ExpireSnapshots": + ) -> ExpireSnapshots: """Comprehensive snapshot expiration with multiple retention strategies. This method provides a unified interface for snapshot expiration with various @@ -1091,7 +1091,7 @@ def with_retention_policy( self._snapshot_ids_to_expire.update(snapshots_to_expire) return self - def retain_last_n(self, n: int) -> "ExpireSnapshots": + def retain_last_n(self, n: int) -> ExpireSnapshots: """Keep only the last N snapshots, expiring all others. Args: @@ -1106,28 +1106,31 @@ def retain_last_n(self, n: int) -> "ExpireSnapshots": if n < 1: raise ValueError("Number of snapshots to retain must be at least 1") - protected_ids = self._get_protected_snapshot_ids() + snapshots_to_keep = self._get_snapshots_to_keep(retain_last_n=n) + snapshots_to_expire = [ + id for snapshot in self._transaction.table_metadata.snapshots if (id := snapshot.snapshot_id) not in snapshots_to_keep + ] - # Sort snapshots by timestamp (most recent first) - sorted_snapshots = sorted(self._transaction.table_metadata.snapshots, key=lambda s: s.timestamp_ms, reverse=True) + self._snapshot_ids_to_expire.update(snapshots_to_expire) + return self - # Keep the last N snapshots and all protected ones - snapshots_to_keep = set() - snapshots_to_keep.update(protected_ids) + def _get_snapshots_to_keep(self, retain_last_n: Optional[int] = None) -> Set[int]: + """Get set of snapshot IDs that should be kept based on protection and retention rules. - # Add the N most recent snapshots - for i, snapshot in enumerate(sorted_snapshots): - if i < n: - snapshots_to_keep.add(snapshot.snapshot_id) + Args: + retain_last_n: Number of most recent snapshots to keep. - # Find snapshots to expire - snapshots_to_expire = [] - for snapshot in self._transaction.table_metadata.snapshots: - if snapshot.snapshot_id not in snapshots_to_keep: - snapshots_to_expire.append(snapshot.snapshot_id) + Returns: + Set of snapshot IDs to keep. + """ + snapshots_to_keep = self._get_protected_snapshot_ids() - self._snapshot_ids_to_expire.update(snapshots_to_expire) - return self + if retain_last_n is not None: + # Sort snapshots by timestamp (most recent first), and get most recent N + sorted_snapshots = sorted(self._transaction.table_metadata.snapshots, key=lambda s: s.timestamp_ms, reverse=True) + snapshots_to_keep.update(snapshot.snapshot_id for snapshot in sorted_snapshots[:retain_last_n]) + + return snapshots_to_keep def _get_snapshots_to_expire_with_retention( self, timestamp_ms: Optional[int] = None, retain_last_n: Optional[int] = None, min_snapshots_to_keep: Optional[int] = None @@ -1142,22 +1145,10 @@ def _get_snapshots_to_expire_with_retention( Returns: List of snapshot IDs to expire. """ - protected_ids = self._get_protected_snapshot_ids() - - # Sort snapshots by timestamp (most recent first) - sorted_snapshots = sorted(self._transaction.table_metadata.snapshots, key=lambda s: s.timestamp_ms, reverse=True) - - # Start with all snapshots that could be expired - candidates_for_expiration = [] - snapshots_to_keep = set(protected_ids) - - # Apply retain_last_n constraint - if retain_last_n is not None: - for i, snapshot in enumerate(sorted_snapshots): - if i < retain_last_n: - snapshots_to_keep.add(snapshot.snapshot_id) + snapshots_to_keep = self._get_snapshots_to_keep(retain_last_n=retain_last_n) # Apply timestamp constraint + candidates_for_expiration = [] for snapshot in self._transaction.table_metadata.snapshots: if snapshot.snapshot_id not in snapshots_to_keep and (timestamp_ms is None or snapshot.timestamp_ms < timestamp_ms): candidates_for_expiration.append(snapshot) @@ -1166,18 +1157,12 @@ def _get_snapshots_to_expire_with_retention( candidates_for_expiration.sort(key=lambda s: s.timestamp_ms) # Apply min_snapshots_to_keep constraint - total_snapshots = len(self._transaction.table_metadata.snapshots) - snapshots_to_expire: List[int] = [] - - for candidate in candidates_for_expiration: - # Check if expiring this snapshot would violate min_snapshots_to_keep - remaining_after_expiration = total_snapshots - len(snapshots_to_expire) - 1 - - if min_snapshots_to_keep is None or remaining_after_expiration >= min_snapshots_to_keep: - snapshots_to_expire.append(candidate.snapshot_id) - else: - # Stop expiring to maintain minimum count - break + if min_snapshots_to_keep is not None: + total_snapshots = len(self._transaction.table_metadata.snapshots) + max_to_expire = total_snapshots - min_snapshots_to_keep + snapshots_to_expire = [candidate.snapshot_id for candidate in candidates_for_expiration[:max_to_expire]] + else: + snapshots_to_expire = [candidate.snapshot_id for candidate in candidates_for_expiration] return snapshots_to_expire @@ -1187,15 +1172,12 @@ def _get_expiration_properties(self) -> tuple[Optional[int], Optional[int], Opti Returns: Tuple of (max_snapshot_age_ms, min_snapshots_to_keep, max_ref_age_ms) """ - properties = self._transaction.table_metadata.properties - - max_snapshot_age_ms = properties.get("history.expire.max-snapshot-age-ms") - max_snapshot_age = int(max_snapshot_age_ms) if max_snapshot_age_ms is not None else None + from pyiceberg.table import TableProperties - min_snapshots = properties.get("history.expire.min-snapshots-to-keep") - min_snapshots_to_keep = int(min_snapshots) if min_snapshots is not None else None + properties = self._transaction.table_metadata.properties - max_ref_age = properties.get("history.expire.max-ref-age-ms") - max_ref_age_ms = int(max_ref_age) if max_ref_age is not None else None + max_snapshot_age = property_as_int(properties, TableProperties.MAX_SNAPSHOT_AGE_MS) + min_snapshots_to_keep = property_as_int(properties, TableProperties.MIN_SNAPSHOTS_TO_KEEP) + max_ref_age_ms = property_as_int(properties, "history.expire.max-ref-age-ms") return max_snapshot_age, min_snapshots_to_keep, max_ref_age_ms From 29ae3a82aa6aa5a34cde5c28c85badfdbac4fce8 Mon Sep 17 00:00:00 2001 From: ForeverAngry <61765732+ForeverAngry@users.noreply.github.com> Date: Sun, 14 Sep 2025 21:06:30 -0400 Subject: [PATCH 6/7] Fixed: doc string for `retain_last_n` to properly describe the function. --- pyiceberg/table/update/snapshot.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index cf954e3b7f..72aa4e511b 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -1092,10 +1092,14 @@ def with_retention_policy( return self def retain_last_n(self, n: int) -> ExpireSnapshots: - """Keep only the last N snapshots, expiring all others. + """Keep only the last N snapshots globally across all branches, expiring all others. + + Note: This method considers snapshots globally across the entire table history, + not per-branch. Protected snapshots (branch/tag heads) are always preserved + regardless of the retention count. Args: - n: Number of most recent snapshots to keep. + n: Number of most recent snapshots to keep globally. Returns: This for method chaining. From ad556e366a6d1f328a9940cb04961dde2d29130b Mon Sep 17 00:00:00 2001 From: ForeverAngry <61765732+ForeverAngry@users.noreply.github.com> Date: Mon, 15 Sep 2025 18:50:47 -0400 Subject: [PATCH 7/7] Re-trigger CI/CD after fixing linting issues