From 32b2428ca48e5b2ec817c415115dfb84aa16cc36 Mon Sep 17 00:00:00 2001 From: Gabriel Igliozzi Date: Tue, 17 Feb 2026 00:49:57 +0100 Subject: [PATCH 1/2] Add referenced files DFI, validate_no_new_added_delete_files, _validate_no_new_deletes_for_data_file --- pyiceberg/table/delete_file_index.py | 15 +++ pyiceberg/table/update/validate.py | 123 +++++++++++++++++++- tests/table/test_validate.py | 161 ++++++++++++++++++++++++++- 3 files changed, 297 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/delete_file_index.py b/pyiceberg/table/delete_file_index.py index 5c4323ea8f..26bb380365 100644 --- a/pyiceberg/table/delete_file_index.py +++ b/pyiceberg/table/delete_file_index.py @@ -54,6 +54,10 @@ def filter_by_seq(self, seq: int) -> list[DataFile]: start_idx = bisect_left(self._seqs, seq) return [delete_file for delete_file, _ in self._files[start_idx:]] + def referenced_delete_files(self) -> list[DataFile]: + self._ensure_indexed() + return [data_file for data_file, _ in self._files] + def _has_path_bounds(delete_file: DataFile) -> bool: lower = delete_file.lower_bounds @@ -140,3 +144,14 @@ def for_data_file(self, seq_num: int, data_file: DataFile, partition_key: Record deletes.update(path_deletes.filter_by_seq(seq_num)) return deletes + + def referenced_data_files(self) -> list[DataFile]: + data_files: list[DataFile] = [] + + for deletes in self._by_partition.values(): + data_files.extend(deletes.referenced_delete_files()) + + for deletes in self._by_path.values(): + data_files.extend(deletes.referenced_delete_files()) + + return data_files diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 0cda688f2a..267fdde1d8 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -19,14 +19,23 @@ from pyiceberg.exceptions import ValidationException from pyiceberg.expressions import BooleanExpression from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, _InclusiveMetricsEvaluator -from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile +from pyiceberg.manifest import ( + INITIAL_SEQUENCE_NUMBER, + DataFile, + ManifestContent, + ManifestEntry, + ManifestEntryStatus, + ManifestFile, +) from pyiceberg.schema import Schema from pyiceberg.table import Table +from pyiceberg.table.delete_file_index import DeleteFileIndex from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between from pyiceberg.typedef import Record VALIDATE_DATA_FILES_EXIST_OPERATIONS: set[Operation] = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE} VALIDATE_ADDED_DATA_FILES_OPERATIONS: set[Operation] = {Operation.APPEND, Operation.OVERWRITE} +VALIDATE_ADDED_DELETE_FILES_OPERATIONS: set[Operation] = {Operation.DELETE, Operation.OVERWRITE} def _validation_history( @@ -216,6 +225,61 @@ def _added_data_files( yield entry +def _added_delete_files( + table: Table, + starting_snapshot: Snapshot, + data_filter: BooleanExpression | None, + partition_set: dict[int, set[Record]] | None, + parent_snapshot: Snapshot | None, +) -> DeleteFileIndex: + """Return matching delete files that have been added to the table since a starting snapshot. + + Args: + table: Table to get the history from + starting_snapshot: Starting snapshot to get the history from + data_filter: Optional filter to match data files + partition_set: Optional set of partitions to match data files + parent_snapshot: Parent snapshot to get the history from + + Returns: + DeleteFileIndex + """ + if parent_snapshot is None or table.format_version < 2: + return DeleteFileIndex() + + manifests, snapshot_ids = _validation_history( + table, parent_snapshot, starting_snapshot, VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES + ) + + dfi = DeleteFileIndex() + + for manifest in manifests: + for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False): + if _filter_manifest_entries( + entry, snapshot_ids, data_filter, partition_set, ManifestEntryStatus.ADDED, table.schema() + ): + dfi.add_delete_file(entry, entry.data_file.partition) + + return dfi + + +def _starting_sequence_number(table: Table, starting_snapshot: Snapshot | None) -> int: + """Find the starting sequence number from a snapshot. + + Args: + table: Table to find snapshot from + starting_snapshot: Snapshot from where to start looking + + Returns + Sequence number as int + """ + if starting_snapshot is not None: + if snapshot := table.snapshot_by_id(starting_snapshot.snapshot_id): + if seq := snapshot.sequence_number: + return seq + return INITIAL_SEQUENCE_NUMBER + + def _validate_added_data_files( table: Table, starting_snapshot: Snapshot, @@ -235,3 +299,60 @@ def _validate_added_data_files( if any(conflicting_entries): conflicting_snapshots = {entry.snapshot_id for entry in conflicting_entries if entry.snapshot_id is not None} raise ValidationException(f"Added data files were found matching the filter for snapshots {conflicting_snapshots}!") + + +def _validate_no_new_delete_files( + table: Table, + starting_snapshot: Snapshot, + data_filter: BooleanExpression | None, + partition_set: dict[int, set[Record]] | None, + parent_snapshot: Snapshot | None, +) -> None: + """Validate no new delete files matching a filter have been added to the table since starting a snapshot. + + Args: + table: Table to validate + starting_snapshot: Snapshot current at the start of the operation + data_filter: Expression used to find added data files + partition_set: Dictionary of partition spec to set of partition records + parent_snapshot: Ending snapshot on the branch being validated + """ + deletes = _added_delete_files(table, starting_snapshot, data_filter, partition_set, parent_snapshot) + + if deletes.is_empty(): + return + + conflicting_delete_files = deletes.referenced_data_files() + raise ValidationException( + f"Found new conflicting delete files that can apply to records matching {data_filter}: {conflicting_delete_files}" + ) + + +def _validate_no_new_delete_files_for_data_files( + table: Table, + starting_snapshot: Snapshot, + data_filter: BooleanExpression | None, + data_files: set[DataFile], + parent_snapshot: Snapshot | None, +) -> None: + """Validate no new delete files must be applied for data files that have been added to the table since a starting snapshot. + + Args: + table: Table to validate + starting_snapshot: Snapshot current at the start of the operation + data_filter: Expression used to find added data files + data_files: data files to validate have no new deletes + parent_snapshot: Ending snapshot on the branch being validated + """ + # If there is no current state, or no files has been added + if parent_snapshot is None or table.format_version < 2: + return + + deletes = _added_delete_files(table, starting_snapshot, data_filter, None, parent_snapshot) + seq_num = _starting_sequence_number(table, starting_snapshot) + + # Fail to any delete file found that applies to files written in or before the starting snapshot + for data_file in data_files: + delete_files = deletes.for_data_file(seq_num, data_file) + if len(delete_files) > 0: + raise ValidationException(f"Cannot commit, found new delete for replace data file {data_file}") diff --git a/tests/table/test_validate.py b/tests/table/test_validate.py index 570f680860..49e42c83ea 100644 --- a/tests/table/test_validate.py +++ b/tests/table/test_validate.py @@ -22,14 +22,17 @@ from pyiceberg.exceptions import ValidationException from pyiceberg.io import FileIO -from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile +from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile from pyiceberg.table import Table from pyiceberg.table.snapshots import Operation, Snapshot, Summary from pyiceberg.table.update.validate import ( _added_data_files, + _added_delete_files, _deleted_data_files, _validate_added_data_files, _validate_deleted_data_files, + _validate_no_new_delete_files, + _validate_no_new_delete_files_for_data_files, _validation_history, ) @@ -350,3 +353,159 @@ class DummyEntry: data_filter=None, parent_snapshot=oldest_snapshot, ) + + +@pytest.mark.parametrize("operation", [Operation.APPEND, Operation.REPLACE]) +def test_validate_added_delete_files_non_conflicting_count( + table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]], + operation: Operation, +) -> None: + table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests + + snapshot_history = 100 + snapshots = table.snapshots() + for i in range(1, snapshot_history + 1): + altered_snapshot = snapshots[-i] + altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=operation)}) + snapshots[-i] = altered_snapshot + + table.metadata = table.metadata.model_copy( + update={"snapshots": snapshots}, + ) + + oldest_snapshot = table.snapshots()[-snapshot_history] + newest_snapshot = cast(Snapshot, table.current_snapshot()) + + def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]: + """Mock the manifests method to use the snapshot_id for lookup.""" + snapshot_id = self.snapshot_id + if snapshot_id in mock_manifests: + return mock_manifests[snapshot_id] + return [] + + def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: bool = True) -> list[ManifestEntry]: + return [ + ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, snapshot_id=self.added_snapshot_id, sequence_number=self.sequence_number + ) + ] + + with ( + patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect), + patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", new=mock_fetch_manifest_entry), + ): + dfi = _added_delete_files( + table=table, + starting_snapshot=newest_snapshot, + data_filter=None, + parent_snapshot=oldest_snapshot, + partition_set=None, + ) + + assert dfi.is_empty() + assert len(dfi.referenced_data_files()) == 0 + + +@pytest.mark.parametrize("operation", [Operation.DELETE, Operation.OVERWRITE]) +def test_validate_added_delete_files_conflicting_count( + table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]], + operation: Operation, +) -> None: + table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests + + snapshot_history = 100 + snapshots = table.snapshots() + for i in range(1, snapshot_history + 1): + altered_snapshot = snapshots[-i] + altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=operation)}) + snapshots[-i] = altered_snapshot + + table.metadata = table.metadata.model_copy( + update={"snapshots": snapshots}, + ) + + oldest_snapshot = table.snapshots()[-snapshot_history] + newest_snapshot = cast(Snapshot, table.current_snapshot()) + + mock_delete_file = DataFile.from_args( + content=DataFileContent.POSITION_DELETES, + file_path="s3://dummy/path", + ) + + mock_delete_file.spec_id = 0 + + def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]: + """Mock the manifests method to use the snapshot_id for lookup.""" + snapshot_id = self.snapshot_id + if snapshot_id in mock_manifests: + return mock_manifests[snapshot_id] + return [] + + def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: bool = True) -> list[ManifestEntry]: + result = [ + ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, snapshot_id=self.added_snapshot_id, sequence_number=self.min_sequence_number + ) + ] + + result[-1] = ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + snapshot_id=self.added_snapshot_id, + sequence_number=10000, + data_file=mock_delete_file, + ) + + return result + + with ( + patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect), + patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", new=mock_fetch_manifest_entry), + ): + dfi = _added_delete_files( + table=table, + starting_snapshot=newest_snapshot, + data_filter=None, + parent_snapshot=oldest_snapshot, + partition_set=None, + ) + + assert not dfi.is_empty() + assert dfi.referenced_data_files()[0] == mock_delete_file + + +def test_validate_no_new_delete_files_raises_on_conflict( + table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]], +) -> None: + table, _ = table_v2_with_extensive_snapshots_and_manifests + oldest_snapshot = table.snapshots()[0] + newest_snapshot = cast(Snapshot, table.current_snapshot()) + + with patch("pyiceberg.table.update.validate.DeleteFileIndex.is_empty", return_value=False): + with pytest.raises(ValidationException): + _validate_no_new_delete_files( + table=table, + starting_snapshot=newest_snapshot, + data_filter=None, + partition_set=None, + parent_snapshot=oldest_snapshot, + ) + + +def test_validate_no_new_delete_files_for_data_files_raises_on_conflict( + table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]], +) -> None: + table, _ = table_v2_with_extensive_snapshots_and_manifests + oldest_snapshot = table.snapshots()[0] + newest_snapshot = cast(Snapshot, table.current_snapshot()) + + mocked_data_file = DataFile.from_args() + + with patch("pyiceberg.table.update.validate.DeleteFileIndex.for_data_file", return_value=[mocked_data_file]): + with pytest.raises(ValidationException): + _validate_no_new_delete_files_for_data_files( + table=table, + starting_snapshot=newest_snapshot, + data_filter=None, + data_files={mocked_data_file}, + parent_snapshot=oldest_snapshot, + ) From 8c0f0957d4569a5c333616a710e47a2cbb13d765 Mon Sep 17 00:00:00 2001 From: Gabriel Igliozzi Date: Tue, 17 Feb 2026 14:23:24 +0100 Subject: [PATCH 2/2] comments, thanks geruh --- pyiceberg/table/delete_file_index.py | 2 +- pyiceberg/table/update/validate.py | 15 +++++++-------- tests/table/test_validate.py | 28 +++++++++++----------------- 3 files changed, 19 insertions(+), 26 deletions(-) diff --git a/pyiceberg/table/delete_file_index.py b/pyiceberg/table/delete_file_index.py index 26bb380365..3f513aabe5 100644 --- a/pyiceberg/table/delete_file_index.py +++ b/pyiceberg/table/delete_file_index.py @@ -145,7 +145,7 @@ def for_data_file(self, seq_num: int, data_file: DataFile, partition_key: Record return deletes - def referenced_data_files(self) -> list[DataFile]: + def referenced_delete_files(self) -> list[DataFile]: data_files: list[DataFile] = [] for deletes in self._by_partition.values(): diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 267fdde1d8..66ec2e98ca 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -254,7 +254,7 @@ def _added_delete_files( dfi = DeleteFileIndex() for manifest in manifests: - for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False): + for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=True): if _filter_manifest_entries( entry, snapshot_ids, data_filter, partition_set, ManifestEntryStatus.ADDED, table.schema() ): @@ -274,9 +274,8 @@ def _starting_sequence_number(table: Table, starting_snapshot: Snapshot | None) Sequence number as int """ if starting_snapshot is not None: - if snapshot := table.snapshot_by_id(starting_snapshot.snapshot_id): - if seq := snapshot.sequence_number: - return seq + if seq := starting_snapshot.sequence_number: + return seq return INITIAL_SEQUENCE_NUMBER @@ -322,13 +321,13 @@ def _validate_no_new_delete_files( if deletes.is_empty(): return - conflicting_delete_files = deletes.referenced_data_files() + conflicting_delete_paths = [file.file_path for file in deletes.referenced_delete_files()] raise ValidationException( - f"Found new conflicting delete files that can apply to records matching {data_filter}: {conflicting_delete_files}" + f"Found new conflicting delete files that can apply to records matching {data_filter}: {conflicting_delete_paths}" ) -def _validate_no_new_delete_files_for_data_files( +def _validate_no_new_deletes_for_data_files( table: Table, starting_snapshot: Snapshot, data_filter: BooleanExpression | None, @@ -353,6 +352,6 @@ def _validate_no_new_delete_files_for_data_files( # Fail to any delete file found that applies to files written in or before the starting snapshot for data_file in data_files: - delete_files = deletes.for_data_file(seq_num, data_file) + delete_files = deletes.for_data_file(seq_num, data_file, data_file.partition) if len(delete_files) > 0: raise ValidationException(f"Cannot commit, found new delete for replace data file {data_file}") diff --git a/tests/table/test_validate.py b/tests/table/test_validate.py index 49e42c83ea..cb9d80e196 100644 --- a/tests/table/test_validate.py +++ b/tests/table/test_validate.py @@ -32,7 +32,7 @@ _validate_added_data_files, _validate_deleted_data_files, _validate_no_new_delete_files, - _validate_no_new_delete_files_for_data_files, + _validate_no_new_deletes_for_data_files, _validation_history, ) @@ -356,7 +356,7 @@ class DummyEntry: @pytest.mark.parametrize("operation", [Operation.APPEND, Operation.REPLACE]) -def test_validate_added_delete_files_non_conflicting_count( +def test_added_delete_files_non_conflicting_count( table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]], operation: Operation, ) -> None: @@ -403,11 +403,11 @@ def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: b ) assert dfi.is_empty() - assert len(dfi.referenced_data_files()) == 0 + assert len(dfi.referenced_delete_files()) == 0 @pytest.mark.parametrize("operation", [Operation.DELETE, Operation.OVERWRITE]) -def test_validate_added_delete_files_conflicting_count( +def test_added_delete_files_conflicting_count( table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]], operation: Operation, ) -> None: @@ -442,21 +442,15 @@ def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestF return [] def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: bool = True) -> list[ManifestEntry]: - result = [ + return [ ManifestEntry.from_args( - status=ManifestEntryStatus.ADDED, snapshot_id=self.added_snapshot_id, sequence_number=self.min_sequence_number + status=ManifestEntryStatus.ADDED, + snapshot_id=self.added_snapshot_id, + sequence_number=self.min_sequence_number, + data_file=mock_delete_file, ) ] - result[-1] = ManifestEntry.from_args( - status=ManifestEntryStatus.ADDED, - snapshot_id=self.added_snapshot_id, - sequence_number=10000, - data_file=mock_delete_file, - ) - - return result - with ( patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect), patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", new=mock_fetch_manifest_entry), @@ -470,7 +464,7 @@ def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: b ) assert not dfi.is_empty() - assert dfi.referenced_data_files()[0] == mock_delete_file + assert dfi.referenced_delete_files()[0] == mock_delete_file def test_validate_no_new_delete_files_raises_on_conflict( @@ -502,7 +496,7 @@ def test_validate_no_new_delete_files_for_data_files_raises_on_conflict( with patch("pyiceberg.table.update.validate.DeleteFileIndex.for_data_file", return_value=[mocked_data_file]): with pytest.raises(ValidationException): - _validate_no_new_delete_files_for_data_files( + _validate_no_new_deletes_for_data_files( table=table, starting_snapshot=newest_snapshot, data_filter=None,