diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index af3f040482..1e48126cac 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -272,10 +272,10 @@ class SnapshotSummaryCollector: partition_metrics: DefaultDict[str, UpdateMetrics] max_changed_partitions_for_summaries: int - def __init__(self) -> None: + def __init__(self, partition_summary_limit: int = 0) -> None: self.metrics = UpdateMetrics() self.partition_metrics = defaultdict(UpdateMetrics) - self.max_changed_partitions_for_summaries = 0 + self.max_changed_partitions_for_summaries = partition_summary_limit def set_partition_summary_limit(self, limit: int) -> None: self.max_changed_partitions_for_summaries = limit diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index b53c331758..a82167744d 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -203,13 +203,12 @@ def _write_delete_manifest() -> List[ManifestFile]: def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary: from pyiceberg.table import TableProperties - ssc = SnapshotSummaryCollector() partition_summary_limit = int( self._transaction.table_metadata.properties.get( TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT ) ) - ssc.set_partition_summary_limit(partition_summary_limit) + ssc = SnapshotSummaryCollector(partition_summary_limit=partition_summary_limit) for data_file in self._added_data_files: ssc.add_file( diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index 24d5f0ffff..3f0dae143b 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -218,6 +218,41 @@ def test_snapshot_summary_collector_with_partition() -> None: } +@pytest.mark.integration +def test_snapshot_summary_collector_with_partition_limit_in_constructor() -> None: + # Given + partition_summary_limit = 10 + ssc = SnapshotSummaryCollector(partition_summary_limit=partition_summary_limit) + + assert ssc.build() == {} + schema = Schema( + NestedField(field_id=1, name="bool_field", field_type=BooleanType(), required=False), + NestedField(field_id=2, name="string_field", field_type=StringType(), required=False), + NestedField(field_id=3, name="int_field", field_type=IntegerType(), required=False), + ) + spec = PartitionSpec(PartitionField(source_id=3, field_id=1001, transform=IdentityTransform(), name="int_field")) + data_file_1 = DataFile.from_args(content=DataFileContent.DATA, record_count=100, file_size_in_bytes=1234, partition=Record(1)) + data_file_2 = DataFile.from_args(content=DataFileContent.DATA, record_count=200, file_size_in_bytes=4321, partition=Record(2)) + + # When + ssc.add_file(data_file=data_file_1, schema=schema, partition_spec=spec) + ssc.remove_file(data_file=data_file_1, schema=schema, partition_spec=spec) + ssc.remove_file(data_file=data_file_2, schema=schema, partition_spec=spec) + + # Then + assert ssc.build() == { + "added-files-size": "1234", + "removed-files-size": "5555", + "added-data-files": "1", + "deleted-data-files": "2", + "added-records": "100", + "deleted-records": "300", + "changed-partition-count": "2", + "partitions.int_field=1": "added-files-size=1234,removed-files-size=1234,added-data-files=1,deleted-data-files=1,added-records=100,deleted-records=100", + "partitions.int_field=2": "removed-files-size=4321,deleted-data-files=1,deleted-records=200", + } + + def test_merge_snapshot_summaries_empty() -> None: assert update_snapshot_summaries(Summary(Operation.APPEND)) == Summary( operation=Operation.APPEND,