Skip to content

Commit 32b2428

Browse files
committed
Add referenced files DFI, validate_no_new_added_delete_files, _validate_no_new_deletes_for_data_file
1 parent 89a129c commit 32b2428

File tree

3 files changed

+297
-2
lines changed

3 files changed

+297
-2
lines changed

pyiceberg/table/delete_file_index.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ def filter_by_seq(self, seq: int) -> list[DataFile]:
5454
start_idx = bisect_left(self._seqs, seq)
5555
return [delete_file for delete_file, _ in self._files[start_idx:]]
5656

57+
def referenced_delete_files(self) -> list[DataFile]:
58+
self._ensure_indexed()
59+
return [data_file for data_file, _ in self._files]
60+
5761

5862
def _has_path_bounds(delete_file: DataFile) -> bool:
5963
lower = delete_file.lower_bounds
@@ -140,3 +144,14 @@ def for_data_file(self, seq_num: int, data_file: DataFile, partition_key: Record
140144
deletes.update(path_deletes.filter_by_seq(seq_num))
141145

142146
return deletes
147+
148+
def referenced_data_files(self) -> list[DataFile]:
149+
data_files: list[DataFile] = []
150+
151+
for deletes in self._by_partition.values():
152+
data_files.extend(deletes.referenced_delete_files())
153+
154+
for deletes in self._by_path.values():
155+
data_files.extend(deletes.referenced_delete_files())
156+
157+
return data_files

pyiceberg/table/update/validate.py

Lines changed: 122 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,23 @@
1919
from pyiceberg.exceptions import ValidationException
2020
from pyiceberg.expressions import BooleanExpression
2121
from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, _InclusiveMetricsEvaluator
22-
from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
22+
from pyiceberg.manifest import (
23+
INITIAL_SEQUENCE_NUMBER,
24+
DataFile,
25+
ManifestContent,
26+
ManifestEntry,
27+
ManifestEntryStatus,
28+
ManifestFile,
29+
)
2330
from pyiceberg.schema import Schema
2431
from pyiceberg.table import Table
32+
from pyiceberg.table.delete_file_index import DeleteFileIndex
2533
from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between
2634
from pyiceberg.typedef import Record
2735

2836
VALIDATE_DATA_FILES_EXIST_OPERATIONS: set[Operation] = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE}
2937
VALIDATE_ADDED_DATA_FILES_OPERATIONS: set[Operation] = {Operation.APPEND, Operation.OVERWRITE}
38+
VALIDATE_ADDED_DELETE_FILES_OPERATIONS: set[Operation] = {Operation.DELETE, Operation.OVERWRITE}
3039

3140

