Skip to content

Commit 2d17fc4

Browse files
committed
adding add_files_overwrite method
1 parent df69165 commit 2d17fc4

File tree

3 files changed

+442
-13
lines changed

3 files changed

+442
-13
lines changed

mkdocs/docs/api.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -672,7 +672,12 @@ file_paths = [
672672
673673
tbl.add_files(file_paths=file_paths)
674674
675+
# or if you want to overwrite
676+
677+
tbl.add_files_overwrite(file_paths=file_paths)
678+
675679
# A new snapshot is committed to the table with manifests pointing to the existing parquet files
680+
676681
```
677682

678683
<!-- prettier-ignore-start -->

pyiceberg/table/__init__.py

Lines changed: 52 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,26 @@ def add_files(self, file_paths: List[str], snapshot_properties: Dict[str, str] =
474474
for data_file in data_files:
475475
update_snapshot.append_data_file(data_file)
476476

477+
def add_files_overwrite(self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
478+
"""
479+
Shorthand API for adding files as data files and overwriting the table.
480+
481+
Args:
482+
file_paths: The list of full file paths to be added as data files to the table
483+
snapshot_properties: Custom properties to be added to the snapshot summary
484+
485+
Raises:
486+
FileNotFoundError: If the file does not exist.
487+
"""
488+
if self._table.name_mapping() is None:
489+
self.set_properties(**{TableProperties.DEFAULT_NAME_MAPPING: self._table.schema().name_mapping.model_dump_json()})
490+
with self.update_snapshot(snapshot_properties=snapshot_properties).overwrite() as update_snapshot:
491+
data_files = _parquet_files_to_data_files(
492+
table_metadata=self._table.metadata, file_paths=file_paths, io=self._table.io
493+
)
494+
for data_file in data_files:
495+
update_snapshot.append_data_file(data_file)
496+
477497
def update_spec(self) -> UpdateSpec:
478498
"""Create a new UpdateSpec to update the partitioning of the table.
479499
@@ -1383,6 +1403,20 @@ def add_files(self, file_paths: List[str], snapshot_properties: Dict[str, str] =
13831403
with self.transaction() as tx:
13841404
tx.add_files(file_paths=file_paths, snapshot_properties=snapshot_properties)
13851405

1406+
def add_files_overwrite(self, file_paths: List[str], snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
1407+
"""
1408+
Shorthand API for adding files as data files and overwriting the table.
1409+
1410+
Args:
1411+
file_paths: The list of full file paths to be added as data files to the table
1412+
snapshot_properties: Custom properties to be added to the snapshot summary
1413+
1414+
Raises:
1415+
FileNotFoundError: If the file does not exist.
1416+
"""
1417+
with self.transaction() as tx:
1418+
tx.add_files_overwrite(file_paths=file_paths, snapshot_properties=snapshot_properties)
1419+
13861420
def update_spec(self, case_sensitive: bool = True) -> UpdateSpec:
13871421
return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive)
13881422

@@ -3017,9 +3051,9 @@ def fast_append(self) -> FastAppendFiles:
30173051

30183052
def overwrite(self) -> OverwriteFiles:
30193053
return OverwriteFiles(
3020-
operation=Operation.OVERWRITE
3021-
if self._transaction.table_metadata.current_snapshot() is not None
3022-
else Operation.APPEND,
3054+
operation=(
3055+
Operation.OVERWRITE if self._transaction.table_metadata.current_snapshot() is not None else Operation.APPEND
3056+
),
30233057
transaction=self._transaction,
30243058
io=self._io,
30253059
snapshot_properties=self._snapshot_properties,
@@ -3401,12 +3435,16 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
34013435
"null_value_count": null_value_counts.get(field.field_id),
34023436
"nan_value_count": nan_value_counts.get(field.field_id),
34033437
# Makes them readable
3404-
"lower_bound": from_bytes(field.field_type, lower_bound)
3405-
if (lower_bound := lower_bounds.get(field.field_id))
3406-
else None,
3407-
"upper_bound": from_bytes(field.field_type, upper_bound)
3408-
if (upper_bound := upper_bounds.get(field.field_id))
3409-
else None,
3438+
"lower_bound": (
3439+
from_bytes(field.field_type, lower_bound)
3440+
if (lower_bound := lower_bounds.get(field.field_id))
3441+
else None
3442+
),
3443+
"upper_bound": (
3444+
from_bytes(field.field_type, upper_bound)
3445+
if (upper_bound := upper_bounds.get(field.field_id))
3446+
else None
3447+
),
34103448
}
34113449
for field in self.tbl.metadata.schema().fields
34123450
}
@@ -3641,9 +3679,11 @@ def _partition_summaries_to_rows(
36413679
"added_delete_files_count": manifest.added_files_count if is_delete_file else 0,
36423680
"existing_delete_files_count": manifest.existing_files_count if is_delete_file else 0,
36433681
"deleted_delete_files_count": manifest.deleted_files_count if is_delete_file else 0,
3644-
"partition_summaries": _partition_summaries_to_rows(specs[manifest.partition_spec_id], manifest.partitions)
3645-
if manifest.partitions
3646-
else [],
3682+
"partition_summaries": (
3683+
_partition_summaries_to_rows(specs[manifest.partition_spec_id], manifest.partitions)
3684+
if manifest.partitions
3685+
else []
3686+
),
36473687
})
36483688

36493689
return pa.Table.from_pylist(

0 commit comments

Comments
 (0)