Skip to content

Commit 6eb24c9

Browse files
committed
fix: raise error when creating a table during a transaction
1 parent eb71a1f commit 6eb24c9

File tree

2 files changed

+86
-29
lines changed

2 files changed

+86
-29
lines changed

pyiceberg/catalog/s3tables.py

Lines changed: 50 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
3636
from pyiceberg.schema import Schema
3737
from pyiceberg.serializers import FromInputFile
38-
from pyiceberg.table import CommitTableResponse, Table
38+
from pyiceberg.table import CommitTableResponse, CreateTableTransaction, Table
3939
from pyiceberg.table.metadata import new_table_metadata
4040
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
4141
from pyiceberg.table.update import TableRequirement, TableUpdate
@@ -92,36 +92,57 @@ def commit_table(
9292
table_identifier = table.name()
9393
database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError)
9494

95-
current_table, version_token = self._load_table_and_version(identifier=table_identifier)
96-
97-
updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates)
98-
if current_table and updated_staged_table.metadata == current_table.metadata:
99-
# no changes, do nothing
100-
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
101-
102-
self._write_metadata(
103-
metadata=updated_staged_table.metadata,
104-
io=updated_staged_table.io,
105-
metadata_path=updated_staged_table.metadata_location,
106-
overwrite=True,
107-
)
108-
109-
# try to update metadata location which will fail if the versionToken changed meanwhile
95+
current_table: Optional[Table]
96+
version_token: Optional[str]
11097
try:
111-
self.s3tables.update_table_metadata_location(
112-
tableBucketARN=self.table_bucket_arn,
113-
namespace=database_name,
114-
name=table_name,
115-
versionToken=version_token,
116-
metadataLocation=updated_staged_table.metadata_location,
98+
current_table, version_token = self._load_table_and_version(identifier=table_identifier)
99+
except NoSuchTableError:
100+
current_table = None
101+
version_token = None
102+
103+
if current_table:
104+
updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates)
105+
if updated_staged_table.metadata == current_table.metadata:
106+
# no changes, do nothing
107+
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
108+
109+
self._write_metadata(
110+
metadata=updated_staged_table.metadata,
111+
io=updated_staged_table.io,
112+
metadata_path=updated_staged_table.metadata_location,
113+
overwrite=True,
117114
)
118-
except self.s3tables.exceptions.ConflictException as e:
119-
raise CommitFailedException(
120-
f"Cannot commit {database_name}.{table_name} because of a concurrent update to the table version {version_token}."
121-
) from e
122-
return CommitTableResponse(
123-
metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location
124-
)
115+
116+
# try to update metadata location which will fail if the versionToken changed meanwhile
117+
try:
118+
self.s3tables.update_table_metadata_location(
119+
tableBucketARN=self.table_bucket_arn,
120+
namespace=database_name,
121+
name=table_name,
122+
versionToken=version_token,
123+
metadataLocation=updated_staged_table.metadata_location,
124+
)
125+
except self.s3tables.exceptions.ConflictException as e:
126+
raise CommitFailedException(
127+
f"Cannot commit {database_name}.{table_name} because of a concurrent update to the table version {version_token}."
128+
) from e
129+
return CommitTableResponse(
130+
metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location
131+
)
132+
else:
133+
# table does not exist, create it
134+
raise NotImplementedError("Creating a table on commit is currently not supported.")
135+
136+
def create_table_transaction(
137+
self,
138+
identifier: Union[str, Identifier],
139+
schema: Union[Schema, "pa.Schema"],
140+
location: Optional[str] = None,
141+
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
142+
sort_order: SortOrder = UNSORTED_SORT_ORDER,
143+
properties: Properties = EMPTY_DICT,
144+
) -> CreateTableTransaction:
145+
raise NotImplementedError("create_table_transaction currently not supported.")
125146

126147
def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:
127148
if properties:

tests/catalog/integration_test_s3tables.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
from pyiceberg.catalog.s3tables import S3TablesCatalog
2020
from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError, TableBucketNotFound
21+
from pyiceberg.partitioning import PartitionField, PartitionSpec
2122
from pyiceberg.schema import Schema
23+
from pyiceberg.transforms import IdentityTransform
2224
from pyiceberg.types import IntegerType
2325

2426

@@ -247,3 +249,37 @@ def test_commit_new_data_to_table(catalog: S3TablesCatalog, database_name: str,
247249
assert updated_table_metadata.metadata_log[-1].metadata_file == original_table_metadata_location
248250
assert updated_table_metadata.metadata_log[-1].timestamp_ms == original_table_last_updated_ms
249251
assert table.scan().to_arrow().num_rows == 2 * row_count
252+
253+
254+
def test_create_table_transaction(
255+
catalog: S3TablesCatalog, database_name: str, table_name: str, table_schema_nested: str
256+
) -> None:
257+
identifier = (database_name, table_name)
258+
catalog.create_namespace(namespace=database_name)
259+
260+
with catalog.create_table_transaction(
261+
identifier,
262+
table_schema_nested,
263+
partition_spec=PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="foo")),
264+
) as txn:
265+
last_updated_metadata = txn.table_metadata.last_updated_ms
266+
with txn.update_schema() as update_schema:
267+
update_schema.add_column(path="b", field_type=IntegerType())
268+
269+
with txn.update_spec() as update_spec:
270+
update_spec.add_identity("bar")
271+
272+
txn.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c")
273+
274+
table = catalog.load_table(identifier)
275+
276+
assert table.schema().find_field("b").field_type == IntegerType()
277+
assert table.properties == {"test_a": "test_aa", "test_b": "test_b", "test_c": "test_c"}
278+
assert table.spec().last_assigned_field_id == 1001
279+
assert table.spec().fields_by_source_id(1)[0].name == "foo"
280+
assert table.spec().fields_by_source_id(1)[0].field_id == 1000
281+
assert table.spec().fields_by_source_id(1)[0].transform == IdentityTransform()
282+
assert table.spec().fields_by_source_id(2)[0].name == "bar"
283+
assert table.spec().fields_by_source_id(2)[0].field_id == 1001
284+
assert table.spec().fields_by_source_id(2)[0].transform == IdentityTransform()
285+
assert table.metadata.last_updated_ms > last_updated_metadata

0 commit comments

Comments
 (0)