3241
def _validation_history(
@@ -216,6 +225,61 @@ def _added_data_files(
216225
yield entry
217226

218227

228+
def _added_delete_files(
229+
table: Table,
230+
starting_snapshot: Snapshot,
231+
data_filter: BooleanExpression | None,
232+
partition_set: dict[int, set[Record]] | None,
233+
parent_snapshot: Snapshot | None,
234+
) -> DeleteFileIndex:
235+
"""Return matching delete files that have been added to the table since a starting snapshot.
236+
237+
Args:
238+
table: Table to get the history from
239+
starting_snapshot: Starting snapshot to get the history from
240+
data_filter: Optional filter to match data files
241+
partition_set: Optional set of partitions to match data files
242+
parent_snapshot: Parent snapshot to get the history from
243+
244+
Returns:
245+
DeleteFileIndex
246+
"""
247+
if parent_snapshot is None or table.format_version < 2:
248+
return DeleteFileIndex()
249+
250+
manifests, snapshot_ids = _validation_history(
251+
table, parent_snapshot, starting_snapshot, VALIDATE_ADDED_DELETE_FILES_OPERATIONS, ManifestContent.DELETES
252+
)
253+
254+
dfi = DeleteFileIndex()
255+
256+
for manifest in manifests:
257+
for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False):
258+
if _filter_manifest_entries(
259+
entry, snapshot_ids, data_filter, partition_set, ManifestEntryStatus.ADDED, table.schema()
260+
):
261+
dfi.add_delete_file(entry, entry.data_file.partition)
262+
263+
return dfi
264+
265+
266+
def _starting_sequence_number(table: Table, starting_snapshot: Snapshot | None) -> int:
267+
"""Find the starting sequence number from a snapshot.
268+
269+
Args:
270+
table: Table to find snapshot from
271+
starting_snapshot: Snapshot from where to start looking
272+
273+
Returns
274+
Sequence number as int
275+
"""
276+
if starting_snapshot is not None:
277+
if snapshot := table.snapshot_by_id(starting_snapshot.snapshot_id):
278+
if seq := snapshot.sequence_number:
279+
return seq
280+
return INITIAL_SEQUENCE_NUMBER
281+
282+
219283
def _validate_added_data_files(
220284
table: Table,
221285
starting_snapshot: Snapshot,
@@ -235,3 +299,60 @@ def _validate_added_data_files(
235299
if any(conflicting_entries):
236300
conflicting_snapshots = {entry.snapshot_id for entry in conflicting_entries if entry.snapshot_id is not None}
237301
raise ValidationException(f"Added data files were found matching the filter for snapshots {conflicting_snapshots}!")
302+
303+
304+
def _validate_no_new_delete_files(
305+
table: Table,
306+
starting_snapshot: Snapshot,
307+
data_filter: BooleanExpression | None,
308+
partition_set: dict[int, set[Record]] | None,
309+
parent_snapshot: Snapshot | None,
310+
) -> None:
311+
"""Validate no new delete files matching a filter have been added to the table since starting a snapshot.
312+
313+
Args:
314+
table: Table to validate
315+
starting_snapshot: Snapshot current at the start of the operation
316+
data_filter: Expression used to find added data files
317+
partition_set: Dictionary of partition spec to set of partition records
318+
parent_snapshot: Ending snapshot on the branch being validated
319+
"""
320+
deletes = _added_delete_files(table, starting_snapshot, data_filter, partition_set, parent_snapshot)
321+
322+
if deletes.is_empty():
323+
return
324+
325+
conflicting_delete_files = deletes.referenced_data_files()
326+
raise ValidationException(
327+
f"Found new conflicting delete files that can apply to records matching {data_filter}: {conflicting_delete_files}"
328+
)
329+
330+
331+
def _validate_no_new_delete_files_for_data_files(
332+
table: Table,
333+
starting_snapshot: Snapshot,
334+
data_filter: BooleanExpression | None,
335+
data_files: set[DataFile],
336+
parent_snapshot: Snapshot | None,
337+
) -> None:
338+
"""Validate no new delete files must be applied for data files that have been added to the table since a starting snapshot.
339+
340+
Args:
341+
table: Table to validate
342+
starting_snapshot: Snapshot current at the start of the operation
343+
data_filter: Expression used to find added data files
344+
data_files: data files to validate have no new deletes
345+
parent_snapshot: Ending snapshot on the branch being validated
346+
"""
347+
# If there is no current state, or no files has been added
348+
if parent_snapshot is None or table.format_version < 2:
349+
return
350+
351+
deletes = _added_delete_files(table, starting_snapshot, data_filter, None, parent_snapshot)
352+
seq_num = _starting_sequence_number(table, starting_snapshot)
353+
354+
# Fail to any delete file found that applies to files written in or before the starting snapshot
355+
for data_file in data_files:
356+
delete_files = deletes.for_data_file(seq_num, data_file)
357+
if len(delete_files) > 0:
358+
raise ValidationException(f"Cannot commit, found new delete for replace data file {data_file}")

tests/table/test_validate.py

Lines changed: 160 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,17 @@
2222

2323
from pyiceberg.exceptions import ValidationException
2424
from pyiceberg.io import FileIO
25-
from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
25+
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
2626
from pyiceberg.table import Table
2727
from pyiceberg.table.snapshots import Operation, Snapshot, Summary
2828
from pyiceberg.table.update.validate import (
2929
_added_data_files,
30+
_added_delete_files,
3031
_deleted_data_files,
3132
_validate_added_data_files,
3233
_validate_deleted_data_files,
34+
_validate_no_new_delete_files,
35+
_validate_no_new_delete_files_for_data_files,
3336
_validation_history,
3437
)
3538

@@ -350,3 +353,159 @@ class DummyEntry:
350353
data_filter=None,
351354
parent_snapshot=oldest_snapshot,
352355
)
356+
357+
358+
@pytest.mark.parametrize("operation", [Operation.APPEND, Operation.REPLACE])
359+
def test_validate_added_delete_files_non_conflicting_count(
360+
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
361+
operation: Operation,
362+
) -> None:
363+
table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests
364+
365+
snapshot_history = 100
366+
snapshots = table.snapshots()
367+
for i in range(1, snapshot_history + 1):
368+
altered_snapshot = snapshots[-i]
369+
altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=operation)})
370+
snapshots[-i] = altered_snapshot
371+
372+
table.metadata = table.metadata.model_copy(
373+
update={"snapshots": snapshots},
374+
)
375+
376+
oldest_snapshot = table.snapshots()[-snapshot_history]
377+
newest_snapshot = cast(Snapshot, table.current_snapshot())
378+
379+
def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]:
380+
"""Mock the manifests method to use the snapshot_id for lookup."""
381+
snapshot_id = self.snapshot_id
382+
if snapshot_id in mock_manifests:
383+
return mock_manifests[snapshot_id]
384+
return []
385+
386+
def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: bool = True) -> list[ManifestEntry]:
387+
return [
388+
ManifestEntry.from_args(
389+
status=ManifestEntryStatus.ADDED, snapshot_id=self.added_snapshot_id, sequence_number=self.sequence_number
390+
)
391+
]
392+
393+
with (
394+
patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect),
395+
patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", new=mock_fetch_manifest_entry),
396+
):
397+
dfi = _added_delete_files(
398+
table=table,
399+
starting_snapshot=newest_snapshot,
400+
data_filter=None,
401+
parent_snapshot=oldest_snapshot,
402+
partition_set=None,
403+
)
404+
405+
assert dfi.is_empty()
406+
assert len(dfi.referenced_data_files()) == 0
407+
408+
409+
@pytest.mark.parametrize("operation", [Operation.DELETE, Operation.OVERWRITE])
410+
def test_validate_added_delete_files_conflicting_count(
411+
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
412+
operation: Operation,
413+
) -> None:
414+
table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests
415+
416+
snapshot_history = 100
417+
snapshots = table.snapshots()
418+
for i in range(1, snapshot_history + 1):
419+
altered_snapshot = snapshots[-i]
420+
altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=operation)})
421+
snapshots[-i] = altered_snapshot
422+
423+
table.metadata = table.metadata.model_copy(
424+
update={"snapshots": snapshots},
425+
)
426+
427+
oldest_snapshot = table.snapshots()[-snapshot_history]
428+
newest_snapshot = cast(Snapshot, table.current_snapshot())
429+
430+
mock_delete_file = DataFile.from_args(
431+
content=DataFileContent.POSITION_DELETES,
432+
file_path="s3://dummy/path",
433+
)
434+
435+
mock_delete_file.spec_id = 0
436+
437+
def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]:
438+
"""Mock the manifests method to use the snapshot_id for lookup."""
439+
snapshot_id = self.snapshot_id
440+
if snapshot_id in mock_manifests:
441+
return mock_manifests[snapshot_id]
442+
return []
443+
444+
def mock_fetch_manifest_entry(self: ManifestFile, io: FileIO, discard_deleted: bool = True) -> list[ManifestEntry]:
445+
result = [
446+
ManifestEntry.from_args(
447+
status=ManifestEntryStatus.ADDED, snapshot_id=self.added_snapshot_id, sequence_number=self.min_sequence_number
448+
)
449+
]
450+
451+
result[-1] = ManifestEntry.from_args(
452+
status=ManifestEntryStatus.ADDED,
453+
snapshot_id=self.added_snapshot_id,
454+
sequence_number=10000,
455+
data_file=mock_delete_file,
456+
)
457+
458+
return result
459+
460+
with (
461+
patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect),
462+
patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", new=mock_fetch_manifest_entry),
463+
):
464+
dfi = _added_delete_files(
465+
table=table,
466+
starting_snapshot=newest_snapshot,
467+
data_filter=None,
468+
parent_snapshot=oldest_snapshot,
469+
partition_set=None,
470+
)
471+
472+
assert not dfi.is_empty()
473+
assert dfi.referenced_data_files()[0] == mock_delete_file
474+
475+
476+
def test_validate_no_new_delete_files_raises_on_conflict(
477+
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
478+
) -> None:
479+
table, _ = table_v2_with_extensive_snapshots_and_manifests
480+
oldest_snapshot = table.snapshots()[0]
481+
newest_snapshot = cast(Snapshot, table.current_snapshot())
482+
483+
with patch("pyiceberg.table.update.validate.DeleteFileIndex.is_empty", return_value=False):
484+
with pytest.raises(ValidationException):
485+
_validate_no_new_delete_files(
486+
table=table,
487+
starting_snapshot=newest_snapshot,
488+
data_filter=None,
489+
partition_set=None,
490+
parent_snapshot=oldest_snapshot,
491+
)
492+
493+
494+
def test_validate_no_new_delete_files_for_data_files_raises_on_conflict(
495+
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
496+
) -> None:
497+
table, _ = table_v2_with_extensive_snapshots_and_manifests
498+
oldest_snapshot = table.snapshots()[0]
499+
newest_snapshot = cast(Snapshot, table.current_snapshot())
500+
501+
mocked_data_file = DataFile.from_args()
502+
503+
with patch("pyiceberg.table.update.validate.DeleteFileIndex.for_data_file", return_value=[mocked_data_file]):
504+
with pytest.raises(ValidationException):
505+
_validate_no_new_delete_files_for_data_files(
506+
table=table,
507+
starting_snapshot=newest_snapshot,
508+
data_filter=None,
509+
data_files={mocked_data_file},
510+
parent_snapshot=oldest_snapshot,
511+
)

0 commit comments

Comments
 (0)