Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions pyiceberg/table/delete_file_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ def filter_by_seq(self, seq: int) -> list[DataFile]:
start_idx = bisect_left(self._seqs, seq)
return [delete_file for delete_file, _ in self._files[start_idx:]]

def referenced_delete_files(self) -> list[DataFile]:
self._ensure_indexed()
return [data_file for data_file, _ in self._files]


def _has_path_bounds(delete_file: DataFile) -> bool:
lower = delete_file.lower_bounds
Expand Down Expand Up @@ -140,3 +144,14 @@ def for_data_file(self, seq_num: int, data_file: DataFile, partition_key: Record
deletes.update(path_deletes.filter_by_seq(seq_num))

return deletes

def referenced_delete_files(self) -> list[DataFile]:
data_files: list[DataFile] = []

for deletes in self._by_partition.values():
data_files.extend(deletes.referenced_delete_files())

for deletes in self._by_path.values():
data_files.extend(deletes.referenced_delete_files())

return data_files
122 changes: 121 additions & 1 deletion pyiceberg/table/update/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,23 @@
from pyiceberg.exceptions import ValidationException
from pyiceberg.expressions import BooleanExpression
from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, _InclusiveMetricsEvaluator
from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
from pyiceberg.manifest import (
INITIAL_SEQUENCE_NUMBER,
DataFile,
ManifestContent,
ManifestEntry,
ManifestEntryStatus,
ManifestFile,
)
from pyiceberg.schema import Schema
from pyiceberg.table import Table
from pyiceberg.table.delete_file_index import DeleteFileIndex
from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between
from pyiceberg.typedef import Record

VALIDATE_DATA_FILES_EXIST_OPERATIONS: set[Operation] = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE}
VALIDATE_ADDED_DATA_FILES_OPERATIONS: set[Operation] = {Operation.APPEND, Operation.OVERWRITE}
VALIDATE_ADDED_DELETE_FILES_OPERATIONS: set[Operation] = {Operation.DELETE, Operation.OVERWRITE}


def _validation_history(
Expand Down Expand Up @@ -216,6 +225,60 @@ def _added_data_files(
yield entry


def _added_delete_files(
table: Table,
starting_snapshot: Snapshot,
data_filter: BooleanExpression | None,
partition_set: dict[int, set[Record]] | None,
parent_snapshot: Snapshot | None,
) -> DeleteFileIndex:
"""Return matching delete files that have been added to the table since a starting snapshot.

Args:
table: Table to get the history from
starting_snapshot: Starting snapshot to get the history from
data_filter: Optional filter to match data files
partition_set: Optional set of partitions to match data files
parent_snapshot: Parent snapshot to get the history from

Returns:
DeleteFileIndex
"""
if parent_snapshot is None or table.format_version < 2:
return DeleteFileIndex()

manifests, snapshot_ids = _validation_history(
table, parent_snapshot, starting_snapshot, VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES
)

dfi = DeleteFileIndex()

for manifest in manifests:
for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=True):
if _filter_manifest_entries(
entry, snapshot_ids, data_filter, partition_set, ManifestEntryStatus.ADDED, table.schema()
):
dfi.add_delete_file(entry, entry.data_file.partition)

return dfi


def _starting_sequence_number(table: Table, starting_snapshot: Snapshot | None) -> int:
"""Find the starting sequence number from a snapshot.

Args:
table: Table to find snapshot from
starting_snapshot: Snapshot from where to start looking

Returns
Sequence number as int
"""
if starting_snapshot is not None:
if seq := starting_snapshot.sequence_number:
return seq
return INITIAL_SEQUENCE_NUMBER


def _validate_added_data_files(
table: Table,
starting_snapshot: Snapshot,
Expand All @@ -235,3 +298,60 @@ def _validate_added_data_files(
if any(conflicting_entries):
conflicting_snapshots = {entry.snapshot_id for entry in conflicting_entries if entry.snapshot_id is not None}
raise ValidationException(f"Added data files were found matching the filter for snapshots {conflicting_snapshots}!")


def _validate_no_new_delete_files(
table: Table,
starting_snapshot: Snapshot,
data_filter: BooleanExpression | None,
partition_set: dict[int, set[Record]] | None,
parent_snapshot: Snapshot | None,
) -> None:
"""Validate no new delete files matching a filter have been added to the table since starting a snapshot.

Args:
table: Table to validate
starting_snapshot: Snapshot current at the start of the operation
data_filter: Expression used to find added data files
partition_set: Dictionary of partition spec to set of partition records
parent_snapshot: Ending snapshot on the branch being validated
"""
deletes = _added_delete_files(table, starting_snapshot, data_filter, partition_set, parent_snapshot)

if deletes.is_empty():
return

conflicting_delete_paths = [file.file_path for file in deletes.referenced_delete_files()]
raise ValidationException(
f"Found new conflicting delete files that can apply to records matching {data_filter}: {conflicting_delete_paths}"
)


def _validate_no_new_deletes_for_data_files(
table: Table,
starting_snapshot: Snapshot,
data_filter: BooleanExpression | None,
data_files: set[DataFile],
parent_snapshot: Snapshot | None,
) -> None:
"""Validate no new delete files must be applied for data files that have been added to the table since a starting snapshot.

Args:
table: Table to validate
starting_snapshot: Snapshot current at the start of the operation
data_filter: Expression used to find added data files
data_files: data files to validate have no new deletes
parent_snapshot: Ending snapshot on the branch being validated
"""
# If there is no current state, or no files has been added
if parent_snapshot is None or table.format_version < 2:
return

deletes = _added_delete_files(table, starting_snapshot, data_filter, None, parent_snapshot)
seq_num = _starting_sequence_number(table, starting_snapshot)

# Fail to any delete file found that applies to files written in or before the starting snapshot
for data_file in data_files:
delete_files = deletes.for_data_file(seq_num, data_file, data_file.partition)
if len(delete_files) > 0:
raise ValidationException(f"Cannot commit, found new delete for replace data file {data_file}")
155 changes: 154 additions & 1 deletion tests/table/test_validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@

from pyiceberg.exceptions import ValidationException
from pyiceberg.io import FileIO
from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
from pyiceberg.table import Table
from pyiceberg.table.snapshots import Operation, Snapshot, Summary
from pyiceberg.table.update.validate import (
_added_data_files,
_added_delete_files,
_deleted_data_files,
_validate_added_data_files,
_validate_deleted_data_files,
_validate_no_new_delete_files,
_validate_no_new_deletes_for_data_files,
_validation_history,
)

Expand Down Expand Up @@ -350,3 +353,153 @@ class DummyEntry:
data_filter=None,
parent_snapshot=oldest_snapshot,
)


@pytest.mark.parametrize("operation", [Operation.APPEND, Operation.REPLACE])
def test_added_delete_files_non_conflicting_count(
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
operation: Operation,
) -> None:
table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests

snapshot_history = 100
snapshots = table.snapshots()
for i in range(1, snapshot_history + 1):
altered_snapshot = snapshots[-i]
altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=operation)})
snapshots[-i] = altered_snapshot

