Skip to content

Commit cad0ad7

Browse files
authored
Add all_manifests metadata table with tests (#1241)
* Add `all_manifests` metadata table with tests * Move get_manifests_schema and get_all_manifests_schema to InspectTable class * Update tests for all_manifests table * Added linter changes in inspect.py
1 parent c68b9b1 commit cad0ad7

File tree

2 files changed

+143
-24
lines changed

2 files changed

+143
-24
lines changed

pyiceberg/table/inspect.py

Lines changed: 51 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@
1717
from __future__ import annotations
1818

1919
from datetime import datetime, timezone
20-
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
20+
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple
2121

2222
from pyiceberg.conversions import from_bytes
2323
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, PartitionFieldSummary
2424
from pyiceberg.partitioning import PartitionSpec
2525
from pyiceberg.table.snapshots import Snapshot, ancestors_of
2626
from pyiceberg.types import PrimitiveType
27+
from pyiceberg.utils.concurrent import ExecutorFactory
2728
from pyiceberg.utils.singleton import _convert_to_hashable_type
2829

2930
if TYPE_CHECKING:
@@ -346,7 +347,7 @@ def update_partitions_map(
346347
schema=table_schema,
347348
)
348349

349-
def manifests(self) -> "pa.Table":
350+
def _get_manifests_schema(self) -> "pa.Schema":
350351
import pyarrow as pa
351352

352353
partition_summary_schema = pa.struct(
@@ -374,6 +375,17 @@ def manifests(self) -> "pa.Table":
374375
pa.field("partition_summaries", pa.list_(partition_summary_schema), nullable=False),
375376
]
376377
)
378+
return manifest_schema
379+
380+
def _get_all_manifests_schema(self) -> "pa.Schema":
381+
import pyarrow as pa
382+
383+
all_manifests_schema = self._get_manifests_schema()
384+
all_manifests_schema = all_manifests_schema.append(pa.field("reference_snapshot_id", pa.int64(), nullable=False))
385+
return all_manifests_schema
386+
387+
def _generate_manifests_table(self, snapshot: Optional[Snapshot], is_all_manifests_table: bool = False) -> "pa.Table":
388+
import pyarrow as pa
377389

