Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
0a94d96
Added initial units tests and Class for Removing a Snapshot
ForeverAngry Mar 29, 2025
5f0b62b
Added methods needed to expire snapshots by id, and optionally cleanu…
ForeverAngry Mar 31, 2025
f995daa
Update test_expire_snapshots.py
ForeverAngry Mar 31, 2025
65365e1
Added the builder method to __init__.py, updated the snapshot api wit…
ForeverAngry Apr 1, 2025
e28815f
Snapshots are not being transacted on, but need to re-assign refs
ForeverAngry Apr 1, 2025
4628ede
Fixed the test case.
ForeverAngry Apr 3, 2025
e80c41c
adding print statements to help with debugging
ForeverAngry Apr 3, 2025
cb9f0c9
Draft ready
ForeverAngry Apr 3, 2025
ebcff2b
Applied suggestions to Fix CICD
ForeverAngry Apr 3, 2025
97399bf
Merge branch 'main' into main
ForeverAngry Apr 3, 2025
95e5af2
Rebuild the poetry lock file.
ForeverAngry Apr 3, 2025
5ab5890
Merge branch 'main' into main
ForeverAngry Apr 4, 2025
5acd690
Refactor implementation of `ExpireSnapshots`
ForeverAngry Apr 13, 2025
d30a08c
Fixed format and linting issues
ForeverAngry Apr 13, 2025
e62ab58
Merge branch 'main' into main
ForeverAngry Apr 13, 2025
1af3258
Fixed format and linting issues
ForeverAngry Apr 13, 2025
352b48f
Merge branch 'main' of https://github.com/ForeverAngry/iceberg-python
ForeverAngry Apr 13, 2025
382e0ea
Merge branch 'main' into main
ForeverAngry Apr 18, 2025
549c183
rebased: from main
ForeverAngry Apr 19, 2025
386cb15
fixed: typo
ForeverAngry Apr 19, 2025
12729fa
removed errant files
ForeverAngry Apr 22, 2025
ce3515c
Added: public method signature to the init table file.
ForeverAngry Apr 22, 2025
28fce4b
Removed: `expire_snapshots_older_than` method, in favor of implementi…
ForeverAngry Apr 24, 2025
2c3153e
Update tests/table/test_expire_snapshots.py
ForeverAngry Apr 26, 2025
27c3ece
Removed: unrelated changes, Added: logic to expire snapshot method.
ForeverAngry Apr 26, 2025
fe73a34
feat: implement deduplication of data files in Iceberg table and remo…
ForeverAngry Jul 5, 2025
8dfa038
Closes:
ForeverAngry Jul 5, 2025
42e55c9
refactor: remove obsolete `expire_snapshots_older_than` method
ForeverAngry Jul 5, 2025
e1627c4
### Features & Enhancements
ForeverAngry Jul 5, 2025
0e6d45c
feat: enhance table maintenance with deduplication and snapshot reten…
ForeverAngry Jul 5, 2025
311c442
feat: update maintenance features with deduplication and retention st…
ForeverAngry Jul 5, 2025
fba592d
Update .gitignore
ForeverAngry Jul 5, 2025
b837f86
Update test_writes.py
ForeverAngry Jul 5, 2025
4605a04
Merge branch 'main' into refactor/consolidate-snapshot-expiration
ForeverAngry Jul 5, 2025
536528e
refactor: remove obsolete test file for snapshot expiration
ForeverAngry Jul 5, 2025
6036e12
wip: enhance deduplication logic and improve data file handling in ma…
ForeverAngry Jul 5, 2025
9dc9c82
wip - refactor: update deduplication tests to use file names instead …
ForeverAngry Jul 5, 2025
635a1d9
fix(table): correct deduplication logic for data files in Maintenance…
ForeverAngry Jul 5, 2025
73658e0
fix(tests): ensure commit_table is not called when no snapshots are e…
ForeverAngry Jul 5, 2025
a9a01ee
refactor: remove unused expire_snapshots method and clean up transact…
ForeverAngry Jul 5, 2025
8c906d2
refactor: streamline data file retrieval in MaintenanceTable and enha…
ForeverAngry Jul 6, 2025
0e72ccc
Reverted changes back to prior commit version for `_get_all_datafiles`
ForeverAngry Jul 6, 2025
cfb4061
refactor: simplify snapshot expiration logic and clean up unused imports
ForeverAngry Jul 6, 2025
9371bca
Merge branch 'main' into refactor/consolidate-snapshot-expiration
ForeverAngry Jul 6, 2025
881fab9
fix: add missing newline in API documentation for clarity
ForeverAngry Jul 6, 2025
acb70da
refactor: update license header in test_retention_strategies.py
ForeverAngry Jul 6, 2025
54c1f7f
feat: add license header to test_overwrite_files.py
ForeverAngry Jul 6, 2025
4c6f86c
Update test_literals.py
ForeverAngry Jul 7, 2025
03acf03
fix: update typing-extensions and mkdocs-material versions
ForeverAngry Jul 9, 2025
55a156f
fix: update mkdocs-material and typing-extensions versions
ForeverAngry Jul 9, 2025
6cf08b5
Commit Summary
ForeverAngry Jul 10, 2025
3a5c8e4
fix: remove unused parameter from _get_protected_snapshot_ids method
ForeverAngry Jul 10, 2025
2fc6758
SQLite Connection Cleanup: Added proper cleanup of SQLAlchemy engine …
ForeverAngry Jul 10, 2025
2e7e4cb
fix: remove unnecessary whitespace and improve code formatting in mai…
ForeverAngry Jul 10, 2025
93a79b9
Merge branch 'main' into refactor/consolidate-snapshot-expiration
ForeverAngry Jul 10, 2025
6cfc329
Moved the deduplicate logic found here: https://github.com/apache/ice…
ForeverAngry Jul 15, 2025
ee94c47
Update inspect.py
ForeverAngry Jul 16, 2025
cee4017
Fixed linting issues for the CI/CD Process
ForeverAngry Jul 28, 2025
dfd8a93
chore: remove unused ruff configuration and test file
ForeverAngry Jul 28, 2025
6f1d1a7
add back ruff.toml
kevinjqliu Jul 30, 2025
8c66829
refactor: introduce ExpireSnapshots builder for snapshot expiration
ForeverAngry Jul 31, 2025
c30ea6e
reverted `inspect.py` to be at parity with the main branch
ForeverAngry Aug 8, 2025
3591c39
reverted `api.md` changes
ForeverAngry Aug 8, 2025
f13227e
implemented the changes suggest by @kevinjqliu to
ForeverAngry Aug 8, 2025
c5ff202
docs: add table maintenance section with snapshot expiration examples
ForeverAngry Aug 9, 2025
af78e52
refactor: rename expire_snapshot_by_id to by_id to align back with wh…
ForeverAngry Aug 10, 2025
1c2f631
feat: implement snapshot expiration functionality with tests for prot…
ForeverAngry Aug 11, 2025
64ba8f0
feat: add methods to expire snapshots by IDs and older than a timesta…
ForeverAngry Aug 11, 2025
bf9427a
refactor: rename snapshot expiration methods for clarity and consistency
ForeverAngry Aug 11, 2025
1b3a95c
refactor: update snapshot expiration method calls for consistency and…
ForeverAngry Aug 11, 2025
cab890f
fixed: unrelated changes
ForeverAngry Aug 11, 2025
4df0e83
fixed: unrelated changes
ForeverAngry Aug 11, 2025
44da743
fix: update error messages for protected snapshot expiration tests fo…
ForeverAngry Aug 11, 2025
36f89e6
refactor: remove outdated snapshot expiration documentation for clarity
ForeverAngry Aug 11, 2025
d6ec64d
linted update
ForeverAngry Aug 11, 2025
3ba85f0
resolve merge conflict
kevinjqliu Aug 11, 2025
c980a16
extra?
kevinjqliu Aug 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1287,6 +1287,47 @@ with table.manage_snapshots() as ms:
ms.create_branch(snapshot_id1, "Branch_A").create_tag(snapshot_id2, "tag789")
```

## Table Maintenance

PyIceberg provides table maintenance operations through the `table.maintenance` API. This provides a clean interface for performing maintenance tasks like snapshot expiration.

### Snapshot Expiration

Expire old snapshots to clean up table metadata and reduce storage costs:

```python
# Basic usage - expire a specific snapshot by ID
table.maintenance.expire_snapshots().by_id(12345).commit()

