From d0dd75aa04eb76997d6223d27ba9f5967fda8bac Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Sun, 1 Jun 2025 14:15:28 -0400 Subject: [PATCH] feat: `ManifestEntryEvaluator` --- pyiceberg/table/update/validate.py | 109 +++++++++++++++++++++++------ 1 file changed, 87 insertions(+), 22 deletions(-) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py index 55c34676e3..c49a4292ca 100644 --- a/pyiceberg/table/update/validate.py +++ b/pyiceberg/table/update/validate.py @@ -14,12 +14,12 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from typing import Iterator, Optional +from typing import Callable, Iterator, Optional from pyiceberg.exceptions import ValidationException from pyiceberg.expressions import BooleanExpression from pyiceberg.expressions.visitors import ROWS_CANNOT_MATCH, _InclusiveMetricsEvaluator -from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile +from pyiceberg.manifest import DataFile, ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile from pyiceberg.table import Table from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between from pyiceberg.typedef import Record @@ -27,6 +27,82 @@ VALIDATE_DATA_FILES_EXIST_OPERATIONS = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE} +class ManifestEntryEvaluator: + """Evaluate manifests against given conditions.""" + + def __init__( + self, + manifests: list[ManifestFile], + table: Table, + snapshot_ids: Optional[set[int]] = None, + status: Optional[ManifestEntryStatus] = None, + data_filter: Optional[BooleanExpression] = None, + partition_set: Optional[dict[int, set[Record]]] = None, + ): + self.manifests = manifests + self.table = table + self.snapshot_ids = snapshot_ids if snapshot_ids is not None else [] + self.status = status + self.partition_set = partition_set + + if data_filter is not None: + self.metrics_evaluator: Optional[Callable[[DataFile], bool]] = _InclusiveMetricsEvaluator( + self.table.schema(), data_filter + ).eval + else: + self.metrics_evaluator = None + + def _evaluate_snapshot_ids(self, entry: ManifestEntry) -> bool: + """Check if the entry's snapshot ID is in the filter set.""" + if entry.snapshot_id in self.snapshot_ids: + return True + else: + return False + + def _evaluate_status(self, entry: ManifestEntry) -> bool: + """Check if the entry's status matches the filter.""" + if self.status is None or entry.status == self.status: + return True + else: + return False + + def _evaluate_data_filter(self, entry: ManifestEntry) -> bool: + """Check if the entry's data file matches the data filter.""" + if self.metrics_evaluator is None: + return True + if self.metrics_evaluator(entry.data_file) is ROWS_CANNOT_MATCH: + return False + return True + + def _evaluate_partition_set(self, entry: ManifestEntry) -> bool: + """Check if the entry's partition matches the partition set.""" + if self.partition_set is None: + return True + spec_id = entry.data_file.spec_id + partition = entry.data_file.partition + if spec_id not in self.partition_set or partition not in self.partition_set[spec_id]: + return False + return True + + def evaluate(self) -> Iterator[ManifestEntry]: + """Evaluate the manifests against the given conditions.""" + for manifest in self.manifests: + for entry in manifest.fetch_manifest_entry(self.table.io, discard_deleted=False): + if not self._evaluate_snapshot_ids(entry): + continue + + if not self._evaluate_status(entry): + continue + + if not self._evaluate_data_filter(entry): + continue + + if not self._evaluate_partition_set(entry): + continue + + yield entry + + def validation_history( table: Table, from_snapshot: Snapshot, @@ -108,27 +184,16 @@ def _deleted_data_files( ManifestContent.DATA, ) - if data_filter is not None: - evaluator = _InclusiveMetricsEvaluator(table.schema(), data_filter).eval - - for manifest in manifests: - for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False): - if entry.snapshot_id not in snapshot_ids: - continue - - if entry.status != ManifestEntryStatus.DELETED: - continue - - if data_filter is not None and evaluator(entry.data_file) is ROWS_CANNOT_MATCH: - continue - - if partition_set is not None: - spec_id = entry.data_file.spec_id - partition = entry.data_file.partition - if spec_id not in partition_set or partition not in partition_set[spec_id]: - continue + manifest_evaluator = ManifestEntryEvaluator( + manifests, + table, + snapshot_ids, + ManifestEntryStatus.DELETED, + data_filter, + partition_set, + ) - yield entry + yield from manifest_evaluator.evaluate() def _validate_deleted_data_files(