diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index a5515f12b0..af3f040482 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -28,6 +28,7 @@ from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, _manifests from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema +from pyiceberg.utils.deprecated import deprecation_message if TYPE_CHECKING: from pyiceberg.table.metadata import TableMetadata @@ -356,6 +357,11 @@ def update_snapshot_summaries( raise ValueError(f"Operation not implemented: {summary.operation}") if truncate_full_table and summary.operation == Operation.OVERWRITE and previous_summary is not None: + deprecation_message( + deprecated_in="0.10.0", + removed_in="0.11.0", + help_message="The truncate-full-table shouldn't be used.", + ) summary = _truncate_table_summary(summary, previous_summary) if not previous_summary: diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index c705f3b9fd..0aff68520b 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -236,7 +236,6 @@ def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary: return update_snapshot_summaries( summary=Summary(operation=self._operation, **ssc.build(), **snapshot_properties), previous_summary=previous_snapshot.summary if previous_snapshot is not None else None, - truncate_full_table=self._operation == Operation.OVERWRITE, ) def _commit(self) -> UpdatesAndRequirements: diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index ae03beea53..527f659640 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -467,21 +467,19 @@ def test_partitioned_table_positional_deletes_sequence_number(spark: SparkSessio assert snapshots[2].summary == Summary( Operation.OVERWRITE, **{ - "added-files-size": snapshots[2].summary["total-files-size"], "added-data-files": "1", + "added-files-size": snapshots[2].summary["added-files-size"], "added-records": "2", "changed-partition-count": "1", - "total-files-size": snapshots[2].summary["total-files-size"], - "total-delete-files": "0", - "total-data-files": "1", - "total-position-deletes": "0", - "total-records": "2", - "total-equality-deletes": "0", - "deleted-data-files": "2", - "removed-delete-files": "1", - "deleted-records": "5", + "deleted-data-files": "1", + "deleted-records": "3", "removed-files-size": snapshots[2].summary["removed-files-size"], - "removed-position-deletes": "1", + "total-data-files": "2", + "total-delete-files": "1", + "total-equality-deletes": "0", + "total-files-size": snapshots[2].summary["total-files-size"], + "total-position-deletes": "1", + "total-records": "4", }, ) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 8ea2e93cb1..c16a4d40b0 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -250,7 +250,7 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi "total-records": "0", } - # Overwrite + # Append assert summaries[3] == { "added-data-files": "1", "added-files-size": str(file_size), @@ -264,6 +264,99 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi } +@pytest.mark.integration +def test_summaries_partial_overwrite(spark: SparkSession, session_catalog: Catalog) -> None: + identifier = "default.test_summaries_partial_overwrite" + TEST_DATA = { + "id": [1, 2, 3, 1, 1], + "name": ["AB", "CD", "EF", "CD", "EF"], + } + pa_schema = pa.schema( + [ + pa.field("id", pa.int32()), + pa.field("name", pa.string()), + ] + ) + arrow_table = pa.Table.from_pydict(TEST_DATA, schema=pa_schema) + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, schema=pa_schema) + with tbl.update_spec() as txn: + txn.add_identity("id") + tbl.append(arrow_table) + + assert len(tbl.inspect.data_files()) == 3 + + tbl.delete(delete_filter="id == 1 and name = 'AB'") # partial overwrite data from 1 data file + + rows = spark.sql( + f""" + SELECT operation, summary + FROM {identifier}.snapshots + ORDER BY committed_at ASC + """ + ).collect() + + operations = [row.operation for row in rows] + assert operations == ["append", "overwrite"] + + summaries = [row.summary for row in rows] + + file_size = int(summaries[0]["added-files-size"]) + assert file_size > 0 + + # APPEND + assert summaries[0] == { + "added-data-files": "3", + "added-files-size": "2570", + "added-records": "5", + "changed-partition-count": "3", + "total-data-files": "3", + "total-delete-files": "0", + "total-equality-deletes": "0", + "total-files-size": "2570", + "total-position-deletes": "0", + "total-records": "5", + } + # Java produces: + # { + # "added-data-files": "1", + # "added-files-size": "707", + # "added-records": "2", + # "app-id": "local-1743678304626", + # "changed-partition-count": "1", + # "deleted-data-files": "1", + # "deleted-records": "3", + # "engine-name": "spark", + # "engine-version": "3.5.5", + # "iceberg-version": "Apache Iceberg 1.8.1 (commit 9ce0fcf0af7becf25ad9fc996c3bad2afdcfd33d)", + # "removed-files-size": "693", + # "spark.app.id": "local-1743678304626", + # "total-data-files": "3", + # "total-delete-files": "0", + # "total-equality-deletes": "0", + # "total-files-size": "1993", + # "total-position-deletes": "0", + # "total-records": "4" + # } + files = tbl.inspect.data_files() + assert len(files) == 3 + assert summaries[1] == { + "added-data-files": "1", + "added-files-size": "859", + "added-records": "2", + "changed-partition-count": "1", + "deleted-data-files": "1", + "deleted-records": "3", + "removed-files-size": "866", + "total-data-files": "3", + "total-delete-files": "0", + "total-equality-deletes": "0", + "total-files-size": "2563", + "total-position-deletes": "0", + "total-records": "4", + } + assert len(tbl.scan().to_pandas()) == 4 + + @pytest.mark.integration def test_data_files(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: identifier = "default.arrow_data_files" diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index b4dde217d4..f9f7fb026e 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -289,7 +289,6 @@ def test_merge_snapshot_summaries_overwrite_summary() -> None: "total-position-deletes": "1", "total-records": "1", }, - truncate_full_table=True, ) expected = { @@ -299,18 +298,12 @@ def test_merge_snapshot_summaries_overwrite_summary() -> None: "added-files-size": "4", "added-position-deletes": "5", "added-records": "6", - "total-data-files": "1", - "total-records": "6", - "total-delete-files": "2", - "total-equality-deletes": "3", - "total-files-size": "4", - "total-position-deletes": "5", - "deleted-data-files": "1", - "removed-delete-files": "1", - "deleted-records": "1", - "removed-files-size": "1", - "removed-position-deletes": "1", - "removed-equality-deletes": "1", + "total-data-files": "2", + "total-delete-files": "3", + "total-records": "7", + "total-files-size": "5", + "total-position-deletes": "6", + "total-equality-deletes": "4", } assert actual.additional_properties == expected @@ -337,7 +330,6 @@ def test_invalid_type() -> None: }, ), previous_summary={"total-data-files": "abc"}, # should be a number - truncate_full_table=True, ) assert "Could not parse summary property total-data-files to an int: abc" in str(e.value)