Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 87 additions & 22 deletions pyiceberg/table/update/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,95 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Iterator, Optional
from typing import Callable, Iterator, Optional

from pyiceberg.exceptions import ValidationException
from pyiceberg.expressions import BooleanExpression
from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, _InclusiveMetricsEvaluator
from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
from pyiceberg.manifest import DataFile, ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile
from pyiceberg.table import Table
from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between
from pyiceberg.typedef import Record

VALIDATE_DATA_FILES_EXIST_OPERATIONS = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE}


class ManifestEntryEvaluator:
"""Evaluate manifests against given conditions."""

def __init__(
self,
manifests: list[ManifestFile],
table: Table,
snapshot_ids: Optional[set[int]] = None,
status: Optional[ManifestEntryStatus] = None,
data_filter: Optional[BooleanExpression] = None,
partition_set: Optional[dict[int, set[Record]]] = None,
):
self.manifests = manifests
self.table = table
self.snapshot_ids = snapshot_ids if snapshot_ids is not None else []
self.status = status
self.partition_set = partition_set

if data_filter is not None:
self.metrics_evaluator: Optional[Callable[[DataFile], bool]] = _InclusiveMetricsEvaluator(
self.table.schema(), data_filter
).eval
else:
self.metrics_evaluator = None

def _evaluate_snapshot_ids(self, entry: ManifestEntry) -> bool:
"""Check if the entry's snapshot ID is in the filter set."""
if entry.snapshot_id in self.snapshot_ids:
return True
else:
return False

def _evaluate_status(self, entry: ManifestEntry) -> bool:
"""Check if the entry's status matches the filter."""
if self.status is None or entry.status == self.status:
return True
else:
return False

def _evaluate_data_filter(self, entry: ManifestEntry) -> bool:
"""Check if the entry's data file matches the data filter."""
if self.metrics_evaluator is None:
return True
if self.metrics_evaluator(entry.data_file) is ROWS_CANNOT_MATCH:
return False
return True

def _evaluate_partition_set(self, entry: ManifestEntry) -> bool:
"""Check if the entry's partition matches the partition set."""
if self.partition_set is None:
return True
spec_id = entry.data_file.spec_id
partition = entry.data_file.partition
if spec_id not in self.partition_set or partition not in self.partition_set[spec_id]:
return False
return True

def evaluate(self) -> Iterator[ManifestEntry]:
"""Evaluate the manifests against the given conditions."""
for manifest in self.manifests:
for entry in manifest.fetch_manifest_entry(self.table.io, discard_deleted=False):
if not self._evaluate_snapshot_ids(entry):
continue

if not self._evaluate_status(entry):
continue

if not self._evaluate_data_filter(entry):
continue

if not self._evaluate_partition_set(entry):
continue

yield entry


def validation_history(
table: Table,
from_snapshot: Snapshot,
Expand Down Expand Up @@ -108,27 +184,16 @@ def _deleted_data_files(
ManifestContent.DATA,
)

if data_filter is not None:
evaluator = _InclusiveMetricsEvaluator(table.schema(), data_filter).eval

for manifest in manifests:
for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False):
if entry.snapshot_id not in snapshot_ids:
continue

if entry.status != ManifestEntryStatus.DELETED:
continue

if data_filter is not None and evaluator(entry.data_file) is ROWS_CANNOT_MATCH:
continue

if partition_set is not None:
spec_id = entry.data_file.spec_id
partition = entry.data_file.partition
if spec_id not in partition_set or partition not in partition_set[spec_id]:
continue
manifest_evaluator = ManifestEntryEvaluator(
manifests,
table,
snapshot_ids,
ManifestEntryStatus.DELETED,
data_filter,
partition_set,
)

yield entry
yield from manifest_evaluator.evaluate()


def _validate_deleted_data_files(
Expand Down