diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 0e0dc375de..70a192ff0f 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -1376,6 +1376,82 @@ def cleanup_old_snapshots(table_name: str, snapshot_ids: list[int]): cleanup_old_snapshots("analytics.user_events", [12345, 67890, 11111]) ``` +#### Advanced Retention Strategies + +PyIceberg provides additional retention helpers on `ExpireSnapshots` to balance safety and cleanup: + +Key table properties used as defaults (all optional): + +- `history.expire.max-snapshot-age-ms`: Default age threshold for `with_retention_policy` +- `history.expire.min-snapshots-to-keep`: Minimum total snapshots to retain +- `history.expire.max-ref-age-ms`: (Reserved for future protected ref/branch cleanup logic) + +Protected snapshots (referenced by branches or tags) are never expired by these APIs. + +Keep only the last N snapshots (plus protected): + +```python +table.maintenance.expire_snapshots().retain_last_n(5).commit() +``` + +Expire older snapshots but always keep the most recent N and a safety floor: + +```python +from datetime import datetime, timedelta + +cutoff = int((datetime.now() - timedelta(days=7)).timestamp() * 1000) +table.maintenance.expire_snapshots().older_than_with_retention( + timestamp_ms=cutoff, + retain_last_n=3, + min_snapshots_to_keep=4, +).commit() +``` + +Unified policy that also reads table property defaults: + +```python +# Uses table properties if arguments omitted +table.maintenance.expire_snapshots().with_retention_policy().commit() + +# Override selectively +table.maintenance.expire_snapshots().with_retention_policy( + retain_last_n=2, # keep 2 newest regardless of age + min_snapshots_to_keep=5, # never go below 5 total + # timestamp_ms omitted -> falls back to history.expire.max-snapshot-age-ms if set +).commit() +``` + +##### Using a context manager + +You can use a context manager to automatically commit on successful exit (and skip commit if an exception occurs): + +```python +# Keep the 3 newest snapshots (plus protected refs) and enforce a floor of 8 total +with table.maintenance.expire_snapshots() as expire: + expire.with_retention_policy(retain_last_n=3, min_snapshots_to_keep=8) + +# Only keep the last 5 snapshots +with table.maintenance.expire_snapshots() as expire: + expire.retain_last_n(5) + +# Combine explicit cutoff with other guards +from datetime import datetime, timedelta +cutoff = int((datetime.utcnow() - timedelta(days=14)).timestamp() * 1000) + +with table.maintenance.expire_snapshots() as expire: + expire.older_than_with_retention(timestamp_ms=cutoff, retain_last_n=2, min_snapshots_to_keep=6) +``` + +Parameter interaction rules: + +- `retain_last_n` snapshots are always kept (plus protected refs) +- `timestamp_ms` filters candidates (older than only) +- `min_snapshots_to_keep` stops expiration once the floor would be violated +- If all of (`timestamp_ms`, `retain_last_n`, `min_snapshots_to_keep`) are None in `with_retention_policy`, nothing is expired +- Passing invalid values (`< 1`) for counts raises `ValueError` + +Safety tip: Start with higher `min_snapshots_to_keep` when first enabling automated cleanup. + ## Views PyIceberg supports view operations. diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 2ed03ced73..72aa4e511b 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -1005,3 +1005,183 @@ def older_than(self, dt: datetime) -> "ExpireSnapshots": if snapshot.timestamp_ms < expire_from and snapshot.snapshot_id not in protected_ids: self._snapshot_ids_to_expire.add(snapshot.snapshot_id) return self + + def older_than_with_retention( + self, timestamp_ms: int, retain_last_n: Optional[int] = None, min_snapshots_to_keep: Optional[int] = None + ) -> ExpireSnapshots: + """Expire all unprotected snapshots with a timestamp older than a given value, with retention strategies. + + Args: + timestamp_ms: Only snapshots with timestamp_ms < this value will be expired. + retain_last_n: Always keep the last N snapshots regardless of age. + min_snapshots_to_keep: Minimum number of snapshots to keep in total. + + Returns: + This for method chaining. + """ + snapshots_to_expire = self._get_snapshots_to_expire_with_retention( + timestamp_ms=timestamp_ms, retain_last_n=retain_last_n, min_snapshots_to_keep=min_snapshots_to_keep + ) + self._snapshot_ids_to_expire.update(snapshots_to_expire) + return self + + def with_retention_policy( + self, timestamp_ms: Optional[int] = None, retain_last_n: Optional[int] = None, min_snapshots_to_keep: Optional[int] = None + ) -> ExpireSnapshots: + """Comprehensive snapshot expiration with multiple retention strategies. + + This method provides a unified interface for snapshot expiration with various + retention policies to ensure operational resilience while allowing space reclamation. + + The method will use table properties as defaults if they are set: + - history.expire.max-snapshot-age-ms: Default for timestamp_ms if not provided + - history.expire.min-snapshots-to-keep: Default for min_snapshots_to_keep if not provided + - history.expire.max-ref-age-ms: Used for ref expiration (branches/tags) + + Args: + timestamp_ms: Only snapshots with timestamp_ms < this value will be considered for expiration. + If None, will use history.expire.max-snapshot-age-ms table property if set. + retain_last_n: Always keep the last N snapshots regardless of age. + Useful when regular snapshot creation occurs and users want to keep + the last few for rollback purposes. + min_snapshots_to_keep: Minimum number of snapshots to keep in total. + Acts as a guardrail to prevent aggressive expiration logic. + If None, will use history.expire.min-snapshots-to-keep table property if set. + + Returns: + This for method chaining. + + Raises: + ValueError: If retain_last_n or min_snapshots_to_keep is less than 1. + + Examples: + # Use table property defaults + table.expire_snapshots().with_retention_policy().commit() + + # Override defaults with explicit values + table.expire_snapshots().with_retention_policy( + timestamp_ms=1234567890000, + retain_last_n=10, + min_snapshots_to_keep=5 + ).commit() + """ + # Get default values from table properties + default_max_age, default_min_snapshots, _ = self._get_expiration_properties() + + # Use defaults from table properties if not explicitly provided + if timestamp_ms is None: + timestamp_ms = default_max_age + + if min_snapshots_to_keep is None: + min_snapshots_to_keep = default_min_snapshots + + # If no expiration criteria are provided, don't expire anything + if timestamp_ms is None and retain_last_n is None and min_snapshots_to_keep is None: + return self + + if retain_last_n is not None and retain_last_n < 1: + raise ValueError("retain_last_n must be at least 1") + + if min_snapshots_to_keep is not None and min_snapshots_to_keep < 1: + raise ValueError("min_snapshots_to_keep must be at least 1") + + snapshots_to_expire = self._get_snapshots_to_expire_with_retention( + timestamp_ms=timestamp_ms, retain_last_n=retain_last_n, min_snapshots_to_keep=min_snapshots_to_keep + ) + self._snapshot_ids_to_expire.update(snapshots_to_expire) + return self + + def retain_last_n(self, n: int) -> ExpireSnapshots: + """Keep only the last N snapshots globally across all branches, expiring all others. + + Note: This method considers snapshots globally across the entire table history, + not per-branch. Protected snapshots (branch/tag heads) are always preserved + regardless of the retention count. + + Args: + n: Number of most recent snapshots to keep globally. + + Returns: + This for method chaining. + + Raises: + ValueError: If n is less than 1. + """ + if n < 1: + raise ValueError("Number of snapshots to retain must be at least 1") + + snapshots_to_keep = self._get_snapshots_to_keep(retain_last_n=n) + snapshots_to_expire = [ + id for snapshot in self._transaction.table_metadata.snapshots if (id := snapshot.snapshot_id) not in snapshots_to_keep + ] + + self._snapshot_ids_to_expire.update(snapshots_to_expire) + return self + + def _get_snapshots_to_keep(self, retain_last_n: Optional[int] = None) -> Set[int]: + """Get set of snapshot IDs that should be kept based on protection and retention rules. + + Args: + retain_last_n: Number of most recent snapshots to keep. + + Returns: + Set of snapshot IDs to keep. + """ + snapshots_to_keep = self._get_protected_snapshot_ids() + + if retain_last_n is not None: + # Sort snapshots by timestamp (most recent first), and get most recent N + sorted_snapshots = sorted(self._transaction.table_metadata.snapshots, key=lambda s: s.timestamp_ms, reverse=True) + snapshots_to_keep.update(snapshot.snapshot_id for snapshot in sorted_snapshots[:retain_last_n]) + + return snapshots_to_keep + + def _get_snapshots_to_expire_with_retention( + self, timestamp_ms: Optional[int] = None, retain_last_n: Optional[int] = None, min_snapshots_to_keep: Optional[int] = None + ) -> List[int]: + """Get snapshots to expire considering retention strategies. + + Args: + timestamp_ms: Only snapshots with timestamp_ms < this value will be considered for expiration. + retain_last_n: Always keep the last N snapshots regardless of age. + min_snapshots_to_keep: Minimum number of snapshots to keep in total. + + Returns: + List of snapshot IDs to expire. + """ + snapshots_to_keep = self._get_snapshots_to_keep(retain_last_n=retain_last_n) + + # Apply timestamp constraint + candidates_for_expiration = [] + for snapshot in self._transaction.table_metadata.snapshots: + if snapshot.snapshot_id not in snapshots_to_keep and (timestamp_ms is None or snapshot.timestamp_ms < timestamp_ms): + candidates_for_expiration.append(snapshot) + + # Sort candidates by timestamp (oldest first) for potential expiration + candidates_for_expiration.sort(key=lambda s: s.timestamp_ms) + + # Apply min_snapshots_to_keep constraint + if min_snapshots_to_keep is not None: + total_snapshots = len(self._transaction.table_metadata.snapshots) + max_to_expire = total_snapshots - min_snapshots_to_keep + snapshots_to_expire = [candidate.snapshot_id for candidate in candidates_for_expiration[:max_to_expire]] + else: + snapshots_to_expire = [candidate.snapshot_id for candidate in candidates_for_expiration] + + return snapshots_to_expire + + def _get_expiration_properties(self) -> tuple[Optional[int], Optional[int], Optional[int]]: + """Get the default expiration properties from table properties. + + Returns: + Tuple of (max_snapshot_age_ms, min_snapshots_to_keep, max_ref_age_ms) + """ + from pyiceberg.table import TableProperties + + properties = self._transaction.table_metadata.properties + + max_snapshot_age = property_as_int(properties, TableProperties.MAX_SNAPSHOT_AGE_MS) + min_snapshots_to_keep = property_as_int(properties, TableProperties.MIN_SNAPSHOTS_TO_KEEP) + max_ref_age_ms = property_as_int(properties, "history.expire.max-ref-age-ms") + + return max_snapshot_age, min_snapshots_to_keep, max_ref_age_ms diff --git a/tests/table/test_expire_snapshots.py b/tests/table/test_expire_snapshots.py index e2b2d47b67..59491ff767 100644 --- a/tests/table/test_expire_snapshots.py +++ b/tests/table/test_expire_snapshots.py @@ -21,6 +21,7 @@ import pytest from pyiceberg.table import CommitTableResponse, Table +from pyiceberg.table.update.snapshot import ExpireSnapshots def test_cannot_expire_protected_head_snapshot(table_v2: Table) -> None: @@ -223,3 +224,195 @@ def test_expire_snapshots_by_ids(table_v2: Table) -> None: assert EXPIRE_SNAPSHOT_1 not in remaining_snapshots assert EXPIRE_SNAPSHOT_2 not in remaining_snapshots assert len(table_v2.metadata.snapshots) == 1 + + +def test_retain_last_n_with_protection(table_v2: Table) -> None: + """Test retain_last_n keeps most recent snapshots plus protected ones.""" + from types import SimpleNamespace + + # Clear shared state set on the class between tests + ExpireSnapshots._snapshot_ids_to_expire.clear() + + PROTECTED_SNAPSHOT = 3051729675574597101 # oldest (also protected) + EXPIRE_SNAPSHOT = 3051729675574597102 + KEEP_SNAPSHOT_1 = 3051729675574597103 + KEEP_SNAPSHOT_2 = 3051729675574597104 # newest + + # Protected PROTECTED_SNAPSHOT as branch head + table_v2.metadata = table_v2.metadata.model_copy( + update={ + "refs": { + "main": MagicMock(snapshot_id=PROTECTED_SNAPSHOT, snapshot_ref_type="branch"), + }, + "snapshots": [ + SimpleNamespace(snapshot_id=PROTECTED_SNAPSHOT, timestamp_ms=1, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=EXPIRE_SNAPSHOT, timestamp_ms=2, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=KEEP_SNAPSHOT_1, timestamp_ms=3, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=KEEP_SNAPSHOT_2, timestamp_ms=4, parent_snapshot_id=None), + ], + } + ) + + table_v2.catalog = MagicMock() + kept_ids = {PROTECTED_SNAPSHOT, KEEP_SNAPSHOT_1, KEEP_SNAPSHOT_2} # retain_last_n=2 keeps newest plus protected + mock_response = CommitTableResponse( + metadata=table_v2.metadata.model_copy(update={"snapshots": list(kept_ids)}), + metadata_location="mock://metadata/location", + uuid=uuid4(), + ) + table_v2.catalog.commit_table.return_value = mock_response + + table_v2.maintenance.expire_snapshots().retain_last_n(2).commit() + table_v2.metadata = mock_response.metadata + + args, kwargs = table_v2.catalog.commit_table.call_args + updates = args[2] if len(args) > 2 else () + remove_update = next((u for u in updates if getattr(u, "action", None) == "remove-snapshots"), None) + assert remove_update is not None + # Only EXPIRE_SNAPSHOT should be expired + assert set(remove_update.snapshot_ids) == {EXPIRE_SNAPSHOT} + assert EXPIRE_SNAPSHOT not in table_v2.metadata.snapshots + + +def test_retain_last_n_validation(table_v2: Table) -> None: + """Test retain_last_n validates n >= 1.""" + ExpireSnapshots._snapshot_ids_to_expire.clear() + + with pytest.raises(ValueError, match="Number of snapshots to retain must be at least 1"): + table_v2.maintenance.expire_snapshots().retain_last_n(0) + + with pytest.raises(ValueError, match="Number of snapshots to retain must be at least 1"): + table_v2.maintenance.expire_snapshots().retain_last_n(-1) + + +def test_with_retention_policy_validation(table_v2: Table) -> None: + """Test with_retention_policy validates parameter ranges.""" + ExpireSnapshots._snapshot_ids_to_expire.clear() + + with pytest.raises(ValueError, match="retain_last_n must be at least 1"): + table_v2.maintenance.expire_snapshots().with_retention_policy(retain_last_n=0).commit() + + with pytest.raises(ValueError, match="min_snapshots_to_keep must be at least 1"): + table_v2.maintenance.expire_snapshots().with_retention_policy(min_snapshots_to_keep=0).commit() + + +def test_with_retention_policy_no_criteria_does_nothing(table_v2: Table) -> None: + """Test with_retention_policy does nothing when no criteria provided.""" + from types import SimpleNamespace + + ExpireSnapshots._snapshot_ids_to_expire.clear() + + SNAPSHOT_1, SNAPSHOT_2 = 3051729675574597501, 3051729675574597502 + snapshots = [ + SimpleNamespace(snapshot_id=SNAPSHOT_1, timestamp_ms=100, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=SNAPSHOT_2, timestamp_ms=200, parent_snapshot_id=None), + ] + table_v2.metadata = table_v2.metadata.model_copy(update={"refs": {}, "snapshots": snapshots, "properties": {}}) + table_v2.catalog = MagicMock() + + # Should be a no-op + result = table_v2.maintenance.expire_snapshots().with_retention_policy() + assert result._snapshot_ids_to_expire == set() + + +def test_get_expiration_properties_missing_values(table_v2: Table) -> None: + """Test _get_expiration_properties handles missing properties gracefully.""" + ExpireSnapshots._snapshot_ids_to_expire.clear() + + # No expiration properties set + table_v2.metadata = table_v2.metadata.model_copy(update={"properties": {}}) + expire = table_v2.maintenance.expire_snapshots() + max_age, min_snaps, max_ref_age = expire._get_expiration_properties() + + assert max_age is None + assert min_snaps is None + assert max_ref_age is None + + +def test_retain_last_n_fewer_snapshots_than_requested(table_v2: Table) -> None: + """Test retain_last_n when table has fewer snapshots than requested.""" + from types import SimpleNamespace + + ExpireSnapshots._snapshot_ids_to_expire.clear() + + SNAPSHOT_1, SNAPSHOT_2 = 3051729675574597601, 3051729675574597602 + snapshots = [ + SimpleNamespace(snapshot_id=SNAPSHOT_1, timestamp_ms=100, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=SNAPSHOT_2, timestamp_ms=200, parent_snapshot_id=None), + ] + table_v2.metadata = table_v2.metadata.model_copy(update={"refs": {}, "snapshots": snapshots}) + table_v2.catalog = MagicMock() + + # Request to keep 5 snapshots, but only 2 exist + result = table_v2.maintenance.expire_snapshots().retain_last_n(5) + # Should not expire anything + assert result._snapshot_ids_to_expire == set() + + +def test_older_than_with_retention_edge_cases(table_v2: Table) -> None: + """Test edge cases for older_than_with_retention.""" + from types import SimpleNamespace + + ExpireSnapshots._snapshot_ids_to_expire.clear() + + # Test with retain_last_n and a valid timestamp + EXPIRE_SNAPSHOT, KEEP_SNAPSHOT_1, KEEP_SNAPSHOT_2 = 3051729675574597701, 3051729675574597702, 3051729675574597703 + snapshots = [ + SimpleNamespace(snapshot_id=EXPIRE_SNAPSHOT, timestamp_ms=100, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=KEEP_SNAPSHOT_1, timestamp_ms=200, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=KEEP_SNAPSHOT_2, timestamp_ms=300, parent_snapshot_id=None), + ] + table_v2.metadata = table_v2.metadata.model_copy(update={"refs": {}, "snapshots": snapshots}) + table_v2.catalog = MagicMock() + + mock_response = CommitTableResponse( + metadata=table_v2.metadata.model_copy(update={"snapshots": [KEEP_SNAPSHOT_1, KEEP_SNAPSHOT_2]}), + metadata_location="mock://metadata/location", + uuid=uuid4(), + ) + table_v2.catalog.commit_table.return_value = mock_response + + # Use a timestamp that would include all snapshots, but retain_last_n=2 should limit to keeping KEEP_SNAPSHOT_1, KEEP_SNAPSHOT_2 + table_v2.maintenance.expire_snapshots().older_than_with_retention( + timestamp_ms=400, retain_last_n=2, min_snapshots_to_keep=None + ).commit() + + args, kwargs = table_v2.catalog.commit_table.call_args + updates = args[2] if len(args) > 2 else () + remove_update = next((u for u in updates if getattr(u, "action", None) == "remove-snapshots"), None) + assert remove_update is not None + assert set(remove_update.snapshot_ids) == {EXPIRE_SNAPSHOT} + + +def test_with_retention_policy_partial_properties(table_v2: Table) -> None: + """Test with_retention_policy with only some properties set.""" + from types import SimpleNamespace + + ExpireSnapshots._snapshot_ids_to_expire.clear() + + # Only max-snapshot-age-ms set, no min-snapshots-to-keep + properties = {"history.expire.max-snapshot-age-ms": "250"} + EXPIRE_SNAPSHOT_1, EXPIRE_SNAPSHOT_2, KEEP_SNAPSHOT = 3051729675574597801, 3051729675574597802, 3051729675574597803 + snapshots = [ + SimpleNamespace(snapshot_id=EXPIRE_SNAPSHOT_1, timestamp_ms=100, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=EXPIRE_SNAPSHOT_2, timestamp_ms=200, parent_snapshot_id=None), + SimpleNamespace(snapshot_id=KEEP_SNAPSHOT, timestamp_ms=300, parent_snapshot_id=None), + ] + table_v2.metadata = table_v2.metadata.model_copy(update={"refs": {}, "snapshots": snapshots, "properties": properties}) + table_v2.catalog = MagicMock() + + mock_response = CommitTableResponse( + metadata=table_v2.metadata.model_copy(update={"snapshots": [KEEP_SNAPSHOT]}), + metadata_location="mock://metadata/location", + uuid=uuid4(), + ) + table_v2.catalog.commit_table.return_value = mock_response + + # Should expire EXPIRE_SNAPSHOT_1,EXPIRE_SNAPSHOT_2 (older than 250ms), keep KEEP_SNAPSHOT + table_v2.maintenance.expire_snapshots().with_retention_policy().commit() + + args, kwargs = table_v2.catalog.commit_table.call_args + updates = args[2] if len(args) > 2 else () + remove_update = next((u for u in updates if getattr(u, "action", None) == "remove-snapshots"), None) + assert remove_update is not None + assert set(remove_update.snapshot_ids) == {EXPIRE_SNAPSHOT_1, EXPIRE_SNAPSHOT_2}