From 0bf0e8ba920ee7363a9f952b3b9ffad4ad7561c6 Mon Sep 17 00:00:00 2001 From: Yingjian Wu Date: Mon, 21 Apr 2025 10:06:19 -0700 Subject: [PATCH 1/3] refactor partition_summary_limit into SnapshotSummaryCollector constructor --- pyiceberg/table/snapshots.py | 7 ++----- pyiceberg/table/update/snapshot.py | 3 +-- tests/table/test_snapshots.py | 21 ++++++++++++++++++++- 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index af3f040482..a9d89c82bf 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -272,13 +272,10 @@ class SnapshotSummaryCollector: partition_metrics: DefaultDict[str, UpdateMetrics] max_changed_partitions_for_summaries: int - def __init__(self) -> None: + def __init__(self, partition_summary_limit: int = 0) -> None: self.metrics = UpdateMetrics() self.partition_metrics = defaultdict(UpdateMetrics) - self.max_changed_partitions_for_summaries = 0 - - def set_partition_summary_limit(self, limit: int) -> None: - self.max_changed_partitions_for_summaries = limit + self.max_changed_partitions_for_summaries = partition_summary_limit def add_file(self, data_file: DataFile, schema: Schema, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC) -> None: self.metrics.add_file(data_file) diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index b53c331758..a82167744d 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -203,13 +203,12 @@ def _write_delete_manifest() -> List[ManifestFile]: def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary: from pyiceberg.table import TableProperties - ssc = SnapshotSummaryCollector() partition_summary_limit = int( self._transaction.table_metadata.properties.get( TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT ) ) - ssc.set_partition_summary_limit(partition_summary_limit) + ssc = SnapshotSummaryCollector(partition_summary_limit=partition_summary_limit) for data_file in self._added_data_files: ssc.add_file( diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index 24d5f0ffff..8e3b6b31c9 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -201,8 +201,27 @@ def test_snapshot_summary_collector_with_partition() -> None: "changed-partition-count": "2", } + +@pytest.mark.integration +def test_snapshot_summary_collector_with_partition_limit_in_constructor() -> None: + # Given + partition_summary_limit = 10 + ssc = SnapshotSummaryCollector(partition_summary_limit=partition_summary_limit) + + assert ssc.build() == {} + schema = Schema( + NestedField(field_id=1, name="bool_field", field_type=BooleanType(), required=False), + NestedField(field_id=2, name="string_field", field_type=StringType(), required=False), + NestedField(field_id=3, name="int_field", field_type=IntegerType(), required=False), + ) + spec = PartitionSpec(PartitionField(source_id=3, field_id=1001, transform=IdentityTransform(), name="int_field")) + data_file_1 = DataFile(content=DataFileContent.DATA, record_count=100, file_size_in_bytes=1234, partition=Record(int_field=1)) + data_file_2 = DataFile(content=DataFileContent.DATA, record_count=200, file_size_in_bytes=4321, partition=Record(int_field=2)) + # When - ssc.set_partition_summary_limit(10) + ssc.add_file(data_file=data_file_1, schema=schema, partition_spec=spec) + ssc.remove_file(data_file=data_file_1, schema=schema, partition_spec=spec) + ssc.remove_file(data_file=data_file_2, schema=schema, partition_spec=spec) # Then assert ssc.build() == { From f25f406ba5e06c5a6a616e701617b3774cd42e22 Mon Sep 17 00:00:00 2001 From: Yingjian Wu Date: Wed, 23 Apr 2025 08:09:11 -0700 Subject: [PATCH 2/3] address comments --- pyiceberg/table/snapshots.py | 3 +++ tests/table/test_snapshots.py | 16 ++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index a9d89c82bf..1e48126cac 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -277,6 +277,9 @@ def __init__(self, partition_summary_limit: int = 0) -> None: self.partition_metrics = defaultdict(UpdateMetrics) self.max_changed_partitions_for_summaries = partition_summary_limit + def set_partition_summary_limit(self, limit: int) -> None: + self.max_changed_partitions_for_summaries = limit + def add_file(self, data_file: DataFile, schema: Schema, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC) -> None: self.metrics.add_file(data_file) if len(data_file.partition) > 0: diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index 8e3b6b31c9..1997e9f3eb 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -201,6 +201,22 @@ def test_snapshot_summary_collector_with_partition() -> None: "changed-partition-count": "2", } + # When + ssc.set_partition_summary_limit(10) + + # Then + assert ssc.build() == { + "added-files-size": "1234", + "removed-files-size": "5555", + "added-data-files": "1", + "deleted-data-files": "2", + "added-records": "100", + "deleted-records": "300", + "changed-partition-count": "2", + "partitions.int_field=1": "added-files-size=1234,removed-files-size=1234,added-data-files=1,deleted-data-files=1,added-records=100,deleted-records=100", + "partitions.int_field=2": "removed-files-size=4321,deleted-data-files=1,deleted-records=200", + } + @pytest.mark.integration def test_snapshot_summary_collector_with_partition_limit_in_constructor() -> None: From e321bc419e61f1521da5ccac745943d9d5a232ef Mon Sep 17 00:00:00 2001 From: Yingjian Wu Date: Sat, 26 Apr 2025 07:56:48 -0700 Subject: [PATCH 3/3] rebase and fix test --- tests/table/test_snapshots.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index 1997e9f3eb..3f0dae143b 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -231,8 +231,8 @@ def test_snapshot_summary_collector_with_partition_limit_in_constructor() -> Non NestedField(field_id=3, name="int_field", field_type=IntegerType(), required=False), ) spec = PartitionSpec(PartitionField(source_id=3, field_id=1001, transform=IdentityTransform(), name="int_field")) - data_file_1 = DataFile(content=DataFileContent.DATA, record_count=100, file_size_in_bytes=1234, partition=Record(int_field=1)) - data_file_2 = DataFile(content=DataFileContent.DATA, record_count=200, file_size_in_bytes=4321, partition=Record(int_field=2)) + data_file_1 = DataFile.from_args(content=DataFileContent.DATA, record_count=100, file_size_in_bytes=1234, partition=Record(1)) + data_file_2 = DataFile.from_args(content=DataFileContent.DATA, record_count=200, file_size_in_bytes=4321, partition=Record(2)) # When ssc.add_file(data_file=data_file_1, schema=schema, partition_spec=spec)