|
80 | 80 | from pyiceberg.utils.bin_packing import ListPacker |
81 | 81 | from pyiceberg.utils.concurrent import ExecutorFactory |
82 | 82 | from pyiceberg.utils.properties import property_as_bool, property_as_int |
| 83 | +from pyiceberg.utils.snapshot import ancestors_between |
83 | 84 |
|
84 | 85 | if TYPE_CHECKING: |
85 | 86 | from pyiceberg.table import Transaction |
@@ -251,6 +252,12 @@ def _commit(self) -> UpdatesAndRequirements: |
251 | 252 | ) |
252 | 253 | location_provider = self._transaction._table.location_provider() |
253 | 254 | manifest_list_file_path = location_provider.new_metadata_location(file_name) |
| 255 | + |
| 256 | + # get current snapshot id and starting snapshot id, and validate that there are no conflicts |
| 257 | + starting_snapshot_id = self._parent_snapshot_id |
| 258 | + current_snapshot_id = self._transaction._table.refresh().metadata.current_snapshot_id |
| 259 | + self._validate(starting_snapshot_id, current_snapshot_id) |
| 260 | + |
254 | 261 | with write_manifest_list( |
255 | 262 | format_version=self._transaction.table_metadata.format_version, |
256 | 263 | output_file=self._io.new_output(manifest_list_file_path), |
@@ -279,6 +286,27 @@ def _commit(self) -> UpdatesAndRequirements: |
279 | 286 | (AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"),), |
280 | 287 | ) |
281 | 288 |
|
| 289 | + def _validate(self, starting_snapshot_id: Optional[int], current_snapshot_id: Optional[int]) -> None: |
| 290 | + # get all the snapshots between the current snapshot id and the parent id |
| 291 | + snapshots = ancestors_between(starting_snapshot_id, current_snapshot_id, self._transaction._table.metadata.snapshot_by_id) |
| 292 | + |
| 293 | + # Define allowed operations for each type of operation |
| 294 | + allowed_operations = { |
| 295 | + Operation.APPEND: {Operation.APPEND, Operation.REPLACE, Operation.OVERWRITE, Operation.DELETE}, |
| 296 | + Operation.REPLACE: {Operation.APPEND}, |
| 297 | + Operation.OVERWRITE: set(), |
| 298 | + Operation.DELETE: set(), |
| 299 | + } |
| 300 | + |
| 301 | + for snapshot in snapshots: |
| 302 | + snapshot_operation = snapshot.summary.operation |
| 303 | + |
| 304 | + if snapshot_operation not in allowed_operations[self._operation]: |
| 305 | + raise ValueError( |
| 306 | + f"Operation {snapshot_operation} is not allowed when performing {self._operation}. " |
| 307 | + "Check for overlaps or conflicts." |
| 308 | + ) |
| 309 | + |
282 | 310 | @property |
283 | 311 | def snapshot_id(self) -> int: |
284 | 312 | return self._snapshot_id |
|
0 commit comments