Skip to content

Commit aae22e6

Browse files
committed
Merge branch 'main' into feat/validate-deleted-data-files
2 parents 03b2913 + ff7bc62 commit aae22e6

File tree

16 files changed

+925
-388
lines changed

16 files changed

+925
-388
lines changed

.github/ISSUE_TEMPLATE/iceberg_bug_report.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ body:
2828
description: What Apache Iceberg version are you using?
2929
multiple: false
3030
options:
31-
- "0.9.0 (latest release)"
31+
- "0.9.1 (latest release)"
32+
- "0.9.0"
3233
- "0.8.1"
3334
- "0.8.0"
3435
- "0.7.1"

mkdocs/docs/configuration.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,7 @@ catalog:
332332
| rest.signing-region | us-east-1 | The region to use when SigV4 signing a request |
333333
| rest.signing-name | execute-api | The service signing name to use when SigV4 signing a request |
334334
| oauth2-server-uri | <https://auth-service/cc> | Authentication URL to use for client credentials authentication (default: uri + 'v1/oauth/tokens') |
335+
| snapshot-loading-mode | refs | The snapshots to return in the body of the metadata. Setting the value to `all` would return the full set of snapshots currently valid for the table. Setting the value to `refs` would load all snapshots referenced by branches or tags. |
335336

336337
<!-- markdown-link-check-enable-->
337338

poetry.lock

Lines changed: 165 additions & 121 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyiceberg/catalog/rest/__init__.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ class IdentifierKind(Enum):
138138
SIGV4_REGION = "rest.signing-region"
139139
SIGV4_SERVICE = "rest.signing-name"
140140
OAUTH2_SERVER_URI = "oauth2-server-uri"
141+
SNAPSHOT_LOADING_MODE = "snapshot-loading-mode"
141142

142143
NAMESPACE_SEPARATOR = b"\x1f".decode(UTF8)
143144

@@ -678,7 +679,16 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
678679

679680
@retry(**_RETRY_ARGS)
680681
def load_table(self, identifier: Union[str, Identifier]) -> Table:
681-
response = self._session.get(self.url(Endpoints.load_table, prefixed=True, **self._split_identifier_for_path(identifier)))
682+
params = {}
683+
if mode := self.properties.get(SNAPSHOT_LOADING_MODE):
684+
if mode in {"all", "refs"}:
685+
params["snapshots"] = mode
686+
else:
687+
raise ValueError("Invalid snapshot-loading-mode: {}")
688+
689+
response = self._session.get(
690+
self.url(Endpoints.load_table, prefixed=True, **self._split_identifier_for_path(identifier)), params=params
691+
)
682692
try:
683693
response.raise_for_status()
684694
except HTTPError as exc:

pyiceberg/io/fsspec.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ def _s3(properties: Properties) -> AbstractFileSystem:
163163
fs = S3FileSystem(client_kwargs=client_kwargs, config_kwargs=config_kwargs)
164164

165165
for event_name, event_function in register_events.items():
166+
fs.s3.meta.events.unregister(event_name, unique_id=1925)
166167
fs.s3.meta.events.register_last(event_name, event_function, unique_id=1925)
167168

168169
return fs

pyiceberg/io/pyarrow.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,12 @@ def visit_fixed(self, fixed_type: FixedType) -> pa.DataType:
636636
return pa.binary(len(fixed_type))
637637

638638
def visit_decimal(self, decimal_type: DecimalType) -> pa.DataType:
639+
# It looks like decimal{32,64} is not fully implemented:
640+
# https://github.com/apache/arrow/issues/25483
641+
# https://github.com/apache/arrow/issues/43956
642+
# However, if we keep it as 128 in memory, and based on the
643+
# precision/scale Arrow will map it to INT{32,64}
644+
# https://github.com/apache/arrow/blob/598938711a8376cbfdceaf5c77ab0fd5057e6c02/cpp/src/parquet/arrow/schema.cc#L380-L392
639645
return pa.decimal128(decimal_type.precision, decimal_type.scale)
640646

