diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 28407d4ffa..2b63b8e6a1 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -1287,6 +1287,47 @@ with table.manage_snapshots() as ms: ms.create_branch(snapshot_id1, "Branch_A").create_tag(snapshot_id2, "tag789") ``` +## Table Maintenance + +PyIceberg provides table maintenance operations through the `table.maintenance` API. This provides a clean interface for performing maintenance tasks like snapshot expiration. + +### Snapshot Expiration + +Expire old snapshots to clean up table metadata and reduce storage costs: + +```python +# Basic usage - expire a specific snapshot by ID +table.maintenance.expire_snapshots().by_id(12345).commit() + +# Context manager usage (recommended for multiple operations) +with table.maintenance.expire_snapshots() as expire: + expire.by_id(12345) + expire.by_id(67890) + # Automatically commits when exiting the context + +# Method chaining +table.maintenance.expire_snapshots().by_id(12345).commit() +``` + +#### Real-world Example + +```python +def cleanup_old_snapshots(table_name: str, snapshot_ids: list[int]): + """Remove specific snapshots from a table.""" + catalog = load_catalog("production") + table = catalog.load_table(table_name) + + # Use context manager for safe transaction handling + with table.maintenance.expire_snapshots() as expire: + for snapshot_id in snapshot_ids: + expire.by_id(snapshot_id) + + print(f"Expired {len(snapshot_ids)} snapshots from {table_name}") + +# Usage +cleanup_old_snapshots("analytics.user_events", [12345, 67890, 11111]) +``` + ## Views PyIceberg supports view operations. diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 06368f709c..13bf3e21c5 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -80,6 +80,7 @@ from pyiceberg.schema import Schema from pyiceberg.table.inspect import InspectTable from pyiceberg.table.locations import LocationProvider, load_location_provider +from pyiceberg.table.maintenance import MaintenanceTable from pyiceberg.table.metadata import ( INITIAL_SEQUENCE_NUMBER, TableMetadata, @@ -115,7 +116,7 @@ update_table_metadata, ) from pyiceberg.table.update.schema import UpdateSchema -from pyiceberg.table.update.snapshot import ExpireSnapshots, ManageSnapshots, UpdateSnapshot, _FastAppendFiles +from pyiceberg.table.update.snapshot import ManageSnapshots, UpdateSnapshot, _FastAppendFiles from pyiceberg.table.update.spec import UpdateSpec from pyiceberg.table.update.statistics import UpdateStatistics from pyiceberg.transforms import IdentityTransform @@ -1069,6 +1070,15 @@ def inspect(self) -> InspectTable: """ return InspectTable(self) + @property + def maintenance(self) -> MaintenanceTable: + """Return the MaintenanceTable object for maintenance. + + Returns: + MaintenanceTable object based on this Table. + """ + return MaintenanceTable(self) + def refresh(self) -> Table: """Refresh the current table metadata. @@ -1241,10 +1251,6 @@ def manage_snapshots(self) -> ManageSnapshots: """ return ManageSnapshots(transaction=Transaction(self, autocommit=True)) - def expire_snapshots(self) -> ExpireSnapshots: - """Shorthand to run expire snapshots by id or by a timestamp.""" - return ExpireSnapshots(transaction=Transaction(self, autocommit=True)) - def update_statistics(self) -> UpdateStatistics: """ Shorthand to run statistics management operations like add statistics and remove statistics. diff --git a/pyiceberg/table/maintenance.py b/pyiceberg/table/maintenance.py new file mode 100644 index 0000000000..0fcda35ae9 --- /dev/null +++ b/pyiceberg/table/maintenance.py @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +logger = logging.getLogger(__name__) + + +if TYPE_CHECKING: + from pyiceberg.table import Table + from pyiceberg.table.update.snapshot import ExpireSnapshots + + +class MaintenanceTable: + tbl: Table + + def __init__(self, tbl: Table) -> None: + self.tbl = tbl + + def expire_snapshots(self) -> ExpireSnapshots: + """Return an ExpireSnapshots builder for snapshot expiration operations. + + Returns: + ExpireSnapshots builder for configuring and executing snapshot expiration. + """ + from pyiceberg.table import Transaction + from pyiceberg.table.update.snapshot import ExpireSnapshots + + return ExpireSnapshots(transaction=Transaction(self.tbl, autocommit=True)) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 3ffb275ded..9a44024a3c 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -907,8 +907,7 @@ def remove_branch(self, branch_name: str) -> ManageSnapshots: class ExpireSnapshots(UpdateTableMetadata["ExpireSnapshots"]): - """ - Expire snapshots by ID. + """Expire snapshots by ID. Use table.expire_snapshots().().commit() to run a specific operation. Use table.expire_snapshots().().().commit() to run multiple operations. @@ -953,7 +952,7 @@ def _get_protected_snapshot_ids(self) -> Set[int]: return protected_ids - def expire_snapshot_by_id(self, snapshot_id: int) -> ExpireSnapshots: + def by_id(self, snapshot_id: int) -> ExpireSnapshots: """ Expire a snapshot by its ID. @@ -974,7 +973,7 @@ def expire_snapshot_by_id(self, snapshot_id: int) -> ExpireSnapshots: return self - def expire_snapshots_by_ids(self, snapshot_ids: List[int]) -> "ExpireSnapshots": + def by_ids(self, snapshot_ids: List[int]) -> "ExpireSnapshots": """ Expire multiple snapshots by their IDs. @@ -986,10 +985,10 @@ def expire_snapshots_by_ids(self, snapshot_ids: List[int]) -> "ExpireSnapshots": This for method chaining. """ for snapshot_id in snapshot_ids: - self.expire_snapshot_by_id(snapshot_id) + self.by_id(snapshot_id) return self - def expire_snapshots_older_than(self, timestamp_ms: int) -> "ExpireSnapshots": + def older_than(self, timestamp_ms: int) -> "ExpireSnapshots": """ Expire all unprotected snapshots with a timestamp older than a given value. diff --git a/tests/table/test_expire_snapshots.py b/tests/table/test_expire_snapshots.py index 82ecb9e493..273b4f631b 100644 --- a/tests/table/test_expire_snapshots.py +++ b/tests/table/test_expire_snapshots.py @@ -43,7 +43,7 @@ def test_cannot_expire_protected_head_snapshot(table_v2: Table) -> None: # Attempt to expire the HEAD snapshot and expect a ValueError with pytest.raises(ValueError, match=f"Snapshot with ID {HEAD_SNAPSHOT} is protected and cannot be expired."): - table_v2.expire_snapshots().expire_snapshot_by_id(HEAD_SNAPSHOT).commit() + table_v2.maintenance.expire_snapshots().by_id(HEAD_SNAPSHOT).commit() table_v2.catalog.commit_table.assert_not_called() @@ -66,7 +66,7 @@ def test_cannot_expire_tagged_snapshot(table_v2: Table) -> None: assert any(ref.snapshot_id == TAGGED_SNAPSHOT for ref in table_v2.metadata.refs.values()) with pytest.raises(ValueError, match=f"Snapshot with ID {TAGGED_SNAPSHOT} is protected and cannot be expired."): - table_v2.expire_snapshots().expire_snapshot_by_id(TAGGED_SNAPSHOT).commit() + table_v2.maintenance.expire_snapshots().by_id(TAGGED_SNAPSHOT).commit() table_v2.catalog.commit_table.assert_not_called() @@ -98,7 +98,7 @@ def test_expire_unprotected_snapshot(table_v2: Table) -> None: assert all(ref.snapshot_id != EXPIRE_SNAPSHOT for ref in table_v2.metadata.refs.values()) # Expire the snapshot - table_v2.expire_snapshots().expire_snapshot_by_id(EXPIRE_SNAPSHOT).commit() + table_v2.maintenance.expire_snapshots().by_id(EXPIRE_SNAPSHOT).commit() table_v2.catalog.commit_table.assert_called_once() remaining_snapshots = table_v2.metadata.snapshots @@ -114,7 +114,7 @@ def test_expire_nonexistent_snapshot_raises(table_v2: Table) -> None: table_v2.metadata = table_v2.metadata.model_copy(update={"refs": {}}) with pytest.raises(ValueError, match=f"Snapshot with ID {NONEXISTENT_SNAPSHOT} does not exist."): - table_v2.expire_snapshots().expire_snapshot_by_id(NONEXISTENT_SNAPSHOT).commit() + table_v2.maintenance.expire_snapshots().by_id(NONEXISTENT_SNAPSHOT).commit() table_v2.catalog.commit_table.assert_not_called() @@ -152,7 +152,7 @@ def test_expire_snapshots_by_timestamp_skips_protected(table_v2: Table) -> None: ) table_v2.catalog.commit_table.return_value = mock_response - table_v2.expire_snapshots().expire_snapshots_older_than(future_timestamp).commit() + table_v2.maintenance.expire_snapshots().older_than(future_timestamp).commit() # Update metadata to reflect the commit (as in other tests) table_v2.metadata = mock_response.metadata @@ -215,7 +215,7 @@ def test_expire_snapshots_by_ids(table_v2: Table) -> None: assert all(ref.snapshot_id not in (EXPIRE_SNAPSHOT_1, EXPIRE_SNAPSHOT_2) for ref in table_v2.metadata.refs.values()) # Expire the snapshots - table_v2.expire_snapshots().expire_snapshots_by_ids([EXPIRE_SNAPSHOT_1, EXPIRE_SNAPSHOT_2]).commit() + table_v2.maintenance.expire_snapshots().by_ids([EXPIRE_SNAPSHOT_1, EXPIRE_SNAPSHOT_2]).commit() table_v2.catalog.commit_table.assert_called_once() remaining_snapshots = table_v2.metadata.snapshots