From 96c680be17476aea8981f881a08872696fa9ff3d Mon Sep 17 00:00:00 2001 From: Soumya Ghosh Date: Sat, 8 Feb 2025 16:17:47 +0530 Subject: [PATCH 1/5] Add metadata tables - all_files - all_data_files - all_delete_files --- pyiceberg/table/inspect.py | 148 +++++++----- tests/integration/test_inspect_table.py | 293 ++++++++++++++---------- 2 files changed, 264 insertions(+), 177 deletions(-) diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index 91bdb2f29d..f7293101ff 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -20,7 +20,7 @@ from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple from pyiceberg.conversions import from_bytes -from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, PartitionFieldSummary +from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary from pyiceberg.partitioning import PartitionSpec from pyiceberg.table.snapshots import Snapshot, ancestors_of from pyiceberg.types import PrimitiveType @@ -523,7 +523,62 @@ def history(self) -> "pa.Table": return pa.Table.from_pylist(history, schema=history_schema) - def _files(self, snapshot_id: Optional[int] = None, data_file_filter: Optional[Set[DataFileContent]] = None) -> "pa.Table": + def _files_by_manifest( + self, manifest_list: ManifestFile, data_file_filter: Optional[Set[DataFileContent]] = None + ) -> List[Dict[str, Any]]: + files: list[dict[str, Any]] = [] + schema = self.tbl.metadata.schema() + io = self.tbl.io + + for manifest_entry in manifest_list.fetch_manifest_entry(io): + data_file = manifest_entry.data_file + if data_file_filter and data_file.content not in data_file_filter: + continue + column_sizes = data_file.column_sizes or {} + value_counts = data_file.value_counts or {} + null_value_counts = data_file.null_value_counts or {} + nan_value_counts = data_file.nan_value_counts or {} + lower_bounds = data_file.lower_bounds or {} + upper_bounds = data_file.upper_bounds or {} + readable_metrics = { + schema.find_column_name(field.field_id): { + "column_size": column_sizes.get(field.field_id), + "value_count": value_counts.get(field.field_id), + "null_value_count": null_value_counts.get(field.field_id), + "nan_value_count": nan_value_counts.get(field.field_id), + "lower_bound": from_bytes(field.field_type, lower_bound) + if (lower_bound := lower_bounds.get(field.field_id)) + else None, + "upper_bound": from_bytes(field.field_type, upper_bound) + if (upper_bound := upper_bounds.get(field.field_id)) + else None, + } + for field in self.tbl.metadata.schema().fields + } + files.append( + { + "content": data_file.content, + "file_path": data_file.file_path, + "file_format": data_file.file_format, + "spec_id": data_file.spec_id, + "record_count": data_file.record_count, + "file_size_in_bytes": data_file.file_size_in_bytes, + "column_sizes": dict(data_file.column_sizes) if data_file.column_sizes is not None else None, + "value_counts": dict(data_file.value_counts) if data_file.value_counts is not None else None, + "null_value_counts": dict(data_file.null_value_counts) if data_file.null_value_counts is not None else None, + "nan_value_counts": dict(data_file.nan_value_counts) if data_file.nan_value_counts is not None else None, + "lower_bounds": dict(data_file.lower_bounds) if data_file.lower_bounds is not None else None, + "upper_bounds": dict(data_file.upper_bounds) if data_file.upper_bounds is not None else None, + "key_metadata": data_file.key_metadata, + "split_offsets": data_file.split_offsets, + "equality_ids": data_file.equality_ids, + "sort_order_id": data_file.sort_order_id, + "readable_metrics": readable_metrics, + } + ) + return files + + def _get_files_schema(self) -> "pa.Schema": import pyarrow as pa from pyiceberg.io.pyarrow import schema_to_pyarrow @@ -570,70 +625,27 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: pa.field("readable_metrics", pa.struct(readable_metrics_struct), nullable=True), ] ) + return files_schema + + def _files(self, snapshot_id: Optional[int] = None, data_file_filter: Optional[Set[DataFileContent]] = None) -> "pa.Table": + import pyarrow as pa files: list[dict[str, Any]] = [] if not snapshot_id and not self.tbl.metadata.current_snapshot(): return pa.Table.from_pylist( files, - schema=files_schema, + schema=self._get_files_schema(), ) snapshot = self._get_snapshot(snapshot_id) io = self.tbl.io for manifest_list in snapshot.manifests(io): - for manifest_entry in manifest_list.fetch_manifest_entry(io): - data_file = manifest_entry.data_file - if data_file_filter and data_file.content not in data_file_filter: - continue - column_sizes = data_file.column_sizes or {} - value_counts = data_file.value_counts or {} - null_value_counts = data_file.null_value_counts or {} - nan_value_counts = data_file.nan_value_counts or {} - lower_bounds = data_file.lower_bounds or {} - upper_bounds = data_file.upper_bounds or {} - readable_metrics = { - schema.find_column_name(field.field_id): { - "column_size": column_sizes.get(field.field_id), - "value_count": value_counts.get(field.field_id), - "null_value_count": null_value_counts.get(field.field_id), - "nan_value_count": nan_value_counts.get(field.field_id), - "lower_bound": from_bytes(field.field_type, lower_bound) - if (lower_bound := lower_bounds.get(field.field_id)) - else None, - "upper_bound": from_bytes(field.field_type, upper_bound) - if (upper_bound := upper_bounds.get(field.field_id)) - else None, - } - for field in self.tbl.metadata.schema().fields - } - files.append( - { - "content": data_file.content, - "file_path": data_file.file_path, - "file_format": data_file.file_format, - "spec_id": data_file.spec_id, - "record_count": data_file.record_count, - "file_size_in_bytes": data_file.file_size_in_bytes, - "column_sizes": dict(data_file.column_sizes) if data_file.column_sizes is not None else None, - "value_counts": dict(data_file.value_counts) if data_file.value_counts is not None else None, - "null_value_counts": dict(data_file.null_value_counts) - if data_file.null_value_counts is not None - else None, - "nan_value_counts": dict(data_file.nan_value_counts) if data_file.nan_value_counts is not None else None, - "lower_bounds": dict(data_file.lower_bounds) if data_file.lower_bounds is not None else None, - "upper_bounds": dict(data_file.upper_bounds) if data_file.upper_bounds is not None else None, - "key_metadata": data_file.key_metadata, - "split_offsets": data_file.split_offsets, - "equality_ids": data_file.equality_ids, - "sort_order_id": data_file.sort_order_id, - "readable_metrics": readable_metrics, - } - ) + files.extend(self._files_by_manifest(manifest_list, data_file_filter)) return pa.Table.from_pylist( files, - schema=files_schema, + schema=self._get_files_schema(), ) def files(self, snapshot_id: Optional[int] = None) -> "pa.Table": @@ -657,3 +669,35 @@ def all_manifests(self) -> "pa.Table": lambda args: self._generate_manifests_table(*args), [(snapshot, True) for snapshot in snapshots] ) return pa.concat_tables(manifests_by_snapshots) + + def _all_files(self, data_file_filter: Optional[Set[DataFileContent]] = None) -> "pa.Table": + import pyarrow as pa + + snapshots = self.tbl.snapshots() + if not snapshots: + return pa.Table.from_pylist([], schema=self._get_files_schema()) + + executor = ExecutorFactory.get_or_create() + all_manifest_files_by_snapshot: Iterator[List[ManifestFile]] = executor.map( + lambda args: args[0].manifests(self.tbl.io), [(snapshot,) for snapshot in snapshots] + ) + all_manifest_files = list( + {(manifest.manifest_path, manifest) for manifest_list in all_manifest_files_by_snapshot for manifest in manifest_list} + ) + all_files_by_manifest: Iterator[List[Dict[str, Any]]] = executor.map( + lambda args: self._files_by_manifest(*args), [(manifest, data_file_filter) for _, manifest in all_manifest_files] + ) + all_files_list = [file for files in all_files_by_manifest for file in files] + return pa.Table.from_pylist( + all_files_list, + schema=self._get_files_schema(), + ) + + def all_files(self) -> "pa.Table": + return self._all_files() + + def all_data_files(self) -> "pa.Table": + return self._all_files({DataFileContent.DATA}) + + def all_delete_files(self) -> "pa.Table": + return self._all_files({DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES}) diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index 75fe92a69a..7457532d1d 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -71,6 +71,131 @@ def _create_table(session_catalog: Catalog, identifier: str, properties: Propert return session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties=properties) +def _inspect_files_asserts(df: pa.Table, spark_df: DataFrame) -> None: + from pandas.testing import assert_frame_equal + + assert df.column_names == [ + "content", + "file_path", + "file_format", + "spec_id", + "record_count", + "file_size_in_bytes", + "column_sizes", + "value_counts", + "null_value_counts", + "nan_value_counts", + "lower_bounds", + "upper_bounds", + "key_metadata", + "split_offsets", + "equality_ids", + "sort_order_id", + "readable_metrics", + ] + + # make sure the non-nullable fields are filled + for int_column in ["content", "spec_id", "record_count", "file_size_in_bytes"]: + for value in df[int_column]: + assert isinstance(value.as_py(), int) + + for split_offsets in df["split_offsets"]: + assert isinstance(split_offsets.as_py(), list) + + for file_format in df["file_format"]: + assert file_format.as_py() == "PARQUET" + + for file_path in df["file_path"]: + assert file_path.as_py().startswith("s3://") + + # sort the dataframes by content and file_path to compare them, + # as the order of the files is not guaranteed in case of all_files + lhs = df.to_pandas().sort_values(by=["content", "file_path"]).reset_index(drop=True) + rhs = spark_df.toPandas().sort_values(by=["content", "file_path"]).reset_index(drop=True) + + lhs_subset = lhs[ + [ + "content", + "file_path", + "file_format", + "spec_id", + "record_count", + "file_size_in_bytes", + "split_offsets", + "equality_ids", + "sort_order_id", + ] + ] + rhs_subset = rhs[ + [ + "content", + "file_path", + "file_format", + "spec_id", + "record_count", + "file_size_in_bytes", + "split_offsets", + "equality_ids", + "sort_order_id", + ] + ] + + assert_frame_equal(lhs_subset, rhs_subset, check_dtype=False, check_categorical=False) + + for column in df.column_names: + for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): + if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right): + # NaN != NaN in Python + continue + if column in [ + "column_sizes", + "value_counts", + "null_value_counts", + "nan_value_counts", + "lower_bounds", + "upper_bounds", + ]: + if isinstance(right, dict): + left = dict(left) + assert left == right, f"Difference in column {column}: {left} != {right}" + + elif column == "readable_metrics": + assert list(left.keys()) == [ + "bool", + "string", + "string_long", + "int", + "long", + "float", + "double", + "timestamp", + "timestamptz", + "date", + "binary", + "fixed", + ] + assert left.keys() == right.keys() + + for rm_column in left.keys(): + rm_lhs = left[rm_column] + rm_rhs = right[rm_column] + + assert rm_lhs["column_size"] == rm_rhs["column_size"] + assert rm_lhs["value_count"] == rm_rhs["value_count"] + assert rm_lhs["null_value_count"] == rm_rhs["null_value_count"] + assert rm_lhs["nan_value_count"] == rm_rhs["nan_value_count"] + + if rm_column == "timestamptz" and rm_rhs["lower_bound"] and rm_rhs["upper_bound"]: + # PySpark does not correctly set the timstamptz + rm_rhs["lower_bound"] = rm_rhs["lower_bound"].replace(tzinfo=pytz.utc) + rm_rhs["upper_bound"] = rm_rhs["upper_bound"].replace(tzinfo=pytz.utc) + + assert rm_lhs["lower_bound"] == rm_rhs["lower_bound"] + assert rm_lhs["upper_bound"] == rm_rhs["upper_bound"] + else: + assert left == right, f"Difference in column {column}: {left} != {right}" + + @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) def test_inspect_snapshots( @@ -661,8 +786,6 @@ def test_inspect_history(spark: SparkSession, session_catalog: Catalog, format_v def test_inspect_files( spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int ) -> None: - from pandas.testing import assert_frame_equal - identifier = "default.table_metadata_files" tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) @@ -684,129 +807,9 @@ def test_inspect_files( delete_files_df = tbl.inspect.delete_files() - def inspect_files_asserts(df: pa.Table, spark_df: DataFrame) -> None: - assert df.column_names == [ - "content", - "file_path", - "file_format", - "spec_id", - "record_count", - "file_size_in_bytes", - "column_sizes", - "value_counts", - "null_value_counts", - "nan_value_counts", - "lower_bounds", - "upper_bounds", - "key_metadata", - "split_offsets", - "equality_ids", - "sort_order_id", - "readable_metrics", - ] - - # make sure the non-nullable fields are filled - for int_column in ["content", "spec_id", "record_count", "file_size_in_bytes"]: - for value in df[int_column]: - assert isinstance(value.as_py(), int) - - for split_offsets in df["split_offsets"]: - assert isinstance(split_offsets.as_py(), list) - - for file_format in df["file_format"]: - assert file_format.as_py() == "PARQUET" - - for file_path in df["file_path"]: - assert file_path.as_py().startswith("s3://") - - lhs = df.to_pandas() - rhs = spark_df.toPandas() - - lhs_subset = lhs[ - [ - "content", - "file_path", - "file_format", - "spec_id", - "record_count", - "file_size_in_bytes", - "split_offsets", - "equality_ids", - "sort_order_id", - ] - ] - rhs_subset = rhs[ - [ - "content", - "file_path", - "file_format", - "spec_id", - "record_count", - "file_size_in_bytes", - "split_offsets", - "equality_ids", - "sort_order_id", - ] - ] - - assert_frame_equal(lhs_subset, rhs_subset, check_dtype=False, check_categorical=False) - - for column in df.column_names: - for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): - if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right): - # NaN != NaN in Python - continue - if column in [ - "column_sizes", - "value_counts", - "null_value_counts", - "nan_value_counts", - "lower_bounds", - "upper_bounds", - ]: - if isinstance(right, dict): - left = dict(left) - assert left == right, f"Difference in column {column}: {left} != {right}" - - elif column == "readable_metrics": - assert list(left.keys()) == [ - "bool", - "string", - "string_long", - "int", - "long", - "float", - "double", - "timestamp", - "timestamptz", - "date", - "binary", - "fixed", - ] - assert left.keys() == right.keys() - - for rm_column in left.keys(): - rm_lhs = left[rm_column] - rm_rhs = right[rm_column] - - assert rm_lhs["column_size"] == rm_rhs["column_size"] - assert rm_lhs["value_count"] == rm_rhs["value_count"] - assert rm_lhs["null_value_count"] == rm_rhs["null_value_count"] - assert rm_lhs["nan_value_count"] == rm_rhs["nan_value_count"] - - if rm_column == "timestamptz" and rm_rhs["lower_bound"] and rm_rhs["upper_bound"]: - # PySpark does not correctly set the timstamptz - rm_rhs["lower_bound"] = rm_rhs["lower_bound"].replace(tzinfo=pytz.utc) - rm_rhs["upper_bound"] = rm_rhs["upper_bound"].replace(tzinfo=pytz.utc) - - assert rm_lhs["lower_bound"] == rm_rhs["lower_bound"] - assert rm_lhs["upper_bound"] == rm_rhs["upper_bound"] - else: - assert left == right, f"Difference in column {column}: {left} != {right}" - - inspect_files_asserts(files_df, spark.table(f"{identifier}.files")) - inspect_files_asserts(data_files_df, spark.table(f"{identifier}.data_files")) - inspect_files_asserts(delete_files_df, spark.table(f"{identifier}.delete_files")) + _inspect_files_asserts(files_df, spark.table(f"{identifier}.files")) + _inspect_files_asserts(data_files_df, spark.table(f"{identifier}.data_files")) + _inspect_files_asserts(delete_files_df, spark.table(f"{identifier}.delete_files")) @pytest.mark.integration @@ -819,6 +822,9 @@ def test_inspect_files_no_snapshot(spark: SparkSession, session_catalog: Catalog files_df = tbl.refresh().inspect.files() data_files_df = tbl.inspect.data_files() delete_files_df = tbl.inspect.delete_files() + all_files_df = tbl.inspect.all_files() + all_data_files_df = tbl.inspect.all_data_files() + all_delete_files_df = tbl.inspect.all_delete_files() def inspect_files_asserts(df: pa.Table) -> None: assert df.column_names == [ @@ -846,6 +852,9 @@ def inspect_files_asserts(df: pa.Table) -> None: inspect_files_asserts(files_df) inspect_files_asserts(data_files_df) inspect_files_asserts(delete_files_df) + inspect_files_asserts(all_files_df) + inspect_files_asserts(all_data_files_df) + inspect_files_asserts(all_delete_files_df) @pytest.mark.integration @@ -938,3 +947,37 @@ def test_inspect_all_manifests(spark: SparkSession, session_catalog: Catalog, fo lhs = spark.table(f"{identifier}.all_manifests").toPandas() rhs = df.to_pandas() assert_frame_equal(lhs, rhs, check_dtype=False) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_inspect_all_files( + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int +) -> None: + identifier = "default.table_metadata_files" + + tbl = _create_table(session_catalog, identifier, properties={"format-version": format_version}) + + # append three times + for _ in range(3): + tbl.append(arrow_table_with_null) + + # configure table properties + if format_version == 2: + with tbl.transaction() as txn: + txn.set_properties({"write.delete.mode": "merge-on-read"}) + txn.set_properties({"write.update.mode": "merge-on-read"}) + spark.sql(f"DELETE FROM {identifier} WHERE int = 1") + tbl.refresh() + tbl.append(arrow_table_with_null) + spark.sql(f"UPDATE {identifier} SET string = 'b' WHERE int = 9") + spark.sql(f"DELETE FROM {identifier} WHERE int = 1") + tbl.refresh() + + all_files_df = tbl.inspect.all_files() + all_data_files_df = tbl.inspect.all_data_files() + all_delete_files_df = tbl.inspect.all_delete_files() + + _inspect_files_asserts(all_files_df, spark.table(f"{identifier}.all_files")) + _inspect_files_asserts(all_data_files_df, spark.table(f"{identifier}.all_data_files")) + _inspect_files_asserts(all_delete_files_df, spark.table(f"{identifier}.all_delete_files")) From fb10185fd6acb695b1f63d6611bcae2dd95d9789 Mon Sep 17 00:00:00 2001 From: Soumya Ghosh Date: Sat, 26 Apr 2025 23:59:51 +0530 Subject: [PATCH 2/5] refactored _get_files_from_manifest and _all_files methods --- pyiceberg/table/inspect.py | 43 ++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 23 deletions(-) diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index f7293101ff..078414b63c 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -523,9 +523,11 @@ def history(self) -> "pa.Table": return pa.Table.from_pylist(history, schema=history_schema) - def _files_by_manifest( + def _get_files_from_manifest( self, manifest_list: ManifestFile, data_file_filter: Optional[Set[DataFileContent]] = None - ) -> List[Dict[str, Any]]: + ) -> "pa.Table": + import pyarrow as pa + files: list[dict[str, Any]] = [] schema = self.tbl.metadata.schema() io = self.tbl.io @@ -576,7 +578,10 @@ def _files_by_manifest( "readable_metrics": readable_metrics, } ) - return files + return pa.Table.from_pylist( + files, + schema=self._get_files_schema(), + ) def _get_files_schema(self) -> "pa.Schema": import pyarrow as pa @@ -630,23 +635,20 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: def _files(self, snapshot_id: Optional[int] = None, data_file_filter: Optional[Set[DataFileContent]] = None) -> "pa.Table": import pyarrow as pa - files: list[dict[str, Any]] = [] + files_table: list[pa.Table] = [] if not snapshot_id and not self.tbl.metadata.current_snapshot(): return pa.Table.from_pylist( - files, + [], schema=self._get_files_schema(), ) snapshot = self._get_snapshot(snapshot_id) io = self.tbl.io for manifest_list in snapshot.manifests(io): - files.extend(self._files_by_manifest(manifest_list, data_file_filter)) + files_table.append(self._get_files_from_manifest(manifest_list, data_file_filter)) - return pa.Table.from_pylist( - files, - schema=self._get_files_schema(), - ) + return pa.concat_tables(files_table) def files(self, snapshot_id: Optional[int] = None) -> "pa.Table": return self._files(snapshot_id) @@ -678,21 +680,16 @@ def _all_files(self, data_file_filter: Optional[Set[DataFileContent]] = None) -> return pa.Table.from_pylist([], schema=self._get_files_schema()) executor = ExecutorFactory.get_or_create() - all_manifest_files_by_snapshot: Iterator[List[ManifestFile]] = executor.map( - lambda args: args[0].manifests(self.tbl.io), [(snapshot,) for snapshot in snapshots] - ) - all_manifest_files = list( - {(manifest.manifest_path, manifest) for manifest_list in all_manifest_files_by_snapshot for manifest in manifest_list} - ) - all_files_by_manifest: Iterator[List[Dict[str, Any]]] = executor.map( - lambda args: self._files_by_manifest(*args), [(manifest, data_file_filter) for _, manifest in all_manifest_files] - ) - all_files_list = [file for files in all_files_by_manifest for file in files] - return pa.Table.from_pylist( - all_files_list, - schema=self._get_files_schema(), + manifest_lists = executor.map(lambda snapshot: snapshot.manifests(self.tbl.io), snapshots) + + unique_manifests = {(manifest.manifest_path, manifest) for manifest_list in manifest_lists for manifest in manifest_list} + + file_lists = executor.map( + lambda args: self._get_files_from_manifest(*args), [(manifest, data_file_filter) for _, manifest in unique_manifests] ) + return pa.concat_tables(file_lists) + def all_files(self) -> "pa.Table": return self._all_files() From 9fff025cfba8ff44d8eb779c6a039ac278340c75 Mon Sep 17 00:00:00 2001 From: Soumya Ghosh Date: Thu, 8 May 2025 03:31:01 +0530 Subject: [PATCH 3/5] Add integration tests format version 3 for files metadata tables --- pyiceberg/table/inspect.py | 10 ++--- tests/integration/test_inspect_table.py | 52 +++++++++++++++++++++++-- 2 files changed, 51 insertions(+), 11 deletions(-) diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index 0c46391b95..05b735c8a3 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -635,16 +635,12 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: def _files(self, snapshot_id: Optional[int] = None, data_file_filter: Optional[Set[DataFileContent]] = None) -> "pa.Table": import pyarrow as pa - files_table: list[pa.Table] = [] - if not snapshot_id and not self.tbl.metadata.current_snapshot(): - return pa.Table.from_pylist( - [], - schema=self._get_files_schema(), - ) - snapshot = self._get_snapshot(snapshot_id) + return self._get_files_schema().empty_table() + snapshot = self._get_snapshot(snapshot_id) io = self.tbl.io + files_table: list[pa.Table] = [] for manifest_list in snapshot.manifests(io): files_table.append(self._get_files_from_manifest(manifest_list, data_file_filter)) diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index 96bf504ba1..9f2a65f539 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -100,10 +100,8 @@ def _inspect_files_asserts(df: pa.Table, spark_df: DataFrame) -> None: assert isinstance(value.as_py(), int) for split_offsets in df["split_offsets"]: - assert isinstance(split_offsets.as_py(), list) - - for file_format in df["file_format"]: - assert file_format.as_py() == "PARQUET" + if split_offsets.as_py() is not None: + assert isinstance(split_offsets.as_py(), list) for file_path in df["file_path"]: assert file_path.as_py().startswith("s3://") @@ -985,3 +983,49 @@ def test_inspect_all_files( _inspect_files_asserts(all_files_df, spark.table(f"{identifier}.all_files")) _inspect_files_asserts(all_data_files_df, spark.table(f"{identifier}.all_data_files")) _inspect_files_asserts(all_delete_files_df, spark.table(f"{identifier}.all_delete_files")) + + +@pytest.mark.integration +def test_inspect_files_format_version_3(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.table_metadata_files" + + tbl = _create_table( + session_catalog, + identifier, + properties={ + "format-version": "3", + "write.delete.mode": "merge-on-read", + "write.update.mode": "merge-on-read", + "write.merge.mode": "merge-on-read", + }, + ) + + insert_data_sql = f"""INSERT INTO {identifier} VALUES + (false, 'a', 'aaaaaaaaaaaaaaaaaaaaaa', 1, 1, 0.0, 0.0, TIMESTAMP('2023-01-01 19:25:00'), TIMESTAMP('2023-01-01 19:25:00+00:00'), DATE('2023-01-01'), X'01', X'00000000000000000000000000000000'), + (NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL), + (true, 'z', 'zzzzzzzzzzzzzzzzzzzzzz', 9, 9, 0.9, 0.9, TIMESTAMP('2023-03-01 19:25:00'), TIMESTAMP('2023-03-01 19:25:00+00:00'), DATE('2023-03-01'), X'12', X'11111111111111111111111111111111'); + """ + + spark.sql(insert_data_sql) + spark.sql(insert_data_sql) + spark.sql(f"UPDATE {identifier} SET int = 2 WHERE int = 1") + spark.sql(f"DELETE FROM {identifier} WHERE int = 9") + spark.table(identifier).show(20, False) + + tbl.refresh() + + files_df = tbl.inspect.files() + data_files_df = tbl.inspect.data_files() + delete_files_df = tbl.inspect.delete_files() + + all_files_df = tbl.inspect.all_files() + all_data_files_df = tbl.inspect.all_data_files() + all_delete_files_df = tbl.inspect.all_delete_files() + + _inspect_files_asserts(files_df, spark.table(f"{identifier}.files")) + _inspect_files_asserts(data_files_df, spark.table(f"{identifier}.data_files")) + _inspect_files_asserts(delete_files_df, spark.table(f"{identifier}.delete_files")) + + _inspect_files_asserts(all_files_df, spark.table(f"{identifier}.all_files")) + _inspect_files_asserts(all_data_files_df, spark.table(f"{identifier}.all_data_files")) + _inspect_files_asserts(all_delete_files_df, spark.table(f"{identifier}.all_delete_files")) From 2bce48499b9707f1629dfa37b8d7430e3b9936b1 Mon Sep 17 00:00:00 2001 From: Soumya Ghosh Date: Fri, 9 May 2025 04:07:31 +0530 Subject: [PATCH 4/5] Add partition field in files metadata table schema --- pyiceberg/table/inspect.py | 10 ++++ tests/integration/test_inspect_table.py | 74 ++++++++++++++++++++++++- 2 files changed, 83 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index 05b735c8a3..bf50a812a2 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -557,11 +557,17 @@ def _get_files_from_manifest( } for field in self.tbl.metadata.schema().fields } + partition = data_file.partition + partition_record_dict = { + field.name: partition[pos] + for pos, field in enumerate(self.tbl.metadata.specs()[manifest_list.partition_spec_id].fields) + } files.append( { "content": data_file.content, "file_path": data_file.file_path, "file_format": data_file.file_format, + "partition": partition_record_dict, "spec_id": data_file.spec_id, "record_count": data_file.record_count, "file_size_in_bytes": data_file.file_size_in_bytes, @@ -604,6 +610,9 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: ] ) + partition_record = self.tbl.metadata.specs_struct() + pa_record_struct = schema_to_pyarrow(partition_record) + for field in self.tbl.metadata.schema().fields: readable_metrics_struct.append( pa.field(schema.find_column_name(field.field_id), _readable_metrics_struct(field.field_type), nullable=False) @@ -614,6 +623,7 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: pa.field("content", pa.int8(), nullable=False), pa.field("file_path", pa.string(), nullable=False), pa.field("file_format", pa.dictionary(pa.int32(), pa.string()), nullable=False), + pa.field("partition", pa_record_struct, nullable=False), pa.field("spec_id", pa.int32(), nullable=False), pa.field("record_count", pa.int64(), nullable=False), pa.field("file_size_in_bytes", pa.int64(), nullable=False), diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index 9f2a65f539..4adea968eb 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -78,6 +78,7 @@ def _inspect_files_asserts(df: pa.Table, spark_df: DataFrame) -> None: "content", "file_path", "file_format", + "partition", "spec_id", "record_count", "file_size_in_bytes", @@ -141,6 +142,9 @@ def _inspect_files_asserts(df: pa.Table, spark_df: DataFrame) -> None: assert_frame_equal(lhs_subset, rhs_subset, check_dtype=False, check_categorical=False) for column in df.column_names: + if column == "partition": + # Spark leaves out the partition if the table is unpartitioned + continue for left, right in zip(lhs[column].to_list(), rhs[column].to_list()): if isinstance(left, float) and math.isnan(left) and isinstance(right, float) and math.isnan(right): # NaN != NaN in Python @@ -833,6 +837,7 @@ def inspect_files_asserts(df: pa.Table) -> None: "content", "file_path", "file_format", + "partition", "spec_id", "record_count", "file_size_in_bytes", @@ -1010,7 +1015,6 @@ def test_inspect_files_format_version_3(spark: SparkSession, session_catalog: Ca spark.sql(insert_data_sql) spark.sql(f"UPDATE {identifier} SET int = 2 WHERE int = 1") spark.sql(f"DELETE FROM {identifier} WHERE int = 9") - spark.table(identifier).show(20, False) tbl.refresh() @@ -1029,3 +1033,71 @@ def test_inspect_files_format_version_3(spark: SparkSession, session_catalog: Ca _inspect_files_asserts(all_files_df, spark.table(f"{identifier}.all_files")) _inspect_files_asserts(all_data_files_df, spark.table(f"{identifier}.all_data_files")) _inspect_files_asserts(all_delete_files_df, spark.table(f"{identifier}.all_delete_files")) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2, 3]) +def test_inspect_files_partitioned(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + from pandas.testing import assert_frame_equal + + identifier = "default.table_metadata_files_partitioned" + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + spark.sql( + f""" + CREATE TABLE {identifier} ( + dt date, + int_data int + ) + PARTITIONED BY (months(dt)) + TBLPROPERTIES ('format-version'='{format_version}') + """ + ) + + if format_version > 1: + spark.sql( + f""" + ALTER TABLE {identifier} SET TBLPROPERTIES( + 'write.update.mode' = 'merge-on-read', + 'write.delete.mode' = 'merge-on-read', + 'write.merge.mode' = 'merge-on-read') + """ + ) + + spark.sql(f""" + INSERT INTO {identifier} VALUES (CAST('2025-01-01' AS date), 1), (CAST('2025-01-01' AS date), 2) + """) + + spark.sql( + f""" + ALTER TABLE {identifier} + REPLACE PARTITION FIELD dt_month WITH days(dt) + """ + ) + + spark.sql( + f""" + INSERT INTO {identifier} VALUES (CAST('2025-01-02' AS date), 2) + """ + ) + + spark.sql( + f""" + DELETE FROM {identifier} WHERE int_data = 1 + """ + ) + + tbl = session_catalog.load_table(identifier) + files_df = tbl.inspect.files() + lhs = files_df.to_pandas()[["file_path", "partition"]].sort_values("file_path", ignore_index=True).reset_index() + rhs = ( + spark.table(f"{identifier}.files") + .select(["file_path", "partition"]) + .toPandas() + .sort_values("file_path", ignore_index=True) + .reset_index() + ) + assert_frame_equal(lhs, rhs, check_dtype=False) From 9bdf5c750e49f102065842eee9a259b4286a71fc Mon Sep 17 00:00:00 2001 From: Soumya Ghosh Date: Fri, 9 May 2025 15:41:08 +0530 Subject: [PATCH 5/5] Fix order of fields in files schema --- pyiceberg/table/inspect.py | 4 ++-- tests/integration/test_inspect_table.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index bf50a812a2..cce5250ad5 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -567,8 +567,8 @@ def _get_files_from_manifest( "content": data_file.content, "file_path": data_file.file_path, "file_format": data_file.file_format, - "partition": partition_record_dict, "spec_id": data_file.spec_id, + "partition": partition_record_dict, "record_count": data_file.record_count, "file_size_in_bytes": data_file.file_size_in_bytes, "column_sizes": dict(data_file.column_sizes) if data_file.column_sizes is not None else None, @@ -623,8 +623,8 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: pa.field("content", pa.int8(), nullable=False), pa.field("file_path", pa.string(), nullable=False), pa.field("file_format", pa.dictionary(pa.int32(), pa.string()), nullable=False), - pa.field("partition", pa_record_struct, nullable=False), pa.field("spec_id", pa.int32(), nullable=False), + pa.field("partition", pa_record_struct, nullable=False), pa.field("record_count", pa.int64(), nullable=False), pa.field("file_size_in_bytes", pa.int64(), nullable=False), pa.field("column_sizes", pa.map_(pa.int32(), pa.int64()), nullable=True), diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index 4adea968eb..e81050a81c 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -78,8 +78,8 @@ def _inspect_files_asserts(df: pa.Table, spark_df: DataFrame) -> None: "content", "file_path", "file_format", - "partition", "spec_id", + "partition", "record_count", "file_size_in_bytes", "column_sizes", @@ -837,8 +837,8 @@ def inspect_files_asserts(df: pa.Table) -> None: "content", "file_path", "file_format", - "partition", "spec_id", + "partition", "record_count", "file_size_in_bytes", "column_sizes",