@@ -695,6 +695,122 @@ def delete(
695695 if not delete_snapshot .files_affected and not delete_snapshot .rewrites_needed :
696696 warnings .warn ("Delete operation did not match any records" )
697697
698+ def upsert (
699+ self ,
700+ df : pa .Table ,
701+ join_cols : Optional [List [str ]] = None ,
702+ when_matched_update_all : bool = True ,
703+ when_not_matched_insert_all : bool = True ,
704+ case_sensitive : bool = True ,
705+ ) -> UpsertResult :
706+ """Shorthand API for performing an upsert to an iceberg table.
707+
708+ Args:
709+
710+ df: The input dataframe to upsert with the table's data.
711+ join_cols: Columns to join on, if not provided, it will use the identifier-field-ids.
712+ when_matched_update_all: Bool indicating to update rows that are matched but require an update due to a value in a non-key column changing
713+ when_not_matched_insert_all: Bool indicating new rows to be inserted that do not match any existing rows in the table
714+ case_sensitive: Bool indicating if the match should be case-sensitive
715+
716+ To learn more about the identifier-field-ids: https://iceberg.apache.org/spec/#identifier-field-ids
717+
718+ Example Use Cases:
719+ Case 1: Both Parameters = True (Full Upsert)
720+ Existing row found → Update it
721+ New row found → Insert it
722+
723+ Case 2: when_matched_update_all = False, when_not_matched_insert_all = True
724+ Existing row found → Do nothing (no updates)
725+ New row found → Insert it
726+
727+ Case 3: when_matched_update_all = True, when_not_matched_insert_all = False
728+ Existing row found → Update it
729+ New row found → Do nothing (no inserts)
730+
731+ Case 4: Both Parameters = False (No Merge Effect)
732+ Existing row found → Do nothing
733+ New row found → Do nothing
734+ (Function effectively does nothing)
735+
736+
737+ Returns:
738+ An UpsertResult class (contains details of rows updated and inserted)
739+ """
740+ try :
741+ import pyarrow as pa # noqa: F401
742+ except ModuleNotFoundError as e :
743+ raise ModuleNotFoundError ("For writes PyArrow needs to be installed" ) from e
744+
745+ from pyiceberg .io .pyarrow import expression_to_pyarrow
746+ from pyiceberg .table import upsert_util
747+
748+ if join_cols is None :
749+ join_cols = []
750+ for field_id in self .table_metadata .schema ().identifier_field_ids :
751+ col = self .table_metadata .schema ().find_column_name (field_id )
752+ if col is not None :
753+ join_cols .append (col )
754+ else :
755+ raise ValueError (f"Field-ID could not be found: { join_cols } " )
756+
757+ if len (join_cols ) == 0 :
758+ raise ValueError ("Join columns could not be found, please set identifier-field-ids or pass in explicitly." )
759+
760+ if not when_matched_update_all and not when_not_matched_insert_all :
761+ raise ValueError ("no upsert options selected...exiting" )
762+
763+ if upsert_util .has_duplicate_rows (df , join_cols ):
764+ raise ValueError ("Duplicate rows found in source dataset based on the key columns. No upsert executed" )
765+
766+ from pyiceberg .io .pyarrow import _check_pyarrow_schema_compatible
767+
768+ downcast_ns_timestamp_to_us = Config ().get_bool (DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE ) or False
769+ _check_pyarrow_schema_compatible (
770+ self .table_metadata .schema (), provided_schema = df .schema , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
771+ )
772+
773+ # get list of rows that exist so we don't have to load the entire target table
774+ matched_predicate = upsert_util .create_match_filter (df , join_cols )
775+
776+ # We must use Transaction.table_metadata for the scan. This includes all uncommitted - but relevant - changes.
777+ matched_iceberg_table = DataScan (
778+ table_metadata = self .table_metadata ,
779+ io = self ._table .io ,
780+ row_filter = matched_predicate ,
781+ case_sensitive = case_sensitive ,
782+ ).to_arrow ()
783+
784+ update_row_cnt = 0
785+ insert_row_cnt = 0
786+
787+ if when_matched_update_all :
788+ # function get_rows_to_update is doing a check on non-key columns to see if any of the values have actually changed
789+ # we don't want to do just a blanket overwrite for matched rows if the actual non-key column data hasn't changed
790+ # this extra step avoids unnecessary IO and writes
791+ rows_to_update = upsert_util .get_rows_to_update (df , matched_iceberg_table , join_cols )
792+
793+ update_row_cnt = len (rows_to_update )
794+
795+ if len (rows_to_update ) > 0 :
796+ # build the match predicate filter
797+ overwrite_mask_predicate = upsert_util .create_match_filter (rows_to_update , join_cols )
798+
799+ self .overwrite (rows_to_update , overwrite_filter = overwrite_mask_predicate )
800+
801+ if when_not_matched_insert_all :
802+ expr_match = upsert_util .create_match_filter (matched_iceberg_table , join_cols )
803+ expr_match_bound = bind (self .table_metadata .schema (), expr_match , case_sensitive = case_sensitive )
804+ expr_match_arrow = expression_to_pyarrow (expr_match_bound )
805+ rows_to_insert = df .filter (~ expr_match_arrow )
806+
807+ insert_row_cnt = len (rows_to_insert )
808+
809+ if insert_row_cnt > 0 :
810+ self .append (rows_to_insert )
811+
812+ return UpsertResult (rows_updated = update_row_cnt , rows_inserted = insert_row_cnt )
813+
698814 def add_files (
699815 self , file_paths : List [str ], snapshot_properties : Dict [str , str ] = EMPTY_DICT , check_duplicate_files : bool = True
700816 ) -> None :
@@ -1159,73 +1275,14 @@ def upsert(
11591275 Returns:
11601276 An UpsertResult class (contains details of rows updated and inserted)
11611277 """
1162- try :
1163- import pyarrow as pa # noqa: F401
1164- except ModuleNotFoundError as e :
1165- raise ModuleNotFoundError ("For writes PyArrow needs to be installed" ) from e
1166-
1167- from pyiceberg .io .pyarrow import expression_to_pyarrow
1168- from pyiceberg .table import upsert_util
1169-
1170- if join_cols is None :
1171- join_cols = []
1172- for field_id in self .schema ().identifier_field_ids :
1173- col = self .schema ().find_column_name (field_id )
1174- if col is not None :
1175- join_cols .append (col )
1176- else :
1177- raise ValueError (f"Field-ID could not be found: { join_cols } " )
1178-
1179- if len (join_cols ) == 0 :
1180- raise ValueError ("Join columns could not be found, please set identifier-field-ids or pass in explicitly." )
1181-
1182- if not when_matched_update_all and not when_not_matched_insert_all :
1183- raise ValueError ("no upsert options selected...exiting" )
1184-
1185- if upsert_util .has_duplicate_rows (df , join_cols ):
1186- raise ValueError ("Duplicate rows found in source dataset based on the key columns. No upsert executed" )
1187-
1188- from pyiceberg .io .pyarrow import _check_pyarrow_schema_compatible
1189-
1190- downcast_ns_timestamp_to_us = Config ().get_bool (DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE ) or False
1191- _check_pyarrow_schema_compatible (
1192- self .schema (), provided_schema = df .schema , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
1193- )
1194-
1195- # get list of rows that exist so we don't have to load the entire target table
1196- matched_predicate = upsert_util .create_match_filter (df , join_cols )
1197- matched_iceberg_table = self .scan (row_filter = matched_predicate , case_sensitive = case_sensitive ).to_arrow ()
1198-
1199- update_row_cnt = 0
1200- insert_row_cnt = 0
1201-
12021278 with self .transaction () as tx :
1203- if when_matched_update_all :
1204- # function get_rows_to_update is doing a check on non-key columns to see if any of the values have actually changed
1205- # we don't want to do just a blanket overwrite for matched rows if the actual non-key column data hasn't changed
1206- # this extra step avoids unnecessary IO and writes
1207- rows_to_update = upsert_util .get_rows_to_update (df , matched_iceberg_table , join_cols )
1208-
1209- update_row_cnt = len (rows_to_update )
1210-
1211- if len (rows_to_update ) > 0 :
1212- # build the match predicate filter
1213- overwrite_mask_predicate = upsert_util .create_match_filter (rows_to_update , join_cols )
1214-
1215- tx .overwrite (rows_to_update , overwrite_filter = overwrite_mask_predicate )
1216-
1217- if when_not_matched_insert_all :
1218- expr_match = upsert_util .create_match_filter (matched_iceberg_table , join_cols )
1219- expr_match_bound = bind (self .schema (), expr_match , case_sensitive = case_sensitive )
1220- expr_match_arrow = expression_to_pyarrow (expr_match_bound )
1221- rows_to_insert = df .filter (~ expr_match_arrow )
1222-
1223- insert_row_cnt = len (rows_to_insert )
1224-
1225- if insert_row_cnt > 0 :
1226- tx .append (rows_to_insert )
1227-
1228- return UpsertResult (rows_updated = update_row_cnt , rows_inserted = insert_row_cnt )
1279+ return tx .upsert (
1280+ df = df ,
1281+ join_cols = join_cols ,
1282+ when_matched_update_all = when_matched_update_all ,
1283+ when_not_matched_insert_all = when_not_matched_insert_all ,
1284+ case_sensitive = case_sensitive ,
1285+ )
12291286
12301287 def append (self , df : pa .Table , snapshot_properties : Dict [str , str ] = EMPTY_DICT ) -> None :
12311288 """
0 commit comments