Skip to content

Commit 8d31af0

Browse files
committed
Add RemovePartitionStatisticsUpdate and SetPartitionStatisticsUpdate
events This allows us to add, update, and remove partition statistics files.
1 parent 479e663 commit 8d31af0

File tree

3 files changed

+127
-2
lines changed

3 files changed

+127
-2
lines changed

pyiceberg/table/statistics.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,10 @@ def filter_statistics_by_snapshot_id(
5252
reject_snapshot_id: int,
5353
) -> List[StatisticsFile]:
5454
return [stat for stat in statistics if stat.snapshot_id != reject_snapshot_id]
55+
56+
57+
def filter_partition_statistics_by_snapshot_id(
58+
statistics: List[PartitionStatisticsFile],
59+
reject_snapshot_id: int,
60+
) -> List[PartitionStatisticsFile]:
61+
return [stat for stat in statistics if stat.snapshot_id != reject_snapshot_id]

pyiceberg/table/update/__init__.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,12 @@
3636
SnapshotLogEntry,
3737
)
3838
from pyiceberg.table.sorting import SortOrder
39-
from pyiceberg.table.statistics import StatisticsFile, filter_statistics_by_snapshot_id
39+
from pyiceberg.table.statistics import (
40+
PartitionStatisticsFile,
41+
StatisticsFile,
42+
filter_partition_statistics_by_snapshot_id,
43+
filter_statistics_by_snapshot_id,
44+
)
4045
from pyiceberg.typedef import (
4146
IcebergBaseModel,
4247
Properties,
@@ -198,6 +203,16 @@ class RemoveStatisticsUpdate(IcebergBaseModel):
198203
snapshot_id: int = Field(alias="snapshot-id")
199204

200205

206+
class SetPartitionStatisticsUpdate(IcebergBaseModel):
207+
action: Literal["set-partition-statistics"] = Field(default="set-partition-statistics")
208+
partition_statistics: PartitionStatisticsFile
209+
210+
211+
class RemovePartitionStatisticsUpdate(IcebergBaseModel):
212+
action: Literal["remove-partition-statistics"] = Field(default="remove-partition-statistics")
213+
snapshot_id: int = Field(alias="snapshot-id")
214+
215+
201216
TableUpdate = Annotated[
202217
Union[
203218
AssignUUIDUpdate,
@@ -217,6 +232,8 @@ class RemoveStatisticsUpdate(IcebergBaseModel):
217232
RemovePropertiesUpdate,
218233
SetStatisticsUpdate,
219234
RemoveStatisticsUpdate,
235+
SetPartitionStatisticsUpdate,
236+
RemovePartitionStatisticsUpdate,
220237
],
221238
Field(discriminator="action"),
222239
]
@@ -582,6 +599,29 @@ def _(update: RemoveStatisticsUpdate, base_metadata: TableMetadata, context: _Ta
582599
return base_metadata.model_copy(update={"statistics": statistics})
583600

584601

602+
@_apply_table_update.register(SetPartitionStatisticsUpdate)
603+
def _(update: SetPartitionStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
604+
partition_statistics = filter_partition_statistics_by_snapshot_id(
605+
base_metadata.partition_statistics, update.partition_statistics.snapshot_id
606+
)
607+
context.add_update(update)
608+
609+
return base_metadata.model_copy(update={"partition_statistics": partition_statistics + [update.partition_statistics]})
610+
611+
612+
@_apply_table_update.register(RemovePartitionStatisticsUpdate)
613+
def _(
614+
update: RemovePartitionStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext
615+
) -> TableMetadata:
616+
if not any(part_stat.snapshot_id == update.snapshot_id for part_stat in base_metadata.partition_statistics):
617+
raise ValueError(f"Partition Statistics with snapshot id {update.snapshot_id} does not exist")
618+
619+
statistics = filter_partition_statistics_by_snapshot_id(base_metadata.partition_statistics, update.snapshot_id)
620+
context.add_update(update)
621+
622+
return base_metadata.model_copy(update={"partition_statistics": statistics})
623+
624+
585625
def update_table_metadata(
586626
base_metadata: TableMetadata,
587627
updates: Tuple[TableUpdate, ...],

tests/table/test_init.py

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
SortField,
6565
SortOrder,
6666
)
67-
from pyiceberg.table.statistics import BlobMetadata, StatisticsFile
67+
from pyiceberg.table.statistics import BlobMetadata, PartitionStatisticsFile, StatisticsFile
6868
from pyiceberg.table.update import (
6969
AddSnapshotUpdate,
7070
AddSortOrderUpdate,
@@ -76,11 +76,13 @@
7676
AssertLastAssignedPartitionId,
7777
AssertRefSnapshotId,
7878
AssertTableUUID,
79+
RemovePartitionStatisticsUpdate,
7980
RemovePropertiesUpdate,
8081
RemoveSnapshotRefUpdate,
8182
RemoveSnapshotsUpdate,
8283
RemoveStatisticsUpdate,
8384
SetDefaultSortOrderUpdate,
85+
SetPartitionStatisticsUpdate,
8486
SetPropertiesUpdate,
8587
SetSnapshotRefUpdate,
8688
SetStatisticsUpdate,
@@ -1359,3 +1361,79 @@ def test_remove_statistics_update(table_v2_with_statistics: Table) -> None:
13591361
table_v2_with_statistics.metadata,
13601362
(RemoveStatisticsUpdate(snapshot_id=123456789),),
13611363
)
1364+
1365+
1366+
def test_set_partition_statistics_update(table_v2_with_statistics: Table) -> None:
1367+
snapshot_id = table_v2_with_statistics.metadata.current_snapshot_id
1368+
1369+
partition_statistics_file = PartitionStatisticsFile(
1370+
snapshot_id=snapshot_id,
1371+
statistics_path="s3://bucket/warehouse/stats.puffin",
1372+
file_size_in_bytes=124,
1373+
)
1374+
1375+
update = SetPartitionStatisticsUpdate(
1376+
partition_statistics=partition_statistics_file,
1377+
)
1378+
1379+
new_metadata = update_table_metadata(
1380+
table_v2_with_statistics.metadata,
1381+
(update,),
1382+
)
1383+
1384+
expected = """
1385+
{
1386+
"snapshot-id": 3055729675574597004,
1387+
"statistics-path": "s3://bucket/warehouse/stats.puffin",
1388+
"file-size-in-bytes": 124
1389+
}"""
1390+
1391+
assert len(new_metadata.partition_statistics) == 1
1392+
1393+
updated_statistics = [stat for stat in new_metadata.partition_statistics if stat.snapshot_id == snapshot_id]
1394+
1395+
assert len(updated_statistics) == 1
1396+
assert json.loads(updated_statistics[0].model_dump_json()) == json.loads(expected)
1397+
1398+
1399+
def test_remove_partition_statistics_update(table_v2_with_statistics: Table) -> None:
1400+
# Add partition statistics file.
1401+
snapshot_id = table_v2_with_statistics.metadata.current_snapshot_id
1402+
1403+
partition_statistics_file = PartitionStatisticsFile(
1404+
snapshot_id=snapshot_id,
1405+
statistics_path="s3://bucket/warehouse/stats.puffin",
1406+
file_size_in_bytes=124,
1407+
)
1408+
1409+
update = SetPartitionStatisticsUpdate(
1410+
partition_statistics=partition_statistics_file,
1411+
)
1412+
1413+
new_metadata = update_table_metadata(
1414+
table_v2_with_statistics.metadata,
1415+
(update,),
1416+
)
1417+
assert len(new_metadata.partition_statistics) == 1
1418+
1419+
# Remove the same partition statistics file.
1420+
remove_update = RemovePartitionStatisticsUpdate(snapshot_id=snapshot_id)
1421+
1422+
remove_metadata = update_table_metadata(
1423+
new_metadata,
1424+
(remove_update,),
1425+
)
1426+
1427+
assert len(remove_metadata.partition_statistics) == 0
1428+
1429+
1430+
def test_remove_partition_statistics_update_with_invalid_snapshot_id(table_v2_with_statistics: Table) -> None:
1431+
# Remove the same partition statistics file.
1432+
with pytest.raises(
1433+
ValueError,
1434+
match="Partition Statistics with snapshot id 123456789 does not exist",
1435+
):
1436+
update_table_metadata(
1437+
table_v2_with_statistics.metadata,
1438+
(RemovePartitionStatisticsUpdate(snapshot_id=123456789),),
1439+
)

0 commit comments

Comments
 (0)