table.metadata = table.metadata.model_copy(
update={"snapshots": snapshots},
)

oldest_snapshot = table.snapshots()[-snapshot_history]
newest_snapshot = cast(Snapshot, table.current_snapshot())

def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]:
"""Mock the manifests method to use the snapshot_id for lookup."""
snapshot_id = self.snapshot_id
if snapshot_id in mock_manifests:
return mock_manifests[snapshot_id]
return []

def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: bool = True) -> list[ManifestEntry]:
return [
ManifestEntry.from_args(
status=ManifestEntryStatus.ADDED, snapshot_id=self.added_snapshot_id, sequence_number=self.sequence_number
)
]

with (
patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect),
patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", new=mock_fetch_manifest_entry),
):
dfi = _added_delete_files(
table=table,
starting_snapshot=newest_snapshot,
data_filter=None,
parent_snapshot=oldest_snapshot,
partition_set=None,
)

assert dfi.is_empty()
assert len(dfi.referenced_delete_files()) == 0


@pytest.mark.parametrize("operation", [Operation.DELETE, Operation.OVERWRITE])
def test_added_delete_files_conflicting_count(
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
operation: Operation,
) -> None:
table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests

snapshot_history = 100
snapshots = table.snapshots()
for i in range(1, snapshot_history + 1):
altered_snapshot = snapshots[-i]
altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=operation)})
snapshots[-i] = altered_snapshot

table.metadata = table.metadata.model_copy(
update={"snapshots": snapshots},
)

oldest_snapshot = table.snapshots()[-snapshot_history]
newest_snapshot = cast(Snapshot, table.current_snapshot())

mock_delete_file = DataFile.from_args(
content=DataFileContent.POSITION_DELETES,
file_path="s3://dummy/path",
)

mock_delete_file.spec_id = 0

def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]:
"""Mock the manifests method to use the snapshot_id for lookup."""
snapshot_id = self.snapshot_id
if snapshot_id in mock_manifests:
return mock_manifests[snapshot_id]
return []

def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: bool = True) -> list[ManifestEntry]:
return [
ManifestEntry.from_args(
status=ManifestEntryStatus.ADDED,
snapshot_id=self.added_snapshot_id,
sequence_number=self.min_sequence_number,
data_file=mock_delete_file,
)
]

with (
patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect),
patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", new=mock_fetch_manifest_entry),
):
dfi = _added_delete_files(
table=table,
starting_snapshot=newest_snapshot,
data_filter=None,
parent_snapshot=oldest_snapshot,
partition_set=None,
)

assert not dfi.is_empty()
assert dfi.referenced_delete_files()[0] == mock_delete_file


def test_validate_no_new_delete_files_raises_on_conflict(
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
) -> None:
table, _ = table_v2_with_extensive_snapshots_and_manifests
oldest_snapshot = table.snapshots()[0]
newest_snapshot = cast(Snapshot, table.current_snapshot())

with patch("pyiceberg.table.update.validate.DeleteFileIndex.is_empty", return_value=False):
with pytest.raises(ValidationException):
_validate_no_new_delete_files(
table=table,
starting_snapshot=newest_snapshot,
data_filter=None,
partition_set=None,
parent_snapshot=oldest_snapshot,
)


def test_validate_no_new_delete_files_for_data_files_raises_on_conflict(
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
) -> None:
table, _ = table_v2_with_extensive_snapshots_and_manifests
oldest_snapshot = table.snapshots()[0]
newest_snapshot = cast(Snapshot, table.current_snapshot())

mocked_data_file = DataFile.from_args()

with patch("pyiceberg.table.update.validate.DeleteFileIndex.for_data_file", return_value=[mocked_data_file]):
with pytest.raises(ValidationException):
_validate_no_new_deletes_for_data_files(
table=table,
starting_snapshot=newest_snapshot,
data_filter=None,
data_files={mocked_data_file},
parent_snapshot=oldest_snapshot,
)