From f16f8b38f45d68aeaa1860c9d88ed01a485a3f4c Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Wed, 2 Apr 2025 22:38:55 +0200 Subject: [PATCH 01/14] Fallback for upsert when arrow cannot compare source rows with target rows --- pyiceberg/table/upsert_util.py | 26 +++++++------ tests/table/test_upsert.py | 68 +++++++++++++++++++++++++++++++++- 2 files changed, 82 insertions(+), 12 deletions(-) diff --git a/pyiceberg/table/upsert_util.py b/pyiceberg/table/upsert_util.py index d2bd48bc99..93ae81286b 100644 --- a/pyiceberg/table/upsert_util.py +++ b/pyiceberg/table/upsert_util.py @@ -67,14 +67,18 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols diff_expr = functools.reduce(operator.or_, [pc.field(f"{col}-lhs") != pc.field(f"{col}-rhs") for col in non_key_cols]) - return ( - source_table - # We already know that the schema is compatible, this is to fix large_ types - .cast(target_table.schema) - .join(target_table, keys=list(join_cols_set), join_type="inner", left_suffix="-lhs", right_suffix="-rhs") - .filter(diff_expr) - .drop_columns([f"{col}-rhs" for col in non_key_cols]) - .rename_columns({f"{col}-lhs" if col not in join_cols else col: col for col in source_table.column_names}) - # Finally cast to the original schema since it doesn't carry nullability: - # https://github.com/apache/arrow/issues/45557 - ).cast(target_table.schema) + try: + return ( + source_table + # We already know that the schema is compatible, this is to fix large_ types + .cast(target_table.schema) + .join(target_table, keys=list(join_cols_set), join_type="inner", left_suffix="-lhs", right_suffix="-rhs") + .filter(diff_expr) + .drop_columns([f"{col}-rhs" for col in non_key_cols]) + .rename_columns({f"{col}-lhs" if col not in join_cols else col: col for col in source_table.column_names}) + # Finally cast to the original schema since it doesn't carry nullability: + # https://github.com/apache/arrow/issues/45557 + ).cast(target_table.schema) + except pa.ArrowInvalid: + # When we are not able to compare, just update all rows from source table + return source_table.cast(target_table.schema) diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py index 19bfbc01de..a69e9676b4 100644 --- a/tests/table/test_upsert.py +++ b/tests/table/test_upsert.py @@ -30,7 +30,7 @@ from pyiceberg.table import UpsertResult from pyiceberg.table.snapshots import Operation from pyiceberg.table.upsert_util import create_match_filter -from pyiceberg.types import IntegerType, NestedField, StringType +from pyiceberg.types import IntegerType, NestedField, StringType, StructType from tests.catalog.test_base import InMemoryCatalog, Table @@ -509,3 +509,69 @@ def test_upsert_without_identifier_fields(catalog: Catalog) -> None: ValueError, match="Join columns could not be found, please set identifier-field-ids or pass in explicitly." ): tbl.upsert(df) + + +def test_upsert_struct_field_fails_in_join(catalog: Catalog) -> None: + identifier = "default.test_upsert_struct_field_fails" + _drop_table(catalog, identifier) + + schema = Schema( + NestedField(1, "id", IntegerType(), required=True), + NestedField( + 2, + "nested_type", + # Struct> + StructType( + NestedField(3, "sub1", StringType(), required=True), + NestedField(4, "sub2", StringType(), required=True), + ), + required=False, + ), + identifier_field_ids=[1], + ) + + tbl = catalog.create_table(identifier, schema=schema) + + arrow_schema = pa.schema( + [ + pa.field("id", pa.int32(), nullable=False), + pa.field( + "nested_type", + pa.struct( + [ + pa.field("sub1", pa.large_string(), nullable=False), + pa.field("sub2", pa.large_string(), nullable=False), + ] + ), + nullable=True, + ), + ] + ) + + initial_data = pa.Table.from_pylist( + [ + { + "id": 1, + "nested_type": {"sub1": "bla1", "sub2": "bla"}, + } + ], + schema=arrow_schema, + ) + tbl.append(initial_data) + + update_data = pa.Table.from_pylist( + [ + { + "id": 1, + "nested_type": {"sub1": "bla1", "sub2": "bla"}, + } + ], + schema=arrow_schema, + ) + + upd = tbl.upsert(update_data, join_cols=["id"]) + + # Row needs to be updated even tho it's not changed. + # When pyarrow isn't able to compare rows, just update everything + assert upd.rows_updated == 1 + assert upd.rows_inserted == 0 From 06af05ab0ed5d659b6fd21e44cdd95c625405076 Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Thu, 3 Apr 2025 08:57:13 +0200 Subject: [PATCH 02/14] Make upsert work for non-join complex column types - skip column comparison --- pyiceberg/table/upsert_util.py | 29 +++++++++++++++++++++++++++-- tests/table/test_upsert.py | 8 ++++++-- 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/pyiceberg/table/upsert_util.py b/pyiceberg/table/upsert_util.py index 93ae81286b..33ec316f6a 100644 --- a/pyiceberg/table/upsert_util.py +++ b/pyiceberg/table/upsert_util.py @@ -80,5 +80,30 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols # https://github.com/apache/arrow/issues/45557 ).cast(target_table.schema) except pa.ArrowInvalid: - # When we are not able to compare, just update all rows from source table - return source_table.cast(target_table.schema) + # When we are not able to compare (e.g. due to unsupported types), + # fall back to selecting only rows in the source table that do NOT already exist in the target. + # See: https://github.com/apache/arrow/issues/35785 + + MARKER_COLUMN_NAME = "__from_target" + + assert MARKER_COLUMN_NAME not in join_cols_set + + # Step 1: Prepare source index with join keys and a marker + # Cast to target table schema, so we can do the join + source_index = source_table.cast(target_table.schema).select(join_cols_set) + + # Step 2: Prepare target index with join keys and a marker + target_index = target_table.select(join_cols_set).append_column( + MARKER_COLUMN_NAME, pa.array([True] * len(target_table), pa.bool_()) + ) + + # Step 3: Perform a left outer join to find which rows from source exist in target + joined = source_index.join(target_index, keys=list(join_cols_set), join_type="left outer") + + # Step 4: Create a boolean mask for rows that do NOT exist in the target + # i.e., where 'from_target' is null after the join + to_update_mask = pc.invert(pc.is_null(joined[MARKER_COLUMN_NAME])) + + # Step 5: Filter source table using the mask (keep only rows that should be updated), + # and cast to the target schema to ensure compatibility (e.g. large_string → string) + return source_table.filter(to_update_mask) diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py index a69e9676b4..a62f7753a7 100644 --- a/tests/table/test_upsert.py +++ b/tests/table/test_upsert.py @@ -564,7 +564,11 @@ def test_upsert_struct_field_fails_in_join(catalog: Catalog) -> None: { "id": 1, "nested_type": {"sub1": "bla1", "sub2": "bla"}, - } + }, + { + "id": 2, + "nested_type": {"sub1": "bla1", "sub2": "bla"}, + }, ], schema=arrow_schema, ) @@ -574,4 +578,4 @@ def test_upsert_struct_field_fails_in_join(catalog: Catalog) -> None: # Row needs to be updated even tho it's not changed. # When pyarrow isn't able to compare rows, just update everything assert upd.rows_updated == 1 - assert upd.rows_inserted == 0 + assert upd.rows_inserted == 1 From 6b9ddf4d4ac5269ef1425f3453e0553c4c580d0c Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Thu, 3 Apr 2025 09:00:43 +0200 Subject: [PATCH 03/14] minor --- pyiceberg/table/upsert_util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/upsert_util.py b/pyiceberg/table/upsert_util.py index 9ff92c00a4..9a78f113e2 100644 --- a/pyiceberg/table/upsert_util.py +++ b/pyiceberg/table/upsert_util.py @@ -116,7 +116,7 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols joined = source_index.join(target_index, keys=list(join_cols_set), join_type="left outer") # Step 4: Create a boolean mask for rows that do NOT exist in the target - # i.e., where 'from_target' is null after the join + # i.e., where market column is null after the join to_update_mask = pc.invert(pc.is_null(joined[MARKER_COLUMN_NAME])) # Step 5: Filter source table using the mask (keep only rows that should be updated), From 7131dc0d582cb73138e34f12778d7b069cd9a7cc Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Thu, 3 Apr 2025 09:42:32 +0200 Subject: [PATCH 04/14] Linting --- tests/table/test_upsert.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py index 013323c247..755d19f8e8 100644 --- a/tests/table/test_upsert.py +++ b/tests/table/test_upsert.py @@ -580,7 +580,7 @@ def test_upsert_struct_field_fails_in_join(catalog: Catalog) -> None: assert upd.rows_updated == 1 assert upd.rows_inserted == 1 - + def test_upsert_with_nulls(catalog: Catalog) -> None: identifier = "default.test_upsert_with_nulls" _drop_table(catalog, identifier) From 0719ecf0cc57723080a52adc2f68ca82b6a24b02 Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Tue, 8 Apr 2025 19:27:32 +0200 Subject: [PATCH 05/14] Replace assert with ValueError --- pyiceberg/table/upsert_util.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/upsert_util.py b/pyiceberg/table/upsert_util.py index 9a78f113e2..7e3d95bcc3 100644 --- a/pyiceberg/table/upsert_util.py +++ b/pyiceberg/table/upsert_util.py @@ -101,7 +101,10 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols MARKER_COLUMN_NAME = "__from_target" - assert MARKER_COLUMN_NAME not in join_cols_set + if MARKER_COLUMN_NAME in join_cols_set: + raise ValueError( + f"{MARKER_COLUMN_NAME} is used for joining " f"DataFrames, and cannot be used as column name" + ) from None # Step 1: Prepare source index with join keys and a marker # Cast to target table schema, so we can do the join From a699602978762a6ee4dc9d301ee1edb815c0faa4 Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Mon, 14 Apr 2025 22:46:47 +0200 Subject: [PATCH 06/14] Make complex-test upsert test fail --- tests/table/test_upsert.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py index 755d19f8e8..6f5d079715 100644 --- a/tests/table/test_upsert.py +++ b/tests/table/test_upsert.py @@ -562,13 +562,13 @@ def test_upsert_struct_field_fails_in_join(catalog: Catalog) -> None: update_data = pa.Table.from_pylist( [ { - "id": 1, + "id": 2, "nested_type": {"sub1": "bla1", "sub2": "bla"}, }, { - "id": 2, + "id": 1, "nested_type": {"sub1": "bla1", "sub2": "bla"}, - }, + } ], schema=arrow_schema, ) From 8e32e9c7134ff7b41a5d2d33aadc58621a21ff9e Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Mon, 14 Apr 2025 22:54:12 +0200 Subject: [PATCH 07/14] Preserve order in get_rows_to_update for complex types --- pyiceberg/table/upsert_util.py | 30 ++++++++++++++++++++---------- tests/table/test_upsert.py | 2 +- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/pyiceberg/table/upsert_util.py b/pyiceberg/table/upsert_util.py index 7e3d95bcc3..1cb6145a7a 100644 --- a/pyiceberg/table/upsert_util.py +++ b/pyiceberg/table/upsert_util.py @@ -98,17 +98,23 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols # When we are not able to compare (e.g. due to unsupported types), # fall back to selecting only rows in the source table that do NOT already exist in the target. # See: https://github.com/apache/arrow/issues/35785 - MARKER_COLUMN_NAME = "__from_target" + INDEX_COLUMN_NAME = "__source_index" - if MARKER_COLUMN_NAME in join_cols_set: + if MARKER_COLUMN_NAME in join_cols_set or INDEX_COLUMN_NAME in join_cols_set: raise ValueError( - f"{MARKER_COLUMN_NAME} is used for joining " f"DataFrames, and cannot be used as column name" + f"{MARKER_COLUMN_NAME} and {INDEX_COLUMN_NAME} are reserved for joining " + f"DataFrames, and cannot be used as column names" ) from None - # Step 1: Prepare source index with join keys and a marker + # Step 1: Prepare source index with join keys and a marker index # Cast to target table schema, so we can do the join - source_index = source_table.cast(target_table.schema).select(join_cols_set) + # See: https://github.com/apache/arrow/issues/37542 + source_index = ( + source_table.cast(target_table.schema) + .select(join_cols_set) + .append_column(INDEX_COLUMN_NAME, pa.array(range(len(source_table)))) + ) # Step 2: Prepare target index with join keys and a marker target_index = target_table.select(join_cols_set).append_column( @@ -118,10 +124,14 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols # Step 3: Perform a left outer join to find which rows from source exist in target joined = source_index.join(target_index, keys=list(join_cols_set), join_type="left outer") - # Step 4: Create a boolean mask for rows that do NOT exist in the target - # i.e., where market column is null after the join + # Step 4: Restore original source order + joined = joined.sort_by(INDEX_COLUMN_NAME) + + # Step 5: Create a boolean mask for rows that do exist in the target + # i.e., where marker column is true after the join to_update_mask = pc.invert(pc.is_null(joined[MARKER_COLUMN_NAME])) - # Step 5: Filter source table using the mask (keep only rows that should be updated), - # and cast to the target schema to ensure compatibility (e.g. large_string → string) - return source_table.filter(to_update_mask) + # Step 6: Filter source table using the mask and cast to target schema + filtered = source_table.filter(to_update_mask) + + return filtered diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py index 6f5d079715..8ae648f576 100644 --- a/tests/table/test_upsert.py +++ b/tests/table/test_upsert.py @@ -568,7 +568,7 @@ def test_upsert_struct_field_fails_in_join(catalog: Catalog) -> None: { "id": 1, "nested_type": {"sub1": "bla1", "sub2": "bla"}, - } + }, ], schema=arrow_schema, ) From a088d6c6aa0f22855d8275568098288e76f56364 Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Thu, 17 Apr 2025 22:27:16 +0200 Subject: [PATCH 08/14] Create marker column in pyarrow instead of Python list first --- pyiceberg/table/upsert_util.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pyiceberg/table/upsert_util.py b/pyiceberg/table/upsert_util.py index 1cb6145a7a..cb64f43ef7 100644 --- a/pyiceberg/table/upsert_util.py +++ b/pyiceberg/table/upsert_util.py @@ -117,9 +117,7 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols ) # Step 2: Prepare target index with join keys and a marker - target_index = target_table.select(join_cols_set).append_column( - MARKER_COLUMN_NAME, pa.array([True] * len(target_table), pa.bool_()) - ) + target_index = target_table.select(join_cols_set).append_column(MARKER_COLUMN_NAME, pa.repeat(True, len(target_table))) # Step 3: Perform a left outer join to find which rows from source exist in target joined = source_index.join(target_index, keys=list(join_cols_set), join_type="left outer") From e9af36816481f273de4ce5b93f96c5094cbdde51 Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Wed, 23 Apr 2025 13:43:19 +0200 Subject: [PATCH 09/14] Always update all rows in upsert, don't try to check if rows changed --- pyiceberg/table/upsert_util.py | 92 +++++++++++----------------------- tests/table/test_upsert.py | 37 ++++++++++---- 2 files changed, 58 insertions(+), 71 deletions(-) diff --git a/pyiceberg/table/upsert_util.py b/pyiceberg/table/upsert_util.py index cb64f43ef7..aef6efa98f 100644 --- a/pyiceberg/table/upsert_util.py +++ b/pyiceberg/table/upsert_util.py @@ -60,9 +60,7 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols The table is joined on the identifier columns, and then checked if there are any updated rows. Those are selected and everything is renamed correctly. """ - all_columns = set(source_table.column_names) join_cols_set = set(join_cols) - non_key_cols = all_columns - join_cols_set if has_duplicate_rows(target_table, join_cols): raise ValueError("Target table has duplicate rows, aborting upsert") @@ -71,65 +69,35 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols # When the target table is empty, there is nothing to update :) return source_table.schema.empty_table() - diff_expr = functools.reduce( - operator.or_, - [ - pc.or_kleene( - pc.not_equal(pc.field(f"{col}-lhs"), pc.field(f"{col}-rhs")), - pc.is_null(pc.not_equal(pc.field(f"{col}-lhs"), pc.field(f"{col}-rhs"))), - ) - for col in non_key_cols - ], + # When we are not able to compare (e.g. due to unsupported types), + # fall back to selecting only rows in the source table that do NOT already exist in the target. + # See: https://github.com/apache/arrow/issues/35785 + MARKER_COLUMN_NAME = "__from_target" + INDEX_COLUMN_NAME = "__source_index" + + if MARKER_COLUMN_NAME in join_cols or INDEX_COLUMN_NAME in join_cols: + raise ValueError( + f"{MARKER_COLUMN_NAME} and {INDEX_COLUMN_NAME} are reserved for joining " + f"DataFrames, and cannot be used as column names" + ) from None + + # Step 1: Prepare source index with join keys and a marker index + # Cast to target table schema, so we can do the join + # See: https://github.com/apache/arrow/issues/37542 + source_index = ( + source_table.cast(target_table.schema) + .select(join_cols_set) + .append_column(INDEX_COLUMN_NAME, pa.array(range(len(source_table)))) ) - try: - return ( - source_table - # We already know that the schema is compatible, this is to fix large_ types - .cast(target_table.schema) - .join(target_table, keys=list(join_cols_set), join_type="inner", left_suffix="-lhs", right_suffix="-rhs") - .filter(diff_expr) - .drop_columns([f"{col}-rhs" for col in non_key_cols]) - .rename_columns({f"{col}-lhs" if col not in join_cols else col: col for col in source_table.column_names}) - # Finally cast to the original schema since it doesn't carry nullability: - # https://github.com/apache/arrow/issues/45557 - ).cast(target_table.schema) - except pa.ArrowInvalid: - # When we are not able to compare (e.g. due to unsupported types), - # fall back to selecting only rows in the source table that do NOT already exist in the target. - # See: https://github.com/apache/arrow/issues/35785 - MARKER_COLUMN_NAME = "__from_target" - INDEX_COLUMN_NAME = "__source_index" - - if MARKER_COLUMN_NAME in join_cols_set or INDEX_COLUMN_NAME in join_cols_set: - raise ValueError( - f"{MARKER_COLUMN_NAME} and {INDEX_COLUMN_NAME} are reserved for joining " - f"DataFrames, and cannot be used as column names" - ) from None - - # Step 1: Prepare source index with join keys and a marker index - # Cast to target table schema, so we can do the join - # See: https://github.com/apache/arrow/issues/37542 - source_index = ( - source_table.cast(target_table.schema) - .select(join_cols_set) - .append_column(INDEX_COLUMN_NAME, pa.array(range(len(source_table)))) - ) - - # Step 2: Prepare target index with join keys and a marker - target_index = target_table.select(join_cols_set).append_column(MARKER_COLUMN_NAME, pa.repeat(True, len(target_table))) - - # Step 3: Perform a left outer join to find which rows from source exist in target - joined = source_index.join(target_index, keys=list(join_cols_set), join_type="left outer") - - # Step 4: Restore original source order - joined = joined.sort_by(INDEX_COLUMN_NAME) - - # Step 5: Create a boolean mask for rows that do exist in the target - # i.e., where marker column is true after the join - to_update_mask = pc.invert(pc.is_null(joined[MARKER_COLUMN_NAME])) - - # Step 6: Filter source table using the mask and cast to target schema - filtered = source_table.filter(to_update_mask) - - return filtered + # Step 2: Prepare target index with join keys and a marker + target_index = target_table.select(join_cols_set).append_column(MARKER_COLUMN_NAME, pa.repeat(True, len(target_table))) + + # Step 3: Perform a left outer join to find which rows from source exist in target + joined = source_index.join(target_index, keys=list(join_cols_set), join_type="left outer") + + # Step 4: Create indices for rows that do exist in the target i.e., where marker column is true after the join + to_update_indices = joined.filter(pc.field(MARKER_COLUMN_NAME))[INDEX_COLUMN_NAME] + + # Step 5: Take rows from source table using the indices and cast to target schema + return source_table.take(to_update_indices) diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py index 8ae648f576..e2b63f400a 100644 --- a/tests/table/test_upsert.py +++ b/tests/table/test_upsert.py @@ -186,7 +186,7 @@ def test_merge_scenario_skip_upd_row(catalog: Catalog) -> None: res = table.upsert(df=source_df, join_cols=["order_id"]) - expected_updated = 1 + expected_updated = 2 expected_inserted = 1 assert_upsert_result(res, expected_updated, expected_inserted) @@ -222,7 +222,7 @@ def test_merge_scenario_date_as_key(catalog: Catalog) -> None: res = table.upsert(df=source_df, join_cols=["order_date"]) - expected_updated = 1 + expected_updated = 2 expected_inserted = 1 assert_upsert_result(res, expected_updated, expected_inserted) @@ -258,7 +258,7 @@ def test_merge_scenario_string_as_key(catalog: Catalog) -> None: res = table.upsert(df=source_df, join_cols=["order_id"]) - expected_updated = 1 + expected_updated = 2 expected_inserted = 1 assert_upsert_result(res, expected_updated, expected_inserted) @@ -371,16 +371,25 @@ def test_upsert_with_identifier_fields(catalog: Catalog) -> None: expected_operations = [Operation.APPEND, Operation.OVERWRITE, Operation.APPEND, Operation.APPEND] - assert upd.rows_updated == 1 + assert upd.rows_updated == 2 assert upd.rows_inserted == 1 assert [snap.summary.operation for snap in tbl.snapshots() if snap.summary is not None] == expected_operations - # This should be a no-op + # This will update all 3 rows upd = tbl.upsert(df) - assert upd.rows_updated == 0 + assert upd.rows_updated == 3 assert upd.rows_inserted == 0 + expected_operations = [ + Operation.APPEND, + Operation.OVERWRITE, + Operation.APPEND, + Operation.APPEND, + Operation.DELETE, + Operation.OVERWRITE, + Operation.APPEND, + ] assert [snap.summary.operation for snap in tbl.snapshots() if snap.summary is not None] == expected_operations @@ -552,7 +561,7 @@ def test_upsert_struct_field_fails_in_join(catalog: Catalog) -> None: [ { "id": 1, - "nested_type": {"sub1": "bla1", "sub2": "bla"}, + "nested_type": {"sub1": "1_sub1_init", "sub2": "1sub2_init"}, } ], schema=arrow_schema, @@ -563,12 +572,17 @@ def test_upsert_struct_field_fails_in_join(catalog: Catalog) -> None: [ { "id": 2, - "nested_type": {"sub1": "bla1", "sub2": "bla"}, + "nested_type": {"sub1": "2_sub1_new", "sub2": "2_sub2_new"}, }, { "id": 1, - "nested_type": {"sub1": "bla1", "sub2": "bla"}, + "nested_type": {"sub1": "1sub1_init", "sub2": "1sub2_new"}, }, + # TODO: struct changes should cause _check_pyarrow_schema_compatible to fail. Introduce a new `sub3` attribute + # { + # "id": 1, + # "nested_type": {"sub3": "1sub3_init", "sub2": "1sub2_new"}, + # }, ], schema=arrow_schema, ) @@ -580,6 +594,11 @@ def test_upsert_struct_field_fails_in_join(catalog: Catalog) -> None: assert upd.rows_updated == 1 assert upd.rows_inserted == 1 + assert tbl.scan().to_arrow().to_pylist() == [ + {"id": 2, "nested_type": {"sub1": "2_sub1_new", "sub2": "2_sub2_new"}}, + {"id": 1, "nested_type": {"sub1": "1sub1_init", "sub2": "1sub2_new"}}, + ] + def test_upsert_with_nulls(catalog: Catalog) -> None: identifier = "default.test_upsert_with_nulls" From 79f61810e551704b95f6cc9636b36b8e22b5ec47 Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Thu, 24 Apr 2025 21:59:53 +0200 Subject: [PATCH 10/14] Do row comparision in Python --- pyiceberg/table/upsert_util.py | 44 ++++++++++++++++++----- tests/table/test_upsert.py | 66 +++++++++++++++++----------------- 2 files changed, 69 insertions(+), 41 deletions(-) diff --git a/pyiceberg/table/upsert_util.py b/pyiceberg/table/upsert_util.py index aef6efa98f..442b1de5db 100644 --- a/pyiceberg/table/upsert_util.py +++ b/pyiceberg/table/upsert_util.py @@ -60,8 +60,11 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols The table is joined on the identifier columns, and then checked if there are any updated rows. Those are selected and everything is renamed correctly. """ + all_columns = set(source_table.column_names) join_cols_set = set(join_cols) + non_key_cols = list(all_columns - join_cols_set) + if has_duplicate_rows(target_table, join_cols): raise ValueError("Target table has duplicate rows, aborting upsert") @@ -73,11 +76,12 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols # fall back to selecting only rows in the source table that do NOT already exist in the target. # See: https://github.com/apache/arrow/issues/35785 MARKER_COLUMN_NAME = "__from_target" - INDEX_COLUMN_NAME = "__source_index" + SOURCE_INDEX_COLUMN_NAME = "__source_index" + TARGET_INDEX_COLUMN_NAME = "__target_index" - if MARKER_COLUMN_NAME in join_cols or INDEX_COLUMN_NAME in join_cols: + if MARKER_COLUMN_NAME in join_cols or SOURCE_INDEX_COLUMN_NAME in join_cols or TARGET_INDEX_COLUMN_NAME in join_cols: raise ValueError( - f"{MARKER_COLUMN_NAME} and {INDEX_COLUMN_NAME} are reserved for joining " + f"{MARKER_COLUMN_NAME}, {SOURCE_INDEX_COLUMN_NAME} and {TARGET_INDEX_COLUMN_NAME} are reserved for joining " f"DataFrames, and cannot be used as column names" ) from None @@ -87,17 +91,39 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols source_index = ( source_table.cast(target_table.schema) .select(join_cols_set) - .append_column(INDEX_COLUMN_NAME, pa.array(range(len(source_table)))) + .append_column(SOURCE_INDEX_COLUMN_NAME, pa.array(range(len(source_table)))) ) # Step 2: Prepare target index with join keys and a marker - target_index = target_table.select(join_cols_set).append_column(MARKER_COLUMN_NAME, pa.repeat(True, len(target_table))) + target_index = ( + target_table.select(join_cols_set) + .append_column(TARGET_INDEX_COLUMN_NAME, pa.array(range(len(target_table)))) + .append_column(MARKER_COLUMN_NAME, pa.repeat(True, len(target_table))) + ) # Step 3: Perform a left outer join to find which rows from source exist in target joined = source_index.join(target_index, keys=list(join_cols_set), join_type="left outer") # Step 4: Create indices for rows that do exist in the target i.e., where marker column is true after the join - to_update_indices = joined.filter(pc.field(MARKER_COLUMN_NAME))[INDEX_COLUMN_NAME] - - # Step 5: Take rows from source table using the indices and cast to target schema - return source_table.take(to_update_indices) + matching_indices = joined.filter(pc.field(MARKER_COLUMN_NAME)) + + # Step 5: Compare all rows using Python + to_update_indices = [] + for source_idx, target_idx in zip( + matching_indices[SOURCE_INDEX_COLUMN_NAME].to_pylist(), matching_indices[TARGET_INDEX_COLUMN_NAME].to_pylist() + ): + source_row = source_table.slice(source_idx, 1) + target_row = target_table.slice(target_idx, 1) + + for key in non_key_cols: + source_val = source_row.column(key)[0].as_py() + target_val = target_row.column(key)[0].as_py() + if source_val != target_val: + to_update_indices.append(source_idx) + break + + # Step 6: Take rows from source table using the indices and cast to target schema + if to_update_indices: + return source_table.take(to_update_indices) + else: + return source_table.schema.empty_table() diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py index e2b63f400a..e591b40406 100644 --- a/tests/table/test_upsert.py +++ b/tests/table/test_upsert.py @@ -186,7 +186,7 @@ def test_merge_scenario_skip_upd_row(catalog: Catalog) -> None: res = table.upsert(df=source_df, join_cols=["order_id"]) - expected_updated = 2 + expected_updated = 1 expected_inserted = 1 assert_upsert_result(res, expected_updated, expected_inserted) @@ -222,7 +222,7 @@ def test_merge_scenario_date_as_key(catalog: Catalog) -> None: res = table.upsert(df=source_df, join_cols=["order_date"]) - expected_updated = 2 + expected_updated = 1 expected_inserted = 1 assert_upsert_result(res, expected_updated, expected_inserted) @@ -258,7 +258,7 @@ def test_merge_scenario_string_as_key(catalog: Catalog) -> None: res = table.upsert(df=source_df, join_cols=["order_id"]) - expected_updated = 2 + expected_updated = 1 expected_inserted = 1 assert_upsert_result(res, expected_updated, expected_inserted) @@ -371,25 +371,16 @@ def test_upsert_with_identifier_fields(catalog: Catalog) -> None: expected_operations = [Operation.APPEND, Operation.OVERWRITE, Operation.APPEND, Operation.APPEND] - assert upd.rows_updated == 2 + assert upd.rows_updated == 1 assert upd.rows_inserted == 1 assert [snap.summary.operation for snap in tbl.snapshots() if snap.summary is not None] == expected_operations - # This will update all 3 rows + # This should be a no-op upd = tbl.upsert(df) - assert upd.rows_updated == 3 + assert upd.rows_updated == 0 assert upd.rows_inserted == 0 - expected_operations = [ - Operation.APPEND, - Operation.OVERWRITE, - Operation.APPEND, - Operation.APPEND, - Operation.DELETE, - Operation.OVERWRITE, - Operation.APPEND, - ] assert [snap.summary.operation for snap in tbl.snapshots() if snap.summary is not None] == expected_operations @@ -561,7 +552,7 @@ def test_upsert_struct_field_fails_in_join(catalog: Catalog) -> None: [ { "id": 1, - "nested_type": {"sub1": "1_sub1_init", "sub2": "1sub2_init"}, + "nested_type": {"sub1": "bla1", "sub2": "bla"}, } ], schema=arrow_schema, @@ -572,32 +563,43 @@ def test_upsert_struct_field_fails_in_join(catalog: Catalog) -> None: [ { "id": 2, - "nested_type": {"sub1": "2_sub1_new", "sub2": "2_sub2_new"}, + "nested_type": {"sub1": "bla1", "sub2": "bla"}, }, { "id": 1, - "nested_type": {"sub1": "1sub1_init", "sub2": "1sub2_new"}, + "nested_type": {"sub1": "bla1", "sub2": "bla2"}, }, - # TODO: struct changes should cause _check_pyarrow_schema_compatible to fail. Introduce a new `sub3` attribute - # { - # "id": 1, - # "nested_type": {"sub3": "1sub3_init", "sub2": "1sub2_new"}, - # }, ], schema=arrow_schema, ) - upd = tbl.upsert(update_data, join_cols=["id"]) + res = tbl.upsert(update_data, join_cols=["id"]) - # Row needs to be updated even tho it's not changed. - # When pyarrow isn't able to compare rows, just update everything - assert upd.rows_updated == 1 - assert upd.rows_inserted == 1 + expected_updated = 1 + expected_inserted = 1 - assert tbl.scan().to_arrow().to_pylist() == [ - {"id": 2, "nested_type": {"sub1": "2_sub1_new", "sub2": "2_sub2_new"}}, - {"id": 1, "nested_type": {"sub1": "1sub1_init", "sub2": "1sub2_new"}}, - ] + assert_upsert_result(res, expected_updated, expected_inserted) + + update_data = pa.Table.from_pylist( + [ + { + "id": 2, + "nested_type": {"sub1": "bla1", "sub2": "bla"}, + }, + { + "id": 1, + "nested_type": {"sub1": "bla1", "sub2": "bla2"}, + }, + ], + schema=arrow_schema, + ) + + res = tbl.upsert(update_data, join_cols=["id"]) + + expected_updated = 0 + expected_inserted = 0 + + assert_upsert_result(res, expected_updated, expected_inserted) def test_upsert_with_nulls(catalog: Catalog) -> None: From 6f50b4855d576964e2bfe2e14eccb125fbbe1358 Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Thu, 24 Apr 2025 22:03:42 +0200 Subject: [PATCH 11/14] Update comment --- pyiceberg/table/upsert_util.py | 2 -- tests/table/test_upsert.py | 1 - 2 files changed, 3 deletions(-) diff --git a/pyiceberg/table/upsert_util.py b/pyiceberg/table/upsert_util.py index 442b1de5db..c10b72ad92 100644 --- a/pyiceberg/table/upsert_util.py +++ b/pyiceberg/table/upsert_util.py @@ -72,8 +72,6 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols # When the target table is empty, there is nothing to update :) return source_table.schema.empty_table() - # When we are not able to compare (e.g. due to unsupported types), - # fall back to selecting only rows in the source table that do NOT already exist in the target. # See: https://github.com/apache/arrow/issues/35785 MARKER_COLUMN_NAME = "__from_target" SOURCE_INDEX_COLUMN_NAME = "__source_index" diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py index e591b40406..3a450e8b4e 100644 --- a/tests/table/test_upsert.py +++ b/tests/table/test_upsert.py @@ -520,7 +520,6 @@ def test_upsert_struct_field_fails_in_join(catalog: Catalog) -> None: NestedField( 2, "nested_type", - # Struct> StructType( NestedField(3, "sub1", StringType(), required=True), NestedField(4, "sub2", StringType(), required=True), From e9e94851b33426e49396f57d47f48b1ceb668596 Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Thu, 24 Apr 2025 22:05:28 +0200 Subject: [PATCH 12/14] Update comment --- pyiceberg/table/upsert_util.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pyiceberg/table/upsert_util.py b/pyiceberg/table/upsert_util.py index c10b72ad92..241ecc6d4a 100644 --- a/pyiceberg/table/upsert_util.py +++ b/pyiceberg/table/upsert_util.py @@ -72,6 +72,9 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols # When the target table is empty, there is nothing to update :) return source_table.schema.empty_table() + # We need to compare non_key_cols in Python as PyArrow + # 1. Cannot do a join when non-join columns have complex types + # 2. Cannot compare columns with complex types # See: https://github.com/apache/arrow/issues/35785 MARKER_COLUMN_NAME = "__from_target" SOURCE_INDEX_COLUMN_NAME = "__source_index" From 85424d82f85be27f5195156983bdf5029d942fd1 Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Thu, 24 Apr 2025 22:35:52 +0200 Subject: [PATCH 13/14] Replace left outer join + filter with inner join --- pyiceberg/table/upsert_util.py | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/pyiceberg/table/upsert_util.py b/pyiceberg/table/upsert_util.py index 241ecc6d4a..cefdd101a0 100644 --- a/pyiceberg/table/upsert_util.py +++ b/pyiceberg/table/upsert_util.py @@ -76,13 +76,12 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols # 1. Cannot do a join when non-join columns have complex types # 2. Cannot compare columns with complex types # See: https://github.com/apache/arrow/issues/35785 - MARKER_COLUMN_NAME = "__from_target" SOURCE_INDEX_COLUMN_NAME = "__source_index" TARGET_INDEX_COLUMN_NAME = "__target_index" - if MARKER_COLUMN_NAME in join_cols or SOURCE_INDEX_COLUMN_NAME in join_cols or TARGET_INDEX_COLUMN_NAME in join_cols: + if SOURCE_INDEX_COLUMN_NAME in join_cols or TARGET_INDEX_COLUMN_NAME in join_cols: raise ValueError( - f"{MARKER_COLUMN_NAME}, {SOURCE_INDEX_COLUMN_NAME} and {TARGET_INDEX_COLUMN_NAME} are reserved for joining " + f"{SOURCE_INDEX_COLUMN_NAME} and {TARGET_INDEX_COLUMN_NAME} are reserved for joining " f"DataFrames, and cannot be used as column names" ) from None @@ -96,19 +95,12 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols ) # Step 2: Prepare target index with join keys and a marker - target_index = ( - target_table.select(join_cols_set) - .append_column(TARGET_INDEX_COLUMN_NAME, pa.array(range(len(target_table)))) - .append_column(MARKER_COLUMN_NAME, pa.repeat(True, len(target_table))) - ) - - # Step 3: Perform a left outer join to find which rows from source exist in target - joined = source_index.join(target_index, keys=list(join_cols_set), join_type="left outer") + target_index = target_table.select(join_cols_set).append_column(TARGET_INDEX_COLUMN_NAME, pa.array(range(len(target_table)))) - # Step 4: Create indices for rows that do exist in the target i.e., where marker column is true after the join - matching_indices = joined.filter(pc.field(MARKER_COLUMN_NAME)) + # Step 3: Perform an inner join to find which rows from source exist in target + matching_indices = source_index.join(target_index, keys=list(join_cols_set), join_type="inner") - # Step 5: Compare all rows using Python + # Step 4: Compare all rows using Python to_update_indices = [] for source_idx, target_idx in zip( matching_indices[SOURCE_INDEX_COLUMN_NAME].to_pylist(), matching_indices[TARGET_INDEX_COLUMN_NAME].to_pylist() @@ -123,7 +115,7 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols to_update_indices.append(source_idx) break - # Step 6: Take rows from source table using the indices and cast to target schema + # Step 5: Take rows from source table using the indices and cast to target schema if to_update_indices: return source_table.take(to_update_indices) else: From 7da6cca08dad7e0959230411c8f646c1f01dcd0a Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Thu, 24 Apr 2025 22:40:28 +0200 Subject: [PATCH 14/14] Add test from Kevins branch for upsert with struct field as join key + rename upsert test with struct field as non-join key --- tests/table/test_upsert.py | 69 +++++++++++++++++++++++++++++++++++++- 1 file changed, 68 insertions(+), 1 deletion(-) diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py index 3a450e8b4e..70203fd162 100644 --- a/tests/table/test_upsert.py +++ b/tests/table/test_upsert.py @@ -511,7 +511,7 @@ def test_upsert_without_identifier_fields(catalog: Catalog) -> None: tbl.upsert(df) -def test_upsert_struct_field_fails_in_join(catalog: Catalog) -> None: +def test_upsert_with_struct_field_as_non_join_key(catalog: Catalog) -> None: identifier = "default.test_upsert_struct_field_fails" _drop_table(catalog, identifier) @@ -601,6 +601,73 @@ def test_upsert_struct_field_fails_in_join(catalog: Catalog) -> None: assert_upsert_result(res, expected_updated, expected_inserted) +def test_upsert_with_struct_field_as_join_key(catalog: Catalog) -> None: + identifier = "default.test_upsert_with_struct_field_as_join_key" + _drop_table(catalog, identifier) + + schema = Schema( + NestedField(1, "id", IntegerType(), required=True), + NestedField( + 2, + "nested_type", + StructType( + NestedField(3, "sub1", StringType(), required=True), + NestedField(4, "sub2", StringType(), required=True), + ), + required=False, + ), + identifier_field_ids=[1], + ) + + tbl = catalog.create_table(identifier, schema=schema) + + arrow_schema = pa.schema( + [ + pa.field("id", pa.int32(), nullable=False), + pa.field( + "nested_type", + pa.struct( + [ + pa.field("sub1", pa.large_string(), nullable=False), + pa.field("sub2", pa.large_string(), nullable=False), + ] + ), + nullable=True, + ), + ] + ) + + initial_data = pa.Table.from_pylist( + [ + { + "id": 1, + "nested_type": {"sub1": "bla1", "sub2": "bla"}, + } + ], + schema=arrow_schema, + ) + tbl.append(initial_data) + + update_data = pa.Table.from_pylist( + [ + { + "id": 2, + "nested_type": {"sub1": "bla1", "sub2": "bla"}, + }, + { + "id": 1, + "nested_type": {"sub1": "bla1", "sub2": "bla"}, + }, + ], + schema=arrow_schema, + ) + + with pytest.raises( + pa.lib.ArrowNotImplementedError, match="Keys of type struct" + ): + _ = tbl.upsert(update_data, join_cols=["nested_type"]) + + def test_upsert_with_nulls(catalog: Catalog) -> None: identifier = "default.test_upsert_with_nulls" _drop_table(catalog, identifier)