Skip to content

Commit 5a63517

Browse files
author
Hanzhi Wang
committed
perf: optimize inspect.partitions
Parallelizes manifest processing to improve performance for large tables with many manifest files. After parallel processing, merges the resulting partition maps to produce the final aggregated result.
1 parent 8b43eb8 commit 5a63517

File tree

1 file changed

+29
-4
lines changed

1 file changed

+29
-4
lines changed

pyiceberg/table/inspect.py

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -330,17 +330,42 @@ def update_partitions_map(
330330
else:
331331
raise ValueError(f"Unknown DataFileContent ({file.content})")
332332

333-
partitions_map: Dict[Tuple[str, Any], Any] = {}
334-
snapshot = self._get_snapshot(snapshot_id)
335-
for manifest in snapshot.manifests(self.tbl.io):
333+
def process_manifest(manifest: ManifestFile) -> Dict[Tuple[str, Any], Any]:
334+
local_partitions_map: Dict[Tuple[str, Any], Any] = {}
336335
for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
337336
partition = entry.data_file.partition
338337
partition_record_dict = {
339338
field.name: partition[pos]
340339
for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields)
341340
}
342341
entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if entry.snapshot_id is not None else None
343-
update_partitions_map(partitions_map, entry.data_file, partition_record_dict, entry_snapshot)
342+
update_partitions_map(local_partitions_map, entry.data_file, partition_record_dict, entry_snapshot)
343+
return local_partitions_map
344+
345+
snapshot = self._get_snapshot(snapshot_id)
346+
executor = ExecutorFactory.get_or_create()
347+
local_partitions_maps = list(executor.map(process_manifest, snapshot.manifests(self.tbl.io)))
348+
349+
partitions_map: Dict[Tuple[str, Any], Any] = {}
350+
for local_map in local_partitions_maps:
351+
for partition_record_key, partition_row in local_map.items():
352+
if partition_record_key not in partitions_map:
353+
partitions_map[partition_record_key] = partition_row
354+
else:
355+
existing = partitions_map[partition_record_key]
356+
existing["record_count"] += partition_row["record_count"]
357+
existing["file_count"] += partition_row["file_count"]
358+
existing["total_data_file_size_in_bytes"] += partition_row["total_data_file_size_in_bytes"]
359+
existing["position_delete_record_count"] += partition_row["position_delete_record_count"]
360+
existing["position_delete_file_count"] += partition_row["position_delete_file_count"]
361+
existing["equality_delete_record_count"] += partition_row["equality_delete_record_count"]
362+
existing["equality_delete_file_count"] += partition_row["equality_delete_file_count"]
363+
364+
if partition_row["last_updated_at"] and (
365+
not existing["last_updated_at"] or partition_row["last_updated_at"] > existing["last_updated_at"]
366+
):
367+
existing["last_updated_at"] = partition_row["last_updated_at"]
368+
existing["last_updated_snapshot_id"] = partition_row["last_updated_snapshot_id"]
344369

345370
return pa.Table.from_pylist(
346371
partitions_map.values(),

0 commit comments

Comments
 (0)