From be914fd58d2bbf1c5421aabd3abaaa4efb0482d3 Mon Sep 17 00:00:00 2001 From: geruh Date: Thu, 16 Oct 2025 20:56:16 -0700 Subject: [PATCH 1/4] Add support for v3 snapshot metadata fields --- pyiceberg/table/snapshots.py | 43 +++++++++++++++++++- tests/table/test_snapshots.py | 75 +++++++++++++++++++++++++++++++++++ 2 files changed, 117 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 60ad7219e1..ad47a2435a 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -22,7 +22,7 @@ from enum import Enum from typing import TYPE_CHECKING, Any, DefaultDict, Dict, Iterable, List, Mapping, Optional -from pydantic import Field, PrivateAttr, model_serializer +from pydantic import Field, PrivateAttr, field_validator, model_serializer, model_validator from pyiceberg.io import FileIO from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, _manifests @@ -237,14 +237,55 @@ def __eq__(self, other: Any) -> bool: class Snapshot(IcebergBaseModel): + """Represents a snapshot of an Iceberg table at a specific point in time. + + A snapshot tracks the state of a table, including all data and delete files, + at the time the snapshot was created. + """ + snapshot_id: int = Field(alias="snapshot-id") parent_snapshot_id: Optional[int] = Field(alias="parent-snapshot-id", default=None) sequence_number: Optional[int] = Field(alias="sequence-number", default=INITIAL_SEQUENCE_NUMBER) timestamp_ms: int = Field(alias="timestamp-ms", default_factory=lambda: int(time.time() * 1000)) manifest_list: str = Field(alias="manifest-list", description="Location of the snapshot's manifest list file") + first_row_id: Optional[int] = Field( + alias="first-row-id", + default=None, + description="The row-id of the first newly added row in this snapshot. Returns None when row lineage is not supported.", + ) + added_rows: Optional[int] = Field( + alias="added-rows", + default=None, + description=( + "The upper bound of number of rows with assigned row IDs in this snapshot. Returns None if the value was not stored." + ), + ) summary: Optional[Summary] = Field(default=None) schema_id: Optional[int] = Field(alias="schema-id", default=None) + @field_validator("first_row_id") + @classmethod + def validate_first_row_id(cls, v: Optional[int]) -> Optional[int]: + """Validate that first_row_id is non-negative if provided.""" + if v is not None and v < 0: + raise ValueError(f"Invalid first-row-id (cannot be negative): {v}") + return v + + @field_validator("added_rows") + @classmethod + def validate_added_rows(cls, v: Optional[int]) -> Optional[int]: + """Validate that added_rows is non-negative if provided.""" + if v is not None and v < 0: + raise ValueError(f"Invalid added-rows (cannot be negative): {v}") + return v + + @model_validator(mode="after") + def validate_row_lineage_fields(self) -> "Snapshot": + """Validate that added_rows is required when first_row_id is set.""" + if self.first_row_id is not None and self.added_rows is None: + raise ValueError("Invalid added-rows (required when first-row-id is set): None") + return self + def __str__(self) -> str: """Return the string representation of the Snapshot class.""" operation = f"{self.summary.operation}: " if self.summary else "" diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index d26562ad8f..91a743820f 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -456,3 +456,78 @@ def test_ancestors_between(table_v2_with_extensive_snapshots: Table) -> None: ) == 2000 ) + + +def test_snapshot_v3_fields() -> None: + snapshot = Snapshot( + **{ + "snapshot-id": 1, + "timestamp-ms": 1234567890, + "manifest-list": "s3:/a/b/c.avro", + "first-row-id": 100, + "added-rows": 1000, + "summary": {"operation": "append"}, + } + ) + + assert snapshot.first_row_id == 100 + assert snapshot.added_rows == 1000 + + actual = snapshot.model_dump_json() + expected = """{"snapshot-id":1,"sequence-number":0,"timestamp-ms":1234567890,"manifest-list":"s3:/a/b/c.avro","first-row-id":100,"added-rows":1000,"summary":{"operation":"append"}}""" + assert actual == expected + + +def test_snapshot_v3_fields_validation_negative_first_row_id() -> None: + with pytest.raises(ValueError, match="Invalid first-row-id \\(cannot be negative\\): -1"): + Snapshot( + **{ + "snapshot-id": 1, + "timestamp-ms": 1234567890, + "manifest-list": "s3:/a/b/c.avro", + "first-row-id": -1, + "added-rows": 1000, + "summary": {"operation": "append"}, + } + ) + + +def test_snapshot_v3_fields_validation_negative_added_rows() -> None: + with pytest.raises(ValueError, match="Invalid added-rows \\(cannot be negative\\): -1"): + Snapshot( + **{ + "snapshot-id": 1, + "timestamp-ms": 1234567890, + "manifest-list": "s3:/a/b/c.avro", + "first-row-id": 100, + "added-rows": -1, + "summary": {"operation": "append"}, + } + ) + + +def test_snapshot_v3_fields_validation_first_row_id_requires_added_rows() -> None: + with pytest.raises(ValueError, match="Invalid added-rows \\(required when first-row-id is set\\): None"): + Snapshot( + **{ + "snapshot-id": 1, + "timestamp-ms": 1234567890, + "manifest-list": "s3:/a/b/c.avro", + "first-row-id": 100, + "summary": {"operation": "append"}, + } + ) + + +def test_snapshot_v3_fields_added_rows_without_first_row_id() -> None: + snapshot = Snapshot( + **{ + "snapshot-id": 1, + "timestamp-ms": 1234567890, + "manifest-list": "s3:/a/b/c.avro", + "added-rows": 1000, + "summary": {"operation": "append"}, + } + ) + assert snapshot.first_row_id is None + assert snapshot.added_rows == 1000 From cc90fa250d42e90f3e5583a8d7addf939892fc80 Mon Sep 17 00:00:00 2001 From: geruh Date: Thu, 16 Oct 2025 23:28:07 -0700 Subject: [PATCH 2/4] remove comment --- tests/catalog/test_drew.py | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 tests/catalog/test_drew.py diff --git a/tests/catalog/test_drew.py b/tests/catalog/test_drew.py new file mode 100644 index 0000000000..fa707d38f1 --- /dev/null +++ b/tests/catalog/test_drew.py @@ -0,0 +1,36 @@ +import pyarrow as pa + +from pyiceberg.catalog.glue import GlueCatalog + +# pd.set_option('display.max_rows', None) + + +def test(): + catalog = GlueCatalog("drew") + table_name = "demo1.vec" + # + # catalog = load_catalog( + # "catalog_name", + # **{ + # "type": "rest", + # "warehouse":"arn:aws:s3tables:us-east-1:580974493829:bucket/drew", + # "uri": "https://s3tables.us-east-1.amazonaws.com/iceberg", + # "rest.sigv4-enabled": "true", + # "rest.signing-name": "s3tables", + # "rest.signing-region": "us-east-1" + # } + # ) + + df = pa.Table.from_pylist( + [ + {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, + {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, + {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, + {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, + {"city": "Paris", "lat": 48.864716, "long": 2.349014}, + ], + ) + + # tbl = catalog.create_table("demo1.cities2", schema=df.schema, location="s3://dru-iad/demo1/cities2") + table = catalog.load_table(("yeehaw", "lmfao")) + print(table) From acae3012241fa8fb0d29847e4ad57e7ba22fb91c Mon Sep 17 00:00:00 2001 From: geruh Date: Fri, 17 Oct 2025 10:48:03 -0700 Subject: [PATCH 3/4] accidentally pushed up my test class --- pyiceberg/table/snapshots.py | 6 ------ tests/catalog/test_drew.py | 36 ------------------------------------ 2 files changed, 42 deletions(-) delete mode 100644 tests/catalog/test_drew.py diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index ad47a2435a..5b01712c61 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -237,12 +237,6 @@ def __eq__(self, other: Any) -> bool: class Snapshot(IcebergBaseModel): - """Represents a snapshot of an Iceberg table at a specific point in time. - - A snapshot tracks the state of a table, including all data and delete files, - at the time the snapshot was created. - """ - snapshot_id: int = Field(alias="snapshot-id") parent_snapshot_id: Optional[int] = Field(alias="parent-snapshot-id", default=None) sequence_number: Optional[int] = Field(alias="sequence-number", default=INITIAL_SEQUENCE_NUMBER) diff --git a/tests/catalog/test_drew.py b/tests/catalog/test_drew.py deleted file mode 100644 index fa707d38f1..0000000000 --- a/tests/catalog/test_drew.py +++ /dev/null @@ -1,36 +0,0 @@ -import pyarrow as pa - -from pyiceberg.catalog.glue import GlueCatalog - -# pd.set_option('display.max_rows', None) - - -def test(): - catalog = GlueCatalog("drew") - table_name = "demo1.vec" - # - # catalog = load_catalog( - # "catalog_name", - # **{ - # "type": "rest", - # "warehouse":"arn:aws:s3tables:us-east-1:580974493829:bucket/drew", - # "uri": "https://s3tables.us-east-1.amazonaws.com/iceberg", - # "rest.sigv4-enabled": "true", - # "rest.signing-name": "s3tables", - # "rest.signing-region": "us-east-1" - # } - # ) - - df = pa.Table.from_pylist( - [ - {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, - {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, - {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, - {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, - {"city": "Paris", "lat": 48.864716, "long": 2.349014}, - ], - ) - - # tbl = catalog.create_table("demo1.cities2", schema=df.schema, location="s3://dru-iad/demo1/cities2") - table = catalog.load_table(("yeehaw", "lmfao")) - print(table) From 027be3be9bb122126fbb6d86f9dc6ae7c81553df Mon Sep 17 00:00:00 2001 From: geruh Date: Sat, 18 Oct 2025 18:35:24 -0700 Subject: [PATCH 4/4] fix serialization test --- tests/table/test_snapshots.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index 91a743820f..776aaac838 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -139,7 +139,7 @@ def test_deserialize_snapshot_with_properties(snapshot_with_properties: Snapshot def test_snapshot_repr(snapshot: Snapshot) -> None: assert ( repr(snapshot) - == """Snapshot(snapshot_id=25, parent_snapshot_id=19, sequence_number=200, timestamp_ms=1602638573590, manifest_list='s3:/a/b/c.avro', summary=Summary(Operation.APPEND), schema_id=3)""" + == """Snapshot(snapshot_id=25, parent_snapshot_id=19, sequence_number=200, timestamp_ms=1602638573590, manifest_list='s3:/a/b/c.avro', first_row_id=None, added_rows=None, summary=Summary(Operation.APPEND), schema_id=3)""" ) assert snapshot == eval(repr(snapshot)) @@ -147,7 +147,7 @@ def test_snapshot_repr(snapshot: Snapshot) -> None: def test_snapshot_with_properties_repr(snapshot_with_properties: Snapshot) -> None: assert ( repr(snapshot_with_properties) - == """Snapshot(snapshot_id=25, parent_snapshot_id=19, sequence_number=200, timestamp_ms=1602638573590, manifest_list='s3:/a/b/c.avro', summary=Summary(Operation.APPEND, **{'foo': 'bar'}), schema_id=3)""" + == """Snapshot(snapshot_id=25, parent_snapshot_id=19, sequence_number=200, timestamp_ms=1602638573590, manifest_list='s3:/a/b/c.avro', first_row_id=None, added_rows=None, summary=Summary(Operation.APPEND, **{'foo': 'bar'}), schema_id=3)""" ) assert snapshot_with_properties == eval(repr(snapshot_with_properties))