-
Notifications
You must be signed in to change notification settings - Fork 418
feat: add snapshot expiration methods with retention strategies #2369
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 4 commits
3da1528
a8fe03d
a1a126f
2c6eb0b
855e22a
29ae3a8
ad556e3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -953,7 +953,7 @@ def _get_protected_snapshot_ids(self) -> Set[int]: | |||||||||||||||||||||||||||||||||||||||||||||||||||||
| if ref.snapshot_ref_type in [SnapshotRefType.TAG, SnapshotRefType.BRANCH] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def by_id(self, snapshot_id: int) -> ExpireSnapshots: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def by_id(self, snapshot_id: int) -> "ExpireSnapshots": | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Expire a snapshot by its ID. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -1005,3 +1005,197 @@ def older_than(self, dt: datetime) -> "ExpireSnapshots": | |||||||||||||||||||||||||||||||||||||||||||||||||||||
| if snapshot.timestamp_ms < expire_from and snapshot.snapshot_id not in protected_ids: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._snapshot_ids_to_expire.add(snapshot.snapshot_id) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return self | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def older_than_with_retention( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self, timestamp_ms: int, retain_last_n: Optional[int] = None, min_snapshots_to_keep: Optional[int] = None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) -> "ExpireSnapshots": | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Expire all unprotected snapshots with a timestamp older than a given value, with retention strategies. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Args: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| timestamp_ms: Only snapshots with timestamp_ms < this value will be expired. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| retain_last_n: Always keep the last N snapshots regardless of age. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| min_snapshots_to_keep: Minimum number of snapshots to keep in total. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Returns: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| This for method chaining. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| snapshots_to_expire = self._get_snapshots_to_expire_with_retention( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| timestamp_ms=timestamp_ms, retain_last_n=retain_last_n, min_snapshots_to_keep=min_snapshots_to_keep | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._snapshot_ids_to_expire.update(snapshots_to_expire) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return self | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def with_retention_policy( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self, timestamp_ms: Optional[int] = None, retain_last_n: Optional[int] = None, min_snapshots_to_keep: Optional[int] = None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) -> "ExpireSnapshots": | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Comprehensive snapshot expiration with multiple retention strategies. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| This method provides a unified interface for snapshot expiration with various | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| retention policies to ensure operational resilience while allowing space reclamation. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| The method will use table properties as defaults if they are set: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| - history.expire.max-snapshot-age-ms: Default for timestamp_ms if not provided | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| - history.expire.min-snapshots-to-keep: Default for min_snapshots_to_keep if not provided | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| - history.expire.max-ref-age-ms: Used for ref expiration (branches/tags) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Args: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| timestamp_ms: Only snapshots with timestamp_ms < this value will be considered for expiration. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| If None, will use history.expire.max-snapshot-age-ms table property if set. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| retain_last_n: Always keep the last N snapshots regardless of age. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Useful when regular snapshot creation occurs and users want to keep | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| the last few for rollback purposes. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
ForeverAngry marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| min_snapshots_to_keep: Minimum number of snapshots to keep in total. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Acts as a guardrail to prevent aggressive expiration logic. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| If None, will use history.expire.min-snapshots-to-keep table property if set. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Returns: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| This for method chaining. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Raises: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ValueError: If retain_last_n or min_snapshots_to_keep is less than 1. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Examples: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Use table property defaults | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| table.expire_snapshots().with_retention_policy().commit() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Override defaults with explicit values | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| table.expire_snapshots().with_retention_policy( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| timestamp_ms=1234567890000, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| retain_last_n=10, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| min_snapshots_to_keep=5 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ).commit() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Get default values from table properties | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| default_max_age, default_min_snapshots, _ = self._get_expiration_properties() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Use defaults from table properties if not explicitly provided | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if timestamp_ms is None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| timestamp_ms = default_max_age | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if min_snapshots_to_keep is None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| min_snapshots_to_keep = default_min_snapshots | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # If no expiration criteria are provided, don't expire anything | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if timestamp_ms is None and retain_last_n is None and min_snapshots_to_keep is None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return self | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if retain_last_n is not None and retain_last_n < 1: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| raise ValueError("retain_last_n must be at least 1") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if min_snapshots_to_keep is not None and min_snapshots_to_keep < 1: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| raise ValueError("min_snapshots_to_keep must be at least 1") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| snapshots_to_expire = self._get_snapshots_to_expire_with_retention( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| timestamp_ms=timestamp_ms, retain_last_n=retain_last_n, min_snapshots_to_keep=min_snapshots_to_keep | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._snapshot_ids_to_expire.update(snapshots_to_expire) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return self | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def retain_last_n(self, n: int) -> "ExpireSnapshots": | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Keep only the last N snapshots, expiring all others. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Args: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| n: Number of most recent snapshots to keep. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Returns: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| This for method chaining. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Raises: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ValueError: If n is less than 1. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if n < 1: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| raise ValueError("Number of snapshots to retain must be at least 1") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| protected_ids = self._get_protected_snapshot_ids() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Sort snapshots by timestamp (most recent first) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| sorted_snapshots = sorted(self._transaction.table_metadata.snapshots, key=lambda s: s.timestamp_ms, reverse=True) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Keep the last N snapshots and all protected ones | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| snapshots_to_keep = set() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| snapshots_to_keep.update(protected_ids) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Add the N most recent snapshots | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for i, snapshot in enumerate(sorted_snapshots): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if i < n: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| snapshots_to_keep.add(snapshot.snapshot_id) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Find snapshots to expire | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| snapshots_to_expire = [] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for snapshot in self._transaction.table_metadata.snapshots: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if snapshot.snapshot_id not in snapshots_to_keep: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| snapshots_to_expire.append(snapshot.snapshot_id) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| protected_ids = self._get_protected_snapshot_ids() | |
| # Sort snapshots by timestamp (most recent first) | |
| sorted_snapshots = sorted(self._transaction.table_metadata.snapshots, key=lambda s: s.timestamp_ms, reverse=True) | |
| # Keep the last N snapshots and all protected ones | |
| snapshots_to_keep = set() | |
| snapshots_to_keep.update(protected_ids) | |
| # Add the N most recent snapshots | |
| for i, snapshot in enumerate(sorted_snapshots): | |
| if i < n: | |
| snapshots_to_keep.add(snapshot.snapshot_id) | |
| # Find snapshots to expire | |
| snapshots_to_expire = [] | |
| for snapshot in self._transaction.table_metadata.snapshots: | |
| if snapshot.snapshot_id not in snapshots_to_keep: | |
| snapshots_to_expire.append(snapshot.snapshot_id) | |
| snapshots_to_keep = self._get_protected_snapshot_ids() | |
| # Sort snapshots by timestamp (most recent first), and get most recent N | |
| sorted_snapshots = sorted(self._transaction.table_metadata.snapshots, key=lambda s: s.timestamp_ms, reverse=True) | |
| snapshots_to_keep.update(snapshot.snapshot_id for snapshot in sorted_snapshots[:n]) | |
| snapshots_to_expire = [id for snapshot in self._transaction.table_metadata.snapshots if (id := snapshot.snapshot_id) not in snapshots_to_keep] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small syntax change to make more pythonic :)
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this code is the same as in retain_last_n, can we refactor to its own function? I think we also need to handle branches and take the last n of each branch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make more pythonic with comprehension?
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| # Sort candidates by timestamp (oldest first) for potential expiration | |
| candidates_for_expiration.sort(key=lambda s: s.timestamp_ms) | |
| # Apply min_snapshots_to_keep constraint | |
| total_snapshots = len(self._transaction.table_metadata.snapshots) | |
| snapshots_to_expire: List[int] = [] | |
| for candidate in candidates_for_expiration: | |
| # Check if expiring this snapshot would violate min_snapshots_to_keep | |
| remaining_after_expiration = total_snapshots - len(snapshots_to_expire) - 1 | |
| if min_snapshots_to_keep is None or remaining_after_expiration >= min_snapshots_to_keep: | |
| snapshots_to_expire.append(candidate.snapshot_id) | |
| else: | |
| # Stop expiring to maintain minimum count | |
| break | |
| # Sort candidates by timestamp (newest first) for potential expiration | |
| candidates_for_expiration.sort(key=lambda s: s.timestamp_ms, reverse=True) | |
| snapshots_to_expire = candidates_for_expiration[min_snapshots_to_keep:] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
double check that I didn't make an off-by-one error here but I believe this is a more concise way to express things :)
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this string and the default value be a named constant somewhere? What do you think about using property_as_int from properties.py to be consistent with how properties are handled elsewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fwiw since we have
from __future__ import annotationsat the top of the file I think its cleaner to make things consistent to not have quotes. Probably outside of the scope of this PRThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thats a great point. I can log an issue to address these all at one.