Skip to content

Commit 9b6c9ed

Browse files
committed
Merge branch 'main' into feat/orphan-files
2 parents 7c780d3 + d9f3a07 commit 9b6c9ed

File tree

11 files changed

+786
-380
lines changed

11 files changed

+786
-380
lines changed

poetry.lock

Lines changed: 156 additions & 113 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyiceberg/io/pyarrow.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,12 @@ def visit_fixed(self, fixed_type: FixedType) -> pa.DataType:
636636
return pa.binary(len(fixed_type))
637637

638638
def visit_decimal(self, decimal_type: DecimalType) -> pa.DataType:
639+
# It looks like decimal{32,64} is not fully implemented:
640+
# https://github.com/apache/arrow/issues/25483
641+
# https://github.com/apache/arrow/issues/43956
642+
# However, if we keep it as 128 in memory, and based on the
643+
# precision/scale Arrow will map it to INT{32,64}
644+
# https://github.com/apache/arrow/blob/598938711a8376cbfdceaf5c77ab0fd5057e6c02/cpp/src/parquet/arrow/schema.cc#L380-L392
639645
return pa.decimal128(decimal_type.precision, decimal_type.scale)
640646

641647
def visit_boolean(self, _: BooleanType) -> pa.DataType:
@@ -2442,7 +2448,9 @@ def write_parquet(task: WriteTask) -> DataFile:
24422448
)
24432449
fo = io.new_output(file_path)
24442450
with fo.create(overwrite=True) as fos:
2445-
with pq.ParquetWriter(fos, schema=arrow_table.schema, **parquet_writer_kwargs) as writer:
2451+
with pq.ParquetWriter(
2452+
fos, schema=arrow_table.schema, store_decimal_as_integer=True, **parquet_writer_kwargs
2453+
) as writer:
24462454
writer.write(arrow_table, row_group_size=row_group_size)
24472455
statistics = data_file_statistics_from_parquet_metadata(
24482456
parquet_metadata=writer.writer.metadata,

pyiceberg/table/__init__.py

Lines changed: 123 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -696,6 +696,122 @@ def delete(
696696
if not delete_snapshot.files_affected and not delete_snapshot.rewrites_needed:
697697
warnings.warn("Delete operation did not match any records")
698698

699+
def upsert(
700+
self,
701+
df: pa.Table,
702+
join_cols: Optional[List[str]] = None,
703+
when_matched_update_all: bool = True,
704+
when_not_matched_insert_all: bool = True,
705+
case_sensitive: bool = True,
706+
) -> UpsertResult:
707+
"""Shorthand API for performing an upsert to an iceberg table.
708+
709+
Args:
710+
711+
df: The input dataframe to upsert with the table's data.
712+
join_cols: Columns to join on, if not provided, it will use the identifier-field-ids.
713+
when_matched_update_all: Bool indicating to update rows that are matched but require an update due to a value in a non-key column changing
714+
when_not_matched_insert_all: Bool indicating new rows to be inserted that do not match any existing rows in the table
715+
case_sensitive: Bool indicating if the match should be case-sensitive
716+
717+
To learn more about the identifier-field-ids: https://iceberg.apache.org/spec/#identifier-field-ids
718+
719+
Example Use Cases:
720+
Case 1: Both Parameters = True (Full Upsert)
721+
Existing row found → Update it
722+
New row found → Insert it
723+
724+
Case 2: when_matched_update_all = False, when_not_matched_insert_all = True
725+
Existing row found → Do nothing (no updates)
726+
New row found → Insert it
727+
728+
Case 3: when_matched_update_all = True, when_not_matched_insert_all = False
729+
Existing row found → Update it
730+
New row found → Do nothing (no inserts)
731+
732+
Case 4: Both Parameters = False (No Merge Effect)
733+
Existing row found → Do nothing
734+
New row found → Do nothing
735+
(Function effectively does nothing)
736+
737+
738+
Returns:
739+
An UpsertResult class (contains details of rows updated and inserted)
740+
"""
741+
try:
742+
import pyarrow as pa # noqa: F401
743+
except ModuleNotFoundError as e:
744+
raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e
745+
746+
from pyiceberg.io.pyarrow import expression_to_pyarrow
747+
from pyiceberg.table import upsert_util
748+
749+
if join_cols is None:
750+
join_cols = []
751+
for field_id in self.table_metadata.schema().identifier_field_ids:
752+
col = self.table_metadata.schema().find_column_name(field_id)
753+
if col is not None:
754+
join_cols.append(col)
755+
else:
756+
raise ValueError(f"Field-ID could not be found: {join_cols}")
757+
758+
if len(join_cols) == 0:
759+
raise ValueError("Join columns could not be found, please set identifier-field-ids or pass in explicitly.")
760+
761+
if not when_matched_update_all and not when_not_matched_insert_all:
762+
raise ValueError("no upsert options selected...exiting")
763+
764+
if upsert_util.has_duplicate_rows(df, join_cols):
765+
raise ValueError("Duplicate rows found in source dataset based on the key columns. No upsert executed")
766+
767+
from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible
768+
769+
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
770+
_check_pyarrow_schema_compatible(
771+
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
772+
)
773+
774+
# get list of rows that exist so we don't have to load the entire target table
775+
matched_predicate = upsert_util.create_match_filter(df, join_cols)
776+
777+
# We must use Transaction.table_metadata for the scan. This includes all uncommitted - but relevant - changes.
778+
matched_iceberg_table = DataScan(
779+
table_metadata=self.table_metadata,
780+
io=self._table.io,
781+
row_filter=matched_predicate,
782+
case_sensitive=case_sensitive,
783+
).to_arrow()
784+
785+
update_row_cnt = 0
786+
insert_row_cnt = 0
787+
788+
if when_matched_update_all:
789+
# function get_rows_to_update is doing a check on non-key columns to see if any of the values have actually changed
790+
# we don't want to do just a blanket overwrite for matched rows if the actual non-key column data hasn't changed
791+
# this extra step avoids unnecessary IO and writes
792+
rows_to_update = upsert_util.get_rows_to_update(df, matched_iceberg_table, join_cols)
793+
794+
update_row_cnt = len(rows_to_update)
795+
796+
if len(rows_to_update) > 0:
797+
# build the match predicate filter
798+
overwrite_mask_predicate = upsert_util.create_match_filter(rows_to_update, join_cols)
799+
800+
self.overwrite(rows_to_update, overwrite_filter=overwrite_mask_predicate)
801+
802+
if when_not_matched_insert_all:
803+
expr_match = upsert_util.create_match_filter(matched_iceberg_table, join_cols)
804+
expr_match_bound = bind(self.table_metadata.schema(), expr_match, case_sensitive=case_sensitive)
805+
expr_match_arrow = expression_to_pyarrow(expr_match_bound)
806+
rows_to_insert = df.filter(~expr_match_arrow)
807+
808+
insert_row_cnt = len(rows_to_insert)
809+
810+
if insert_row_cnt > 0:
811+
self.append(rows_to_insert)
812+
813+
return UpsertResult(rows_updated=update_row_cnt, rows_inserted=insert_row_cnt)
814+
699815
def add_files(
700816
self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True
701817
) -> None:
@@ -1169,73 +1285,14 @@ def upsert(
11691285
Returns:
11701286
An UpsertResult class (contains details of rows updated and inserted)
11711287
"""
1172-
try:
1173-
import pyarrow as pa # noqa: F401
1174-
except ModuleNotFoundError as e:
1175-
raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e
1176-
1177-
from pyiceberg.io.pyarrow import expression_to_pyarrow
1178-
from pyiceberg.table import upsert_util
1179-
1180-
if join_cols is None:
1181-
join_cols = []
1182-
for field_id in self.schema().identifier_field_ids:
1183-
col = self.schema().find_column_name(field_id)
1184-
if col is not None:
1185-
join_cols.append(col)
1186-
else:
1187-
raise ValueError(f"Field-ID could not be found: {join_cols}")
1188-
1189-
if len(join_cols) == 0:
1190-
raise ValueError("Join columns could not be found, please set identifier-field-ids or pass in explicitly.")
1191-
1192-
if not when_matched_update_all and not when_not_matched_insert_all:
1193-
raise ValueError("no upsert options selected...exiting")
1194-
1195-
if upsert_util.has_duplicate_rows(df, join_cols):
1196-
raise ValueError("Duplicate rows found in source dataset based on the key columns. No upsert executed")
1197-
1198-
from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible
1199-
1200-
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
1201-
_check_pyarrow_schema_compatible(
1202-
self.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
1203-
)
1204-
1205-
# get list of rows that exist so we don't have to load the entire target table
1206-
matched_predicate = upsert_util.create_match_filter(df, join_cols)
1207-
matched_iceberg_table = self.scan(row_filter=matched_predicate, case_sensitive=case_sensitive).to_arrow()
1208-
1209-
update_row_cnt = 0
1210-
insert_row_cnt = 0
1211-
12121288
with self.transaction() as tx:
1213-
if when_matched_update_all:
1214-
# function get_rows_to_update is doing a check on non-key columns to see if any of the values have actually changed
1215-
# we don't want to do just a blanket overwrite for matched rows if the actual non-key column data hasn't changed
1216-
# this extra step avoids unnecessary IO and writes
1217-
rows_to_update = upsert_util.get_rows_to_update(df, matched_iceberg_table, join_cols)
1218-
1219-
update_row_cnt = len(rows_to_update)
1220-
1221-
if len(rows_to_update) > 0:
1222-
# build the match predicate filter
1223-
overwrite_mask_predicate = upsert_util.create_match_filter(rows_to_update, join_cols)
1224-
1225-
tx.overwrite(rows_to_update, overwrite_filter=overwrite_mask_predicate)
1226-
1227-
if when_not_matched_insert_all:
1228-
expr_match = upsert_util.create_match_filter(matched_iceberg_table, join_cols)
1229-
expr_match_bound = bind(self.schema(), expr_match, case_sensitive=case_sensitive)
1230-
expr_match_arrow = expression_to_pyarrow(expr_match_bound)
1231-
rows_to_insert = df.filter(~expr_match_arrow)
1232-
1233-
insert_row_cnt = len(rows_to_insert)
1234-
1235-
if insert_row_cnt > 0:
1236-
tx.append(rows_to_insert)
1237-
1238-
return UpsertResult(rows_updated=update_row_cnt, rows_inserted=insert_row_cnt)
1289+
return tx.upsert(
1290+
df=df,
1291+
join_cols=join_cols,
1292+
when_matched_update_all=when_matched_update_all,
1293+
when_not_matched_insert_all=when_not_matched_insert_all,
1294+
case_sensitive=case_sensitive,
1295+
)
12391296

12401297
def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
12411298
"""

0 commit comments

Comments
 (0)