641647
def visit_boolean(self, _: BooleanType) -> pa.DataType:
@@ -2442,7 +2448,9 @@ def write_parquet(task: WriteTask) -> DataFile:
24422448
)
24432449
fo = io.new_output(file_path)
24442450
with fo.create(overwrite=True) as fos:
2445-
with pq.ParquetWriter(fos, schema=arrow_table.schema, **parquet_writer_kwargs) as writer:
2451+
with pq.ParquetWriter(
2452+
fos, schema=arrow_table.schema, store_decimal_as_integer=True, **parquet_writer_kwargs
2453+
) as writer:
24462454
writer.write(arrow_table, row_group_size=row_group_size)
24472455
statistics = data_file_statistics_from_parquet_metadata(
24482456
parquet_metadata=writer.writer.metadata,

pyiceberg/table/__init__.py

Lines changed: 123 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -695,6 +695,122 @@ def delete(
695695
if not delete_snapshot.files_affected and not delete_snapshot.rewrites_needed:
696696
warnings.warn("Delete operation did not match any records")
697697

698+
def upsert(
699+
self,
700+
df: pa.Table,
701+
join_cols: Optional[List[str]] = None,
702+
when_matched_update_all: bool = True,
703+
when_not_matched_insert_all: bool = True,
704+
case_sensitive: bool = True,
705+
) -> UpsertResult:
706+
"""Shorthand API for performing an upsert to an iceberg table.
707+
708+
Args:
709+
710+
df: The input dataframe to upsert with the table's data.
711+
join_cols: Columns to join on, if not provided, it will use the identifier-field-ids.
712+
when_matched_update_all: Bool indicating to update rows that are matched but require an update due to a value in a non-key column changing
713+
when_not_matched_insert_all: Bool indicating new rows to be inserted that do not match any existing rows in the table
714+
case_sensitive: Bool indicating if the match should be case-sensitive
715+
716+
To learn more about the identifier-field-ids: https://iceberg.apache.org/spec/#identifier-field-ids
717+
718+
Example Use Cases:
719+
Case 1: Both Parameters = True (Full Upsert)
720+
Existing row found → Update it
721+
New row found → Insert it
722+
723+
Case 2: when_matched_update_all = False, when_not_matched_insert_all = True
724+
Existing row found → Do nothing (no updates)
725+
New row found → Insert it
726+
727+
Case 3: when_matched_update_all = True, when_not_matched_insert_all = False
728+
Existing row found → Update it
729+
New row found → Do nothing (no inserts)
730+
731+
Case 4: Both Parameters = False (No Merge Effect)
732+
Existing row found → Do nothing
733+
New row found → Do nothing
734+
(Function effectively does nothing)
735+
736+
737+
Returns:
738+
An UpsertResult class (contains details of rows updated and inserted)
739+
"""
740+
try:
741+
import pyarrow as pa # noqa: F401
742+
except ModuleNotFoundError as e:
743+
raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e
744+
745+
from pyiceberg.io.pyarrow import expression_to_pyarrow
746+
from pyiceberg.table import upsert_util
747+
748+
if join_cols is None:
749+
join_cols = []
750+
for field_id in self.table_metadata.schema().identifier_field_ids:
751+
col = self.table_metadata.schema().find_column_name(field_id)
752+
if col is not None:
753+
join_cols.append(col)
754+
else:
755+
raise ValueError(f"Field-ID could not be found: {join_cols}")
756+
757+
if len(join_cols) == 0:
758+
raise ValueError("Join columns could not be found, please set identifier-field-ids or pass in explicitly.")
759+
760+
if not when_matched_update_all and not when_not_matched_insert_all:
761+
raise ValueError("no upsert options selected...exiting")
762+
763+
if upsert_util.has_duplicate_rows(df, join_cols):
764+
raise ValueError("Duplicate rows found in source dataset based on the key columns. No upsert executed")
765+
766+
from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible
767+
768+
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
769+
_check_pyarrow_schema_compatible(
770+
self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
771+
)
772+
773+
# get list of rows that exist so we don't have to load the entire target table
774+
matched_predicate = upsert_util.create_match_filter(df, join_cols)
775+
776+
# We must use Transaction.table_metadata for the scan. This includes all uncommitted - but relevant - changes.
777+
matched_iceberg_table = DataScan(
778+
table_metadata=self.table_metadata,
779+
io=self._table.io,
780+
row_filter=matched_predicate,
781+
case_sensitive=case_sensitive,
782+
).to_arrow()
783+
784+
update_row_cnt = 0
785+
insert_row_cnt = 0
786+
787+
if when_matched_update_all:
788+
# function get_rows_to_update is doing a check on non-key columns to see if any of the values have actually changed
789+
# we don't want to do just a blanket overwrite for matched rows if the actual non-key column data hasn't changed
790+
# this extra step avoids unnecessary IO and writes
791+
rows_to_update = upsert_util.get_rows_to_update(df, matched_iceberg_table, join_cols)
792+
793+
update_row_cnt = len(rows_to_update)
794+
795+
if len(rows_to_update) > 0:
796+
# build the match predicate filter
797+
overwrite_mask_predicate = upsert_util.create_match_filter(rows_to_update, join_cols)
798+
799+
self.overwrite(rows_to_update, overwrite_filter=overwrite_mask_predicate)
800+
801+
if when_not_matched_insert_all:
802+
expr_match = upsert_util.create_match_filter(matched_iceberg_table, join_cols)
803+
expr_match_bound = bind(self.table_metadata.schema(), expr_match, case_sensitive=case_sensitive)
804+
expr_match_arrow = expression_to_pyarrow(expr_match_bound)
805+
rows_to_insert = df.filter(~expr_match_arrow)
806+
807+
insert_row_cnt = len(rows_to_insert)
808+
809+
if insert_row_cnt > 0:
810+
self.append(rows_to_insert)
811+
812+
return UpsertResult(rows_updated=update_row_cnt, rows_inserted=insert_row_cnt)
813+
698814
def add_files(
699815
self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT, check_duplicate_files: bool = True
700816
) -> None:
@@ -1159,73 +1275,14 @@ def upsert(
11591275
Returns:
11601276
An UpsertResult class (contains details of rows updated and inserted)
11611277
"""
1162-
try:
1163-
import pyarrow as pa # noqa: F401
1164-
except ModuleNotFoundError as e:
1165-
raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e
1166-
1167-
from pyiceberg.io.pyarrow import expression_to_pyarrow
1168-
from pyiceberg.table import upsert_util
1169-
1170-
if join_cols is None:
1171-
join_cols = []
1172-
for field_id in self.schema().identifier_field_ids:
1173-
col = self.schema().find_column_name(field_id)
1174-
if col is not None:
1175-
join_cols.append(col)
1176-
else:
1177-
raise ValueError(f"Field-ID could not be found: {join_cols}")
1178-
1179-
if len(join_cols) == 0:
1180-
raise ValueError("Join columns could not be found, please set identifier-field-ids or pass in explicitly.")
1181-
1182-
if not when_matched_update_all and not when_not_matched_insert_all:
1183-
raise ValueError("no upsert options selected...exiting")
1184-
1185-
if upsert_util.has_duplicate_rows(df, join_cols):
1186-
raise ValueError("Duplicate rows found in source dataset based on the key columns. No upsert executed")
1187-
1188-
from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible
1189-
1190-
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
1191-
_check_pyarrow_schema_compatible(
1192-
self.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
1193-
)
1194-
1195-
# get list of rows that exist so we don't have to load the entire target table
1196-
matched_predicate = upsert_util.create_match_filter(df, join_cols)
1197-
matched_iceberg_table = self.scan(row_filter=matched_predicate, case_sensitive=case_sensitive).to_arrow()
1198-
1199-
update_row_cnt = 0
1200-
insert_row_cnt = 0
1201-
12021278
with self.transaction() as tx:
1203-
if when_matched_update_all:
1204-
# function get_rows_to_update is doing a check on non-key columns to see if any of the values have actually changed
1205-
# we don't want to do just a blanket overwrite for matched rows if the actual non-key column data hasn't changed
1206-
# this extra step avoids unnecessary IO and writes
1207-
rows_to_update = upsert_util.get_rows_to_update(df, matched_iceberg_table, join_cols)
1208-
1209-
update_row_cnt = len(rows_to_update)
1210-
1211-
if len(rows_to_update) > 0:
1212-
# build the match predicate filter
1213-
overwrite_mask_predicate = upsert_util.create_match_filter(rows_to_update, join_cols)
1214-
1215-
tx.overwrite(rows_to_update, overwrite_filter=overwrite_mask_predicate)
1216-
1217-
if when_not_matched_insert_all:
1218-
expr_match = upsert_util.create_match_filter(matched_iceberg_table, join_cols)
1219-
expr_match_bound = bind(self.schema(), expr_match, case_sensitive=case_sensitive)
1220-
expr_match_arrow = expression_to_pyarrow(expr_match_bound)
1221-
rows_to_insert = df.filter(~expr_match_arrow)
1222-
1223-
insert_row_cnt = len(rows_to_insert)
1224-
1225-
if insert_row_cnt > 0:
1226-
tx.append(rows_to_insert)
1227-
1228-
return UpsertResult(rows_updated=update_row_cnt, rows_inserted=insert_row_cnt)
1279+
return tx.upsert(
1280+
df=df,
1281+
join_cols=join_cols,
1282+
when_matched_update_all=when_matched_update_all,
1283+
when_not_matched_insert_all=when_not_matched_insert_all,
1284+
case_sensitive=case_sensitive,
1285+
)
12291286

12301287
def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
12311288
"""

0 commit comments

Comments
 (0)