Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
beff92b
feat: validation history
jayceslesar Apr 18, 2025
c369720
format
jayceslesar Apr 18, 2025
41bb8a4
almost a working test
jayceslesar Apr 18, 2025
763e9f4
allow content_filter in snapshot.manifests
jayceslesar Apr 18, 2025
f200beb
simplify order of arguments to validation_history
jayceslesar Apr 18, 2025
7f6bf9d
simplify return in snapshot.manifests
jayceslesar Apr 18, 2025
c63cc55
tests passing
jayceslesar Apr 19, 2025
f2f3a88
correct ancestors_between
jayceslesar Apr 19, 2025
74d5569
fix to/from logic and allow optional `to_snapshot` arg in `validation…
jayceslesar Apr 19, 2025
167f9e4
remove a level of nesting with smarter clause
jayceslesar Apr 19, 2025
efe50b4
fix bad accessor
jayceslesar Apr 19, 2025
0793713
fix docstring
jayceslesar Apr 19, 2025
89120aa
[wip] feat: `validate_deleted_data_files`
jayceslesar Apr 19, 2025
a39abd2
first dummy test case
jayceslesar Apr 19, 2025
cf5b061
first working tests (needs cleanup)
jayceslesar Apr 19, 2025
645e8df
Merge branch 'main' into feat/validate-deleted-data-files
jayceslesar May 1, 2025
caf7e36
bring back all code from silly merge
jayceslesar May 1, 2025
a52c422
last tweaks
jayceslesar May 1, 2025
d3df4a7
Merge branch 'main' into feat/validate-deleted-data-files
jayceslesar May 10, 2025
9eca29f
fix order of args in deleted_data_files
jayceslesar May 10, 2025
fc83d42
make `deleted_data_files` private
jayceslesar May 10, 2025
ee3959a
Update pyiceberg/table/update/validate.py
jayceslesar May 10, 2025
a51c2da
ise inclusive metrics evaluator and add rows cannot match check
jayceslesar May 10, 2025
ec86695
show what snapshot IDs conflict
jayceslesar May 10, 2025
0a6b781
maybe correct partition_spec impl?
jayceslesar May 10, 2025
a96da01
fix ordering to match
jayceslesar May 10, 2025
03b2913
make private and add test
jayceslesar May 15, 2025
aae22e6
Merge branch 'main' into feat/validate-deleted-data-files
jayceslesar May 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 78 additions & 1 deletion pyiceberg/table/update/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Iterator, Optional

from pyiceberg.exceptions import ValidationException
from pyiceberg.manifest import ManifestContent, ManifestFile
from pyiceberg.expressions import BooleanExpression
from pyiceberg.expressions.visitors import _StrictMetricsEvaluator
from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
from pyiceberg.table import Table
from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between
from pyiceberg.typedef import Record

VALIDATE_DATA_FILES_EXIST_OPERATIONS = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE}


def validation_history(
Expand Down Expand Up @@ -69,3 +75,74 @@ def validation_history(
raise ValidationException("No matching snapshot found.")

return manifests_files, snapshots


def deleted_data_files(
table: Table,
starting_snapshot: Snapshot,
data_filter: Optional[BooleanExpression],
parent_snapshot: Optional[Snapshot],
partition_set: Optional[set[Record]],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks out of order:

Suggested change
parent_snapshot: Optional[Snapshot],
partition_set: Optional[set[Record]],
partition_set: Optional[set[Record]],
parent_snapshot: Optional[Snapshot],

It looks like the function call in validate_deleted_data_files is assuming that parent_snapshot is the last argument

    conflicting_entries = deleted_data_files(table, starting_snapshot, data_filter, None, parent_snapshot)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs some more work. Next to @sungwy's comment, we in the code below:

(entry.data_file.spec_id, entry.data_file.partition) not in partition_set

Where it checks if a tuple is in a partition_set, but the partition_set only contains the Record according to the signature.

This triggered me, because if you do:

ALTER TABLE prod.db.taxis REPLACE PARTITION FIELD pickup_timestamp WITH day(pickup_timestamp);

-- and then
ALTER TABLE prod.db.taxis REPLACE PARTITION FIELD pickup_timestamp WITH day(dropoff_timestamp);

Both of the partitioning strategies will produce a Record[int] because it will contain the number of days since epoch. But the meaning is completely different.

) -> Iterator[ManifestEntry]:
"""Find deleted data files matching a filter since a starting snapshot.

Args:
table: Table to validate
starting_snapshot: Snapshot current at the start of the operation
data_filter: Expression used to find deleted data files
partition_set: a set of partitions to find deleted data files
parent_snapshot: Ending snapshot on the branch being validated

Returns:
List of deleted data files matching the filter
"""
# if there is no current table state, no files have been deleted
if parent_snapshot is None:
return

manifests, snapshot_ids = validation_history(
table,
starting_snapshot,
parent_snapshot,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks out of order to, because validation_history has to_snapshot as the second argument, and from_snapshot as the third argument.

Suggested change
starting_snapshot,
parent_snapshot,
starting_snapshot,
parent_snapshot,

Do you think it would be better to update validation_history function to use the following function signature instead? I think it's a lot more expected to have from_snapshot then to_snapshot

def validation_history(
    table: Table,
    from_snapshot: Snapshot,
    to_snapshot: Snapshot,
    matching_operations: set[Operation],
    manifest_content_filter: ManifestContent,
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify a little more? Are you saying we should move to the terms starting_snapshot (which is from_snapshot) and parent_snapshot (to_snapshot)

VALIDATE_DATA_FILES_EXIST_OPERATIONS,
ManifestContent.DATA,
)

if data_filter is not None:
evaluator = _StrictMetricsEvaluator(table.schema(), data_filter).eval
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not too sure if this is correct, because ManifestGroup.entries seems to be using inclusive projection.

Should we be using inclusive_projection here instead?

Summoning @Fokko for a second review

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree that this should be inclusive projection, since we want to know if there are any matches. Inclusive projection returns rows_might_match and rows_cannot_match. If they cannot be matched, then we can skip it :)


for manifest in manifests:
for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False):
if entry.snapshot_id not in snapshot_ids:
continue

if entry.status != ManifestEntryStatus.DELETED:
continue

if data_filter is not None and not evaluator(entry.data_file):
continue

if partition_set is not None and (entry.data_file.spec_id, entry.data_file.partition) not in partition_set:
continue

yield entry


def validate_deleted_data_files(
table: Table,
starting_snapshot: Snapshot,
data_filter: Optional[BooleanExpression],
parent_snapshot: Snapshot,
) -> None:
"""Validate that no files matching a filter have been deleted from the table since a starting snapshot.

Args:
table: Table to validate
starting_snapshot: Snapshot current at the start of the operation
data_filter: Expression used to find deleted data files
parent_snapshot: Ending snapshot on the branch being validated

"""
conflicting_entries = deleted_data_files(table, starting_snapshot, data_filter, None, parent_snapshot)
if any(conflicting_entries):
raise ValidationException("Deleted data files were found matching the filter.")
67 changes: 64 additions & 3 deletions tests/table/test_validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@

from pyiceberg.exceptions import ValidationException
from pyiceberg.io import FileIO
from pyiceberg.manifest import ManifestContent, ManifestFile
from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
from pyiceberg.table import Table
from pyiceberg.table.snapshots import Operation, Snapshot
from pyiceberg.table.update.validate import validation_history
from pyiceberg.table.snapshots import Operation, Snapshot, Summary
from pyiceberg.table.update.validate import deleted_data_files, validation_history


@pytest.fixture
Expand Down Expand Up @@ -136,3 +136,64 @@ def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestF
{Operation.APPEND},
ManifestContent.DATA,
)


def test_deleted_data_files(
table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]],
) -> None:
table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests

oldest_snapshot = table.snapshots()[0]
newest_snapshot = cast(Snapshot, table.current_snapshot())

def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]:
"""Mock the manifests method to use the snapshot_id for lookup."""
snapshot_id = self.snapshot_id
if snapshot_id in mock_manifests:
return mock_manifests[snapshot_id]
return []

# every snapshot is an append, so we should get nothing!
with patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect):
result = list(
deleted_data_files(
table=table,
starting_snapshot=newest_snapshot,
data_filter=None,
parent_snapshot=oldest_snapshot,
partition_set=None,
)
)

assert result == []

# modify second to last snapshot to be a delete
snapshots = table.snapshots()
altered_snapshot = snapshots[-2]
altered_snapshot = altered_snapshot.model_copy(update={"summary": Summary(operation=Operation.DELETE)})
snapshots[-2] = altered_snapshot

table.metadata = table.metadata.model_copy(
update={"snapshots": snapshots},
)

my_entry = ManifestEntry.from_args(
status=ManifestEntryStatus.DELETED,
snapshot_id=altered_snapshot.snapshot_id,
)

with (
patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect),
patch("pyiceberg.manifest.ManifestFile.fetch_manifest_entry", return_value=[my_entry]),
):
result = list(
deleted_data_files(
table=table,
starting_snapshot=newest_snapshot,
data_filter=None,
parent_snapshot=oldest_snapshot,
partition_set=None,
)
)

assert result == [my_entry]