From 61ae2eaf6291eae3d2190d76636b3562d24ec11f Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Fri, 20 Jun 2025 13:01:29 -0700 Subject: [PATCH 01/10] wip --- pyiceberg/table/metadata.py | 3 ++ pyiceberg/table/snapshots.py | 2 + pyiceberg/table/update/__init__.py | 9 +++- pyiceberg/table/update/snapshot.py | 22 ++++++++ tests/table/test_row_lineage.py | 85 ++++++++++++++++++++++++++++++ 5 files changed, 120 insertions(+), 1 deletion(-) create mode 100644 tests/table/test_row_lineage.py diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index 4fa2235fb9..e9b9821434 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -562,6 +562,9 @@ def construct_refs(self) -> TableMetadata: next_row_id: Optional[int] = Field(alias="next-row-id", default=None) """A long higher than all assigned row IDs; the next snapshot's `first-row-id`.""" + row_lineage: Optional[bool] = Field(alias="row-lineage", default=False) + """Setting whether or not to track the creation and updates to rows in the table.""" + def model_dump_json( self, exclude_none: bool = True, exclude: Optional[Any] = None, by_alias: bool = True, **kwargs: Any ) -> str: diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 60ad7219e1..da7e989090 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -244,6 +244,8 @@ class Snapshot(IcebergBaseModel): manifest_list: str = Field(alias="manifest-list", description="Location of the snapshot's manifest list file") summary: Optional[Summary] = Field(default=None) schema_id: Optional[int] = Field(alias="schema-id", default=None) + first_row_id: Optional[int] = Field(alias="first-row-id", default=None, description="assigned to the first row in the first data file in the first manifest") + added_rows: Optional[int] = Field(alias="added-rows", default=None, description="Sum of the `added_rows_count` from all manifests added in this snapshot.") def __str__(self) -> str: """Return the string representation of the Snapshot class.""" diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 038b952bb3..980871f3b2 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -437,6 +437,13 @@ def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMe f"Cannot add snapshot with sequence number {update.snapshot.sequence_number} " f"older than last sequence number {base_metadata.last_sequence_number}" ) + elif (base_metadata.format_version >= 3 and update.snapshot.first_row_id is None): + raise ValueError("Cannot add snapshot without first row id") + elif (base_metadata.format_version >= 3 and update.snapshot.first_row_id is not None and update.snapshot.first_row_id < base_metadata.next_row_id): + raise ValueError(f"Cannot add a snapshot with first row id smaller than the table's next-row-id {update.snapshot.first_row_id} < {base_metadata.next_row_id}") + + + context.add_update(update) return base_metadata.model_copy( @@ -444,6 +451,7 @@ def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMe "last_updated_ms": update.snapshot.timestamp_ms, "last_sequence_number": update.snapshot.sequence_number, "snapshots": base_metadata.snapshots + [update.snapshot], + "next-row-id": base_metadata.next_row_id + update.snapshot.added_rows } ) @@ -632,7 +640,6 @@ def _(update: RemoveSchemasUpdate, base_metadata: TableMetadata, context: _Table return base_metadata.model_copy(update={"schemas": schemas}) - @_apply_table_update.register(SetPartitionStatisticsUpdate) def _(update: SetPartitionStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata: partition_statistics = filter_statistics_by_snapshot_id( diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 148dacd22f..bcef006b24 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -157,6 +157,19 @@ def delete_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]: self._deleted_data_files.add(data_file) return self + def _calculate_added_rows(self, manifests: List[ManifestFile]) -> int: + """Calculate the number of added rows from a list of manifest files.""" + added_rows = 0 + for manifest in manifests: + if manifest.added_snapshot_id is None or manifest.added_snapshot_id == self._snapshot_id: + if manifest.added_rows_count is None: + raise ValueError( + "Cannot determine number of added rows in snapshot because " + f"the entry for manifest {manifest.manifest_path} is missing the field `added-rows-count`" + ) + added_rows += manifest.added_rows_count + return added_rows + @abstractmethod def _deleted_entries(self) -> List[ManifestEntry]: ... @@ -284,6 +297,13 @@ def _commit(self) -> UpdatesAndRequirements: ) as writer: writer.add_manifests(new_manifests) + added_rows: Optional[int] = None + first_row_id: Optional[int] = None + + if self._transaction.table_metadata.format_version >= 3: + first_row_id = self._transaction.table_metadata.next_row_id + added_rows = self._calculate_added_rows(new_manifests) + snapshot = Snapshot( snapshot_id=self._snapshot_id, parent_snapshot_id=self._parent_snapshot_id, @@ -291,6 +311,8 @@ def _commit(self) -> UpdatesAndRequirements: sequence_number=next_sequence_number, summary=summary, schema_id=self._transaction.table_metadata.current_schema_id, + added_rows=added_rows, + first_row_id=first_row_id, ) add_snapshot_update = AddSnapshotUpdate(snapshot=snapshot) diff --git a/tests/table/test_row_lineage.py b/tests/table/test_row_lineage.py new file mode 100644 index 0000000000..24b3d328b7 --- /dev/null +++ b/tests/table/test_row_lineage.py @@ -0,0 +1,85 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest +import pyarrow as pa +from pyiceberg.catalog import Catalog +from pyiceberg.schema import Schema, NestedField +from pyiceberg.types import LongType, StringType +from pyiceberg.partitioning import PartitionSpec, UNPARTITIONED_PARTITION_SPEC +from pyiceberg.table import Table + + +def _create_table( + catalog: Catalog, + identifier: str, + schema: Schema, + partition_spec: PartitionSpec, + properties: dict, +) -> Table: + try: + catalog.drop_table(identifier) + except Exception: + pass + return catalog.create_table(identifier, schema, partition_spec=partition_spec, properties=properties) + + +@pytest.mark.integration +def test_deletes(session_catalog: Catalog): + # Create a table. + identifier = "default.test_deletes_table" + schema = Schema( + NestedField(1, "id", LongType()), + NestedField(2, "data", StringType()), + ) + table = _create_table( + session_catalog, + identifier, + schema, + UNPARTITIONED_PARTITION_SPEC, + {"format-version": "3"} + ) + + assert table.metadata.next_row_id == 0 + + # Create 30 rows. + num_rows = 30 + pyarrow_table = pa.Table.from_pylist( + [{"id": i, "data": f"row_{i}"} for i in range(num_rows)], + schema=schema.as_arrow() + ) + + table.append(pyarrow_table) + + # Ensure that the current snapshot has claimed rows [0, 30) + current_snapshot = table.current_snapshot() + assert current_snapshot is not None + assert current_snapshot.first_row_id == 0 + assert table.metadata.next_row_id == num_rows + + table.delete("id >= 0") + + # Deleting a file should create a new snapshot which should inherit last-row-id from the + # previous metadata and not change last-row-id for this metadata. + current_snapshot_after_delete = table.current_snapshot() + assert current_snapshot_after_delete is not None + assert current_snapshot_after_delete.first_row_id == num_rows + assert current_snapshot.added_rows == 0 + assert table.metadata.next_row_id == num_rows + + # Clean up + session_catalog.drop_table(identifier) \ No newline at end of file From 0dc458cab7c9b10e15919ebc43deda59570a570e Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Fri, 20 Jun 2025 13:28:42 -0700 Subject: [PATCH 02/10] looks like I need to be able to write manifests to test this --- tests/table/test_row_lineage.py | 85 --------------------------------- 1 file changed, 85 deletions(-) delete mode 100644 tests/table/test_row_lineage.py diff --git a/tests/table/test_row_lineage.py b/tests/table/test_row_lineage.py deleted file mode 100644 index 24b3d328b7..0000000000 --- a/tests/table/test_row_lineage.py +++ /dev/null @@ -1,85 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import pytest -import pyarrow as pa -from pyiceberg.catalog import Catalog -from pyiceberg.schema import Schema, NestedField -from pyiceberg.types import LongType, StringType -from pyiceberg.partitioning import PartitionSpec, UNPARTITIONED_PARTITION_SPEC -from pyiceberg.table import Table - - -def _create_table( - catalog: Catalog, - identifier: str, - schema: Schema, - partition_spec: PartitionSpec, - properties: dict, -) -> Table: - try: - catalog.drop_table(identifier) - except Exception: - pass - return catalog.create_table(identifier, schema, partition_spec=partition_spec, properties=properties) - - -@pytest.mark.integration -def test_deletes(session_catalog: Catalog): - # Create a table. - identifier = "default.test_deletes_table" - schema = Schema( - NestedField(1, "id", LongType()), - NestedField(2, "data", StringType()), - ) - table = _create_table( - session_catalog, - identifier, - schema, - UNPARTITIONED_PARTITION_SPEC, - {"format-version": "3"} - ) - - assert table.metadata.next_row_id == 0 - - # Create 30 rows. - num_rows = 30 - pyarrow_table = pa.Table.from_pylist( - [{"id": i, "data": f"row_{i}"} for i in range(num_rows)], - schema=schema.as_arrow() - ) - - table.append(pyarrow_table) - - # Ensure that the current snapshot has claimed rows [0, 30) - current_snapshot = table.current_snapshot() - assert current_snapshot is not None - assert current_snapshot.first_row_id == 0 - assert table.metadata.next_row_id == num_rows - - table.delete("id >= 0") - - # Deleting a file should create a new snapshot which should inherit last-row-id from the - # previous metadata and not change last-row-id for this metadata. - current_snapshot_after_delete = table.current_snapshot() - assert current_snapshot_after_delete is not None - assert current_snapshot_after_delete.first_row_id == num_rows - assert current_snapshot.added_rows == 0 - assert table.metadata.next_row_id == num_rows - - # Clean up - session_catalog.drop_table(identifier) \ No newline at end of file From 130a96a4ad9ffb3c7f02049184d404f58ca4b8c3 Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Fri, 20 Jun 2025 13:41:56 -0700 Subject: [PATCH 03/10] linter --- pyiceberg/table/snapshots.py | 8 ++++++-- pyiceberg/table/update/__init__.py | 20 +++++++++++++------- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index da7e989090..cc8b597936 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -244,8 +244,12 @@ class Snapshot(IcebergBaseModel): manifest_list: str = Field(alias="manifest-list", description="Location of the snapshot's manifest list file") summary: Optional[Summary] = Field(default=None) schema_id: Optional[int] = Field(alias="schema-id", default=None) - first_row_id: Optional[int] = Field(alias="first-row-id", default=None, description="assigned to the first row in the first data file in the first manifest") - added_rows: Optional[int] = Field(alias="added-rows", default=None, description="Sum of the `added_rows_count` from all manifests added in this snapshot.") + first_row_id: Optional[int] = Field( + alias="first-row-id", default=None, description="assigned to the first row in the first data file in the first manifest" + ) + added_rows: Optional[int] = Field( + alias="added-rows", default=None, description="Sum of the `added_rows_count` from all manifests added in this snapshot." + ) def __str__(self) -> str: """Return the string representation of the Snapshot class.""" diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 980871f3b2..79edfbfc22 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -437,13 +437,17 @@ def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMe f"Cannot add snapshot with sequence number {update.snapshot.sequence_number} " f"older than last sequence number {base_metadata.last_sequence_number}" ) - elif (base_metadata.format_version >= 3 and update.snapshot.first_row_id is None): + elif base_metadata.format_version >= 3 and update.snapshot.first_row_id is None: raise ValueError("Cannot add snapshot without first row id") - elif (base_metadata.format_version >= 3 and update.snapshot.first_row_id is not None and update.snapshot.first_row_id < base_metadata.next_row_id): - raise ValueError(f"Cannot add a snapshot with first row id smaller than the table's next-row-id {update.snapshot.first_row_id} < {base_metadata.next_row_id}") - - - + elif ( + base_metadata.format_version >= 3 + and update.snapshot.first_row_id is not None + and base_metadata.next_row_id is not None + and update.snapshot.first_row_id < base_metadata.next_row_id + ): + raise ValueError( + f"Cannot add a snapshot with first row id smaller than the table's next-row-id {update.snapshot.first_row_id} < {base_metadata.next_row_id}" + ) context.add_update(update) return base_metadata.model_copy( @@ -451,7 +455,9 @@ def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMe "last_updated_ms": update.snapshot.timestamp_ms, "last_sequence_number": update.snapshot.sequence_number, "snapshots": base_metadata.snapshots + [update.snapshot], - "next-row-id": base_metadata.next_row_id + update.snapshot.added_rows + "next_row_id": base_metadata.next_row_id + update.snapshot.added_rows + if base_metadata.next_row_id is not None and update.snapshot.added_rows is not None + else None, } ) From 54ef773221017f77a3adfac1b3135d04194dd45a Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Fri, 20 Jun 2025 14:04:04 -0700 Subject: [PATCH 04/10] add tests --- tests/conftest.py | 15 +++++++++++- tests/table/test_init.py | 53 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index 21f33858d5..6734932993 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -72,7 +72,7 @@ from pyiceberg.schema import Accessor, Schema from pyiceberg.serializers import ToOutputFile from pyiceberg.table import FileScanTask, Table -from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2 +from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2, TableMetadataV3 from pyiceberg.transforms import DayTransform, IdentityTransform from pyiceberg.types import ( BinaryType, @@ -920,6 +920,7 @@ def generate_snapshot( "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", "location": "s3://bucket/test/location", "last-sequence-number": 34, + "next-row-id": 1, "last-updated-ms": 1602638573590, "last-column-id": 3, "current-schema-id": 1, @@ -2489,6 +2490,18 @@ def table_v2(example_table_metadata_v2: Dict[str, Any]) -> Table: ) +@pytest.fixture +def table_v3(example_table_metadata_v3: Dict[str, Any]) -> Table: + table_metadata = TableMetadataV3(**example_table_metadata_v3) + return Table( + identifier=("database", "table"), + metadata=table_metadata, + metadata_location=f"{table_metadata.location}/uuid.metadata.json", + io=load_file_io(), + catalog=NoopCatalog("NoopCatalog"), + ) + + @pytest.fixture def table_v2_orc(example_table_metadata_v2: Dict[str, Any]) -> Table: import copy diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 95c5d822aa..7fd6ee1f1e 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -1521,3 +1521,56 @@ def test_remove_partition_statistics_update_with_invalid_snapshot_id(table_v2_wi table_v2_with_statistics.metadata, (RemovePartitionStatisticsUpdate(snapshot_id=123456789),), ) + +def test_add_snapshot_update_fails_without_first_row_id(table_v3: Table) -> None: + new_snapshot = Snapshot( + snapshot_id=25, + parent_snapshot_id=19, + sequence_number=200, + timestamp_ms=1602638593590, + manifest_list="s3:/a/b/c.avro", + summary=Summary(Operation.APPEND), + schema_id=3, + ) + + with pytest.raises( + ValueError, + match="Cannot add snapshot without first row id", + ): + update_table_metadata(table_v3.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),)) + + +def test_add_snapshot_update_fails_with_smaller_first_row_id(table_v3: Table) -> None: + new_snapshot = Snapshot( + snapshot_id=25, + parent_snapshot_id=19, + sequence_number=200, + timestamp_ms=1602638593590, + manifest_list="s3:/a/b/c.avro", + summary=Summary(Operation.APPEND), + schema_id=3, + first_row_id=0, + ) + + with pytest.raises( + ValueError, + match="Cannot add a snapshot with first row id smaller than the table's next-row-id", + ): + update_table_metadata(table_v3.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),)) + + +def test_add_snapshot_update_updates_next_row_id(table_v3: Table) -> None: + new_snapshot = Snapshot( + snapshot_id=25, + parent_snapshot_id=19, + sequence_number=200, + timestamp_ms=1602638593590, + manifest_list="s3:/a/b/c.avro", + summary=Summary(Operation.APPEND), + schema_id=3, + first_row_id=2, + added_rows=10, + ) + + new_metadata = update_table_metadata(table_v3.metadata, (AddSnapshotUpdate(snapshot=new_snapshot),)) + assert new_metadata.next_row_id == 11 From 8f8af3ed3a31448f8fed9936ef26cba2176331dd Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Mon, 23 Jun 2025 15:46:24 -0700 Subject: [PATCH 05/10] test fix --- pyiceberg/table/update/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 79edfbfc22..b4dc957772 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -456,7 +456,9 @@ def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMe "last_sequence_number": update.snapshot.sequence_number, "snapshots": base_metadata.snapshots + [update.snapshot], "next_row_id": base_metadata.next_row_id + update.snapshot.added_rows - if base_metadata.next_row_id is not None and update.snapshot.added_rows is not None + if base_metadata.format_version >= 3 + and base_metadata.next_row_id is not None + and update.snapshot.added_rows is not None else None, } ) From 281370b0f6428e189b55229b56395ebf4477e045 Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Wed, 25 Jun 2025 14:10:50 -0700 Subject: [PATCH 06/10] fix PR issues --- pyiceberg/table/metadata.py | 3 --- pyiceberg/table/snapshots.py | 8 +++++--- pyiceberg/table/update/snapshot.py | 3 --- tests/table/test_snapshots.py | 4 ++-- 4 files changed, 7 insertions(+), 11 deletions(-) diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index e9b9821434..4fa2235fb9 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -562,9 +562,6 @@ def construct_refs(self) -> TableMetadata: next_row_id: Optional[int] = Field(alias="next-row-id", default=None) """A long higher than all assigned row IDs; the next snapshot's `first-row-id`.""" - row_lineage: Optional[bool] = Field(alias="row-lineage", default=False) - """Setting whether or not to track the creation and updates to rows in the table.""" - def model_dump_json( self, exclude_none: bool = True, exclude: Optional[Any] = None, by_alias: bool = True, **kwargs: Any ) -> str: diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index cc8b597936..a60c41d829 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -247,9 +247,7 @@ class Snapshot(IcebergBaseModel): first_row_id: Optional[int] = Field( alias="first-row-id", default=None, description="assigned to the first row in the first data file in the first manifest" ) - added_rows: Optional[int] = Field( - alias="added-rows", default=None, description="Sum of the `added_rows_count` from all manifests added in this snapshot." - ) + _added_rows: Optional[int] = PrivateAttr() def __str__(self) -> str: """Return the string representation of the Snapshot class.""" @@ -263,6 +261,10 @@ def manifests(self, io: FileIO) -> List[ManifestFile]: """Return the manifests for the given snapshot.""" return list(_manifests(io, self.manifest_list)) + @property + def added_rows(self) -> Optional[int]: + return self._added_rows + class MetadataLogEntry(IcebergBaseModel): metadata_file: str = Field(alias="metadata-file") diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index bcef006b24..aed7ec0449 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -297,12 +297,10 @@ def _commit(self) -> UpdatesAndRequirements: ) as writer: writer.add_manifests(new_manifests) - added_rows: Optional[int] = None first_row_id: Optional[int] = None if self._transaction.table_metadata.format_version >= 3: first_row_id = self._transaction.table_metadata.next_row_id - added_rows = self._calculate_added_rows(new_manifests) snapshot = Snapshot( snapshot_id=self._snapshot_id, @@ -311,7 +309,6 @@ def _commit(self) -> UpdatesAndRequirements: sequence_number=next_sequence_number, summary=summary, schema_id=self._transaction.table_metadata.current_schema_id, - added_rows=added_rows, first_row_id=first_row_id, ) diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index d26562ad8f..391f96622e 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -139,7 +139,7 @@ def test_deserialize_snapshot_with_properties(snapshot_with_properties: Snapshot def test_snapshot_repr(snapshot: Snapshot) -> None: assert ( repr(snapshot) - == """Snapshot(snapshot_id=25, parent_snapshot_id=19, sequence_number=200, timestamp_ms=1602638573590, manifest_list='s3:/a/b/c.avro', summary=Summary(Operation.APPEND), schema_id=3)""" + == """Snapshot(snapshot_id=25, parent_snapshot_id=19, sequence_number=200, timestamp_ms=1602638573590, manifest_list='s3:/a/b/c.avro', summary=Summary(Operation.APPEND), schema_id=3, first_row_id=None)""" ) assert snapshot == eval(repr(snapshot)) @@ -147,7 +147,7 @@ def test_snapshot_repr(snapshot: Snapshot) -> None: def test_snapshot_with_properties_repr(snapshot_with_properties: Snapshot) -> None: assert ( repr(snapshot_with_properties) - == """Snapshot(snapshot_id=25, parent_snapshot_id=19, sequence_number=200, timestamp_ms=1602638573590, manifest_list='s3:/a/b/c.avro', summary=Summary(Operation.APPEND, **{'foo': 'bar'}), schema_id=3)""" + == """Snapshot(snapshot_id=25, parent_snapshot_id=19, sequence_number=200, timestamp_ms=1602638573590, manifest_list='s3:/a/b/c.avro', summary=Summary(Operation.APPEND, **{'foo': 'bar'}), schema_id=3, first_row_id=None)""" ) assert snapshot_with_properties == eval(repr(snapshot_with_properties)) From f73c8b762dd7c875f02e6c68bb2b5f213fa68aae Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Tue, 21 Oct 2025 13:21:42 -0700 Subject: [PATCH 07/10] add tests --- pyiceberg/table/snapshots.py | 6 +-- tests/integration/test_writes/test_writes.py | 43 +++++++++++++++++++- tests/table/test_snapshots.py | 4 +- 3 files changed, 45 insertions(+), 8 deletions(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index a60c41d829..56cd897e6d 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -247,7 +247,7 @@ class Snapshot(IcebergBaseModel): first_row_id: Optional[int] = Field( alias="first-row-id", default=None, description="assigned to the first row in the first data file in the first manifest" ) - _added_rows: Optional[int] = PrivateAttr() + added_rows: Optional[int] = Field(alias="added-rows", default=None, description="The upper bound of the number of rows with assigned row IDs") def __str__(self) -> str: """Return the string representation of the Snapshot class.""" @@ -261,10 +261,6 @@ def manifests(self, io: FileIO) -> List[ManifestFile]: """Return the manifests for the given snapshot.""" return list(_manifests(io, self.manifest_list)) - @property - def added_rows(self) -> Optional[int]: - return self._added_rows - class MetadataLogEntry(IcebergBaseModel): metadata_file: str = Field(alias="metadata-file") diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index c7d79f2c37..15d3398865 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -64,7 +64,7 @@ StringType, UUIDType, ) -from utils import _create_table +from utils import TABLE_SCHEMA, _create_table @pytest.fixture(scope="session", autouse=True) @@ -2490,3 +2490,44 @@ def test_stage_only_overwrite_files( assert operations == ["append", "append", "delete", "append", "append"] assert parent_snapshot_id == [None, first_snapshot, second_snapshot, second_snapshot, second_snapshot] + + +@pytest.mark.integration +def test_v3_write_and_read(spark: SparkSession, session_catalog: Catalog) -> None: + """Test writing to a v3 table and reading with Spark.""" + identifier = "default.test_v3_write_and_read" + tbl = _create_table(session_catalog, identifier, {"format-version": "3"}) + assert tbl.format_version == 3, f"Expected v3, got: v{tbl.format_version}" + assert tbl.metadata.next_row_id is not None, "Expected next_row_id to be initialized" + initial_next_row_id = tbl.metadata.next_row_id + + test_data = pa.Table.from_pydict( + { + "bool": [True, False, True], + "string": ["a", "b", "c"], + "string_long": ["a_long", "b_long", "c_long"], + "int": [1, 2, 3], + "long": [11, 22, 33], + "float": [1.1, 2.2, 3.3], + "double": [1.11, 2.22, 3.33], + "timestamp": [datetime(2023, 1, 1, 1, 1, 1), datetime(2023, 2, 2, 2, 2, 2), datetime(2023, 3, 3, 3, 3, 3)], + "timestamptz": [ + datetime(2023, 1, 1, 1, 1, 1, tzinfo=pytz.utc), + datetime(2023, 2, 2, 2, 2, 2, tzinfo=pytz.utc), + datetime(2023, 3, 3, 3, 3, 3, tzinfo=pytz.utc), + ], + "date": [date(2023, 1, 1), date(2023, 2, 2), date(2023, 3, 3)], + "binary": [b"\x01", b"\x02", b"\x03"], + "fixed": [b"1234567890123456", b"1234567890123456", b"1234567890123456"], + }, + schema=TABLE_SCHEMA.as_arrow(), + ) + + tbl.append(test_data) + + assert ( + tbl.metadata.next_row_id == initial_next_row_id + len(test_data) + ), "Expected next_row_id to be incremented by the number of added rows" + + df = spark.table(identifier) + assert df.count() == 3, "Expected 3 rows" diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index 391f96622e..d26562ad8f 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -139,7 +139,7 @@ def test_deserialize_snapshot_with_properties(snapshot_with_properties: Snapshot def test_snapshot_repr(snapshot: Snapshot) -> None: assert ( repr(snapshot) - == """Snapshot(snapshot_id=25, parent_snapshot_id=19, sequence_number=200, timestamp_ms=1602638573590, manifest_list='s3:/a/b/c.avro', summary=Summary(Operation.APPEND), schema_id=3, first_row_id=None)""" + == """Snapshot(snapshot_id=25, parent_snapshot_id=19, sequence_number=200, timestamp_ms=1602638573590, manifest_list='s3:/a/b/c.avro', summary=Summary(Operation.APPEND), schema_id=3)""" ) assert snapshot == eval(repr(snapshot)) @@ -147,7 +147,7 @@ def test_snapshot_repr(snapshot: Snapshot) -> None: def test_snapshot_with_properties_repr(snapshot_with_properties: Snapshot) -> None: assert ( repr(snapshot_with_properties) - == """Snapshot(snapshot_id=25, parent_snapshot_id=19, sequence_number=200, timestamp_ms=1602638573590, manifest_list='s3:/a/b/c.avro', summary=Summary(Operation.APPEND, **{'foo': 'bar'}), schema_id=3, first_row_id=None)""" + == """Snapshot(snapshot_id=25, parent_snapshot_id=19, sequence_number=200, timestamp_ms=1602638573590, manifest_list='s3:/a/b/c.avro', summary=Summary(Operation.APPEND, **{'foo': 'bar'}), schema_id=3)""" ) assert snapshot_with_properties == eval(repr(snapshot_with_properties)) From 8b9f467d2e6f545cef1eb817cfc9ea0210c3bea2 Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Tue, 21 Oct 2025 13:25:30 -0700 Subject: [PATCH 08/10] PR comments --- pyiceberg/table/snapshots.py | 20 +++++++++++++++++++- pyiceberg/table/update/__init__.py | 1 + tests/integration/test_writes/test_writes.py | 14 +++++--------- tests/table/test_init.py | 1 + 4 files changed, 26 insertions(+), 10 deletions(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 56cd897e6d..13ce52b7eb 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -247,7 +247,9 @@ class Snapshot(IcebergBaseModel): first_row_id: Optional[int] = Field( alias="first-row-id", default=None, description="assigned to the first row in the first data file in the first manifest" ) - added_rows: Optional[int] = Field(alias="added-rows", default=None, description="The upper bound of the number of rows with assigned row IDs") + added_rows: Optional[int] = Field( + alias="added-rows", default=None, description="The upper bound of the number of rows with assigned row IDs" + ) def __str__(self) -> str: """Return the string representation of the Snapshot class.""" @@ -257,6 +259,22 @@ def __str__(self) -> str: result_str = f"{operation}id={self.snapshot_id}{parent_id}{schema_id}" return result_str + def __repr__(self) -> str: + """Return the string representation of the Snapshot class.""" + fields = [ + f"snapshot_id={self.snapshot_id}", + f"parent_snapshot_id={self.parent_snapshot_id}", + f"sequence_number={self.sequence_number}", + f"timestamp_ms={self.timestamp_ms}", + f"manifest_list='{self.manifest_list}'", + f"summary={repr(self.summary)}" if self.summary else None, + f"schema_id={self.schema_id}" if self.schema_id is not None else None, + f"first_row_id={self.first_row_id}" if self.first_row_id is not None else None, + f"added_rows={self.added_rows}" if self.added_rows is not None else None, + ] + filtered_fields = [field for field in fields if field is not None] + return f"Snapshot({', '.join(filtered_fields)})" + def manifests(self, io: FileIO) -> List[ManifestFile]: """Return the manifests for the given snapshot.""" return list(_manifests(io, self.manifest_list)) diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index b4dc957772..bcbe429688 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -648,6 +648,7 @@ def _(update: RemoveSchemasUpdate, base_metadata: TableMetadata, context: _Table return base_metadata.model_copy(update={"schemas": schemas}) + @_apply_table_update.register(SetPartitionStatisticsUpdate) def _(update: SetPartitionStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata: partition_statistics = filter_statistics_by_snapshot_id( diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 15d3398865..0fa76ab84c 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -2493,13 +2493,12 @@ def test_stage_only_overwrite_files( @pytest.mark.integration -def test_v3_write_and_read(spark: SparkSession, session_catalog: Catalog) -> None: +def test_v3_write_and_read_row_lineage(spark: SparkSession, session_catalog: Catalog) -> None: """Test writing to a v3 table and reading with Spark.""" identifier = "default.test_v3_write_and_read" tbl = _create_table(session_catalog, identifier, {"format-version": "3"}) assert tbl.format_version == 3, f"Expected v3, got: v{tbl.format_version}" - assert tbl.metadata.next_row_id is not None, "Expected next_row_id to be initialized" - initial_next_row_id = tbl.metadata.next_row_id + initial_next_row_id = tbl.metadata.next_row_id or 0 test_data = pa.Table.from_pydict( { @@ -2525,9 +2524,6 @@ def test_v3_write_and_read(spark: SparkSession, session_catalog: Catalog) -> Non tbl.append(test_data) - assert ( - tbl.metadata.next_row_id == initial_next_row_id + len(test_data) - ), "Expected next_row_id to be incremented by the number of added rows" - - df = spark.table(identifier) - assert df.count() == 3, "Expected 3 rows" + assert tbl.metadata.next_row_id == initial_next_row_id + len(test_data), ( + "Expected next_row_id to be incremented by the number of added rows" + ) diff --git a/tests/table/test_init.py b/tests/table/test_init.py index 7fd6ee1f1e..5cc68b62a4 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -1522,6 +1522,7 @@ def test_remove_partition_statistics_update_with_invalid_snapshot_id(table_v2_wi (RemovePartitionStatisticsUpdate(snapshot_id=123456789),), ) + def test_add_snapshot_update_fails_without_first_row_id(table_v3: Table) -> None: new_snapshot = Snapshot( snapshot_id=25, From bc110539b58d9107f754f6804321cec6374e2171 Mon Sep 17 00:00:00 2001 From: Alex Stephen Date: Tue, 21 Oct 2025 14:05:05 -0700 Subject: [PATCH 09/10] add skip on tests --- tests/integration/test_writes/test_writes.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 0fa76ab84c..99a5f415d8 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -2492,6 +2492,7 @@ def test_stage_only_overwrite_files( assert parent_snapshot_id == [None, first_snapshot, second_snapshot, second_snapshot, second_snapshot] +@pytest.skip("V3 writer support is not enabled.") @pytest.mark.integration def test_v3_write_and_read_row_lineage(spark: SparkSession, session_catalog: Catalog) -> None: """Test writing to a v3 table and reading with Spark.""" From bd42a5a6e1da326e528ac06421aeb3116d1d24a6 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 22 Oct 2025 22:49:34 +0200 Subject: [PATCH 10/10] Add `mark` --- tests/integration/test_writes/test_writes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 99a5f415d8..dcd465a7ca 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -2492,7 +2492,7 @@ def test_stage_only_overwrite_files( assert parent_snapshot_id == [None, first_snapshot, second_snapshot, second_snapshot, second_snapshot] -@pytest.skip("V3 writer support is not enabled.") +@pytest.mark.skip("V3 writer support is not enabled.") @pytest.mark.integration def test_v3_write_and_read_row_lineage(spark: SparkSession, session_catalog: Catalog) -> None: """Test writing to a v3 table and reading with Spark."""