Skip to content

Commit f7a7a87

Browse files
author
Yingjian Wu
committed
abort the whole transaction if any update on the chain has failed
1 parent ff3a249 commit f7a7a87

File tree

2 files changed

+32
-3
lines changed

2 files changed

+32
-3
lines changed

pyiceberg/table/__init__.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from dataclasses import dataclass
2424
from functools import cached_property
2525
from itertools import chain
26+
from types import TracebackType
2627
from typing import (
2728
TYPE_CHECKING,
2829
Any,
@@ -33,6 +34,7 @@
3334
Optional,
3435
Set,
3536
Tuple,
37+
Type,
3638
TypeVar,
3739
Union,
3840
)
@@ -231,9 +233,13 @@ def __enter__(self) -> Transaction:
231233
"""Start a transaction to update the table."""
232234
return self
233235

234-
def __exit__(self, _: Any, value: Any, traceback: Any) -> None:
235-
"""Close and commit the transaction."""
236-
self.commit_transaction()
236+
def __exit__(
237+
self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType]
238+
) -> None:
239+
"""Close and commit the transaction, or handle exceptions."""
240+
# Only commit the full transaction, if there is no exception in all updates on the chain
241+
if exctype is None and excinst is None and exctb is None:
242+
self.commit_transaction()
237243

238244
def _apply(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...] = ()) -> Transaction:
239245
"""Check if the requirements are met, and applies the updates to the metadata."""

tests/catalog/test_base.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -766,3 +766,26 @@ def test_table_properties_raise_for_none_value(catalog: InMemoryCatalog) -> None
766766
with pytest.raises(ValidationError) as exc_info:
767767
_ = given_catalog_has_a_table(catalog, properties=property_with_none)
768768
assert "None type is not a supported value in properties: property_name" in str(exc_info.value)
769+
770+
771+
def test_abort_table_transaction_on_exception(catalog: InMemoryCatalog) -> None:
772+
tbl = given_catalog_has_a_table(catalog)
773+
# Populate some initial data
774+
data = pa.Table.from_pylist(
775+
[{"x": 1, "y": 2, "z": 3}, {"x": 4, "y": 5, "z": 6}],
776+
schema=TEST_TABLE_SCHEMA.as_arrow(),
777+
)
778+
tbl.append(data)
779+
780+
# Data to overwrite
781+
data = pa.Table.from_pylist(
782+
[{"x": 7, "y": 8, "z": 9}, {"x": 7, "y": 8, "z": 9}, {"x": 7, "y": 8, "z": 9}],
783+
schema=TEST_TABLE_SCHEMA.as_arrow(),
784+
)
785+
786+
with pytest.raises(ValueError):
787+
with tbl.transaction() as txn:
788+
txn.overwrite(data)
789+
raise ValueError
790+
791+
assert len(tbl.scan().to_pandas()) == 2 # type: ignore

0 commit comments

Comments
 (0)