Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion pyiceberg/table/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from enum import Enum
from typing import TYPE_CHECKING, Any, DefaultDict, Dict, Iterable, List, Mapping, Optional

from pydantic import Field, PrivateAttr, model_serializer
from pydantic import Field, PrivateAttr, field_validator, model_serializer, model_validator

from pyiceberg.io import FileIO
from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, _manifests
Expand Down Expand Up @@ -242,9 +242,44 @@ class Snapshot(IcebergBaseModel):
sequence_number: Optional[int] = Field(alias="sequence-number", default=INITIAL_SEQUENCE_NUMBER)
timestamp_ms: int = Field(alias="timestamp-ms", default_factory=lambda: int(time.time() * 1000))
manifest_list: str = Field(alias="manifest-list", description="Location of the snapshot's manifest list file")
first_row_id: Optional[int] = Field(
alias="first-row-id",
default=None,
description="The row-id of the first newly added row in this snapshot. Returns None when row lineage is not supported.",
)
added_rows: Optional[int] = Field(
alias="added-rows",
default=None,
description=(
"The upper bound of number of rows with assigned row IDs in this snapshot. Returns None if the value was not stored."
),
)
summary: Optional[Summary] = Field(default=None)
schema_id: Optional[int] = Field(alias="schema-id", default=None)

@field_validator("first_row_id")
@classmethod
def validate_first_row_id(cls, v: Optional[int]) -> Optional[int]:
Comment on lines +260 to +262
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pydantic also allows for annotating with PositiveInt which will do these checks internally: https://docs.pydantic.dev/2.3/api/types/#pydantic.types.PositiveInt

"""Validate that first_row_id is non-negative if provided."""
if v is not None and v < 0:
raise ValueError(f"Invalid first-row-id (cannot be negative): {v}")
return v

@field_validator("added_rows")
@classmethod
def validate_added_rows(cls, v: Optional[int]) -> Optional[int]:
"""Validate that added_rows is non-negative if provided."""
if v is not None and v < 0:
raise ValueError(f"Invalid added-rows (cannot be negative): {v}")
return v

@model_validator(mode="after")
def validate_row_lineage_fields(self) -> "Snapshot":
"""Validate that added_rows is required when first_row_id is set."""
if self.first_row_id is not None and self.added_rows is None:
raise ValueError("Invalid added-rows (required when first-row-id is set): None")
return self

def __str__(self) -> str:
"""Return the string representation of the Snapshot class."""
operation = f"{self.summary.operation}: " if self.summary else ""
Expand Down
79 changes: 77 additions & 2 deletions tests/table/test_snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,15 @@ def test_deserialize_snapshot_with_properties(snapshot_with_properties: Snapshot
def test_snapshot_repr(snapshot: Snapshot) -> None:
assert (
repr(snapshot)
== """Snapshot(snapshot_id=25, parent_snapshot_id=19, sequence_number=200, timestamp_ms=1602638573590, manifest_list='s3:/a/b/c.avro', summary=Summary(Operation.APPEND), schema_id=3)"""
== """Snapshot(snapshot_id=25, parent_snapshot_id=19, sequence_number=200, timestamp_ms=1602638573590, manifest_list='s3:/a/b/c.avro', first_row_id=None, added_rows=None, summary=Summary(Operation.APPEND), schema_id=3)"""
)
assert snapshot == eval(repr(snapshot))


def test_snapshot_with_properties_repr(snapshot_with_properties: Snapshot) -> None:
assert (
repr(snapshot_with_properties)
== """Snapshot(snapshot_id=25, parent_snapshot_id=19, sequence_number=200, timestamp_ms=1602638573590, manifest_list='s3:/a/b/c.avro', summary=Summary(Operation.APPEND, **{'foo': 'bar'}), schema_id=3)"""
== """Snapshot(snapshot_id=25, parent_snapshot_id=19, sequence_number=200, timestamp_ms=1602638573590, manifest_list='s3:/a/b/c.avro', first_row_id=None, added_rows=None, summary=Summary(Operation.APPEND, **{'foo': 'bar'}), schema_id=3)"""
)
assert snapshot_with_properties == eval(repr(snapshot_with_properties))

Expand Down Expand Up @@ -456,3 +456,78 @@ def test_ancestors_between(table_v2_with_extensive_snapshots: Table) -> None:
)
== 2000
)


def test_snapshot_v3_fields() -> None:
snapshot = Snapshot(
**{
"snapshot-id": 1,
"timestamp-ms": 1234567890,
"manifest-list": "s3:/a/b/c.avro",
"first-row-id": 100,
"added-rows": 1000,
"summary": {"operation": "append"},
}
)

assert snapshot.first_row_id == 100
assert snapshot.added_rows == 1000

actual = snapshot.model_dump_json()
expected = """{"snapshot-id":1,"sequence-number":0,"timestamp-ms":1234567890,"manifest-list":"s3:/a/b/c.avro","first-row-id":100,"added-rows":1000,"summary":{"operation":"append"}}"""
assert actual == expected


def test_snapshot_v3_fields_validation_negative_first_row_id() -> None:
with pytest.raises(ValueError, match="Invalid first-row-id \\(cannot be negative\\): -1"):
Snapshot(
**{
"snapshot-id": 1,
"timestamp-ms": 1234567890,
"manifest-list": "s3:/a/b/c.avro",
"first-row-id": -1,
"added-rows": 1000,
"summary": {"operation": "append"},
}
)


def test_snapshot_v3_fields_validation_negative_added_rows() -> None:
with pytest.raises(ValueError, match="Invalid added-rows \\(cannot be negative\\): -1"):
Snapshot(
**{
"snapshot-id": 1,
"timestamp-ms": 1234567890,
"manifest-list": "s3:/a/b/c.avro",
"first-row-id": 100,
"added-rows": -1,
"summary": {"operation": "append"},
}
)


def test_snapshot_v3_fields_validation_first_row_id_requires_added_rows() -> None:
with pytest.raises(ValueError, match="Invalid added-rows \\(required when first-row-id is set\\): None"):
Snapshot(
**{
"snapshot-id": 1,
"timestamp-ms": 1234567890,
"manifest-list": "s3:/a/b/c.avro",
"first-row-id": 100,
"summary": {"operation": "append"},
}
)


def test_snapshot_v3_fields_added_rows_without_first_row_id() -> None:
snapshot = Snapshot(
**{
"snapshot-id": 1,
"timestamp-ms": 1234567890,
"manifest-list": "s3:/a/b/c.avro",
"added-rows": 1000,
"summary": {"operation": "append"},
}
)
assert snapshot.first_row_id is None
assert snapshot.added_rows == 1000