# Context manager usage (recommended for multiple operations)
with table.maintenance.expire_snapshots() as expire:
expire.by_id(12345)
expire.by_id(67890)
# Automatically commits when exiting the context

# Method chaining
table.maintenance.expire_snapshots().by_id(12345).commit()
Comment on lines +1307 to +1309
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same example as above?

Suggested change
# Method chaining
table.maintenance.expire_snapshots().by_id(12345).commit()

```

#### Real-world Example

```python
def cleanup_old_snapshots(table_name: str, snapshot_ids: list[int]):
"""Remove specific snapshots from a table."""
catalog = load_catalog("production")
table = catalog.load_table(table_name)

# Use context manager for safe transaction handling
with table.maintenance.expire_snapshots() as expire:
for snapshot_id in snapshot_ids:
expire.by_id(snapshot_id)
Comment on lines +1322 to +1323
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use the by_ids? This makes the example a bit more consise:

Suggested change
for snapshot_id in snapshot_ids:
expire.by_id(snapshot_id)
expire.by_ids(snapshot_ids)


print(f"Expired {len(snapshot_ids)} snapshots from {table_name}")

# Usage
cleanup_old_snapshots("analytics.user_events", [12345, 67890, 11111])
```

## Views

PyIceberg supports view operations.
Expand Down
16 changes: 11 additions & 5 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
from pyiceberg.schema import Schema
from pyiceberg.table.inspect import InspectTable
from pyiceberg.table.locations import LocationProvider, load_location_provider
from pyiceberg.table.maintenance import MaintenanceTable
from pyiceberg.table.metadata import (
INITIAL_SEQUENCE_NUMBER,
TableMetadata,
Expand Down Expand Up @@ -115,7 +116,7 @@
update_table_metadata,
)
from pyiceberg.table.update.schema import UpdateSchema
from pyiceberg.table.update.snapshot import ExpireSnapshots, ManageSnapshots, UpdateSnapshot, _FastAppendFiles
from pyiceberg.table.update.snapshot import ManageSnapshots, UpdateSnapshot, _FastAppendFiles
from pyiceberg.table.update.spec import UpdateSpec
from pyiceberg.table.update.statistics import UpdateStatistics
from pyiceberg.transforms import IdentityTransform
Expand Down Expand Up @@ -1069,6 +1070,15 @@ def inspect(self) -> InspectTable:
"""
return InspectTable(self)

@property
def maintenance(self) -> MaintenanceTable:
"""Return the MaintenanceTable object for maintenance.

Returns:
MaintenanceTable object based on this Table.
"""
return MaintenanceTable(self)

def refresh(self) -> Table:
"""Refresh the current table metadata.

Expand Down Expand Up @@ -1241,10 +1251,6 @@ def manage_snapshots(self) -> ManageSnapshots:
"""
return ManageSnapshots(transaction=Transaction(self, autocommit=True))

def expire_snapshots(self) -> ExpireSnapshots:
"""Shorthand to run expire snapshots by id or by a timestamp."""
return ExpireSnapshots(transaction=Transaction(self, autocommit=True))

def update_statistics(self) -> UpdateStatistics:
"""
Shorthand to run statistics management operations like add statistics and remove statistics.
Expand Down
45 changes: 45 additions & 0 deletions pyiceberg/table/maintenance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import logging
from typing import TYPE_CHECKING

logger = logging.getLogger(__name__)


if TYPE_CHECKING:
from pyiceberg.table import Table
from pyiceberg.table.update.snapshot import ExpireSnapshots


class MaintenanceTable:
tbl: Table

def __init__(self, tbl: Table) -> None:
self.tbl = tbl

def expire_snapshots(self) -> ExpireSnapshots:
"""Return an ExpireSnapshots builder for snapshot expiration operations.

Returns:
ExpireSnapshots builder for configuring and executing snapshot expiration.
"""
from pyiceberg.table import Transaction
from pyiceberg.table.update.snapshot import ExpireSnapshots

return ExpireSnapshots(transaction=Transaction(self.tbl, autocommit=True))
11 changes: 5 additions & 6 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -907,8 +907,7 @@ def remove_branch(self, branch_name: str) -> ManageSnapshots:


class ExpireSnapshots(UpdateTableMetadata["ExpireSnapshots"]):
"""
Expire snapshots by ID.
"""Expire snapshots by ID.

Use table.expire_snapshots().<operation>().commit() to run a specific operation.
Use table.expire_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations.
Expand Down Expand Up @@ -953,7 +952,7 @@ def _get_protected_snapshot_ids(self) -> Set[int]:

return protected_ids

def expire_snapshot_by_id(self, snapshot_id: int) -> ExpireSnapshots:
def by_id(self, snapshot_id: int) -> ExpireSnapshots:
"""
Expire a snapshot by its ID.

Expand All @@ -974,7 +973,7 @@ def expire_snapshot_by_id(self, snapshot_id: int) -> ExpireSnapshots:

return self

def expire_snapshots_by_ids(self, snapshot_ids: List[int]) -> "ExpireSnapshots":
def by_ids(self, snapshot_ids: List[int]) -> "ExpireSnapshots":
"""
Expire multiple snapshots by their IDs.

Expand All @@ -986,10 +985,10 @@ def expire_snapshots_by_ids(self, snapshot_ids: List[int]) -> "ExpireSnapshots":
This for method chaining.
"""
for snapshot_id in snapshot_ids:
self.expire_snapshot_by_id(snapshot_id)
self.by_id(snapshot_id)
return self

def expire_snapshots_older_than(self, timestamp_ms: int) -> "ExpireSnapshots":
def older_than(self, timestamp_ms: int) -> "ExpireSnapshots":
"""
Expire all unprotected snapshots with a timestamp older than a given value.

Expand Down
12 changes: 6 additions & 6 deletions tests/table/test_expire_snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def test_cannot_expire_protected_head_snapshot(table_v2: Table) -> None:

# Attempt to expire the HEAD snapshot and expect a ValueError
with pytest.raises(ValueError, match=f"Snapshot with ID {HEAD_SNAPSHOT} is protected and cannot be expired."):
table_v2.expire_snapshots().expire_snapshot_by_id(HEAD_SNAPSHOT).commit()
table_v2.maintenance.expire_snapshots().by_id(HEAD_SNAPSHOT).commit()

table_v2.catalog.commit_table.assert_not_called()

Expand All @@ -66,7 +66,7 @@ def test_cannot_expire_tagged_snapshot(table_v2: Table) -> None:
assert any(ref.snapshot_id == TAGGED_SNAPSHOT for ref in table_v2.metadata.refs.values())

with pytest.raises(ValueError, match=f"Snapshot with ID {TAGGED_SNAPSHOT} is protected and cannot be expired."):
table_v2.expire_snapshots().expire_snapshot_by_id(TAGGED_SNAPSHOT).commit()
table_v2.maintenance.expire_snapshots().by_id(TAGGED_SNAPSHOT).commit()

table_v2.catalog.commit_table.assert_not_called()

Expand Down Expand Up @@ -98,7 +98,7 @@ def test_expire_unprotected_snapshot(table_v2: Table) -> None:
assert all(ref.snapshot_id != EXPIRE_SNAPSHOT for ref in table_v2.metadata.refs.values())

# Expire the snapshot
table_v2.expire_snapshots().expire_snapshot_by_id(EXPIRE_SNAPSHOT).commit()
table_v2.maintenance.expire_snapshots().by_id(EXPIRE_SNAPSHOT).commit()

table_v2.catalog.commit_table.assert_called_once()
remaining_snapshots = table_v2.metadata.snapshots
Expand All @@ -114,7 +114,7 @@ def test_expire_nonexistent_snapshot_raises(table_v2: Table) -> None:
table_v2.metadata = table_v2.metadata.model_copy(update={"refs": {}})

with pytest.raises(ValueError, match=f"Snapshot with ID {NONEXISTENT_SNAPSHOT} does not exist."):
table_v2.expire_snapshots().expire_snapshot_by_id(NONEXISTENT_SNAPSHOT).commit()
table_v2.maintenance.expire_snapshots().by_id(NONEXISTENT_SNAPSHOT).commit()

table_v2.catalog.commit_table.assert_not_called()

Expand Down Expand Up @@ -152,7 +152,7 @@ def test_expire_snapshots_by_timestamp_skips_protected(table_v2: Table) -> None:
)
table_v2.catalog.commit_table.return_value = mock_response

table_v2.expire_snapshots().expire_snapshots_older_than(future_timestamp).commit()
table_v2.maintenance.expire_snapshots().older_than(future_timestamp).commit()
# Update metadata to reflect the commit (as in other tests)
table_v2.metadata = mock_response.metadata

Expand Down Expand Up @@ -215,7 +215,7 @@ def test_expire_snapshots_by_ids(table_v2: Table) -> None:
assert all(ref.snapshot_id not in (EXPIRE_SNAPSHOT_1, EXPIRE_SNAPSHOT_2) for ref in table_v2.metadata.refs.values())

# Expire the snapshots
table_v2.expire_snapshots().expire_snapshots_by_ids([EXPIRE_SNAPSHOT_1, EXPIRE_SNAPSHOT_2]).commit()
table_v2.maintenance.expire_snapshots().by_ids([EXPIRE_SNAPSHOT_1, EXPIRE_SNAPSHOT_2]).commit()

table_v2.catalog.commit_table.assert_called_once()
remaining_snapshots = table_v2.metadata.snapshots
Expand Down