From 3b3356a8c4e4902bebed8a219c74116f919ca295 Mon Sep 17 00:00:00 2001 From: jayceslesar Date: Wed, 17 Sep 2025 20:14:11 -0400 Subject: [PATCH] feat(transactions): sort orders --- pyiceberg/table/__init__.py | 17 +++++++ pyiceberg/table/update/__init__.py | 15 +++++++ pyiceberg/table/update/sort_order.py | 67 ++++++++++++++++++++++++++++ tests/integration/test_catalog.py | 24 ++++++++++ 4 files changed, 123 insertions(+) create mode 100644 pyiceberg/table/update/sort_order.py diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 7c63aa79a1..04c8b0487e 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -117,6 +117,7 @@ ) from pyiceberg.table.update.schema import UpdateSchema from pyiceberg.table.update.snapshot import ManageSnapshots, UpdateSnapshot, _FastAppendFiles +from pyiceberg.table.update.sort_order import UpdateSortOrder from pyiceberg.table.update.spec import UpdateSpec from pyiceberg.table.update.statistics import UpdateStatistics from pyiceberg.transforms import IdentityTransform @@ -433,6 +434,14 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive name_mapping=self.table_metadata.name_mapping(), ) + def update_sort_order(self) -> UpdateSortOrder: + """Create a new UpdateSortOrder transaction to alter the sort order of this table. + + Returns: + A new UpdateSortOrder. + """ + return UpdateSortOrder(self) + def update_snapshot( self, snapshot_properties: Dict[str, str] = EMPTY_DICT, branch: Optional[str] = MAIN_BRANCH ) -> UpdateSnapshot: @@ -1291,6 +1300,14 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive name_mapping=self.name_mapping(), ) + def update_sort_order(self) -> UpdateSortOrder: + """Create a new UpdateSortOrder transaction to alter the sort order of this table. + + Returns: + A new UpdateSortOrder. + """ + return UpdateSortOrder(Transaction(self, autocommit=True)) + def name_mapping(self) -> Optional[NameMapping]: """Return the table's field-id NameMapping.""" return self.metadata.name_mapping() diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index c30d960d38..5ec2208bcb 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -119,6 +119,11 @@ class AddSortOrderUpdate(IcebergBaseModel): sort_order: SortOrder = Field(alias="sort-order") +class SetSortOrderUpdate(IcebergBaseModel): + action: Literal["set-sort-order"] = Field(default="set-sort-order") + sort_order: SortOrder = Field(alias="sort-order") + + class SetDefaultSortOrderUpdate(IcebergBaseModel): action: Literal["set-default-sort-order"] = Field(default="set-default-sort-order") sort_order_id: int = Field( @@ -546,6 +551,16 @@ def _(update: AddSortOrderUpdate, base_metadata: TableMetadata, context: _TableM ) +@_apply_table_update.register(SetSortOrderUpdate) +def _(update: AddSortOrderUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata: + context.add_update(update) + return base_metadata.model_copy( + update={ + "sort_orders": [update.sort_order], + } + ) + + @_apply_table_update.register(SetDefaultSortOrderUpdate) def _( update: SetDefaultSortOrderUpdate, diff --git a/pyiceberg/table/update/sort_order.py b/pyiceberg/table/update/sort_order.py new file mode 100644 index 0000000000..fd7676a118 --- /dev/null +++ b/pyiceberg/table/update/sort_order.py @@ -0,0 +1,67 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from typing import TYPE_CHECKING, Tuple + +from pyiceberg.table.sorting import SortOrder +from pyiceberg.table.update import ( + AddSortOrderUpdate, + SetSortOrderUpdate, + TableUpdate, + UpdatesAndRequirements, + UpdateTableMetadata, +) + +if TYPE_CHECKING: + from pyiceberg.table import Transaction + + +class UpdateSortOrder(UpdateTableMetadata["UpdateSortOrder"]): + """ + Run sort order management operations using APIs. + + APIs include set_sort_order the operation. + + Use table.update_sort_order().().commit() to run a specific operation. + Use table.update_sort_order().().().commit() to run multiple operations. + + Pending changes are applied on commit. + + We can also use context managers to make more changes. For example: + + new_sort_order = SortOrder(SortField(source_id=2, transform='identity')) + + with table.update_sort_order() as update: + update.set_sort_order(sort_order=new_sort_order) + """ + + _updates: Tuple[TableUpdate, ...] = () + + def __init__(self, transaction: "Transaction") -> None: + super().__init__(transaction) + + def add_sort_order(self, sort_order: SortOrder) -> "UpdateSortOrder": + self._updates += (AddSortOrderUpdate(sort_order=sort_order),) + + return self + + def set_sort_order(self, sort_order: SortOrder) -> "UpdateSortOrder": + self._updates += (SetSortOrderUpdate(sort_order=sort_order),) + + return self + + def _commit(self) -> UpdatesAndRequirements: + return self._updates, () diff --git a/tests/integration/test_catalog.py b/tests/integration/test_catalog.py index be93382daa..a3719beab1 100644 --- a/tests/integration/test_catalog.py +++ b/tests/integration/test_catalog.py @@ -34,6 +34,7 @@ ) from pyiceberg.io import WAREHOUSE from pyiceberg.schema import Schema +from pyiceberg.table.sorting import SortOrder from tests.conftest import clean_up @@ -343,3 +344,26 @@ def test_update_namespace_properties(test_catalog: Catalog, database_name: str) else: assert k in update_report.removed assert "updated test description" == test_catalog.load_namespace_properties(database_name)["comment"] + + +@pytest.mark.integration +@pytest.mark.parametrize("test_catalog", CATALOGS) +def test_update_sort_order(test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str) -> None: + new_database_name = f"{database_name}_new" + test_catalog.create_namespace(database_name) + test_catalog.create_namespace(new_database_name) + + identifier = (database_name, table_name) + table = test_catalog.create_table(identifier, table_schema_nested) + new_sort_order = SortOrder(order_id=table.sort_order().order_id + 1) + + print("before") + print(table.metadata.sort_orders) + table.update_sort_order().set_sort_order(new_sort_order).commit() + + table = table.refresh() + print("after") + print(table.metadata.sort_orders) + print(table.sort_order(), "I error :(") + + assert table.sort_order() == new_sort_order