From 5a63517ca949fee94876815c098db769f4d853db Mon Sep 17 00:00:00 2001 From: Hanzhi Wang Date: Tue, 19 Aug 2025 16:36:06 -0700 Subject: [PATCH 1/3] perf: optimize `inspect.partitions` Parallelizes manifest processing to improve performance for large tables with many manifest files. After parallel processing, merges the resulting partition maps to produce the final aggregated result. --- pyiceberg/table/inspect.py | 33 +++++++++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index 3bb0268a05..7f25db525e 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -330,9 +330,8 @@ def update_partitions_map( else: raise ValueError(f"Unknown DataFileContent ({file.content})") - partitions_map: Dict[Tuple[str, Any], Any] = {} - snapshot = self._get_snapshot(snapshot_id) - for manifest in snapshot.manifests(self.tbl.io): + def process_manifest(manifest: ManifestFile) -> Dict[Tuple[str, Any], Any]: + local_partitions_map: Dict[Tuple[str, Any], Any] = {} for entry in manifest.fetch_manifest_entry(io=self.tbl.io): partition = entry.data_file.partition partition_record_dict = { @@ -340,7 +339,33 @@ def update_partitions_map( for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields) } entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if entry.snapshot_id is not None else None - update_partitions_map(partitions_map, entry.data_file, partition_record_dict, entry_snapshot) + update_partitions_map(local_partitions_map, entry.data_file, partition_record_dict, entry_snapshot) + return local_partitions_map + + snapshot = self._get_snapshot(snapshot_id) + executor = ExecutorFactory.get_or_create() + local_partitions_maps = list(executor.map(process_manifest, snapshot.manifests(self.tbl.io))) + + partitions_map: Dict[Tuple[str, Any], Any] = {} + for local_map in local_partitions_maps: + for partition_record_key, partition_row in local_map.items(): + if partition_record_key not in partitions_map: + partitions_map[partition_record_key] = partition_row + else: + existing = partitions_map[partition_record_key] + existing["record_count"] += partition_row["record_count"] + existing["file_count"] += partition_row["file_count"] + existing["total_data_file_size_in_bytes"] += partition_row["total_data_file_size_in_bytes"] + existing["position_delete_record_count"] += partition_row["position_delete_record_count"] + existing["position_delete_file_count"] += partition_row["position_delete_file_count"] + existing["equality_delete_record_count"] += partition_row["equality_delete_record_count"] + existing["equality_delete_file_count"] += partition_row["equality_delete_file_count"] + + if partition_row["last_updated_at"] and ( + not existing["last_updated_at"] or partition_row["last_updated_at"] > existing["last_updated_at"] + ): + existing["last_updated_at"] = partition_row["last_updated_at"] + existing["last_updated_snapshot_id"] = partition_row["last_updated_snapshot_id"] return pa.Table.from_pylist( partitions_map.values(), From 3696901588750e483e6d40af5b0c1f974f6e39cb Mon Sep 17 00:00:00 2001 From: Hanzhi Wang Date: Wed, 20 Aug 2025 10:26:54 -0700 Subject: [PATCH 2/3] Code refactor based on reviews --- pyiceberg/table/inspect.py | 109 ++++++++++++++++++------------------- 1 file changed, 53 insertions(+), 56 deletions(-) diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index 7f25db525e..98b1155777 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -20,7 +20,7 @@ from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple from pyiceberg.conversions import from_bytes -from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary +from pyiceberg.manifest import DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary from pyiceberg.partitioning import PartitionSpec from pyiceberg.table.snapshots import Snapshot, ancestors_of from pyiceberg.types import PrimitiveType @@ -288,63 +288,9 @@ def partitions(self, snapshot_id: Optional[int] = None) -> "pa.Table": table_schema = pa.unify_schemas([partitions_schema, table_schema]) - def update_partitions_map( - partitions_map: Dict[Tuple[str, Any], Any], - file: DataFile, - partition_record_dict: Dict[str, Any], - snapshot: Optional[Snapshot], - ) -> None: - partition_record_key = _convert_to_hashable_type(partition_record_dict) - if partition_record_key not in partitions_map: - partitions_map[partition_record_key] = { - "partition": partition_record_dict, - "spec_id": file.spec_id, - "record_count": 0, - "file_count": 0, - "total_data_file_size_in_bytes": 0, - "position_delete_record_count": 0, - "position_delete_file_count": 0, - "equality_delete_record_count": 0, - "equality_delete_file_count": 0, - "last_updated_at": snapshot.timestamp_ms if snapshot else None, - "last_updated_snapshot_id": snapshot.snapshot_id if snapshot else None, - } - - partition_row = partitions_map[partition_record_key] - - if snapshot is not None: - if partition_row["last_updated_at"] is None or partition_row["last_updated_snapshot_id"] < snapshot.timestamp_ms: - partition_row["last_updated_at"] = snapshot.timestamp_ms - partition_row["last_updated_snapshot_id"] = snapshot.snapshot_id - - if file.content == DataFileContent.DATA: - partition_row["record_count"] += file.record_count - partition_row["file_count"] += 1 - partition_row["total_data_file_size_in_bytes"] += file.file_size_in_bytes - elif file.content == DataFileContent.POSITION_DELETES: - partition_row["position_delete_record_count"] += file.record_count - partition_row["position_delete_file_count"] += 1 - elif file.content == DataFileContent.EQUALITY_DELETES: - partition_row["equality_delete_record_count"] += file.record_count - partition_row["equality_delete_file_count"] += 1 - else: - raise ValueError(f"Unknown DataFileContent ({file.content})") - - def process_manifest(manifest: ManifestFile) -> Dict[Tuple[str, Any], Any]: - local_partitions_map: Dict[Tuple[str, Any], Any] = {} - for entry in manifest.fetch_manifest_entry(io=self.tbl.io): - partition = entry.data_file.partition - partition_record_dict = { - field.name: partition[pos] - for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields) - } - entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if entry.snapshot_id is not None else None - update_partitions_map(local_partitions_map, entry.data_file, partition_record_dict, entry_snapshot) - return local_partitions_map - snapshot = self._get_snapshot(snapshot_id) executor = ExecutorFactory.get_or_create() - local_partitions_maps = list(executor.map(process_manifest, snapshot.manifests(self.tbl.io))) + local_partitions_maps = list(executor.map(self._process_manifest, snapshot.manifests(self.tbl.io))) partitions_map: Dict[Tuple[str, Any], Any] = {} for local_map in local_partitions_maps: @@ -372,6 +318,57 @@ def process_manifest(manifest: ManifestFile) -> Dict[Tuple[str, Any], Any]: schema=table_schema, ) + def _process_manifest(self, manifest: ManifestFile) -> Dict[Tuple[str, Any], Any]: + partitions_map: Dict[Tuple[str, Any], Any] = {} + for entry in manifest.fetch_manifest_entry(io=self.tbl.io): + partition = entry.data_file.partition + partition_record_dict = { + field.name: partition[pos] + for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields) + } + entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if entry.snapshot_id is not None else None + + partition_record_key = _convert_to_hashable_type(partition_record_dict) + if partition_record_key not in partitions_map: + partitions_map[partition_record_key] = { + "partition": partition_record_dict, + "spec_id": entry.data_file.spec_id, + "record_count": 0, + "file_count": 0, + "total_data_file_size_in_bytes": 0, + "position_delete_record_count": 0, + "position_delete_file_count": 0, + "equality_delete_record_count": 0, + "equality_delete_file_count": 0, + "last_updated_at": entry_snapshot.timestamp_ms if entry_snapshot else None, + "last_updated_snapshot_id": entry_snapshot.snapshot_id if entry_snapshot else None, + } + + partition_row = partitions_map[partition_record_key] + + if entry_snapshot is not None: + if ( + partition_row["last_updated_at"] is None + or partition_row["last_updated_snapshot_id"] < entry_snapshot.timestamp_ms + ): + partition_row["last_updated_at"] = entry_snapshot.timestamp_ms + partition_row["last_updated_snapshot_id"] = entry_snapshot.snapshot_id + + if entry.data_file.content == DataFileContent.DATA: + partition_row["record_count"] += entry.data_file.record_count + partition_row["file_count"] += 1 + partition_row["total_data_file_size_in_bytes"] += entry.data_file.file_size_in_bytes + elif entry.data_file.content == DataFileContent.POSITION_DELETES: + partition_row["position_delete_record_count"] += entry.data_file.record_count + partition_row["position_delete_file_count"] += 1 + elif entry.data_file.content == DataFileContent.EQUALITY_DELETES: + partition_row["equality_delete_record_count"] += entry.data_file.record_count + partition_row["equality_delete_file_count"] += 1 + else: + raise ValueError(f"Unknown DataFileContent ({entry.data_file.content})") + + return partitions_map + def _get_manifests_schema(self) -> "pa.Schema": import pyarrow as pa From 62b7cfc5b3519cd16664e3f7267b408e83e95b83 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 20 Aug 2025 23:24:44 +0200 Subject: [PATCH 3/3] perf --- pyiceberg/table/inspect.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index 98b1155777..c3aa870977 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -290,7 +290,7 @@ def partitions(self, snapshot_id: Optional[int] = None) -> "pa.Table": snapshot = self._get_snapshot(snapshot_id) executor = ExecutorFactory.get_or_create() - local_partitions_maps = list(executor.map(self._process_manifest, snapshot.manifests(self.tbl.io))) + local_partitions_maps = executor.map(self._process_manifest, snapshot.manifests(self.tbl.io)) partitions_map: Dict[Tuple[str, Any], Any] = {} for local_map in local_partitions_maps: