Skip to content

Commit 5506fd0

Browse files
Gowthami03BFokko
andauthored
Change Append/Overwrite API to accept snapshot properties (#419)
* added test for snapshot properties * change append/overwrite to accept snapshot_properties * Update tests/catalog/test_glue.py Co-authored-by: Fokko Driesprong <fokko@apache.org> * Update pyiceberg/table/__init__.py Co-authored-by: Fokko Driesprong <fokko@apache.org> * updated docs,docstrings * fix linting * Update mkdocs/docs/api.md Co-authored-by: Fokko Driesprong <fokko@apache.org> * Update pyiceberg/table/__init__.py Co-authored-by: Fokko Driesprong <fokko@apache.org> --------- Co-authored-by: Fokko Driesprong <fokko@apache.org>
1 parent d3db840 commit 5506fd0

File tree

3 files changed

+102
-11
lines changed

3 files changed

+102
-11
lines changed

mkdocs/docs/api.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,20 @@ table = table.transaction().remove_properties("abc").commit_transaction()
563563
assert table.properties == {}
564564
```
565565

566+
## Snapshot properties
567+
568+
Optionally, Snapshot properties can be set while writing to a table using `append` or `overwrite` API:
569+
570+
```python
571+
tbl.append(df, snapshot_properties={"abc": "def"})
572+
573+
# or
574+
575+
tbl.overwrite(df, snapshot_properties={"abc": "def"})
576+
577+
assert tbl.metadata.snapshots[-1].summary["abc"] == "def"
578+
```
579+
566580
## Query the data
567581

568582
To query a table, a table scan is needed. A table scan accepts a filter, columns, optionally a limit and a snapshot ID:

pyiceberg/table/__init__.py

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -332,13 +332,13 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive
332332
name_mapping=self._table.name_mapping(),
333333
)
334334

335-
def update_snapshot(self) -> UpdateSnapshot:
335+
def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> UpdateSnapshot:
336336
"""Create a new UpdateSnapshot to produce a new snapshot for the table.
337337
338338
Returns:
339339
A new UpdateSnapshot
340340
"""
341-
return UpdateSnapshot(self, io=self._table.io)
341+
return UpdateSnapshot(self, io=self._table.io, snapshot_properties=snapshot_properties)
342342

343343
def update_spec(self) -> UpdateSpec:
344344
"""Create a new UpdateSpec to update the partitioning of the table.
@@ -1095,12 +1095,13 @@ def name_mapping(self) -> Optional[NameMapping]:
10951095
else:
10961096
return None
10971097

1098-
def append(self, df: pa.Table) -> None:
1098+
def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
10991099
"""
11001100
Shorthand API for appending a PyArrow table to the table.
11011101
11021102
Args:
11031103
df: The Arrow dataframe that will be appended to overwrite the table
1104+
snapshot_properties: Custom properties to be added to the snapshot summary
11041105
"""
11051106
try:
11061107
import pyarrow as pa
@@ -1116,7 +1117,7 @@ def append(self, df: pa.Table) -> None:
11161117
_check_schema(self.schema(), other_schema=df.schema)
11171118

11181119
with self.transaction() as txn:
1119-
with txn.update_snapshot().fast_append() as update_snapshot:
1120+
with txn.update_snapshot(snapshot_properties=snapshot_properties).fast_append() as update_snapshot:
11201121
# skip writing data files if the dataframe is empty
11211122
if df.shape[0] > 0:
11221123
data_files = _dataframe_to_data_files(
@@ -1125,14 +1126,17 @@ def append(self, df: pa.Table) -> None:
11251126
for data_file in data_files:
11261127
update_snapshot.append_data_file(data_file)
11271128

1128-
def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE) -> None:
1129+
def overwrite(
1130+
self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_TRUE, snapshot_properties: Dict[str, str] = EMPTY_DICT
1131+
) -> None:
11291132
"""
11301133
Shorthand for overwriting the table with a PyArrow table.
11311134
11321135
Args:
11331136
df: The Arrow dataframe that will be used to overwrite the table
11341137
overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
11351138
or a boolean expression in case of a partial overwrite
1139+
snapshot_properties: Custom properties to be added to the snapshot summary
11361140
"""
11371141
try:
11381142
import pyarrow as pa
@@ -1151,7 +1155,7 @@ def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_T
11511155
_check_schema(self.schema(), other_schema=df.schema)
11521156

11531157
with self.transaction() as txn:
1154-
with txn.update_snapshot().overwrite() as update_snapshot:
1158+
with txn.update_snapshot(snapshot_properties=snapshot_properties).overwrite() as update_snapshot:
11551159
# skip writing data files if the dataframe is empty
11561160
if df.shape[0] > 0:
11571161
data_files = _dataframe_to_data_files(
@@ -2551,6 +2555,7 @@ def __init__(
25512555
transaction: Transaction,
25522556
io: FileIO,
25532557
commit_uuid: Optional[uuid.UUID] = None,
2558+
snapshot_properties: Dict[str, str] = EMPTY_DICT,
25542559
) -> None:
25552560
super().__init__(transaction)
25562561
self.commit_uuid = commit_uuid or uuid.uuid4()
@@ -2562,6 +2567,7 @@ def __init__(
25622567
snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.current_snapshot()) else None
25632568
)
25642569
self._added_data_files = []
2570+
self.snapshot_properties = snapshot_properties
25652571

25662572
def append_data_file(self, data_file: DataFile) -> _MergingSnapshotProducer:
25672573
self._added_data_files.append(data_file)
@@ -2629,7 +2635,7 @@ def _write_delete_manifest() -> List[ManifestFile]:
26292635

26302636
return added_manifests.result() + delete_manifests.result() + existing_manifests.result()
26312637

2632-
def _summary(self) -> Summary:
2638+
def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary:
26332639
ssc = SnapshotSummaryCollector()
26342640
partition_summary_limit = int(
26352641
self._transaction.table_metadata.properties.get(
@@ -2652,7 +2658,7 @@ def _summary(self) -> Summary:
26522658
)
26532659

26542660
return update_snapshot_summaries(
2655-
summary=Summary(operation=self._operation, **ssc.build()),
2661+
summary=Summary(operation=self._operation, **ssc.build(), **snapshot_properties),
26562662
previous_summary=previous_snapshot.summary if previous_snapshot is not None else None,
26572663
truncate_full_table=self._operation == Operation.OVERWRITE,
26582664
)
@@ -2661,7 +2667,7 @@ def _commit(self) -> UpdatesAndRequirements:
26612667
new_manifests = self._manifests()
26622668
next_sequence_number = self._transaction.table_metadata.next_sequence_number()
26632669

2664-
summary = self._summary()
2670+
summary = self._summary(self.snapshot_properties)
26652671

26662672
manifest_list_file_path = _generate_manifest_list_path(
26672673
location=self._transaction.table_metadata.location,
@@ -2776,13 +2782,17 @@ def _get_entries(manifest: ManifestFile) -> List[ManifestEntry]:
27762782
class UpdateSnapshot:
27772783
_transaction: Transaction
27782784
_io: FileIO
2785+
_snapshot_properties: Dict[str, str]
27792786

2780-
def __init__(self, transaction: Transaction, io: FileIO) -> None:
2787+
def __init__(self, transaction: Transaction, io: FileIO, snapshot_properties: Dict[str, str]) -> None:
27812788
self._transaction = transaction
27822789
self._io = io
2790+
self._snapshot_properties = snapshot_properties
27832791

27842792
def fast_append(self) -> FastAppendFiles:
2785-
return FastAppendFiles(operation=Operation.APPEND, transaction=self._transaction, io=self._io)
2793+
return FastAppendFiles(
2794+
operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties
2795+
)
27862796

27872797
def overwrite(self) -> OverwriteFiles:
27882798
return OverwriteFiles(
@@ -2791,6 +2801,7 @@ def overwrite(self) -> OverwriteFiles:
27912801
else Operation.APPEND,
27922802
transaction=self._transaction,
27932803
io=self._io,
2804+
snapshot_properties=self._snapshot_properties,
27942805
)
27952806

27962807

tests/catalog/test_glue.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
NoSuchTableError,
3333
TableAlreadyExistsError,
3434
)
35+
from pyiceberg.io.pyarrow import schema_to_pyarrow
3536
from pyiceberg.schema import Schema
3637
from pyiceberg.types import IntegerType
3738
from tests.conftest import BUCKET_NAME, TABLE_METADATA_LOCATION_REGEX
@@ -692,3 +693,68 @@ def test_commit_table_properties(
692693
updated_table_metadata = table.metadata
693694
assert test_catalog._parse_metadata_version(table.metadata_location) == 1
694695
assert updated_table_metadata.properties == {"test_a": "test_aa", "test_c": "test_c"}
696+
697+
698+
@mock_aws
699+
def test_commit_append_table_snapshot_properties(
700+
_bucket_initialize: None, moto_endpoint_url: str, table_schema_simple: Schema, database_name: str, table_name: str
701+
) -> None:
702+
catalog_name = "glue"
703+
identifier = (database_name, table_name)
704+
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"})
705+
test_catalog.create_namespace(namespace=database_name)
706+
table = test_catalog.create_table(identifier=identifier, schema=table_schema_simple)
707+
708+
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
709+
710+
table.append(
711+
pa.Table.from_pylist(
712+
[{"foo": "foo_val", "bar": 1, "baz": False}],
713+
schema=schema_to_pyarrow(table_schema_simple),
714+
),
715+
snapshot_properties={"snapshot_prop_a": "test_prop_a"},
716+
)
717+
718+
updated_table_metadata = table.metadata
719+
summary = updated_table_metadata.snapshots[-1].summary
720+
assert test_catalog._parse_metadata_version(table.metadata_location) == 1
721+
assert summary is not None
722+
assert summary["snapshot_prop_a"] == "test_prop_a"
723+
724+
725+
@mock_aws
726+
def test_commit_overwrite_table_snapshot_properties(
727+
_bucket_initialize: None, moto_endpoint_url: str, table_schema_simple: Schema, database_name: str, table_name: str
728+
) -> None:
729+
catalog_name = "glue"
730+
identifier = (database_name, table_name)
731+
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"})
732+
test_catalog.create_namespace(namespace=database_name)
733+
table = test_catalog.create_table(identifier=identifier, schema=table_schema_simple)
734+
735+
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
736+
737+
table.append(
738+
pa.Table.from_pylist(
739+
[{"foo": "foo_val", "bar": 1, "baz": False}],
740+
schema=schema_to_pyarrow(table_schema_simple),
741+
),
742+
snapshot_properties={"snapshot_prop_a": "test_prop_a"},
743+
)
744+
745+
assert test_catalog._parse_metadata_version(table.metadata_location) == 1
746+
747+
table.overwrite(
748+
pa.Table.from_pylist(
749+
[{"foo": "foo_val", "bar": 2, "baz": True}],
750+
schema=schema_to_pyarrow(table_schema_simple),
751+
),
752+
snapshot_properties={"snapshot_prop_b": "test_prop_b"},
753+
)
754+
755+
updated_table_metadata = table.metadata
756+
summary = updated_table_metadata.snapshots[-1].summary
757+
assert test_catalog._parse_metadata_version(table.metadata_location) == 2
758+
assert summary is not None
759+
assert summary["snapshot_prop_a"] is None
760+
assert summary["snapshot_prop_b"] == "test_prop_b"

0 commit comments

Comments
 (0)