diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py index df07f94342..7b93080676 100644 --- a/pyiceberg/partitioning.py +++ b/pyiceberg/partitioning.py @@ -21,7 +21,7 @@ from dataclasses import dataclass from datetime import date, datetime, time from functools import cached_property, singledispatch -from typing import Annotated, Any, Dict, Generic, List, Optional, Tuple, TypeVar, Union +from typing import Annotated, Any, Callable, Dict, Generic, List, Optional, Tuple, TypeVar, Union from urllib.parse import quote_plus from pydantic import ( @@ -272,6 +272,60 @@ def assign_fresh_partition_spec_ids(spec: PartitionSpec, old_schema: Schema, fre T = TypeVar("T") +class PartitionMap(Generic[T]): + _specs: dict[int, PartitionSpec] + _partition_maps: dict[int, dict[Record | None, T]] + + def __init__(self, specs: dict[int, PartitionSpec]): + self._specs = specs + self._partition_maps = {} + + def __len__(self) -> int: + """Return the length of the partition map. + + Returns: + length of _partition_maps + """ + return len(self.values()) + + def is_empty(self) -> bool: + return len(self.values()) == 0 + + def contains_key(self, spec_id: int, struct: Record) -> bool: + return struct in self._partition_maps.get(spec_id, {}) + + def contains_value(self, value: T) -> bool: + return value in self.values() + + def get(self, spec_id: int, struct: Record | None) -> Optional[T]: + if partition_map := self._partition_maps.get(spec_id): + if result := partition_map.get(struct): + return result + return None + + def put(self, spec_id: int, struct: Record | None, value: T) -> None: + if _ := self._specs.get(spec_id): + if spec_id not in self._partition_maps: + self._partition_maps[spec_id] = {struct: value} + else: + self._partition_maps[spec_id][struct] = value + + def compute_if_absent(self, spec_id: int, struct: Record, value_factory: Callable[[], T]) -> T: + partition_map = self._partition_maps.setdefault(spec_id, {}) + if struct in partition_map: + return partition_map[struct] + + value = value_factory() + partition_map[struct] = value + return value + + def values(self) -> list[T]: + result: list[T] = [] + for partition_map in self._partition_maps.values(): + result.extend(partition_map.values()) + return result + + class PartitionSpecVisitor(Generic[T], ABC): @abstractmethod def identity(self, field_id: int, source_name: str, source_id: int) -> T: diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index b49c4abe07..cd9de608ff 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -14,12 +14,22 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from bisect import bisect_left from typing import Iterator, Optional, Set from pyiceberg.exceptions import ValidationException from pyiceberg.expressions import BooleanExpression from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, _InclusiveMetricsEvaluator -from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile +from pyiceberg.manifest import ( + INITIAL_SEQUENCE_NUMBER, + DataFile, + DataFileContent, + ManifestContent, + ManifestEntry, + ManifestEntryStatus, + ManifestFile, +) +from pyiceberg.partitioning import PartitionMap, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import Table from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between @@ -27,6 +37,244 @@ VALIDATE_DATA_FILES_EXIST_OPERATIONS: Set[Operation] = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE} VALIDATE_ADDED_DATA_FILES_OPERATIONS: Set[Operation] = {Operation.APPEND, Operation.OVERWRITE} +VALIDATE_ADDED_DELETE_FILES_OPERATIONS: Set[Operation] = {Operation.DELETE, Operation.OVERWRITE} + + +class _PositionDeletes: + # Indexed state + _seqs: list[int] = [] + _entries: list[ManifestEntry] = [] + + # Buffer used to hold files before indexing + _buffer: list[ManifestEntry] = [] + _indexed: bool = False + + def _index_if_needed(self) -> None: + if self._indexed is False: + self._entries = sorted(self._buffer, key=lambda entry: _get_sequence_number_or_raise(entry)) + self._seqs = [_get_sequence_number_or_raise(entry) for entry in self._entries] + self._indexed = True + + def add_entry(self, entry: ManifestEntry) -> None: + if self._indexed: + raise Exception("Can't add files upon indexing.") + self._buffer.append(entry) + + def filter(self, seq: int) -> list[ManifestEntry]: + self._index_if_needed() + start = _find_start_index(self._seqs, seq) + + if start >= len(self._entries): + return [] + + if start == 0: + return self.referenced_delete_files() + + matching_entries_count: int = len(self._entries) - start + return self._entries[matching_entries_count:] + + def referenced_delete_files(self) -> list[ManifestEntry]: + self._index_if_needed() + return self._entries + + def is_empty(self) -> bool: + self._index_if_needed() + return len(self._entries) > 0 + + +class _EqualityDeletes: + # Indexed state + _seqs: list[int] = [] + _entries: list[ManifestEntry] = [] + + # Buffer used to hold files before indexing + _buffer: list[ManifestEntry] = [] + _indexed: bool = False + + def _index_if_needed(self) -> None: + if self._indexed is False: + self._entries = sorted(self._buffer, key=lambda entry: _get_sequence_number_or_raise(entry)) + self._seqs = [_get_sequence_number_or_raise(entry) for entry in self._entries] + self._indexed = True + + def add_entry(self, spec: PartitionSpec, entry: ManifestEntry) -> None: + # TODO: Equality deletes should consider the spec to get the equality fields + if self._indexed: + raise Exception("Can't add files upon indexing.") + self._buffer.append(entry) + + def filter(self, seq: int, entry: ManifestEntry) -> list[ManifestEntry]: + self._index_if_needed() + start = _find_start_index(self._seqs, seq) + + if start >= len(self._entries): + return [] + + if start == 0: + return self.referenced_delete_files() + + matching_entries_count: int = len(self._entries) - start + return self._entries[matching_entries_count:] + + def referenced_delete_files(self) -> list[ManifestEntry]: + self._index_if_needed() + return self._entries + + def is_empty(self) -> bool: + self._index_if_needed() + return len(self._entries) > 0 + + +def _find_start_index(seqs: list[int], seq: int) -> int: + pos: int = bisect_left(seqs, seq) + if pos != len(seqs) and seqs[pos] == seqs: + return pos + return -1 + + +def _get_sequence_number_or_raise(entry: ManifestEntry) -> int: + if seq := entry.sequence_number: + return seq + raise ValidationException("ManifestEntry does not have a sequence number") + + +class DeleteFileIndex: + """ + An index of delete files by sequence number. + + Use forDataFile(int, DataFile) or forEntry(ManifestEntry) to get the delete files to apply to a given data file + """ + + _global_deletes: _EqualityDeletes + _pos_deletes_by_path: dict[str, _PositionDeletes] + _pos_deletes_by_partition: PartitionMap[_PositionDeletes] + _eq_deletes_by_partition: PartitionMap[_EqualityDeletes] + _has_eq_deletes: bool + _has_pos_deletes: bool + _is_empty: bool + + def __init__( + self, + delete_entries: list[ManifestEntry], + spec_by_id: dict[int, PartitionSpec], + min_sequence_number: int = INITIAL_SEQUENCE_NUMBER, + ) -> None: + global_deletes: _EqualityDeletes = _EqualityDeletes() + eq_deletes_by_partition: PartitionMap[_EqualityDeletes] = PartitionMap(spec_by_id) + pos_deletes_by_partition: PartitionMap[_PositionDeletes] = PartitionMap(spec_by_id) + pos_deletes_by_path: dict[str, _PositionDeletes] = {} + + for entry in delete_entries: + if entry.sequence_number is None: + continue + + if entry.sequence_number <= min_sequence_number: + continue + + file: DataFile = entry.data_file + content: DataFileContent = file.content + + if content == DataFileContent.POSITION_DELETES: + self._add_pos_deletes(pos_deletes_by_path, pos_deletes_by_partition, entry) + elif content == DataFileContent.EQUALITY_DELETES: + self._add_eq_deletes(global_deletes, eq_deletes_by_partition, spec_by_id, entry) + else: + raise NotImplementedError(f"Unsupported content: {file.content}") + + # Set global variables for the class + self._spec_by_id = spec_by_id + self._global_deletes = global_deletes + self._eq_deletes_by_partition = eq_deletes_by_partition + self._pos_deletes_by_partition = pos_deletes_by_partition + self._pos_deletes_by_path = pos_deletes_by_path + + self._has_eq_deletes = global_deletes.is_empty() or len(eq_deletes_by_partition) > 0 + self._has_pos_deletes = len(pos_deletes_by_partition) > 0 or len(pos_deletes_by_path) > 0 + self._is_empty = not self._has_eq_deletes and not self._has_pos_deletes + + def _add_pos_deletes( + self, + pos_deletes_by_path: dict[str, _PositionDeletes], + pos_deletes_by_partition: PartitionMap[_PositionDeletes], + entry: ManifestEntry, + ) -> None: + deletes: _PositionDeletes = _PositionDeletes() + + # TODO: Fallback method to get file_path from lower_bounds + if file_path := entry.data_file.file_path: + if file_path not in pos_deletes_by_path: + pos_deletes_by_path[file_path] = deletes + else: + deletes = pos_deletes_by_path[file_path] + else: + spec_id: int = entry.data_file.spec_id + partition: Record = entry.data_file.partition + pos_deletes_by_partition.compute_if_absent(spec_id, partition, lambda: deletes) + + deletes.add_entry(entry) + + def _add_eq_deletes( + self, + global_deletes: _EqualityDeletes, + eq_deletes_by_partition: PartitionMap[_EqualityDeletes], + spec_by_id: dict[int, PartitionSpec], + entry: ManifestEntry, + ) -> None: + deletes: _EqualityDeletes = _EqualityDeletes() + + if spec := spec_by_id.get(entry.data_file.spec_id): + if spec.is_unpartitioned(): + deletes = global_deletes + else: + spec_id = spec.spec_id + partition = entry.data_file.partition + eq_deletes_by_partition.compute_if_absent(spec_id, partition, lambda: deletes) + + deletes.add_entry(spec, entry) + + def is_empty(self) -> bool: + return self._is_empty + + def has_equality_deletes(self) -> bool: + return self._has_eq_deletes + + def has_position_deletes(self) -> bool: + return self._has_pos_deletes + + def for_entry(self, entry: ManifestEntry) -> list[ManifestEntry]: + sequence_number = _get_sequence_number_or_raise(entry) + return self.for_data_file(sequence_number, entry) + + def for_data_file(self, sequence_number: int, entry: ManifestEntry) -> list[ManifestEntry]: + if self.is_empty(): + return [] + + global_deletes = self._global_deletes.filter(sequence_number, entry) + pos_path_deletes = self._pos_deletes_by_path[entry.data_file.file_path].filter(sequence_number) + + spec_id = entry.data_file.spec_id + partition = entry.data_file.partition + + eq_deletes_by_partition: list[ManifestEntry] = [] + if eq_deletes := self._eq_deletes_by_partition.get(spec_id, partition): + eq_deletes_by_partition = eq_deletes.filter(sequence_number, entry) + + pos_deletes_by_partition: list[ManifestEntry] = [] + if pos_deletes := self._pos_deletes_by_partition.get(spec_id, partition): + pos_deletes_by_partition = pos_deletes.filter(sequence_number) + + return global_deletes + eq_deletes_by_partition + pos_path_deletes + pos_deletes_by_partition + + def referenced_delete_files(self) -> list[DataFile]: + entries: list[ManifestEntry] = [] + + for deletes in self._pos_deletes_by_partition.values(): + entries.extend(deletes.referenced_delete_files()) + + for deletes in self._pos_deletes_by_path.values(): + entries.extend(deletes.referenced_delete_files()) + + return [entry.data_file for entry in entries] def _validation_history( @@ -159,6 +407,96 @@ def _deleted_data_files( yield entry +def _build_delete_file_index( + table: Table, + snapshot_ids: set[int], + delete_manifests: list[ManifestFile], + min_sequence_number: int, + specs_by_id: dict[int, PartitionSpec], + partition_set: Optional[dict[int, set[Record]]], + data_filter: Optional[BooleanExpression], +) -> DeleteFileIndex: + """Filter manifest entries to build a DeleteFileIndex. + + Args: + table: Table to filter manifest from + snapshot_ids: Snapshots ids from the table + delete_manifests: Manifest to fetch the entries from + min_sequence_number: Sequence number from the starting snapshot + specs_by_id: dict of {spec_id: PartitionSpec} to build DeleteFileIndex + partition_set: dict of {spec_id: set[partition]} to filter on + data_filter: Expression used to find data files + + Returns: + DeleteFileIndex + """ + delete_entries = [] + + for manifest in delete_manifests: + for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False): + if _filter_manifest_entries( + entry, snapshot_ids, data_filter, partition_set, ManifestEntryStatus.ADDED, table.schema() + ): + delete_entries.append(entry) + + return DeleteFileIndex(delete_entries=delete_entries, spec_by_id=specs_by_id, min_sequence_number=min_sequence_number) + + +def _starting_sequence_number(table: Table, starting_snapshot: Optional[Snapshot]) -> int: + """Find the starting sequence number from a snapshot. + + Args: + table: Table to find snapshot from + starting_snapshot: Snapshot from where to start looking + + Returns + Sequence number as int + """ + if starting_snapshot is not None: + if snapshot := table.snapshot_by_id(starting_snapshot.snapshot_id): + if seq := snapshot.sequence_number: + return seq + return INITIAL_SEQUENCE_NUMBER + + +def _added_delete_files( + table: Table, + starting_snapshot: Snapshot, + data_filter: Optional[BooleanExpression], + partition_set: Optional[dict[int, set[Record]]], + parent_snapshot: Optional[Snapshot], +) -> DeleteFileIndex: + """Return matching delete files have been added to the table since a starting snapshot. + + Args: + table: Table to validate + starting_snapshot: Snapshot current at the start of the operation + data_filter: Expression used to find deleted data files + partition_set: dict of {spec_id: set[partition]} to filter on + parent_snapshot: Ending snapshot on the branch being validated + + Returns: + DeleteFileIndex + """ + if parent_snapshot is None or table.format_version < 2: + return DeleteFileIndex([], {}, 0) + + manifests, snapshot_ids = _validation_history( + table, parent_snapshot, starting_snapshot, VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES + ) + + starting_sequence_number = _starting_sequence_number(table, starting_snapshot) + return _build_delete_file_index( + table=table, + snapshot_ids=snapshot_ids, + delete_manifests=manifests, + min_sequence_number=starting_sequence_number, + specs_by_id={}, + partition_set=partition_set, + data_filter=data_filter, + ) + + def _validate_deleted_data_files( table: Table, starting_snapshot: Snapshot, @@ -235,3 +573,27 @@ def _validate_added_data_files( if any(conflicting_entries): conflicting_snapshots = {entry.snapshot_id for entry in conflicting_entries if entry.snapshot_id is not None} raise ValidationException(f"Added data files were found matching the filter for snapshots {conflicting_snapshots}!") + + +def _validate_no_new_delete_files( + table: Table, + starting_snapshot: Snapshot, + data_filter: Optional[BooleanExpression], + partition_set: Optional[dict[int, set[Record]]], + parent_snapshot: Snapshot, +) -> None: + """Validate no new delete files matching a filter have been added to the table since starting a snapshot. + + Args: + table: Table to validate + starting_snapshot: Snapshot current at the start of the operation + data_filter: Expression used to find new conflicting delete files + parent_snapshot: Ending snapshot on the branch being deleted + + """ + deletes = _added_delete_files(table, starting_snapshot, data_filter, partition_set, parent_snapshot) + if not deletes.is_empty(): + conflicting_delete_files = [file.file_path for file in deletes.referenced_delete_files()] + raise ValidationException( + f"Found new conflicting delete files that can apply to records matching {data_filter}:{conflicting_delete_files}" + ) diff --git a/tests/table/test_partitioning.py b/tests/table/test_partitioning.py index 0fe22391c0..11952f26c8 100644 --- a/tests/table/test_partitioning.py +++ b/tests/table/test_partitioning.py @@ -21,7 +21,7 @@ import pytest -from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionMap, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.transforms import ( BucketTransform, @@ -223,3 +223,64 @@ def test_deserialize_partition_field_v3() -> None: field = PartitionField.model_validate_json(json_partition_spec) assert field == PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="str_truncate") + + +@pytest.fixture +def specs_set() -> dict[int, PartitionSpec]: + return { + 0: UNPARTITIONED_PARTITION_SPEC, + 1: PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=DayTransform(), name="dayPartition"), spec_id=1), + 2: PartitionSpec( + PartitionField(source_id=1, field_id=1000, transform=DayTransform(), name="dayPartition"), + PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="identityPartition"), + spec_id=2, + ), + } + + +def test_empty_partition_map() -> None: + specs: dict[int, PartitionSpec] = {UNPARTITIONED_PARTITION_SPEC.spec_id: UNPARTITIONED_PARTITION_SPEC} + partition_map: PartitionMap[str] = PartitionMap(specs) + assert partition_map.is_empty() + assert len(partition_map) == 0 + assert not partition_map.contains_key(1, Record(1)) + assert len(partition_map.values()) == 0 + + +def test_size_partition_map(specs_set: dict[int, PartitionSpec]) -> None: + partition_map: PartitionMap[str] = PartitionMap(specs_set) + partition_map.put(UNPARTITIONED_PARTITION_SPEC.spec_id, None, "v1") + partition_map.put(specs_set[1].spec_id, Record("aaa"), "v2") + partition_map.put(specs_set[1].spec_id, Record("bbb"), "v3") + partition_map.put(specs_set[2].spec_id, Record("ccc", 2), "v4") + assert not partition_map.is_empty() + assert len(partition_map) == 4 + + +def test_put_and_get_partition_map(specs_set: dict[int, PartitionSpec]) -> None: + partition_map: PartitionMap[str] = PartitionMap(specs_set) + partition_map.put(UNPARTITIONED_PARTITION_SPEC.spec_id, None, "v1") + partition_map.put(specs_set[1].spec_id, Record("aaa", 1), "v2") + assert partition_map.get(UNPARTITIONED_PARTITION_SPEC.spec_id, None) == "v1" + assert partition_map.get(specs_set[1].spec_id, Record("aaa", 1)) == "v2" + + +def test_values_partition_map(specs_set: dict[int, PartitionSpec]) -> None: + partition_map: PartitionMap[str] = PartitionMap(specs_set) + partition_map.put(UNPARTITIONED_PARTITION_SPEC.spec_id, None, "v1") + partition_map.put(specs_set[1].spec_id, Record("aaa"), "v2") + partition_map.put(specs_set[1].spec_id, Record("bbb"), "v3") + partition_map.put(specs_set[2].spec_id, Record("ccc", 2), "v4") + assert partition_map.values() == ["v1", "v2", "v3", "v4"] + + +def test_compute_if_absent_partition_map(specs_set: dict[int, PartitionSpec]) -> None: + partition_map: PartitionMap[str] = PartitionMap(specs_set) + + result1 = partition_map.compute_if_absent(specs_set[1].spec_id, Record("a"), lambda: "v1") + assert result1 == "v1" + assert partition_map.get(specs_set[1].spec_id, Record("a")) == "v1" + + result2 = partition_map.compute_if_absent(specs_set[1].spec_id, Record("a"), lambda: "v2") + assert result2 == "v1" + assert partition_map.get(specs_set[1].spec_id, Record("a")) == "v1" diff --git a/tests/table/test_validate.py b/tests/table/test_validate.py index 570f680860..666b6c0b79 100644 --- a/tests/table/test_validate.py +++ b/tests/table/test_validate.py @@ -22,14 +22,16 @@ from pyiceberg.exceptions import ValidationException from pyiceberg.io import FileIO -from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile +from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile from pyiceberg.table import Table from pyiceberg.table.snapshots import Operation, Snapshot, Summary from pyiceberg.table.update.validate import ( _added_data_files, + _added_delete_files, _deleted_data_files, _validate_added_data_files, _validate_deleted_data_files, + _validate_no_new_delete_files, _validation_history, ) @@ -350,3 +352,141 @@ class DummyEntry: data_filter=None, parent_snapshot=oldest_snapshot, ) + + +def test_validate_new_delete_files_raises_on_conflict( + table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]], +) -> None: + table, _ = table_v2_with_extensive_snapshots_and_manifests + oldest_snapshot = table.snapshots()[0] + newest_snapshot = cast(Snapshot, table.current_snapshot()) + + with patch("pyiceberg.table.update.validate.DeleteFileIndex.is_empty", return_value=False): + with pytest.raises(ValidationException): + _validate_no_new_delete_files( + table=table, + starting_snapshot=newest_snapshot, + data_filter=None, + partition_set=None, + parent_snapshot=oldest_snapshot, + ) + + +@pytest.mark.parametrize("operation", [Operation.APPEND, Operation.REPLACE]) +def test_validate_added_delete_files_non_conflicting_count( + table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]], + operation: Operation, +) -> None: + table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests + + snapshot_history = 100 + snapshots = table.snapshots() + for i in range(1, snapshot_history + 1): + altered_snapshot = snapshots[-i] + altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=operation)}) + snapshots[-i] = altered_snapshot + + table.metadata = table.metadata.model_copy( + update={"snapshots": snapshots}, + ) + + oldest_snapshot = table.snapshots()[-snapshot_history] + newest_snapshot = cast(Snapshot, table.current_snapshot()) + + def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]: + """Mock the manifests method to use the snapshot_id for lookup.""" + snapshot_id = self.snapshot_id + if snapshot_id in mock_manifests: + return mock_manifests[snapshot_id] + return [] + + def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: bool = True) -> list[ManifestEntry]: + return [ + ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, snapshot_id=self.added_snapshot_id, sequence_number=self.sequence_number + ) + ] + + with ( + patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect), + patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", new=mock_fetch_manifest_entry), + ): + dfi = _added_delete_files( + table=table, + starting_snapshot=newest_snapshot, + data_filter=None, + parent_snapshot=oldest_snapshot, + partition_set=None, + ) + + assert dfi.is_empty() + assert not dfi.has_position_deletes() + assert not dfi.has_equality_deletes() + assert len(dfi.referenced_delete_files()) == 0 + + +@pytest.mark.parametrize("operation", [Operation.DELETE, Operation.OVERWRITE]) +def test_validate_added_delete_files_conflicting_count( + table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]], + operation: Operation, +) -> None: + table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests + + snapshot_history = 100 + snapshots = table.snapshots() + for i in range(1, snapshot_history + 1): + altered_snapshot = snapshots[-i] + altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=operation)}) + snapshots[-i] = altered_snapshot + + table.metadata = table.metadata.model_copy( + update={"snapshots": snapshots}, + ) + + oldest_snapshot = table.snapshots()[-snapshot_history] + newest_snapshot = cast(Snapshot, table.current_snapshot()) + + mock_delete_file = DataFile.from_args( + content=DataFileContent.POSITION_DELETES, + file_path="s3://dummy/path", + ) + + def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]: + """Mock the manifests method to use the snapshot_id for lookup.""" + snapshot_id = self.snapshot_id + if snapshot_id in mock_manifests: + return mock_manifests[snapshot_id] + return [] + + def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: bool = True) -> list[ManifestEntry]: + result = [ + ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, snapshot_id=self.added_snapshot_id, sequence_number=self.min_sequence_number + ) + ] + + result[-1] = ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + snapshot_id=self.added_snapshot_id, + sequence_number=10000, + data_file=mock_delete_file, + ) + + return result + + with ( + patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect), + patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", new=mock_fetch_manifest_entry), + ): + dfi = _added_delete_files( + table=table, + starting_snapshot=newest_snapshot, + data_filter=None, + parent_snapshot=oldest_snapshot, + partition_set=None, + ) + + assert not dfi.is_empty() + assert dfi.has_position_deletes() + assert not dfi.has_equality_deletes() + assert dfi.referenced_delete_files()[0] == mock_delete_file