Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pyiceberg/table/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,10 @@ class SnapshotSummaryCollector:
partition_metrics: DefaultDict[str, UpdateMetrics]
max_changed_partitions_for_summaries: int

def __init__(self) -> None:
def __init__(self, partition_summary_limit: int = 0) -> None:
self.metrics = UpdateMetrics()
self.partition_metrics = defaultdict(UpdateMetrics)
self.max_changed_partitions_for_summaries = 0
self.max_changed_partitions_for_summaries = partition_summary_limit
Copy link
Contributor

@Fokko Fokko Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also keep the method below so people that actually use this, won't get any errors:

Suggested change
self.max_changed_partitions_for_summaries = partition_summary_limit
self.max_changed_partitions_for_summaries = partition_summary_limit
def set_partition_summary_limit(self, limit: int) -> None:
self.max_changed_partitions_for_summaries = limit


def set_partition_summary_limit(self, limit: int) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can deprecate this in favor of the init function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I was thinking about this.
And I think @Fokko makes a good point that people might be using this function

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deprecation means that we first signal the user, see: https://py.iceberg.apache.org/contributing/#api-compatibility, and in the next release, we can remove it. I don't have strong feelings around removing this one.

self.max_changed_partitions_for_summaries = limit
Expand Down
3 changes: 1 addition & 2 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,12 @@ def _write_delete_manifest() -> List[ManifestFile]:
def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary:
from pyiceberg.table import TableProperties

ssc = SnapshotSummaryCollector()
partition_summary_limit = int(
self._transaction.table_metadata.properties.get(
TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
)
)
ssc.set_partition_summary_limit(partition_summary_limit)
ssc = SnapshotSummaryCollector(partition_summary_limit=partition_summary_limit)

for data_file in self._added_data_files:
ssc.add_file(
Expand Down
35 changes: 35 additions & 0 deletions tests/table/test_snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,41 @@ def test_snapshot_summary_collector_with_partition() -> None:
}


@pytest.mark.integration
def test_snapshot_summary_collector_with_partition_limit_in_constructor() -> None:
# Given
partition_summary_limit = 10
ssc = SnapshotSummaryCollector(partition_summary_limit=partition_summary_limit)

assert ssc.build() == {}
schema = Schema(
NestedField(field_id=1, name="bool_field", field_type=BooleanType(), required=False),
NestedField(field_id=2, name="string_field", field_type=StringType(), required=False),
NestedField(field_id=3, name="int_field", field_type=IntegerType(), required=False),
)
spec = PartitionSpec(PartitionField(source_id=3, field_id=1001, transform=IdentityTransform(), name="int_field"))
data_file_1 = DataFile.from_args(content=DataFileContent.DATA, record_count=100, file_size_in_bytes=1234, partition=Record(1))
data_file_2 = DataFile.from_args(content=DataFileContent.DATA, record_count=200, file_size_in_bytes=4321, partition=Record(2))

# When
ssc.add_file(data_file=data_file_1, schema=schema, partition_spec=spec)
ssc.remove_file(data_file=data_file_1, schema=schema, partition_spec=spec)
ssc.remove_file(data_file=data_file_2, schema=schema, partition_spec=spec)

# Then
assert ssc.build() == {
"added-files-size": "1234",
"removed-files-size": "5555",
"added-data-files": "1",
"deleted-data-files": "2",
"added-records": "100",
"deleted-records": "300",
"changed-partition-count": "2",
"partitions.int_field=1": "added-files-size=1234,removed-files-size=1234,added-data-files=1,deleted-data-files=1,added-records=100,deleted-records=100",
"partitions.int_field=2": "removed-files-size=4321,deleted-data-files=1,deleted-records=200",
}


def test_merge_snapshot_summaries_empty() -> None:
assert update_snapshot_summaries(Summary(Operation.APPEND)) == Summary(
operation=Operation.APPEND,
Expand Down