From fabbbd96c90ef58a08cc852a427d16700c12818e Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Thu, 24 Apr 2025 11:32:12 -0700 Subject: [PATCH 1/6] Partial Revert "Use a join for upsert deduplication (#1685)" This reverts commit b95e792d86f551d3705c3ea6b7e9985a2a0aaf3b. --- pyiceberg/table/upsert_util.py | 66 ++++++++++++++++++++-------------- 1 file changed, 39 insertions(+), 27 deletions(-) diff --git a/pyiceberg/table/upsert_util.py b/pyiceberg/table/upsert_util.py index c2d554dfae..b7170bc5c6 100644 --- a/pyiceberg/table/upsert_util.py +++ b/pyiceberg/table/upsert_util.py @@ -57,39 +57,51 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols """ Return a table with rows that need to be updated in the target table based on the join columns. - The table is joined on the identifier columns, and then checked if there are any updated rows. - Those are selected and everything is renamed correctly. + When a row is matched, an additional scan is done to evaluate the non-key columns to detect if an actual change has occurred. + Only matched rows that have an actual change to a non-key column value will be returned in the final output. """ all_columns = set(source_table.column_names) join_cols_set = set(join_cols) - non_key_cols = all_columns - join_cols_set - if has_duplicate_rows(target_table, join_cols): - raise ValueError("Target table has duplicate rows, aborting upsert") + non_key_cols = list(all_columns - join_cols_set) if len(target_table) == 0: # When the target table is empty, there is nothing to update :) return source_table.schema.empty_table() - diff_expr = functools.reduce( - operator.or_, - [ - pc.or_kleene( - pc.not_equal(pc.field(f"{col}-lhs"), pc.field(f"{col}-rhs")), - pc.is_null(pc.not_equal(pc.field(f"{col}-lhs"), pc.field(f"{col}-rhs"))), - ) - for col in non_key_cols - ], - ) - - return ( - source_table - # We already know that the schema is compatible, this is to fix large_ types - .cast(target_table.schema) - .join(target_table, keys=list(join_cols_set), join_type="inner", left_suffix="-lhs", right_suffix="-rhs") - .filter(diff_expr) - .drop_columns([f"{col}-rhs" for col in non_key_cols]) - .rename_columns({f"{col}-lhs" if col not in join_cols else col: col for col in source_table.column_names}) - # Finally cast to the original schema since it doesn't carry nullability: - # https://github.com/apache/arrow/issues/45557 - ).cast(target_table.schema) + match_expr = functools.reduce(operator.and_, [pc.field(col).isin(target_table.column(col).to_pylist()) for col in join_cols]) + + matching_source_rows = source_table.filter(match_expr) + + rows_to_update = [] + + for index in range(matching_source_rows.num_rows): + source_row = matching_source_rows.slice(index, 1) + + target_filter = functools.reduce(operator.and_, [pc.field(col) == source_row.column(col)[0].as_py() for col in join_cols]) + + matching_target_row = target_table.filter(target_filter) + + if matching_target_row.num_rows > 0: + needs_update = False + + for non_key_col in non_key_cols: + source_value = source_row.column(non_key_col)[0].as_py() + target_value = matching_target_row.column(non_key_col)[0].as_py() + + if source_value != target_value: + needs_update = True + break + + if needs_update: + rows_to_update.append(source_row) + + if rows_to_update: + rows_to_update_table = pa.concat_tables(rows_to_update) + else: + rows_to_update_table = source_table.schema.empty_table() + + common_columns = set(source_table.column_names).intersection(set(target_table.column_names)) + rows_to_update_table = rows_to_update_table.select(list(common_columns)) + + return rows_to_update_table From 1bb05a5b4e67164f7d594c1f0a89ee3ffb09989b Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Thu, 24 Apr 2025 11:33:18 -0700 Subject: [PATCH 2/6] add test for upsert with complex type --- tests/table/test_upsert.py | 71 +++++++++++++++++++++++++++++++++++++- 1 file changed, 70 insertions(+), 1 deletion(-) diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py index 5de4a61187..8d2e1a2481 100644 --- a/tests/table/test_upsert.py +++ b/tests/table/test_upsert.py @@ -30,7 +30,7 @@ from pyiceberg.table import UpsertResult from pyiceberg.table.snapshots import Operation from pyiceberg.table.upsert_util import create_match_filter -from pyiceberg.types import IntegerType, NestedField, StringType +from pyiceberg.types import IntegerType, NestedField, StringType, StructType from tests.catalog.test_base import InMemoryCatalog, Table @@ -511,6 +511,75 @@ def test_upsert_without_identifier_fields(catalog: Catalog) -> None: tbl.upsert(df) +def test_upsert_struct_field(catalog: Catalog) -> None: + identifier = "default.test_upsert_struct_field" + _drop_table(catalog, identifier) + + schema = Schema( + NestedField(1, "id", IntegerType(), required=True), + NestedField( + 2, + "nested_type", + StructType( + NestedField(3, "sub1", StringType(), required=True), + NestedField(4, "sub2", StringType(), required=True), + ), + required=False, + ), + identifier_field_ids=[1], + ) + + tbl = catalog.create_table(identifier, schema=schema) + + arrow_schema = pa.schema( + [ + pa.field("id", pa.int32(), nullable=False), + pa.field( + "nested_type", + pa.struct( + [ + pa.field("sub1", pa.large_string(), nullable=False), + pa.field("sub2", pa.large_string(), nullable=False), + ] + ), + nullable=True, + ), + ] + ) + + initial_data = pa.Table.from_pylist( + [ + { + "id": 1, + "nested_type": {"sub1": "bla1", "sub2": "bla"}, + } + ], + schema=arrow_schema, + ) + tbl.append(initial_data) + + update_data = pa.Table.from_pylist( + [ + { + "id": 2, + "nested_type": {"sub1": "bla1", "sub2": "bla"}, + }, + { + "id": 1, + "nested_type": {"sub1": "bla1", "sub2": "bla"}, + }, + ], + schema=arrow_schema, + ) + + upd = tbl.upsert(update_data, join_cols=["id"]) + + # Row needs to be updated even tho it's not changed. + # When pyarrow isn't able to compare rows, just update everything + assert upd.rows_updated == 1 + assert upd.rows_inserted == 1 + + def test_upsert_with_nulls(catalog: Catalog) -> None: identifier = "default.test_upsert_with_nulls" _drop_table(catalog, identifier) From 81679eb0210196cb53a529becabfa5ffd403ab1e Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Thu, 24 Apr 2025 11:34:43 -0700 Subject: [PATCH 3/6] fix target has dup --- pyiceberg/table/upsert_util.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/upsert_util.py b/pyiceberg/table/upsert_util.py index b7170bc5c6..0b33459dfe 100644 --- a/pyiceberg/table/upsert_util.py +++ b/pyiceberg/table/upsert_util.py @@ -62,9 +62,11 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols """ all_columns = set(source_table.column_names) join_cols_set = set(join_cols) - non_key_cols = list(all_columns - join_cols_set) + if has_duplicate_rows(target_table, join_cols): + raise ValueError("Target table has duplicate rows, aborting upsert") + if len(target_table) == 0: # When the target table is empty, there is nothing to update :) return source_table.schema.empty_table() From d05ebd22e4ffdd312aeb89d44b6991c8e7f8e36a Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Thu, 24 Apr 2025 11:35:48 -0700 Subject: [PATCH 4/6] fix test --- tests/table/test_upsert.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py index 8d2e1a2481..5bfa532315 100644 --- a/tests/table/test_upsert.py +++ b/tests/table/test_upsert.py @@ -574,9 +574,7 @@ def test_upsert_struct_field(catalog: Catalog) -> None: upd = tbl.upsert(update_data, join_cols=["id"]) - # Row needs to be updated even tho it's not changed. - # When pyarrow isn't able to compare rows, just update everything - assert upd.rows_updated == 1 + assert upd.rows_updated == 0 assert upd.rows_inserted == 1 From 659ea08ff48bbca97e26c19923f6b96bcaba030e Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Thu, 24 Apr 2025 12:01:37 -0700 Subject: [PATCH 5/6] add another test --- tests/table/test_upsert.py | 127 ++++++++++++++++++++++++++++--------- 1 file changed, 97 insertions(+), 30 deletions(-) diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py index 5bfa532315..9773f5f0b0 100644 --- a/tests/table/test_upsert.py +++ b/tests/table/test_upsert.py @@ -511,8 +511,51 @@ def test_upsert_without_identifier_fields(catalog: Catalog) -> None: tbl.upsert(df) -def test_upsert_struct_field(catalog: Catalog) -> None: - identifier = "default.test_upsert_struct_field" +def test_upsert_with_nulls(catalog: Catalog) -> None: + identifier = "default.test_upsert_with_nulls" + _drop_table(catalog, identifier) + + schema = pa.schema( + [ + ("foo", pa.string()), + ("bar", pa.int32()), + ("baz", pa.bool_()), + ] + ) + + # create table with null value + table = catalog.create_table(identifier, schema) + data_with_null = pa.Table.from_pylist( + [ + {"foo": "apple", "bar": None, "baz": False}, + {"foo": "banana", "bar": None, "baz": False}, + ], + schema=schema, + ) + table.append(data_with_null) + assert table.scan().to_arrow()["bar"].is_null() + + # upsert table with non-null value + data_without_null = pa.Table.from_pylist( + [ + {"foo": "apple", "bar": 7, "baz": False}, + ], + schema=schema, + ) + upd = table.upsert(data_without_null, join_cols=["foo"]) + assert upd.rows_updated == 1 + assert upd.rows_inserted == 0 + assert table.scan().to_arrow() == pa.Table.from_pylist( + [ + {"foo": "apple", "bar": 7, "baz": False}, + {"foo": "banana", "bar": None, "baz": False}, + ], + schema=schema, + ) + + +def test_upsert_with_struct_field(catalog: Catalog) -> None: + identifier = "default.test_upsert_with_struct_field" _drop_table(catalog, identifier) schema = Schema( @@ -578,44 +621,68 @@ def test_upsert_struct_field(catalog: Catalog) -> None: assert upd.rows_inserted == 1 -def test_upsert_with_nulls(catalog: Catalog) -> None: - identifier = "default.test_upsert_with_nulls" +def test_upsert_with_struct_field_as_join_key(catalog: Catalog) -> None: + identifier = "default.test_upsert_with_struct_field_as_join_key" _drop_table(catalog, identifier) - schema = pa.schema( - [ - ("foo", pa.string()), - ("bar", pa.int32()), - ("baz", pa.bool_()), - ] + schema = Schema( + NestedField(1, "id", IntegerType(), required=True), + NestedField( + 2, + "nested_type", + StructType( + NestedField(3, "sub1", StringType(), required=True), + NestedField(4, "sub2", StringType(), required=True), + ), + required=False, + ), + identifier_field_ids=[1], ) - # create table with null value - table = catalog.create_table(identifier, schema) - data_with_null = pa.Table.from_pylist( + tbl = catalog.create_table(identifier, schema=schema) + + arrow_schema = pa.schema( [ - {"foo": "apple", "bar": None, "baz": False}, - {"foo": "banana", "bar": None, "baz": False}, - ], - schema=schema, + pa.field("id", pa.int32(), nullable=False), + pa.field( + "nested_type", + pa.struct( + [ + pa.field("sub1", pa.large_string(), nullable=False), + pa.field("sub2", pa.large_string(), nullable=False), + ] + ), + nullable=True, + ), + ] ) - table.append(data_with_null) - assert table.scan().to_arrow()["bar"].is_null() - # upsert table with non-null value - data_without_null = pa.Table.from_pylist( + initial_data = pa.Table.from_pylist( [ - {"foo": "apple", "bar": 7, "baz": False}, + { + "id": 1, + "nested_type": {"sub1": "bla1", "sub2": "bla"}, + } ], - schema=schema, + schema=arrow_schema, ) - upd = table.upsert(data_without_null, join_cols=["foo"]) - assert upd.rows_updated == 1 - assert upd.rows_inserted == 0 - assert table.scan().to_arrow() == pa.Table.from_pylist( + tbl.append(initial_data) + + update_data = pa.Table.from_pylist( [ - {"foo": "apple", "bar": 7, "baz": False}, - {"foo": "banana", "bar": None, "baz": False}, + { + "id": 2, + "nested_type": {"sub1": "bla1", "sub2": "bla"}, + }, + { + "id": 1, + "nested_type": {"sub1": "bla1", "sub2": "bla"}, + }, ], - schema=schema, + schema=arrow_schema, ) + + with pytest.raises( + pa.lib.ArrowNotImplementedError, match="Keys of type struct" + ): + _ = tbl.upsert(update_data, join_cols=["nested_type"]) From 2420f7ad1ab6f0f700e683b8ba8c792fd121e982 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Thu, 24 Apr 2025 12:03:06 -0700 Subject: [PATCH 6/6] unnecessary change --- pyiceberg/table/upsert_util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/upsert_util.py b/pyiceberg/table/upsert_util.py index 0b33459dfe..edec13c3e7 100644 --- a/pyiceberg/table/upsert_util.py +++ b/pyiceberg/table/upsert_util.py @@ -62,7 +62,7 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols """ all_columns = set(source_table.column_names) join_cols_set = set(join_cols) - non_key_cols = list(all_columns - join_cols_set) + non_key_cols = all_columns - join_cols_set if has_duplicate_rows(target_table, join_cols): raise ValueError("Target table has duplicate rows, aborting upsert")