Skip to content
Closed
Show file tree
Hide file tree
Changes from 48 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
ccfcbcd
chore: scaffolding
JasperHG90 Jan 5, 2025
95da8f3
chore: scaffolding
JasperHG90 Jan 5, 2025
253967d
chore: add skeleton for asc/desc methods
JasperHG90 Jan 5, 2025
7b5a98e
chore: scaffolding
JasperHG90 Jan 5, 2025
304a806
chore: change method names
JasperHG90 Jan 5, 2025
48ac5c0
chore: update methods
JasperHG90 Jan 7, 2025
a47067c
chore: update methods
JasperHG90 Jan 7, 2025
c1ab2ec
chore: update imports
JasperHG90 Jan 7, 2025
8f27d14
chore: stupid renames
JasperHG90 Jan 7, 2025
d8720f2
chore: lint
JasperHG90 Jan 7, 2025
90db60a
chore: docstrings
JasperHG90 Jan 7, 2025
1cd2729
test: add integration test for replace sort order
JasperHG90 Jan 8, 2025
0a1e781
test: add test for lookup
JasperHG90 Jan 8, 2025
a550ccb
refactor: add last sort order id
JasperHG90 Jan 8, 2025
8b09255
refactor: add last sort order id and increment
JasperHG90 Jan 8, 2025
67b9e52
chore: add imports
JasperHG90 Jan 8, 2025
dcaa63f
feat: add apply and commit methods
JasperHG90 Jan 8, 2025
ced6a4b
test: remove spec stuff
JasperHG90 Jan 8, 2025
43e09a3
chore: remove unused import
JasperHG90 Jan 8, 2025
b460c34
chore: add ReplaceSortOrder to the Transaction class
JasperHG90 Jan 10, 2025
190071f
chore: lint
JasperHG90 Jan 10, 2025
e9475de
Merge branch 'apache:main' into feat/update-sort-order
JasperHG90 Jan 10, 2025
eafafaf
Merge branch 'apache:main' into feat/update-sort-order
JasperHG90 Jan 24, 2025
ec5f711
chore: renames (replace to update)
JasperHG90 Jan 24, 2025
d69a071
chore: renames (replace to update)
JasperHG90 Jan 24, 2025
b5a5bd8
test: add test updating sort order
JasperHG90 Jan 24, 2025
8080fa5
refactor: remove the sort order builder
JasperHG90 Jan 24, 2025
e77a2c1
chore: remove sort order builder
JasperHG90 Jan 24, 2025
fc32b28
chore: lint
JasperHG90 Jan 24, 2025
fa1aa50
chore: update comment
JasperHG90 Jan 25, 2025
1aa6270
Merge branch 'apache:main' into feat/update-sort-order
JasperHG90 Feb 16, 2025
2e9cd3f
test: parametrize over iceberg format versions and remove unnused code
JasperHG90 Feb 16, 2025
137dbd9
chore: fmt
JasperHG90 Feb 16, 2025
d8b9001
Update pyiceberg/table/update/sorting.py
JasperHG90 Feb 16, 2025
58d302d
Update pyiceberg/table/__init__.py
JasperHG90 Feb 16, 2025
0b543b8
Merge branch 'feat/update-sort-order' of https://github.com/JasperHG9…
JasperHG90 Feb 16, 2025
fd0e287
chore: add arg
JasperHG90 Feb 16, 2025
5e57697
chore: fmt
JasperHG90 Feb 16, 2025
5131903
docs: update docs
JasperHG90 Feb 16, 2025
cc1ae1c
chore: set default
JasperHG90 Feb 16, 2025
2ae47e3
Merge branch 'apache:main' into feat/update-sort-order
JasperHG90 Mar 23, 2025
d99dfdd
chore: lint and update names
JasperHG90 Mar 23, 2025
9d77f3f
chore: determine if a sort order is newly added. If so, set the last …
JasperHG90 Mar 24, 2025
3f6a953
test: add test for re-using previously defineed sort order
JasperHG90 Mar 24, 2025
7223fdd
chore: fmt
JasperHG90 Mar 24, 2025
18ced3f
chore: only set last assigned order id when sure that a new sort orde…
JasperHG90 Mar 24, 2025
8f425fb
test: add test for reverting back to unsorted sort order
JasperHG90 Mar 24, 2025
f9efab3
chore: fmt
JasperHG90 Mar 24, 2025
03f0a1b
Merge branch 'apache:main' into feat/update-sort-order
JasperHG90 Apr 24, 2025
46fd88b
Merge branch 'apache:main' into feat/update-sort-order
JasperHG90 May 6, 2025
353178f
Make the CI happy
Fokko Sep 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1259,6 +1259,24 @@ with table.update_spec() as update:
update.rename_field("bucketed_id", "sharded_id")
```

## Sort order updates

Users can update the sort order on existing tables for new data. See [sorting](https://iceberg.apache.org/spec/#sorting) for more details.

The API to use when updating a sort order is the `update_sort_order` API on the table.

Sort orders can only be updated by adding a new sort order. They cannot be deleted or modified.

### Updating a sort order on a table

To create a new sort order, you can use either the `asc` or `desc` API depending on whether you want you data sorted in ascending or descending order. Both take the name of the field, the sort order transform, and a null order that describes the order of null values when sorted.

```python
with table.update_sort_order() as update:
update.desc("event_ts", DayTransform(), NullOrder.NULLS_FIRST)
update.asc("some_field", IdentityTransform(), NullOrder.NULLS_LAST)
```

## Table properties

Set and remove properties through the `Transaction` API:
Expand Down
23 changes: 23 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
UpdateSnapshot,
_FastAppendFiles,
)
from pyiceberg.table.update.sorting import UpdateSortOrder
from pyiceberg.table.update.spec import UpdateSpec
from pyiceberg.table.update.statistics import UpdateStatistics
from pyiceberg.transforms import IdentityTransform
Expand Down Expand Up @@ -430,6 +431,20 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive
name_mapping=self.table_metadata.name_mapping(),
)

def update_sort_order(self, case_sensitive: bool = True) -> UpdateSortOrder:
"""Create a new UpdateSortOrder to update the sort order of this table.

Args:
case_sensitive: If field names are case-sensitive.

Returns:
A new UpdateSortOrder.
"""
return UpdateSortOrder(
self,
case_sensitive=case_sensitive,
)

def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> UpdateSnapshot:
"""Create a new UpdateSnapshot to produce a new snapshot for the table.

Expand Down Expand Up @@ -1103,6 +1118,14 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive
name_mapping=self.name_mapping(),
)

def update_sort_order(self, case_sensitive: bool = True) -> UpdateSortOrder:
"""Create a new UpdateSortOrder to update the sort order of this table.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
Args:
case_sensitive: If field names are case-sensitive.

Returns:
A new UpdateSortOrder.
"""
return UpdateSortOrder(transaction=Transaction(self, autocommit=True), case_sensitive=case_sensitive)

def name_mapping(self) -> Optional[NameMapping]:
"""Return the table's field-id NameMapping."""
return self.metadata.name_mapping()
Expand Down
136 changes: 136 additions & 0 deletions pyiceberg/table/update/sorting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING, Any, List, Optional, Tuple

from pyiceberg.table.sorting import INITIAL_SORT_ORDER_ID, UNSORTED_SORT_ORDER, NullOrder, SortDirection, SortField, SortOrder
from pyiceberg.table.update import (
AddSortOrderUpdate,
AssertDefaultSortOrderId,
SetDefaultSortOrderUpdate,
TableRequirement,
TableUpdate,
UpdatesAndRequirements,
UpdateTableMetadata,
)
from pyiceberg.transforms import Transform

if TYPE_CHECKING:
from pyiceberg.table import Transaction


class UpdateSortOrder(UpdateTableMetadata["UpdateSortOrder"]):
_transaction: Transaction
_last_assigned_order_id: Optional[int]
_case_sensitive: bool
_fields: List[SortField]

def __init__(self, transaction: Transaction, case_sensitive: bool = True) -> None:
super().__init__(transaction)
self._fields: List[SortField] = []
self._case_sensitive: bool = case_sensitive
self._last_assigned_order_id: Optional[int] = None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kevinjqliu this is now None by default


def _column_name_to_id(self, column_name: str) -> int:
"""Map the column name to the column field id."""
return (
self._transaction.table_metadata.schema()
.find_field(
name_or_id=column_name,
case_sensitive=self._case_sensitive,
)
.field_id
)

def _add_sort_field(
self,
source_id: int,
transform: Transform[Any, Any],
direction: SortDirection,
null_order: NullOrder,
) -> UpdateSortOrder:
"""Add a sort field to the sort order list."""
self._fields.append(
SortField(
source_id=source_id,
transform=transform,
direction=direction,
null_order=null_order,
)
)
return self

def _reuse_or_create_sort_order_id(self) -> int:
"""Return the last assigned sort order id or create a new one."""
new_sort_order_id = INITIAL_SORT_ORDER_ID
for sort_order in self._transaction.table_metadata.sort_orders:
new_sort_order_id = max(new_sort_order_id, sort_order.order_id)
if sort_order.fields == self._fields:
return sort_order.order_id
elif new_sort_order_id <= sort_order.order_id:
new_sort_order_id = sort_order.order_id + 1
return new_sort_order_id
Comment on lines +78 to +87
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kevinjqliu added this to reuse an existing sort order instead of creating a new one


def asc(
self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder = NullOrder.NULLS_LAST
) -> UpdateSortOrder:
"""Add a sort field with ascending order."""
return self._add_sort_field(
source_id=self._column_name_to_id(source_column_name),
transform=transform,
direction=SortDirection.ASC,
null_order=null_order,
)

def desc(
self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder = NullOrder.NULLS_LAST
) -> UpdateSortOrder:
"""Add a sort field with descending order."""
return self._add_sort_field(
source_id=self._column_name_to_id(source_column_name),
transform=transform,
direction=SortDirection.DESC,
null_order=null_order,
)
Comment on lines +89 to +109
Copy link
Contributor

@jayceslesar jayceslesar Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these separate functions instead of letting the user specify via constructing and passing in sort field objects? We already have an API for describing a sort field so why abstract it? If the sort order arguments change now we have to take care of it here too

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    def _add_sort_field(
        self,
        source_id: int,
        transform: Transform[Any, Any],
        direction: SortDirection,
        null_order: NullOrder,
    ) -> UpdateSortOrder:
        """Add a sort field to the sort order list."""
        self._fields.append(
            SortField(
                source_id=source_id,
                transform=transform,
                direction=direction,
                null_order=null_order,
            )
        )
        return self

Why do we have this wrapper is my question, can we not acheive the same functionality by letting the user pass in a list of SortField objects, which would mimic the API for creating a table:

sort_order = SortOrder(SortField(source_id=2, transform='identity'))

catalog.create_table(
    identifier="docs_example.bids",
    schema=schema,
    partition_spec=partition_spec,
    sort_order=sort_order,
)


def _apply(self) -> SortOrder:
"""Return the sort order."""
if next(iter(self._fields), None) is None:
return UNSORTED_SORT_ORDER
else:
return SortOrder(*self._fields, order_id=self._reuse_or_create_sort_order_id())

def _commit(self) -> UpdatesAndRequirements:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this implementation defers from the java one. heres the code path for the java implementation 1, 2

In particular, the java implementation tries to retry sort order whenever possible.
i dont think self._last_sort_order_id: int = transaction.table_metadata.default_sort_order_id and then using order_id=self._last_sort_order_id + 1 is correct since default_sort_order_id might be always be the highest sort order.
_last_sort_order_id should default to null and only changed when a new sort order is added.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kevinjqliu . I'm not following you:

  • In particular, the java implementation tries to retry sort order whenever possible. : I looked at the Java code, and with my limited ability to parse it I'm not sure what you mean here. If no sort order has been added, it throws an exception. Else it appears to do an incrementation based on all possible sort orders (see here). When would a user add a new sort order that matches an existing sort order exactly?
  • is correct since default_sort_order_id might be always be the highest sort order: The 'highest' meaning the latest? Why is it a problem to take the value of the default one as the value to be incremented? This would be a problem if the default sort order wasn't the highest one, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the code to return a sort order with an existing sort order id if the sort order has been previously used.

"""Apply the pending changes and commit."""
new_sort_order = self._apply()
requirements: Tuple[TableRequirement, ...] = ()
updates: Tuple[TableUpdate, ...] = ()

if (
self._transaction.table_metadata.default_sort_order_id != new_sort_order.order_id
and self._transaction.table_metadata.sort_order_by_id(new_sort_order.order_id) is None
):
self._last_assigned_order_id = new_sort_order.order_id
updates = (AddSortOrderUpdate(sort_order=new_sort_order), SetDefaultSortOrderUpdate(sort_order_id=-1))
Comment on lines +124 to +129
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kevinjqliu last assigned sort order only set if new sort order is created.

else:
updates = (SetDefaultSortOrderUpdate(sort_order_id=new_sort_order.order_id),)

required_last_assigned_sort_order_id = self._transaction.table_metadata.default_sort_order_id
requirements = (AssertDefaultSortOrderId(default_sort_order_id=required_last_assigned_sort_order_id),)

return updates, requirements
166 changes: 166 additions & 0 deletions tests/integration/test_sort_order_update.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint:disable=redefined-outer-name

import pytest

from pyiceberg.catalog import Catalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.schema import Schema
from pyiceberg.table import Table
from pyiceberg.table.sorting import NullOrder, SortDirection, SortField, SortOrder
from pyiceberg.transforms import (
IdentityTransform,
)


def _simple_table(catalog: Catalog, table_schema_simple: Schema, format_version: str) -> Table:
return _create_table_with_schema(catalog, table_schema_simple, format_version)


def _create_table_with_schema(catalog: Catalog, schema: Schema, format_version: str) -> Table:
tbl_name = "default.test_schema_evolution"
try:
catalog.drop_table(tbl_name)
except NoSuchTableError:
pass
return catalog.create_table(identifier=tbl_name, schema=schema, properties={"format-version": format_version})


@pytest.mark.integration
@pytest.mark.parametrize(
"catalog, format_version",
[
(pytest.lazy_fixture("session_catalog"), "1"),
(pytest.lazy_fixture("session_catalog_hive"), "1"),
(pytest.lazy_fixture("session_catalog"), "2"),
(pytest.lazy_fixture("session_catalog_hive"), "2"),
],
)
def test_map_column_name_to_id(catalog: Catalog, format_version: str, table_schema_simple: Schema) -> None:
simple_table = _simple_table(catalog, table_schema_simple, format_version)
for col_name, col_id in {"foo": 1, "bar": 2, "baz": 3}.items():
assert col_id == simple_table.update_sort_order()._column_name_to_id(col_name)


@pytest.mark.integration
@pytest.mark.parametrize(
"catalog, format_version",
[
(pytest.lazy_fixture("session_catalog"), "1"),
(pytest.lazy_fixture("session_catalog_hive"), "1"),
(pytest.lazy_fixture("session_catalog"), "2"),
(pytest.lazy_fixture("session_catalog_hive"), "2"),
],
)
def test_update_sort_order(catalog: Catalog, format_version: str, table_schema_simple: Schema) -> None:
simple_table = _simple_table(catalog, table_schema_simple, format_version)
simple_table.update_sort_order().asc("foo", IdentityTransform(), NullOrder.NULLS_FIRST).desc(
"bar", IdentityTransform(), NullOrder.NULLS_LAST
).commit()
assert simple_table.sort_order() == SortOrder(
SortField(source_id=1, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST),
SortField(source_id=2, transform=IdentityTransform(), direction=SortDirection.DESC, null_order=NullOrder.NULLS_LAST),
order_id=1,
)


@pytest.mark.integration
@pytest.mark.parametrize(
"catalog, format_version",
[
(pytest.lazy_fixture("session_catalog"), "1"),
(pytest.lazy_fixture("session_catalog_hive"), "1"),
(pytest.lazy_fixture("session_catalog"), "2"),
(pytest.lazy_fixture("session_catalog_hive"), "2"),
],
)
def test_increment_existing_sort_order_id(catalog: Catalog, format_version: str, table_schema_simple: Schema) -> None:
simple_table = _simple_table(catalog, table_schema_simple, format_version)
simple_table.update_sort_order().asc("foo", IdentityTransform(), NullOrder.NULLS_FIRST).commit()
assert simple_table.sort_order() == SortOrder(
SortField(source_id=1, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST),
order_id=1,
)
simple_table.update_sort_order().asc("foo", IdentityTransform(), NullOrder.NULLS_LAST).desc(
"bar", IdentityTransform(), NullOrder.NULLS_FIRST
).commit()
assert (
len(simple_table.sort_orders()) == 3
) # 0: empty sort order from creating tables, 1: first sort order, 2: second sort order
assert simple_table.sort_order() == SortOrder(
SortField(source_id=1, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_LAST),
SortField(source_id=2, transform=IdentityTransform(), direction=SortDirection.DESC, null_order=NullOrder.NULLS_FIRST),
order_id=2,
)


@pytest.mark.integration
@pytest.mark.parametrize(
"catalog, format_version",
[
(pytest.lazy_fixture("session_catalog"), "1"),
(pytest.lazy_fixture("session_catalog_hive"), "1"),
(pytest.lazy_fixture("session_catalog"), "2"),
(pytest.lazy_fixture("session_catalog_hive"), "2"),
],
)
def test_update_existing_sort_order(catalog: Catalog, format_version: str, table_schema_simple: Schema) -> None:
simple_table = _simple_table(catalog, table_schema_simple, format_version)
simple_table.update_sort_order().asc("foo", IdentityTransform(), NullOrder.NULLS_FIRST).commit()
assert simple_table.sort_order() == SortOrder(
SortField(source_id=1, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST),
order_id=1,
)
simple_table.update_sort_order().asc("foo", IdentityTransform(), NullOrder.NULLS_LAST).desc(
"bar", IdentityTransform(), NullOrder.NULLS_FIRST
).commit()
# Go back to the first sort order
simple_table.update_sort_order().asc("foo", IdentityTransform(), NullOrder.NULLS_FIRST).commit()
assert (
len(simple_table.sort_orders()) == 3
) # line 133 should not create a new sort order since it is the same as the first one
assert simple_table.sort_order() == SortOrder(
SortField(source_id=1, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST),
order_id=1,
)


@pytest.mark.integration
@pytest.mark.parametrize(
"catalog, format_version",
[
(pytest.lazy_fixture("session_catalog"), "1"),
(pytest.lazy_fixture("session_catalog_hive"), "1"),
(pytest.lazy_fixture("session_catalog"), "2"),
(pytest.lazy_fixture("session_catalog_hive"), "2"),
],
)
def test_update_existing_sort_order_with_unsorted_sort_order(
catalog: Catalog, format_version: str, table_schema_simple: Schema
) -> None:
simple_table = _simple_table(catalog, table_schema_simple, format_version)
simple_table.update_sort_order().asc("foo", IdentityTransform(), NullOrder.NULLS_FIRST).commit()
assert simple_table.sort_order() == SortOrder(
SortField(source_id=1, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST),
order_id=1,
)
# Table should now be unsorted
simple_table.update_sort_order().commit()
# Go back to the first sort order
assert len(simple_table.sort_orders()) == 2
assert simple_table.sort_order() == SortOrder(order_id=0)