diff --git a/pyiceberg/table/delete_file_index.py b/pyiceberg/table/delete_file_index.py index 5c4323ea8f..3f513aabe5 100644 --- a/pyiceberg/table/delete_file_index.py +++ b/pyiceberg/table/delete_file_index.py @@ -54,6 +54,10 @@ def filter_by_seq(self, seq: int) -> list[DataFile]: start_idx = bisect_left(self._seqs, seq) return [delete_file for delete_file, _ in self._files[start_idx:]] + def referenced_delete_files(self) -> list[DataFile]: + self._ensure_indexed() + return [data_file for data_file, _ in self._files] + def _has_path_bounds(delete_file: DataFile) -> bool: lower = delete_file.lower_bounds @@ -140,3 +144,14 @@ def for_data_file(self, seq_num: int, data_file: DataFile, partition_key: Record deletes.update(path_deletes.filter_by_seq(seq_num)) return deletes + + def referenced_delete_files(self) -> list[DataFile]: + data_files: list[DataFile] = [] + + for deletes in self._by_partition.values(): + data_files.extend(deletes.referenced_delete_files()) + + for deletes in self._by_path.values(): + data_files.extend(deletes.referenced_delete_files()) + + return data_files diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 0cda688f2a..66ec2e98ca 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -19,14 +19,23 @@ from pyiceberg.exceptions import ValidationException from pyiceberg.expressions import BooleanExpression from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, _InclusiveMetricsEvaluator -from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile +from pyiceberg.manifest import ( + INITIAL_SEQUENCE_NUMBER, + DataFile, + ManifestContent, + ManifestEntry, + ManifestEntryStatus, + ManifestFile, +) from pyiceberg.schema import Schema from pyiceberg.table import Table +from pyiceberg.table.delete_file_index import DeleteFileIndex from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between from pyiceberg.typedef import Record VALIDATE_DATA_FILES_EXIST_OPERATIONS: set[Operation] = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE} VALIDATE_ADDED_DATA_FILES_OPERATIONS: set[Operation] = {Operation.APPEND, Operation.OVERWRITE} +VALIDATE_ADDED_DELETE_FILES_OPERATIONS: set[Operation] = {Operation.DELETE, Operation.OVERWRITE} def _validation_history( @@ -216,6 +225,60 @@ def _added_data_files( yield entry +def _added_delete_files( + table: Table, + starting_snapshot: Snapshot, + data_filter: BooleanExpression | None, + partition_set: dict[int, set[Record]] | None, + parent_snapshot: Snapshot | None, +) -> DeleteFileIndex: + """Return matching delete files that have been added to the table since a starting snapshot. + + Args: + table: Table to get the history from + starting_snapshot: Starting snapshot to get the history from + data_filter: Optional filter to match data files + partition_set: Optional set of partitions to match data files + parent_snapshot: Parent snapshot to get the history from + + Returns: + DeleteFileIndex + """ + if parent_snapshot is None or table.format_version < 2: + return DeleteFileIndex() + + manifests, snapshot_ids = _validation_history( + table, parent_snapshot, starting_snapshot, VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES + ) + + dfi = DeleteFileIndex() + + for manifest in manifests: + for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=True): + if _filter_manifest_entries( + entry, snapshot_ids, data_filter, partition_set, ManifestEntryStatus.ADDED, table.schema() + ): + dfi.add_delete_file(entry, entry.data_file.partition) + + return dfi + + +def _starting_sequence_number(table: Table, starting_snapshot: Snapshot | None) -> int: + """Find the starting sequence number from a snapshot. + + Args: + table: Table to find snapshot from + starting_snapshot: Snapshot from where to start looking + + Returns + Sequence number as int + """ + if starting_snapshot is not None: + if seq := starting_snapshot.sequence_number: + return seq + return INITIAL_SEQUENCE_NUMBER + + def _validate_added_data_files( table: Table, starting_snapshot: Snapshot, @@ -235,3 +298,60 @@ def _validate_added_data_files( if any(conflicting_entries): conflicting_snapshots = {entry.snapshot_id for entry in conflicting_entries if entry.snapshot_id is not None} raise ValidationException(f"Added data files were found matching the filter for snapshots {conflicting_snapshots}!") + + +def _validate_no_new_delete_files( + table: Table, + starting_snapshot: Snapshot, + data_filter: BooleanExpression | None, + partition_set: dict[int, set[Record]] | None, + parent_snapshot: Snapshot | None, +) -> None: + """Validate no new delete files matching a filter have been added to the table since starting a snapshot. + + Args: + table: Table to validate + starting_snapshot: Snapshot current at the start of the operation + data_filter: Expression used to find added data files + partition_set: Dictionary of partition spec to set of partition records + parent_snapshot: Ending snapshot on the branch being validated + """ + deletes = _added_delete_files(table, starting_snapshot, data_filter, partition_set, parent_snapshot) + + if deletes.is_empty(): + return + + conflicting_delete_paths = [file.file_path for file in deletes.referenced_delete_files()] + raise ValidationException( + f"Found new conflicting delete files that can apply to records matching {data_filter}: {conflicting_delete_paths}" + ) + + +def _validate_no_new_deletes_for_data_files( + table: Table, + starting_snapshot: Snapshot, + data_filter: BooleanExpression | None, + data_files: set[DataFile], + parent_snapshot: Snapshot | None, +) -> None: + """Validate no new delete files must be applied for data files that have been added to the table since a starting snapshot. + + Args: + table: Table to validate + starting_snapshot: Snapshot current at the start of the operation + data_filter: Expression used to find added data files + data_files: data files to validate have no new deletes + parent_snapshot: Ending snapshot on the branch being validated + """ + # If there is no current state, or no files has been added + if parent_snapshot is None or table.format_version < 2: + return + + deletes = _added_delete_files(table, starting_snapshot, data_filter, None, parent_snapshot) + seq_num = _starting_sequence_number(table, starting_snapshot) + + # Fail to any delete file found that applies to files written in or before the starting snapshot + for data_file in data_files: + delete_files = deletes.for_data_file(seq_num, data_file, data_file.partition) + if len(delete_files) > 0: + raise ValidationException(f"Cannot commit, found new delete for replace data file {data_file}") diff --git a/tests/table/test_validate.py b/tests/table/test_validate.py index 570f680860..cb9d80e196 100644 --- a/tests/table/test_validate.py +++ b/tests/table/test_validate.py @@ -22,14 +22,17 @@ from pyiceberg.exceptions import ValidationException from pyiceberg.io import FileIO -from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile +from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile from pyiceberg.table import Table from pyiceberg.table.snapshots import Operation, Snapshot, Summary from pyiceberg.table.update.validate import ( _added_data_files, + _added_delete_files, _deleted_data_files, _validate_added_data_files, _validate_deleted_data_files, + _validate_no_new_delete_files, + _validate_no_new_deletes_for_data_files, _validation_history, ) @@ -350,3 +353,153 @@ class DummyEntry: data_filter=None, parent_snapshot=oldest_snapshot, ) + + +@pytest.mark.parametrize("operation", [Operation.APPEND, Operation.REPLACE]) +def test_added_delete_files_non_conflicting_count( + table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]], + operation: Operation, +) -> None: + table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests + + snapshot_history = 100 + snapshots = table.snapshots() + for i in range(1, snapshot_history + 1): + altered_snapshot = snapshots[-i] + altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=operation)}) + snapshots[-i] = altered_snapshot + + table.metadata = table.metadata.model_copy( + update={"snapshots": snapshots}, + ) + + oldest_snapshot = table.snapshots()[-snapshot_history] + newest_snapshot = cast(Snapshot, table.current_snapshot()) + + def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]: + """Mock the manifests method to use the snapshot_id for lookup.""" + snapshot_id = self.snapshot_id + if snapshot_id in mock_manifests: + return mock_manifests[snapshot_id] + return [] + + def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: bool = True) -> list[ManifestEntry]: + return [ + ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, snapshot_id=self.added_snapshot_id, sequence_number=self.sequence_number + ) + ] + + with ( + patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect), + patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", new=mock_fetch_manifest_entry), + ): + dfi = _added_delete_files( + table=table, + starting_snapshot=newest_snapshot, + data_filter=None, + parent_snapshot=oldest_snapshot, + partition_set=None, + ) + + assert dfi.is_empty() + assert len(dfi.referenced_delete_files()) == 0 + + +@pytest.mark.parametrize("operation", [Operation.DELETE, Operation.OVERWRITE]) +def test_added_delete_files_conflicting_count( + table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]], + operation: Operation, +) -> None: + table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests + + snapshot_history = 100 + snapshots = table.snapshots() + for i in range(1, snapshot_history + 1): + altered_snapshot = snapshots[-i] + altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=operation)}) + snapshots[-i] = altered_snapshot + + table.metadata = table.metadata.model_copy( + update={"snapshots": snapshots}, + ) + + oldest_snapshot = table.snapshots()[-snapshot_history] + newest_snapshot = cast(Snapshot, table.current_snapshot()) + + mock_delete_file = DataFile.from_args( + content=DataFileContent.POSITION_DELETES, + file_path="s3://dummy/path", + ) + + mock_delete_file.spec_id = 0 + + def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]: + """Mock the manifests method to use the snapshot_id for lookup.""" + snapshot_id = self.snapshot_id + if snapshot_id in mock_manifests: + return mock_manifests[snapshot_id] + return [] + + def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: bool = True) -> list[ManifestEntry]: + return [ + ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + snapshot_id=self.added_snapshot_id, + sequence_number=self.min_sequence_number, + data_file=mock_delete_file, + ) + ] + + with ( + patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect), + patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", new=mock_fetch_manifest_entry), + ): + dfi = _added_delete_files( + table=table, + starting_snapshot=newest_snapshot, + data_filter=None, + parent_snapshot=oldest_snapshot, + partition_set=None, + ) + + assert not dfi.is_empty() + assert dfi.referenced_delete_files()[0] == mock_delete_file + + +def test_validate_no_new_delete_files_raises_on_conflict( + table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]], +) -> None: + table, _ = table_v2_with_extensive_snapshots_and_manifests + oldest_snapshot = table.snapshots()[0] + newest_snapshot = cast(Snapshot, table.current_snapshot()) + + with patch("pyiceberg.table.update.validate.DeleteFileIndex.is_empty", return_value=False): + with pytest.raises(ValidationException): + _validate_no_new_delete_files( + table=table, + starting_snapshot=newest_snapshot, + data_filter=None, + partition_set=None, + parent_snapshot=oldest_snapshot, + ) + + +def test_validate_no_new_delete_files_for_data_files_raises_on_conflict( + table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]], +) -> None: + table, _ = table_v2_with_extensive_snapshots_and_manifests + oldest_snapshot = table.snapshots()[0] + newest_snapshot = cast(Snapshot, table.current_snapshot()) + + mocked_data_file = DataFile.from_args() + + with patch("pyiceberg.table.update.validate.DeleteFileIndex.for_data_file", return_value=[mocked_data_file]): + with pytest.raises(ValidationException): + _validate_no_new_deletes_for_data_files( + table=table, + starting_snapshot=newest_snapshot, + data_filter=None, + data_files={mocked_data_file}, + parent_snapshot=oldest_snapshot, + )