@@ -737,8 +737,8 @@ def upsert(
737737
738738 if join_cols is None :
739739 join_cols = []
740- for field_id in df . schema .identifier_field_ids :
741- col = df . schema .find_column_name (field_id )
740+ for field_id in self . table_metadata . schema () .identifier_field_ids :
741+ col = self . table_metadata . schema () .find_column_name (field_id )
742742 if col is not None :
743743 join_cols .append (col )
744744 else :
@@ -757,12 +757,12 @@ def upsert(
757757
758758 downcast_ns_timestamp_to_us = Config ().get_bool (DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE ) or False
759759 _check_pyarrow_schema_compatible (
760- df . schema , provided_schema = df .schema , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
760+ self . table_metadata . schema () , provided_schema = df .schema , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
761761 )
762762
763763 # get list of rows that exist so we don't have to load the entire target table
764764 matched_predicate = upsert_util .create_match_filter (df , join_cols )
765- matched_iceberg_table = df .scan (row_filter = matched_predicate , case_sensitive = case_sensitive ).to_arrow ()
765+ matched_iceberg_table = self . _table .scan (row_filter = matched_predicate , case_sensitive = case_sensitive ).to_arrow ()
766766
767767 update_row_cnt = 0
768768 insert_row_cnt = 0
@@ -783,7 +783,7 @@ def upsert(
783783
784784 if when_not_matched_insert_all :
785785 expr_match = upsert_util .create_match_filter (matched_iceberg_table , join_cols )
786- expr_match_bound = bind (df . schema , expr_match , case_sensitive = case_sensitive )
786+ expr_match_bound = bind (self . table_metadata . schema () , expr_match , case_sensitive = case_sensitive )
787787 expr_match_arrow = expression_to_pyarrow (expr_match_bound )
788788 rows_to_insert = df .filter (~ expr_match_arrow )
789789
@@ -1260,8 +1260,11 @@ def upsert(
12601260 """
12611261 with self .transaction () as tx :
12621262 return tx .upsert (
1263- df = df , join_cols = join_cols , when_matched_update_all = when_matched_update_all , when_not_matched_insert_all = when_not_matched_insert_all ,
1264- case_sensitive = case_sensitive
1263+ df = df ,
1264+ join_cols = join_cols ,
1265+ when_matched_update_all = when_matched_update_all ,
1266+ when_not_matched_insert_all = when_not_matched_insert_all ,
1267+ case_sensitive = case_sensitive ,
12651268 )
12661269
12671270 def append (self , df : pa .Table , snapshot_properties : Dict [str , str ] = EMPTY_DICT ) -> None :
0 commit comments