Skip to content

Commit 3696901

Browse files
author
Hanzhi Wang
committed
Code refactor based on reviews
1 parent 5a63517 commit 3696901

File tree

1 file changed

+53
-56
lines changed

1 file changed

+53
-56
lines changed

pyiceberg/table/inspect.py

Lines changed: 53 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple
2121

2222
from pyiceberg.conversions import from_bytes
23-
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary
23+
from pyiceberg.manifest import DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary
2424
from pyiceberg.partitioning import PartitionSpec
2525
from pyiceberg.table.snapshots import Snapshot, ancestors_of
2626
from pyiceberg.types import PrimitiveType
@@ -288,63 +288,9 @@ def partitions(self, snapshot_id: Optional[int] = None) -> "pa.Table":
288288

289289
table_schema = pa.unify_schemas([partitions_schema, table_schema])
290290

291-
def update_partitions_map(
292-
partitions_map: Dict[Tuple[str, Any], Any],
293-
file: DataFile,
294-
partition_record_dict: Dict[str, Any],
295-
snapshot: Optional[Snapshot],
296-
) -> None:
297-
partition_record_key = _convert_to_hashable_type(partition_record_dict)
298-
if partition_record_key not in partitions_map:
299-
partitions_map[partition_record_key] = {
300-
"partition": partition_record_dict,
301-
"spec_id": file.spec_id,
302-
"record_count": 0,
303-
"file_count": 0,
304-
"total_data_file_size_in_bytes": 0,
305-
"position_delete_record_count": 0,
306-
"position_delete_file_count": 0,
307-
"equality_delete_record_count": 0,
308-
"equality_delete_file_count": 0,
309-
"last_updated_at": snapshot.timestamp_ms if snapshot else None,
310-
"last_updated_snapshot_id": snapshot.snapshot_id if snapshot else None,
311-
}
312-
313-
partition_row = partitions_map[partition_record_key]
314-
315-
if snapshot is not None:
316-
if partition_row["last_updated_at"] is None or partition_row["last_updated_snapshot_id"] < snapshot.timestamp_ms:
317-
partition_row["last_updated_at"] = snapshot.timestamp_ms
318-
partition_row["last_updated_snapshot_id"] = snapshot.snapshot_id
319-
320-
if file.content == DataFileContent.DATA:
321-
partition_row["record_count"] += file.record_count
322-
partition_row["file_count"] += 1
323-
partition_row["total_data_file_size_in_bytes"] += file.file_size_in_bytes
324-
elif file.content == DataFileContent.POSITION_DELETES:
325-
partition_row["position_delete_record_count"] += file.record_count
326-
partition_row["position_delete_file_count"] += 1
327-
elif file.content == DataFileContent.EQUALITY_DELETES:
328-
partition_row["equality_delete_record_count"] += file.record_count
329-
partition_row["equality_delete_file_count"] += 1
330-
else:
331-
raise ValueError(f"Unknown DataFileContent ({file.content})")
332-
333-
def process_manifest(manifest: ManifestFile) -> Dict[Tuple[str, Any], Any]:
334-
local_partitions_map: Dict[Tuple[str, Any], Any] = {}
335-
for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
336-
partition = entry.data_file.partition
337-
partition_record_dict = {
338-
field.name: partition[pos]
339-
for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields)
340-
}
341-
entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if entry.snapshot_id is not None else None
342-
update_partitions_map(local_partitions_map, entry.data_file, partition_record_dict, entry_snapshot)
343-
return local_partitions_map
344-
345291
snapshot = self._get_snapshot(snapshot_id)
346292
executor = ExecutorFactory.get_or_create()
347-
local_partitions_maps = list(executor.map(process_manifest, snapshot.manifests(self.tbl.io)))
293+
local_partitions_maps = list(executor.map(self._process_manifest, snapshot.manifests(self.tbl.io)))
348294

349295
partitions_map: Dict[Tuple[str, Any], Any] = {}
350296
for local_map in local_partitions_maps:
@@ -372,6 +318,57 @@ def process_manifest(manifest: ManifestFile) -> Dict[Tuple[str, Any], Any]:
372318
schema=table_schema,
373319
)
374320

321+
def _process_manifest(self, manifest: ManifestFile) -> Dict[Tuple[str, Any], Any]:
322+
partitions_map: Dict[Tuple[str, Any], Any] = {}
323+
for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
324+
partition = entry.data_file.partition
325+
partition_record_dict = {
326+
field.name: partition[pos]
327+
for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields)
328+
}
329+
entry_snapshot = self.tbl.snapshot_by_id(entry.snapshot_id) if entry.snapshot_id is not None else None
330+
331+
partition_record_key = _convert_to_hashable_type(partition_record_dict)
332+
if partition_record_key not in partitions_map:
333+
partitions_map[partition_record_key] = {
334+
"partition": partition_record_dict,
335+
"spec_id": entry.data_file.spec_id,
336+
"record_count": 0,
337+
"file_count": 0,
338+
"total_data_file_size_in_bytes": 0,
339+
"position_delete_record_count": 0,
340+
"position_delete_file_count": 0,
341+
"equality_delete_record_count": 0,
342+
"equality_delete_file_count": 0,
343+
"last_updated_at": entry_snapshot.timestamp_ms if entry_snapshot else None,
344+
"last_updated_snapshot_id": entry_snapshot.snapshot_id if entry_snapshot else None,
345+
}
346+
347+
partition_row = partitions_map[partition_record_key]
348+
349+
if entry_snapshot is not None:
350+
if (
351+
partition_row["last_updated_at"] is None
352+
or partition_row["last_updated_snapshot_id"] < entry_snapshot.timestamp_ms
353+
):
354+
partition_row["last_updated_at"] = entry_snapshot.timestamp_ms
355+
partition_row["last_updated_snapshot_id"] = entry_snapshot.snapshot_id
356+
357+
if entry.data_file.content == DataFileContent.DATA:
358+
partition_row["record_count"] += entry.data_file.record_count
359+
partition_row["file_count"] += 1
360+
partition_row["total_data_file_size_in_bytes"] += entry.data_file.file_size_in_bytes
361+
elif entry.data_file.content == DataFileContent.POSITION_DELETES:
362+
partition_row["position_delete_record_count"] += entry.data_file.record_count
363+
partition_row["position_delete_file_count"] += 1
364+
elif entry.data_file.content == DataFileContent.EQUALITY_DELETES:
365+
partition_row["equality_delete_record_count"] += entry.data_file.record_count
366+
partition_row["equality_delete_file_count"] += 1
367+
else:
368+
raise ValueError(f"Unknown DataFileContent ({entry.data_file.content})")
369+
370+
return partitions_map
371+
375372
def _get_manifests_schema(self) -> "pa.Schema":
376373
import pyarrow as pa
377374

0 commit comments

Comments
 (0)