From 218066d19cf6079c59a29ac1e55ab15e1834d9e7 Mon Sep 17 00:00:00 2001 From: amitgilad Date: Mon, 3 Feb 2025 15:30:06 +0200 Subject: [PATCH 1/9] initial implementation of position_deletes --- pyiceberg/io/pyarrow.py | 14 +++++ pyiceberg/table/inspect.py | 71 +++++++++++++++++++++++++ tests/integration/test_inspect_table.py | 67 +++++++++++++++++++++++ 3 files changed, 152 insertions(+) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 1aaab32dbe..5007966418 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -921,6 +921,20 @@ def _construct_fragment(fs: FileSystem, data_file: DataFile, file_format_kwargs: return _get_file_format(data_file.file_format, **file_format_kwargs).make_fragment(path, fs) +def _read_delete_file(fs: FileSystem, data_file: DataFile) -> pa.Table: + positinal_delete_schema = pa.schema( + [ + pa.field("file_path", pa.string(), nullable=False), + pa.field("pos", pa.int64(), nullable=False), + pa.field("row", pa.int64(), nullable=True), + ] + ) + + delete_fragment = _construct_fragment(fs, data_file, file_format_kwargs={"pre_buffer": True, "buffer_size": ONE_MEGABYTE}) + table = ds.Scanner.from_fragment(fragment=delete_fragment, schema=positinal_delete_schema).to_table() + return table + + def _read_deletes(fs: FileSystem, data_file: DataFile) -> Dict[str, pa.ChunkedArray]: if data_file.file_format == FileFormat.PARQUET: delete_fragment = _construct_fragment( diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index cce5250ad5..d9a15d683d 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -384,6 +384,20 @@ def _get_all_manifests_schema(self) -> "pa.Schema": all_manifests_schema = all_manifests_schema.append(pa.field("reference_snapshot_id", pa.int64(), nullable=False)) return all_manifests_schema + def _get_positional_deletes_schema(self) -> "pa.Schema": + import pyarrow as pa + + positinal_delete_schema = pa.schema( + [ + pa.field("file_path", pa.string(), nullable=False), + pa.field("pos", pa.int64(), nullable=False), + pa.field("row", pa.int64(), nullable=True), + pa.field("spec_id", pa.int64(), nullable=True), + pa.field("delete_file_path", pa.string(), nullable=False), + ] + ) + return positinal_delete_schema + def _generate_manifests_table(self, snapshot: Optional[Snapshot], is_all_manifests_table: bool = False) -> "pa.Table": import pyarrow as pa @@ -453,6 +467,24 @@ def _partition_summaries_to_rows( schema=self._get_all_manifests_schema() if is_all_manifests_table else self._get_manifests_schema(), ) + # def _generate_positional_delete_table(self, manifest_list: ManifestFile) -> "pa.Table": + # import pyarrow as pa + # all_deletes = [] + # if manifest_list.content == ManifestContent.DELETES: + # for manifest_entry in manifest_list.fetch_manifest_entry(self.tbl.io): + # if manifest_entry.data_file.content == DataFileContent.POSITION_DELETES: + # from pyiceberg.io.pyarrow import _read_delete_file + # from pyiceberg.io.pyarrow import _fs_from_file_path + # positional_delete = _read_delete_file( + # _fs_from_file_path(self.tbl.io, manifest_entry.data_file.file_path), + # manifest_entry.data_file) + # + # positional_delete = positional_delete.append_column("spec_id", pa.array( + # [manifest_list.partition_spec_id] * len(positional_delete))).append_column("delete_file_path",pa.array([manifest_entry.data_file.file_path] * len(positional_delete))) + # + # all_deletes.append(positional_delete) + # return pa.concat_tables(all_deletes) + def manifests(self) -> "pa.Table": return self._generate_manifests_table(self.tbl.current_snapshot()) @@ -704,3 +736,42 @@ def all_data_files(self) -> "pa.Table": def all_delete_files(self) -> "pa.Table": return self._all_files({DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES}) + + def position_deletes(self) -> "pa.Table": + import pyarrow as pa + + snapshots = self.tbl.snapshots() + if not snapshots: + return pa.Table.from_pylist([], schema=self._get_positional_deletes_schema()) + current_snapshot = self.tbl.current_snapshot() + + # + # executor = ExecutorFactory.get_or_create() + # positonal_deletes: Iterator["pa.Table"] = executor.map( + # lambda manifest_list: self._generate_positional_delete_table(manifest_list),current_snapshot.manifests(self.tbl.io) + # ) + # all_deletes = [] + positional_deletes = [] + + for manifest_list in current_snapshot.manifests(self.tbl.io): + import pyarrow as pa + if manifest_list.content == ManifestContent.DELETES: + defaultSpecId = self.tbl.spec().spec_id + for manifest_entry in manifest_list.fetch_manifest_entry(self.tbl.io): + + if manifest_entry.data_file.content == DataFileContent.POSITION_DELETES: + from pyiceberg.io.pyarrow import _read_delete_file + from pyiceberg.io.pyarrow import _fs_from_file_path + positional_delete = _read_delete_file( + _fs_from_file_path(self.tbl.io, manifest_entry.data_file.file_path), + manifest_entry.data_file) + + positional_delete = positional_delete.append_column("spec_id", pa.array( + [manifest_list.partition_spec_id] * len(positional_delete))).append_column("partition", pa.array( + [self.] * len(positional_delete))).append_column( + "delete_file_path", pa.array([manifest_entry.data_file.file_path] * len(positional_delete))) + + positional_deletes.append(positional_delete) + # return pa.concat_tables(all_deletes) + + return pa.concat_tables(positional_deletes) diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index e81050a81c..115d0bbd13 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -1101,3 +1101,70 @@ def test_inspect_files_partitioned(spark: SparkSession, session_catalog: Catalog .reset_index() ) assert_frame_equal(lhs, rhs, check_dtype=False) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [2]) +def test_inspect_positional_deletes(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + from pandas.testing import assert_frame_equal + + identifier = "default.table_metadata_position_deletes" + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + spark.sql( + f""" + CREATE TABLE {identifier} ( + id int, + data string + ) + PARTITIONED BY (data) + TBLPROPERTIES ('write.update.mode'='merge-on-read', + 'write.delete.mode'='merge-on-read') + """ + ) + tbl = session_catalog.load_table(identifier) + + # check all_manifests when there are no snapshots + + spark.sql(f"INSERT INTO {identifier} VALUES (1, 'a')") + + spark.sql(f"INSERT INTO {identifier} VALUES (2, 'b')") + + spark.sql(f"UPDATE {identifier} SET data = 'c' WHERE id = 1") + + spark.sql(f"DELETE FROM {identifier} WHERE id = 2") + + tbl.refresh() + df = tbl.inspect.position_deletes() + + assert df.column_names == [ + "file_path", + "pos", + "row", + "spec_id", + "delete_file_path" + ] + + int_cols = [ + "pos" + ] + string_cols = [ + "file_path", + "delete_file_path" + ] + + for column in int_cols: + for value in df[column]: + assert isinstance(value.as_py(), int) + + for column in string_cols: + for value in df[column]: + assert isinstance(value.as_py(), str) + + new_df = spark.sql(f"select * from {identifier}.position_deletes").toPandas() + lhs = spark.table(f"{identifier}.position_deletes").toPandas() + rhs = df.to_pandas() + assert_frame_equal(lhs, rhs, check_dtype=False) From e9ae3813bc7bd27b9f75bd67849bfef0eacec98d Mon Sep 17 00:00:00 2001 From: amitgilad Date: Wed, 5 Feb 2025 23:46:31 +0200 Subject: [PATCH 2/9] final version of position_deletes --- pyiceberg/io/pyarrow.py | 12 +-- pyiceberg/table/inspect.py | 103 +++++++++++++----------- tests/integration/test_inspect_table.py | 22 +---- 3 files changed, 60 insertions(+), 77 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 5007966418..81a92011f7 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -921,17 +921,9 @@ def _construct_fragment(fs: FileSystem, data_file: DataFile, file_format_kwargs: return _get_file_format(data_file.file_format, **file_format_kwargs).make_fragment(path, fs) -def _read_delete_file(fs: FileSystem, data_file: DataFile) -> pa.Table: - positinal_delete_schema = pa.schema( - [ - pa.field("file_path", pa.string(), nullable=False), - pa.field("pos", pa.int64(), nullable=False), - pa.field("row", pa.int64(), nullable=True), - ] - ) - +def _read_delete_file(fs: FileSystem, data_file: DataFile, schema: "pa.Schema") -> pa.Table: delete_fragment = _construct_fragment(fs, data_file, file_format_kwargs={"pre_buffer": True, "buffer_size": ONE_MEGABYTE}) - table = ds.Scanner.from_fragment(fragment=delete_fragment, schema=positinal_delete_schema).to_table() + table = ds.Scanner.from_fragment(fragment=delete_fragment, schema=schema).to_table() return table diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index d9a15d683d..4f25370d82 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -384,14 +384,35 @@ def _get_all_manifests_schema(self) -> "pa.Schema": all_manifests_schema = all_manifests_schema.append(pa.field("reference_snapshot_id", pa.int64(), nullable=False)) return all_manifests_schema + def _get_positional_file_schema(self) -> "pa.Schema": + import pyarrow as pa + + from pyiceberg.io.pyarrow import schema_to_pyarrow + + pa_row_struct = schema_to_pyarrow(self.tbl.schema().as_struct()) + positinal_delete_schema = pa.schema( + [ + pa.field("file_path", pa.string(), nullable=False), + pa.field("pos", pa.int64(), nullable=False), + pa.field("row", pa_row_struct, nullable=True), + ] + ) + return positinal_delete_schema + def _get_positional_deletes_schema(self) -> "pa.Schema": import pyarrow as pa + from pyiceberg.io.pyarrow import schema_to_pyarrow + + partition_record = self.tbl.metadata.specs_struct() + pa_partition_struct = schema_to_pyarrow(partition_record) + pa_row_struct = schema_to_pyarrow(self.tbl.schema().as_struct()) positinal_delete_schema = pa.schema( [ pa.field("file_path", pa.string(), nullable=False), pa.field("pos", pa.int64(), nullable=False), - pa.field("row", pa.int64(), nullable=True), + pa.field("row", pa_row_struct, nullable=True), + pa.field("partition", pa_partition_struct, nullable=False), pa.field("spec_id", pa.int64(), nullable=True), pa.field("delete_file_path", pa.string(), nullable=False), ] @@ -467,23 +488,30 @@ def _partition_summaries_to_rows( schema=self._get_all_manifests_schema() if is_all_manifests_table else self._get_manifests_schema(), ) - # def _generate_positional_delete_table(self, manifest_list: ManifestFile) -> "pa.Table": - # import pyarrow as pa - # all_deletes = [] - # if manifest_list.content == ManifestContent.DELETES: - # for manifest_entry in manifest_list.fetch_manifest_entry(self.tbl.io): - # if manifest_entry.data_file.content == DataFileContent.POSITION_DELETES: - # from pyiceberg.io.pyarrow import _read_delete_file - # from pyiceberg.io.pyarrow import _fs_from_file_path - # positional_delete = _read_delete_file( - # _fs_from_file_path(self.tbl.io, manifest_entry.data_file.file_path), - # manifest_entry.data_file) - # - # positional_delete = positional_delete.append_column("spec_id", pa.array( - # [manifest_list.partition_spec_id] * len(positional_delete))).append_column("delete_file_path",pa.array([manifest_entry.data_file.file_path] * len(positional_delete))) - # - # all_deletes.append(positional_delete) - # return pa.concat_tables(all_deletes) + def _generate_positional_delete_table(self, manifest: ManifestFile, position_deletes_schema: "pa.Schema") -> "pa.Table": + import pyarrow as pa + + positional_deletes: List["pa.Table"] = [] + if manifest.content == ManifestContent.DELETES: + for entry in manifest.fetch_manifest_entry(self.tbl.io): + if entry.data_file.content == DataFileContent.POSITION_DELETES: + from pyiceberg.io.pyarrow import _fs_from_file_path, _read_delete_file + + positional_delete_file = _read_delete_file( + _fs_from_file_path(self.tbl.io, entry.data_file.file_path), + entry.data_file, + self._get_positional_file_schema(), + ).to_pylist() + for record in positional_delete_file: + record["partition"] = entry.data_file.partition.__dict__ + record["spec_id"] = manifest.partition_spec_id + record["delete_file_path"] = entry.data_file.file_path + + positional_deletes.append(pa.Table.from_pylist(positional_delete_file, position_deletes_schema)) + + if not positional_deletes: + return pa.Table.from_pylist([], position_deletes_schema) + return pa.concat_tables(positional_deletes) def manifests(self) -> "pa.Table": return self._generate_manifests_table(self.tbl.current_snapshot()) @@ -740,38 +768,15 @@ def all_delete_files(self) -> "pa.Table": def position_deletes(self) -> "pa.Table": import pyarrow as pa - snapshots = self.tbl.snapshots() - if not snapshots: - return pa.Table.from_pylist([], schema=self._get_positional_deletes_schema()) + position_deletes_schema = self._get_positional_deletes_schema() current_snapshot = self.tbl.current_snapshot() - # - # executor = ExecutorFactory.get_or_create() - # positonal_deletes: Iterator["pa.Table"] = executor.map( - # lambda manifest_list: self._generate_positional_delete_table(manifest_list),current_snapshot.manifests(self.tbl.io) - # ) - # all_deletes = [] - positional_deletes = [] - - for manifest_list in current_snapshot.manifests(self.tbl.io): - import pyarrow as pa - if manifest_list.content == ManifestContent.DELETES: - defaultSpecId = self.tbl.spec().spec_id - for manifest_entry in manifest_list.fetch_manifest_entry(self.tbl.io): - - if manifest_entry.data_file.content == DataFileContent.POSITION_DELETES: - from pyiceberg.io.pyarrow import _read_delete_file - from pyiceberg.io.pyarrow import _fs_from_file_path - positional_delete = _read_delete_file( - _fs_from_file_path(self.tbl.io, manifest_entry.data_file.file_path), - manifest_entry.data_file) - - positional_delete = positional_delete.append_column("spec_id", pa.array( - [manifest_list.partition_spec_id] * len(positional_delete))).append_column("partition", pa.array( - [self.] * len(positional_delete))).append_column( - "delete_file_path", pa.array([manifest_entry.data_file.file_path] * len(positional_delete))) - - positional_deletes.append(positional_delete) - # return pa.concat_tables(all_deletes) + if not current_snapshot: + return pa.Table.from_pylist([], schema=position_deletes_schema) + executor = ExecutorFactory.get_or_create() + positional_deletes: Iterator["pa.Table"] = executor.map( + lambda manifest: self._generate_positional_delete_table(manifest, position_deletes_schema), + current_snapshot.manifests(self.tbl.io), + ) return pa.concat_tables(positional_deletes) diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index 115d0bbd13..47efda47d7 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -1104,7 +1104,7 @@ def test_inspect_files_partitioned(spark: SparkSession, session_catalog: Catalog @pytest.mark.integration -@pytest.mark.parametrize("format_version", [2]) +@pytest.mark.parametrize("format_version", [1, 2]) def test_inspect_positional_deletes(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: from pandas.testing import assert_frame_equal @@ -1127,8 +1127,6 @@ def test_inspect_positional_deletes(spark: SparkSession, session_catalog: Catalo ) tbl = session_catalog.load_table(identifier) - # check all_manifests when there are no snapshots - spark.sql(f"INSERT INTO {identifier} VALUES (1, 'a')") spark.sql(f"INSERT INTO {identifier} VALUES (2, 'b')") @@ -1140,21 +1138,10 @@ def test_inspect_positional_deletes(spark: SparkSession, session_catalog: Catalo tbl.refresh() df = tbl.inspect.position_deletes() - assert df.column_names == [ - "file_path", - "pos", - "row", - "spec_id", - "delete_file_path" - ] + assert df.column_names == ["file_path", "pos", "row", "partition", "spec_id", "delete_file_path"] - int_cols = [ - "pos" - ] - string_cols = [ - "file_path", - "delete_file_path" - ] + int_cols = ["pos"] + string_cols = ["file_path", "delete_file_path"] for column in int_cols: for value in df[column]: @@ -1164,7 +1151,6 @@ def test_inspect_positional_deletes(spark: SparkSession, session_catalog: Catalo for value in df[column]: assert isinstance(value.as_py(), str) - new_df = spark.sql(f"select * from {identifier}.position_deletes").toPandas() lhs = spark.table(f"{identifier}.position_deletes").toPandas() rhs = df.to_pandas() assert_frame_equal(lhs, rhs, check_dtype=False) From 2df77353c7fd414b7f3e8a0ce68c4b6ef81d3532 Mon Sep 17 00:00:00 2001 From: amitgilad Date: Sun, 9 Feb 2025 00:03:52 +0200 Subject: [PATCH 3/9] fix comments of pr --- mkdocs/docs/api.md | 61 +++++++++++++++++++++++++++++++++++++ pyiceberg/io/pyarrow.py | 14 +++++++-- pyiceberg/manifest.py | 28 +++++++++++++++++ pyiceberg/table/inspect.py | 55 ++++++++++++++------------------- pyiceberg/table/metadata.py | 31 +++++++++++++++++++ 5 files changed, 154 insertions(+), 35 deletions(-) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index d84c82ec2a..d9affe5a48 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -1047,6 +1047,67 @@ readable_metrics: [ To show only data files or delete files in the current snapshot, use `table.inspect.data_files()` and `table.inspect.delete_files()` respectively. +### Position deletes + +Inspect the positional delete files in the current snapshot of the table: + +```python +table.inspect.position_deletes() +``` + +```python +pyarrow.Table +file_path: string not null +pos: int64 not null +row: struct + child 0, id: int32 + child 1, data: large_string +partition: struct not null + child 0, data: large_string +spec_id: int64 +delete_file_path: string not null +---- +file_path: [[],[],[],["s3://warehouse/default/table_metadata_position_deletes/data/data=a/00000-1-acbf93b7-f760-4517-aa84-b9240902d3d2-0-00001.parquet"]] +pos: [[],[],[],[0]] +row: [ + -- is_valid: all not null + -- child 0 type: int32 +[] + -- child 1 type: large_string +[], + -- is_valid: all not null + -- child 0 type: int32 +[] + -- child 1 type: large_string +[], + -- is_valid: all not null + -- child 0 type: int32 +[] + -- child 1 type: large_string +[], + -- is_valid: [false] + -- child 0 type: int32 +[0] + -- child 1 type: large_string +[""]] +partition: [ + -- is_valid: all not null + -- child 0 type: large_string +[], + -- is_valid: all not null + -- child 0 type: large_string +[], + -- is_valid: all not null + -- child 0 type: large_string +[], + -- is_valid: all not null + -- child 0 type: large_string +["a"]] +spec_id: [[],[],[],[0]] +delete_file_path: [[],[],[],["s3://warehouse/default/table_metadata_position_deletes/data/data=a/00000-5-bc7a1d8a-fefe-4277-b4ac-8f1dd7badb7a-00001-deletes.parquet"]] + +``` + ## Add Files Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them. diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 81a92011f7..1dd4a96a13 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -123,6 +123,7 @@ DataFile, DataFileContent, FileFormat, + PositionDelete, ) from pyiceberg.partitioning import PartitionField, PartitionFieldValue, PartitionKey, PartitionSpec, partition_record_value from pyiceberg.schema import ( @@ -921,10 +922,17 @@ def _construct_fragment(fs: FileSystem, data_file: DataFile, file_format_kwargs: return _get_file_format(data_file.file_format, **file_format_kwargs).make_fragment(path, fs) -def _read_delete_file(fs: FileSystem, data_file: DataFile, schema: "pa.Schema") -> pa.Table: +def _read_delete_file(fs: FileSystem, data_file: DataFile) -> Iterator[PositionDelete]: delete_fragment = _construct_fragment(fs, data_file, file_format_kwargs={"pre_buffer": True, "buffer_size": ONE_MEGABYTE}) - table = ds.Scanner.from_fragment(fragment=delete_fragment, schema=schema).to_table() - return table + table = ds.Scanner.from_fragment(fragment=delete_fragment).to_table() + for batch in table.to_batches(): + for i in range(len(batch)): + row = batch.column("row")[i].as_py() if "row" in batch.schema.names else None + yield PositionDelete( + file_path=batch.column("file_path")[i].as_py(), + pos=batch.column("pos")[i].as_py(), + row=row, # Setting row as None since it's optional and not needed for position deletes + ) def _read_deletes(fs: FileSystem, data_file: DataFile) -> Dict[str, pa.ChunkedArray]: diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 61cb87e3d8..fba7f27b3a 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -321,6 +321,34 @@ def data_file_with_partition(partition_type: StructType, format_version: TableVe ) +class PositionDelete(Record): + __slots__ = ("file_path", "pos", "row") + file_path: str + pos: int + row: Optional[Record] + + def __setattr__(self, name: str, value: Any) -> None: + """Assign a key/value to a PositionDelete.""" + super().__setattr__(name, value) + + def __init__(self, file_path: str, pos: int, row: Optional[Record], *data: Any, **named_data: Any) -> None: + super().__init__(*data, **named_data) + self.file_path = file_path + self.pos = pos + self.row = row + + def __hash__(self) -> int: + """Return the hash of the file path.""" + return hash(self.file_path) + + def __eq__(self, other: Any) -> bool: + """Compare the PositionDelete with another object. + + If it is a PositionDelete, it will compare based on the file_path. + """ + return self.file_path == other.file_path if isinstance(other, PositionDelete) else False + + class DataFile(Record): @classmethod def from_args(cls, _table_format_version: TableVersion = DEFAULT_READ_VERSION, **arguments: Any) -> DataFile: diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index 4f25370d82..de78934b4e 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -384,30 +384,15 @@ def _get_all_manifests_schema(self) -> "pa.Schema": all_manifests_schema = all_manifests_schema.append(pa.field("reference_snapshot_id", pa.int64(), nullable=False)) return all_manifests_schema - def _get_positional_file_schema(self) -> "pa.Schema": - import pyarrow as pa - - from pyiceberg.io.pyarrow import schema_to_pyarrow - - pa_row_struct = schema_to_pyarrow(self.tbl.schema().as_struct()) - positinal_delete_schema = pa.schema( - [ - pa.field("file_path", pa.string(), nullable=False), - pa.field("pos", pa.int64(), nullable=False), - pa.field("row", pa_row_struct, nullable=True), - ] - ) - return positinal_delete_schema - def _get_positional_deletes_schema(self) -> "pa.Schema": import pyarrow as pa from pyiceberg.io.pyarrow import schema_to_pyarrow - partition_record = self.tbl.metadata.specs_struct() - pa_partition_struct = schema_to_pyarrow(partition_record) + partition_struct = self.tbl.metadata.spec_struct() + pa_partition_struct = schema_to_pyarrow(partition_struct) pa_row_struct = schema_to_pyarrow(self.tbl.schema().as_struct()) - positinal_delete_schema = pa.schema( + positional_delete_schema = pa.schema( [ pa.field("file_path", pa.string(), nullable=False), pa.field("pos", pa.int64(), nullable=False), @@ -417,7 +402,7 @@ def _get_positional_deletes_schema(self) -> "pa.Schema": pa.field("delete_file_path", pa.string(), nullable=False), ] ) - return positinal_delete_schema + return positional_delete_schema def _generate_manifests_table(self, snapshot: Optional[Snapshot], is_all_manifests_table: bool = False) -> "pa.Table": import pyarrow as pa @@ -492,22 +477,28 @@ def _generate_positional_delete_table(self, manifest: ManifestFile, position_del import pyarrow as pa positional_deletes: List["pa.Table"] = [] + if manifest.content == ManifestContent.DELETES: for entry in manifest.fetch_manifest_entry(self.tbl.io): if entry.data_file.content == DataFileContent.POSITION_DELETES: from pyiceberg.io.pyarrow import _fs_from_file_path, _read_delete_file positional_delete_file = _read_delete_file( - _fs_from_file_path(self.tbl.io, entry.data_file.file_path), - entry.data_file, - self._get_positional_file_schema(), - ).to_pylist() + _fs_from_file_path(self.tbl.io, entry.data_file.file_path), entry.data_file + ) + positional_deletes_records = [] for record in positional_delete_file: - record["partition"] = entry.data_file.partition.__dict__ - record["spec_id"] = manifest.partition_spec_id - record["delete_file_path"] = entry.data_file.file_path - - positional_deletes.append(pa.Table.from_pylist(positional_delete_file, position_deletes_schema)) + row = { + "file_path": record.file_path, + "pos": record.pos, + "row": record.row, + "partition": entry.data_file.partition.__dict__, + "spec_id": manifest.partition_spec_id, + "delete_file_path": entry.data_file.file_path, + } + positional_deletes_records.append(row) + + positional_deletes.append(pa.Table.from_pylist(positional_deletes_records, position_deletes_schema)) if not positional_deletes: return pa.Table.from_pylist([], position_deletes_schema) @@ -765,18 +756,18 @@ def all_data_files(self) -> "pa.Table": def all_delete_files(self) -> "pa.Table": return self._all_files({DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES}) - def position_deletes(self) -> "pa.Table": + def position_deletes(self, snapshot_id: Optional[int] = None) -> "pa.Table": import pyarrow as pa + snapshot = self._get_snapshot(snapshot_id) if snapshot_id else self.tbl.current_snapshot() position_deletes_schema = self._get_positional_deletes_schema() - current_snapshot = self.tbl.current_snapshot() - if not current_snapshot: + if not snapshot: return pa.Table.from_pylist([], schema=position_deletes_schema) executor = ExecutorFactory.get_or_create() positional_deletes: Iterator["pa.Table"] = executor.map( lambda manifest: self._generate_positional_delete_table(manifest, position_deletes_schema), - current_snapshot.manifests(self.tbl.io), + snapshot.manifests(self.tbl.io), ) return pa.concat_tables(positional_deletes) diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index f248700c02..795b0588c7 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -279,6 +279,37 @@ def specs_struct(self) -> StructType: return StructType(*nested_fields) + def spec_struct(self, spec_id: Optional[int] = None) -> StructType: + """Produce for a spec_id a struct of PartitionSpecs. + + The partition fields should be optional: Partition fields may be added later, + in which case not all files would have the result field, and it may be null. + + :return: A StructType that represents a PartitionSpec of the table for a specific spec_id or latest. + """ + if spec_id is None: + spec = self.spec() + else: + specs = self.specs() + filtered_spec = list(filter(lambda spec: spec.spec_id == spec_id, specs.values())) + if not filtered_spec: + raise ValidationError(f"Spec with spec_id {spec_id} not found") + spec = filtered_spec[0] + # Collect all the fields + struct_fields = {field.field_id: field for field in spec.fields} + + schema = self.schema() + + nested_fields = [] + # Sort them by field_id in order to get a deterministic output + for field_id in sorted(struct_fields): + field = struct_fields[field_id] + source_type = schema.find_type(field.source_id) + result_type = field.transform.result_type(source_type) + nested_fields.append(NestedField(field_id=field.field_id, name=field.name, type=result_type, required=False)) + + return StructType(*nested_fields) + def new_snapshot_id(self) -> int: """Generate a new snapshot-id that's not in use.""" snapshot_id = _generate_snapshot_id() From 58b7fa59615779460e876511f190c738f2a22e8a Mon Sep 17 00:00:00 2001 From: amitgilad Date: Tue, 18 Feb 2025 21:36:33 +0200 Subject: [PATCH 4/9] fix schemas and partition specs to be according the snapshot and not latest and make --- pyiceberg/io/pyarrow.py | 18 ++++++++---------- pyiceberg/table/inspect.py | 21 +++++++++++++-------- pyiceberg/table/metadata.py | 2 +- 3 files changed, 22 insertions(+), 19 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 1dd4a96a13..19c03bffee 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -937,16 +937,15 @@ def _read_delete_file(fs: FileSystem, data_file: DataFile) -> Iterator[PositionD def _read_deletes(fs: FileSystem, data_file: DataFile) -> Dict[str, pa.ChunkedArray]: if data_file.file_format == FileFormat.PARQUET: - delete_fragment = _construct_fragment( - fs, - data_file, - file_format_kwargs={"dictionary_columns": ("file_path",), "pre_buffer": True, "buffer_size": ONE_MEGABYTE}, - ) - table = ds.Scanner.from_fragment(fragment=delete_fragment).to_table() - table = table.unify_dictionaries() + deletes_by_file: Dict[str, List[int]] = {} + for delete in _read_delete_file(fs, data_file): + if delete.file_path not in deletes_by_file: + deletes_by_file[delete.file_path] = [] + deletes_by_file[delete.file_path].append(delete.pos) + + # Convert lists of positions to ChunkedArrays return { - file.as_py(): table.filter(pc.field("file_path") == file).column("pos") - for file in table.column("file_path").chunks[0].dictionary + file_path: pa.chunked_array([pa.array(positions, type=pa.int64())]) for file_path, positions in deletes_by_file.items() } elif data_file.file_format == FileFormat.PUFFIN: _, _, path = PyArrowFileIO.parse_location(data_file.file_path) @@ -957,7 +956,6 @@ def _read_deletes(fs: FileSystem, data_file: DataFile) -> Dict[str, pa.ChunkedAr else: raise ValueError(f"Delete file format not supported: {data_file.file_format}") - def _combine_positional_deletes(positional_deletes: List[pa.ChunkedArray], start_index: int, end_index: int) -> pa.Array: if len(positional_deletes) == 1: all_chunks = positional_deletes[0] diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index de78934b4e..b2b50a941f 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -22,6 +22,7 @@ from pyiceberg.conversions import from_bytes from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary from pyiceberg.partitioning import PartitionSpec +from pyiceberg.schema import Schema from pyiceberg.table.snapshots import Snapshot, ancestors_of from pyiceberg.types import PrimitiveType from pyiceberg.utils.concurrent import ExecutorFactory @@ -384,14 +385,16 @@ def _get_all_manifests_schema(self) -> "pa.Schema": all_manifests_schema = all_manifests_schema.append(pa.field("reference_snapshot_id", pa.int64(), nullable=False)) return all_manifests_schema - def _get_positional_deletes_schema(self) -> "pa.Schema": + def _get_positional_deletes_schema(self, schema: Optional[Schema] = None, spec_id: Optional[int] = None) -> "pa.Schema": import pyarrow as pa from pyiceberg.io.pyarrow import schema_to_pyarrow - partition_struct = self.tbl.metadata.spec_struct() + schema = schema or self.tbl.metadata.schema() + + partition_struct = self.tbl.metadata.spec_struct(spec_id=spec_id) pa_partition_struct = schema_to_pyarrow(partition_struct) - pa_row_struct = schema_to_pyarrow(self.tbl.schema().as_struct()) + pa_row_struct = schema_to_pyarrow(schema.as_struct()) positional_delete_schema = pa.schema( [ pa.field("file_path", pa.string(), nullable=False), @@ -473,11 +476,13 @@ def _partition_summaries_to_rows( schema=self._get_all_manifests_schema() if is_all_manifests_table else self._get_manifests_schema(), ) - def _generate_positional_delete_table(self, manifest: ManifestFile, position_deletes_schema: "pa.Schema") -> "pa.Table": + def _generate_positional_delete_table(self, manifest: ManifestFile, schema: Schema) -> "pa.Table": import pyarrow as pa positional_deletes: List["pa.Table"] = [] + position_deletes_schema = self._get_positional_deletes_schema(schema=schema, spec_id=manifest.partition_spec_id) + if manifest.content == ManifestContent.DELETES: for entry in manifest.fetch_manifest_entry(self.tbl.io): if entry.data_file.content == DataFileContent.POSITION_DELETES: @@ -760,14 +765,14 @@ def position_deletes(self, snapshot_id: Optional[int] = None) -> "pa.Table": import pyarrow as pa snapshot = self._get_snapshot(snapshot_id) if snapshot_id else self.tbl.current_snapshot() - position_deletes_schema = self._get_positional_deletes_schema() - if not snapshot: - return pa.Table.from_pylist([], schema=position_deletes_schema) + schema = self._get_positional_deletes_schema() + return pa.Table.from_pylist([], schema=schema) + schemas = self.tbl.schemas() executor = ExecutorFactory.get_or_create() positional_deletes: Iterator["pa.Table"] = executor.map( - lambda manifest: self._generate_positional_delete_table(manifest, position_deletes_schema), + lambda manifest: self._generate_positional_delete_table(manifest, schema=schemas[snapshot.schema_id]), snapshot.manifests(self.tbl.io), ) return pa.concat_tables(positional_deletes) diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index 795b0588c7..59ef39fbec 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -280,7 +280,7 @@ def specs_struct(self) -> StructType: return StructType(*nested_fields) def spec_struct(self, spec_id: Optional[int] = None) -> StructType: - """Produce for a spec_id a struct of PartitionSpecs. + """Produce for a spec_id a struct of PartitionSpecs. The partition fields should be optional: Partition fields may be added later, in which case not all files would have the result field, and it may be null. From a43877fa534775811f8d36fca82a7d7ff157369e Mon Sep 17 00:00:00 2001 From: amitgilad Date: Tue, 18 Feb 2025 21:45:02 +0200 Subject: [PATCH 5/9] work with correct schema --- pyiceberg/table/inspect.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index b2b50a941f..cedb455bfa 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -769,10 +769,17 @@ def position_deletes(self, snapshot_id: Optional[int] = None) -> "pa.Table": schema = self._get_positional_deletes_schema() return pa.Table.from_pylist([], schema=schema) + if not snapshot.schema_id: + raise ValueError(f"Snapshot {snapshot.snapshot_id} does not have a schema id") + schemas = self.tbl.schemas() + schema = schemas.get(snapshot.schema_id, None) + if not schema: + raise ValueError(f"Cannot find schema with id: {snapshot.schema_id}") + executor = ExecutorFactory.get_or_create() positional_deletes: Iterator["pa.Table"] = executor.map( - lambda manifest: self._generate_positional_delete_table(manifest, schema=schemas[snapshot.schema_id]), + lambda manifest: self._generate_positional_delete_table(manifest, schema=schema), snapshot.manifests(self.tbl.io), ) return pa.concat_tables(positional_deletes) From 1c1533aea45000d37a9387f82b60d3473819e231 Mon Sep 17 00:00:00 2001 From: amitgilad Date: Tue, 18 Feb 2025 22:26:26 +0200 Subject: [PATCH 6/9] fix if statment --- pyiceberg/io/pyarrow.py | 8 ++++++++ pyiceberg/manifest.py | 8 ++++---- pyiceberg/table/inspect.py | 4 ++-- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 19c03bffee..5add453e52 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -936,12 +936,20 @@ def _read_delete_file(fs: FileSystem, data_file: DataFile) -> Iterator[PositionD def _read_deletes(fs: FileSystem, data_file: DataFile) -> Dict[str, pa.ChunkedArray]: +<<<<<<< HEAD if data_file.file_format == FileFormat.PARQUET: deletes_by_file: Dict[str, List[int]] = {} for delete in _read_delete_file(fs, data_file): if delete.file_path not in deletes_by_file: deletes_by_file[delete.file_path] = [] deletes_by_file[delete.file_path].append(delete.pos) +======= + deletes_by_file: Dict[str, List[int]] = {} + for delete in _read_delete_file(fs, data_file): + if delete.path not in deletes_by_file: + deletes_by_file[delete.path] = [] + deletes_by_file[delete.path].append(delete.pos) +>>>>>>> e4ed25e (fix if statment) # Convert lists of positions to ChunkedArrays return { diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index fba7f27b3a..69c8f4931c 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -323,7 +323,7 @@ def data_file_with_partition(partition_type: StructType, format_version: TableVe class PositionDelete(Record): __slots__ = ("file_path", "pos", "row") - file_path: str + path: str pos: int row: Optional[Record] @@ -333,20 +333,20 @@ def __setattr__(self, name: str, value: Any) -> None: def __init__(self, file_path: str, pos: int, row: Optional[Record], *data: Any, **named_data: Any) -> None: super().__init__(*data, **named_data) - self.file_path = file_path + self.path = file_path self.pos = pos self.row = row def __hash__(self) -> int: """Return the hash of the file path.""" - return hash(self.file_path) + return hash(self.path) def __eq__(self, other: Any) -> bool: """Compare the PositionDelete with another object. If it is a PositionDelete, it will compare based on the file_path. """ - return self.file_path == other.file_path if isinstance(other, PositionDelete) else False + return self.path == other.path if isinstance(other, PositionDelete) else False class DataFile(Record): diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index cedb455bfa..0d5933c910 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -494,7 +494,7 @@ def _generate_positional_delete_table(self, manifest: ManifestFile, schema: Sche positional_deletes_records = [] for record in positional_delete_file: row = { - "file_path": record.file_path, + "file_path": record.path, "pos": record.pos, "row": record.row, "partition": entry.data_file.partition.__dict__, @@ -769,7 +769,7 @@ def position_deletes(self, snapshot_id: Optional[int] = None) -> "pa.Table": schema = self._get_positional_deletes_schema() return pa.Table.from_pylist([], schema=schema) - if not snapshot.schema_id: + if snapshot.schema_id == None: raise ValueError(f"Snapshot {snapshot.snapshot_id} does not have a schema id") schemas = self.tbl.schemas() From c8908aa8fac2079adb642679e558d8598accb40a Mon Sep 17 00:00:00 2001 From: amitgilad Date: Tue, 18 Feb 2025 22:27:02 +0200 Subject: [PATCH 7/9] fix if statment --- pyiceberg/table/inspect.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index 0d5933c910..fe1851ecc8 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -769,7 +769,7 @@ def position_deletes(self, snapshot_id: Optional[int] = None) -> "pa.Table": schema = self._get_positional_deletes_schema() return pa.Table.from_pylist([], schema=schema) - if snapshot.schema_id == None: + if snapshot.schema_id is None: raise ValueError(f"Snapshot {snapshot.snapshot_id} does not have a schema id") schemas = self.tbl.schemas() From 463300d0af8e9106043610ece20b7c963e3e9495 Mon Sep 17 00:00:00 2001 From: amitgilad Date: Thu, 15 May 2025 21:57:13 +0300 Subject: [PATCH 8/9] linting --- pyiceberg/io/pyarrow.py | 18 ++++++------------ pyiceberg/table/inspect.py | 2 +- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 5add453e52..a207758821 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -936,24 +936,17 @@ def _read_delete_file(fs: FileSystem, data_file: DataFile) -> Iterator[PositionD def _read_deletes(fs: FileSystem, data_file: DataFile) -> Dict[str, pa.ChunkedArray]: -<<<<<<< HEAD if data_file.file_format == FileFormat.PARQUET: deletes_by_file: Dict[str, List[int]] = {} for delete in _read_delete_file(fs, data_file): - if delete.file_path not in deletes_by_file: - deletes_by_file[delete.file_path] = [] - deletes_by_file[delete.file_path].append(delete.pos) -======= - deletes_by_file: Dict[str, List[int]] = {} - for delete in _read_delete_file(fs, data_file): - if delete.path not in deletes_by_file: - deletes_by_file[delete.path] = [] - deletes_by_file[delete.path].append(delete.pos) ->>>>>>> e4ed25e (fix if statment) + if delete.path not in deletes_by_file: + deletes_by_file[delete.path] = [] + deletes_by_file[delete.path].append(delete.pos) # Convert lists of positions to ChunkedArrays return { - file_path: pa.chunked_array([pa.array(positions, type=pa.int64())]) for file_path, positions in deletes_by_file.items() + file_path: pa.chunked_array([pa.array(positions, type=pa.int64())]) + for file_path, positions in deletes_by_file.items() } elif data_file.file_format == FileFormat.PUFFIN: _, _, path = PyArrowFileIO.parse_location(data_file.file_path) @@ -964,6 +957,7 @@ def _read_deletes(fs: FileSystem, data_file: DataFile) -> Dict[str, pa.ChunkedAr else: raise ValueError(f"Delete file format not supported: {data_file.file_format}") + def _combine_positional_deletes(positional_deletes: List[pa.ChunkedArray], start_index: int, end_index: int) -> pa.Array: if len(positional_deletes) == 1: all_chunks = positional_deletes[0] diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index fe1851ecc8..cf351910ed 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -761,7 +761,7 @@ def all_data_files(self) -> "pa.Table": def all_delete_files(self) -> "pa.Table": return self._all_files({DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES}) - def position_deletes(self, snapshot_id: Optional[int] = None) -> "pa.Table": + def position_deletes(self, snapshot_id: Optional[int] = None) -> "pa.Table": import pyarrow as pa snapshot = self._get_snapshot(snapshot_id) if snapshot_id else self.tbl.current_snapshot() From 776ac3dc25901ebb59cfd105c6dcfe151fe83d73 Mon Sep 17 00:00:00 2001 From: amitgilad Date: Sat, 17 May 2025 22:35:58 +0300 Subject: [PATCH 9/9] fix failing tests --- pyiceberg/table/inspect.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index cf351910ed..f580082ac1 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -497,7 +497,10 @@ def _generate_positional_delete_table(self, manifest: ManifestFile, schema: Sche "file_path": record.path, "pos": record.pos, "row": record.row, - "partition": entry.data_file.partition.__dict__, + "partition": { + field.name: entry.data_file.partition[pos] + for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields) + }, "spec_id": manifest.partition_spec_id, "delete_file_path": entry.data_file.file_path, }