378390
def _partition_summaries_to_rows(
379391
spec: PartitionSpec, partition_summaries: List[PartitionFieldSummary]
@@ -412,36 +424,38 @@ def _partition_summaries_to_rows(
412424

413425
specs = self.tbl.metadata.specs()
414426
manifests = []
415-
if snapshot := self.tbl.metadata.current_snapshot():
427+
if snapshot:
416428
for manifest in snapshot.manifests(self.tbl.io):
417429
is_data_file = manifest.content == ManifestContent.DATA
418430
is_delete_file = manifest.content == ManifestContent.DELETES
419-
manifests.append(
420-
{
421-
"content": manifest.content,
422-
"path": manifest.manifest_path,
423-
"length": manifest.manifest_length,
424-
"partition_spec_id": manifest.partition_spec_id,
425-
"added_snapshot_id": manifest.added_snapshot_id,
426-
"added_data_files_count": manifest.added_files_count if is_data_file else 0,
427-
"existing_data_files_count": manifest.existing_files_count if is_data_file else 0,
428-
"deleted_data_files_count": manifest.deleted_files_count if is_data_file else 0,
429-
"added_delete_files_count": manifest.added_files_count if is_delete_file else 0,
430-
"existing_delete_files_count": manifest.existing_files_count if is_delete_file else 0,
431-
"deleted_delete_files_count": manifest.deleted_files_count if is_delete_file else 0,
432-
"partition_summaries": _partition_summaries_to_rows(
433-
specs[manifest.partition_spec_id], manifest.partitions
434-
)
435-
if manifest.partitions
436-
else [],
437-
}
438-
)
431+
manifest_row = {
432+
"content": manifest.content,
433+
"path": manifest.manifest_path,
434+
"length": manifest.manifest_length,
435+
"partition_spec_id": manifest.partition_spec_id,
436+
"added_snapshot_id": manifest.added_snapshot_id,
437+
"added_data_files_count": manifest.added_files_count if is_data_file else 0,
438+
"existing_data_files_count": manifest.existing_files_count if is_data_file else 0,
439+
"deleted_data_files_count": manifest.deleted_files_count if is_data_file else 0,
440+
"added_delete_files_count": manifest.added_files_count if is_delete_file else 0,
441+
"existing_delete_files_count": manifest.existing_files_count if is_delete_file else 0,
442+
"deleted_delete_files_count": manifest.deleted_files_count if is_delete_file else 0,
443+
"partition_summaries": _partition_summaries_to_rows(specs[manifest.partition_spec_id], manifest.partitions)
444+
if manifest.partitions
445+
else [],
446+
}
447+
if is_all_manifests_table:
448+
manifest_row["reference_snapshot_id"] = snapshot.snapshot_id
449+
manifests.append(manifest_row)
439450

440451
return pa.Table.from_pylist(
441452
manifests,
442-
schema=manifest_schema,
453+
schema=self._get_all_manifests_schema() if is_all_manifests_table else self._get_manifests_schema(),
443454
)
444455

456+
def manifests(self) -> "pa.Table":
457+
return self._generate_manifests_table(self.tbl.current_snapshot())
458+
445459
def metadata_log_entries(self) -> "pa.Table":
446460
import pyarrow as pa
447461

@@ -630,3 +644,16 @@ def data_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
630644

631645
def delete_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
632646
return self._files(snapshot_id, {DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES})
647+
648+
def all_manifests(self) -> "pa.Table":
649+
import pyarrow as pa
650+
651+
snapshots = self.tbl.snapshots()
652+
if not snapshots:
653+
return pa.Table.from_pylist([], schema=self._get_all_manifests_schema())
654+
655+
executor = ExecutorFactory.get_or_create()
656+
manifests_by_snapshots: Iterator["pa.Table"] = executor.map(
657+
lambda args: self._generate_manifests_table(*args), [(snapshot, True) for snapshot in snapshots]
658+
)
659+
return pa.concat_tables(manifests_by_snapshots)

tests/integration/test_inspect_table.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -846,3 +846,95 @@ def inspect_files_asserts(df: pa.Table) -> None:
846846
inspect_files_asserts(files_df)
847847
inspect_files_asserts(data_files_df)
848848
inspect_files_asserts(delete_files_df)
849+
850+
851+
@pytest.mark.integration
852+
@pytest.mark.parametrize("format_version", [1, 2])
853+
def test_inspect_all_manifests(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
854+
from pandas.testing import assert_frame_equal
855+
856+
identifier = "default.table_metadata_all_manifests"
857+
try:
858+
session_catalog.drop_table(identifier=identifier)
859+
except NoSuchTableError:
860+
pass
861+
862+
spark.sql(
863+
f"""
864+
CREATE TABLE {identifier} (
865+
id int,
866+
data string
867+
)
868+
PARTITIONED BY (data)
869+
TBLPROPERTIES ('write.update.mode'='merge-on-read',
870+
'write.delete.mode'='merge-on-read')
871+
"""
872+
)
873+
tbl = session_catalog.load_table(identifier)
874+
875+
# check all_manifests when there are no snapshots
876+
lhs = tbl.inspect.all_manifests().to_pandas()
877+
rhs = spark.table(f"{identifier}.all_manifests").toPandas()
878+
assert_frame_equal(lhs, rhs, check_dtype=False)
879+
880+
spark.sql(f"INSERT INTO {identifier} VALUES (1, 'a')")
881+
882+
spark.sql(f"INSERT INTO {identifier} VALUES (2, 'b')")
883+
884+
spark.sql(f"UPDATE {identifier} SET data = 'c' WHERE id = 1")
885+
886+
spark.sql(f"DELETE FROM {identifier} WHERE id = 2")
887+
888+
spark.sql(f"INSERT OVERWRITE {identifier} VALUES (1, 'a')")
889+
890+
tbl.refresh()
891+
df = tbl.inspect.all_manifests()
892+
893+
assert df.column_names == [
894+
"content",
895+
"path",
896+
"length",
897+
"partition_spec_id",
898+
"added_snapshot_id",
899+
"added_data_files_count",
900+
"existing_data_files_count",
901+
"deleted_data_files_count",
902+
"added_delete_files_count",
903+
"existing_delete_files_count",
904+
"deleted_delete_files_count",
905+
"partition_summaries",
906+
"reference_snapshot_id",
907+
]
908+
909+
int_cols = [
910+
"content",
911+
"length",
912+
"partition_spec_id",
913+
"added_snapshot_id",
914+
"added_data_files_count",
915+
"existing_data_files_count",
916+
"deleted_data_files_count",
917+
"added_delete_files_count",
918+
"existing_delete_files_count",
919+
"deleted_delete_files_count",
920+
"reference_snapshot_id",
921+
]
922+
923+
for column in int_cols:
924+
for value in df[column]:
925+
assert isinstance(value.as_py(), int)
926+
927+
for value in df["path"]:
928+
assert isinstance(value.as_py(), str)
929+
930+
for value in df["partition_summaries"]:
931+
assert isinstance(value.as_py(), list)
932+
for row in value:
933+
assert isinstance(row["contains_null"].as_py(), bool)
934+
assert isinstance(row["contains_nan"].as_py(), (bool, type(None)))
935+
assert isinstance(row["lower_bound"].as_py(), (str, type(None)))
936+
assert isinstance(row["upper_bound"].as_py(), (str, type(None)))
937+
938+
lhs = spark.table(f"{identifier}.all_manifests").toPandas()
939+
rhs = df.to_pandas()
940+
assert_frame_equal(lhs, rhs, check_dtype=False)

0 commit comments

Comments
 (0)