2020from typing import TYPE_CHECKING , Any , Dict , Iterator , List , Optional , Set , Tuple
2121
2222from pyiceberg .conversions import from_bytes
23- from pyiceberg .manifest import DataFile , DataFileContent , ManifestContent , PartitionFieldSummary
23+ from pyiceberg .manifest import DataFile , DataFileContent , ManifestContent , ManifestFile , PartitionFieldSummary
2424from pyiceberg .partitioning import PartitionSpec
2525from pyiceberg .table .snapshots import Snapshot , ancestors_of
2626from pyiceberg .types import PrimitiveType
@@ -523,7 +523,73 @@ def history(self) -> "pa.Table":
523523
524524 return pa .Table .from_pylist (history , schema = history_schema )
525525
526- def _files (self , snapshot_id : Optional [int ] = None , data_file_filter : Optional [Set [DataFileContent ]] = None ) -> "pa.Table" :
526+ def _get_files_from_manifest (
527+ self , manifest_list : ManifestFile , data_file_filter : Optional [Set [DataFileContent ]] = None
528+ ) -> "pa.Table" :
529+ import pyarrow as pa
530+
531+ files : list [dict [str , Any ]] = []
532+ schema = self .tbl .metadata .schema ()
533+ io = self .tbl .io
534+
535+ for manifest_entry in manifest_list .fetch_manifest_entry (io ):
536+ data_file = manifest_entry .data_file
537+ if data_file_filter and data_file .content not in data_file_filter :
538+ continue
539+ column_sizes = data_file .column_sizes or {}
540+ value_counts = data_file .value_counts or {}
541+ null_value_counts = data_file .null_value_counts or {}
542+ nan_value_counts = data_file .nan_value_counts or {}
543+ lower_bounds = data_file .lower_bounds or {}
544+ upper_bounds = data_file .upper_bounds or {}
545+ readable_metrics = {
546+ schema .find_column_name (field .field_id ): {
547+ "column_size" : column_sizes .get (field .field_id ),
548+ "value_count" : value_counts .get (field .field_id ),
549+ "null_value_count" : null_value_counts .get (field .field_id ),
550+ "nan_value_count" : nan_value_counts .get (field .field_id ),
551+ "lower_bound" : from_bytes (field .field_type , lower_bound )
552+ if (lower_bound := lower_bounds .get (field .field_id ))
553+ else None ,
554+ "upper_bound" : from_bytes (field .field_type , upper_bound )
555+ if (upper_bound := upper_bounds .get (field .field_id ))
556+ else None ,
557+ }
558+ for field in self .tbl .metadata .schema ().fields
559+ }
560+ partition = data_file .partition
561+ partition_record_dict = {
562+ field .name : partition [pos ]
563+ for pos , field in enumerate (self .tbl .metadata .specs ()[manifest_list .partition_spec_id ].fields )
564+ }
565+ files .append (
566+ {
567+ "content" : data_file .content ,
568+ "file_path" : data_file .file_path ,
569+ "file_format" : data_file .file_format ,
570+ "spec_id" : data_file .spec_id ,
571+ "partition" : partition_record_dict ,
572+ "record_count" : data_file .record_count ,
573+ "file_size_in_bytes" : data_file .file_size_in_bytes ,
574+ "column_sizes" : dict (data_file .column_sizes ) if data_file .column_sizes is not None else None ,
575+ "value_counts" : dict (data_file .value_counts ) if data_file .value_counts is not None else None ,
576+ "null_value_counts" : dict (data_file .null_value_counts ) if data_file .null_value_counts is not None else None ,
577+ "nan_value_counts" : dict (data_file .nan_value_counts ) if data_file .nan_value_counts is not None else None ,
578+ "lower_bounds" : dict (data_file .lower_bounds ) if data_file .lower_bounds is not None else None ,
579+ "upper_bounds" : dict (data_file .upper_bounds ) if data_file .upper_bounds is not None else None ,
580+ "key_metadata" : data_file .key_metadata ,
581+ "split_offsets" : data_file .split_offsets ,
582+ "equality_ids" : data_file .equality_ids ,
583+ "sort_order_id" : data_file .sort_order_id ,
584+ "readable_metrics" : readable_metrics ,
585+ }
586+ )
587+ return pa .Table .from_pylist (
588+ files ,
589+ schema = self ._get_files_schema (),
590+ )
591+
592+ def _get_files_schema (self ) -> "pa.Schema" :
527593 import pyarrow as pa
528594
529595 from pyiceberg .io .pyarrow import schema_to_pyarrow
@@ -544,6 +610,9 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
544610 ]
545611 )
546612
613+ partition_record = self .tbl .metadata .specs_struct ()
614+ pa_record_struct = schema_to_pyarrow (partition_record )
615+
547616 for field in self .tbl .metadata .schema ().fields :
548617 readable_metrics_struct .append (
549618 pa .field (schema .find_column_name (field .field_id ), _readable_metrics_struct (field .field_type ), nullable = False )
@@ -555,6 +624,7 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
555624 pa .field ("file_path" , pa .string (), nullable = False ),
556625 pa .field ("file_format" , pa .dictionary (pa .int32 (), pa .string ()), nullable = False ),
557626 pa .field ("spec_id" , pa .int32 (), nullable = False ),
627+ pa .field ("partition" , pa_record_struct , nullable = False ),
558628 pa .field ("record_count" , pa .int64 (), nullable = False ),
559629 pa .field ("file_size_in_bytes" , pa .int64 (), nullable = False ),
560630 pa .field ("column_sizes" , pa .map_ (pa .int32 (), pa .int64 ()), nullable = True ),
@@ -570,71 +640,21 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
570640 pa .field ("readable_metrics" , pa .struct (readable_metrics_struct ), nullable = True ),
571641 ]
572642 )
643+ return files_schema
573644
574- files : list [dict [str , Any ]] = []
645+ def _files (self , snapshot_id : Optional [int ] = None , data_file_filter : Optional [Set [DataFileContent ]] = None ) -> "pa.Table" :
646+ import pyarrow as pa
575647
576648 if not snapshot_id and not self .tbl .metadata .current_snapshot ():
577- return pa .Table .from_pylist (
578- files ,
579- schema = files_schema ,
580- )
581- snapshot = self ._get_snapshot (snapshot_id )
649+ return self ._get_files_schema ().empty_table ()
582650
651+ snapshot = self ._get_snapshot (snapshot_id )
583652 io = self .tbl .io
653+ files_table : list [pa .Table ] = []
584654 for manifest_list in snapshot .manifests (io ):
585- for manifest_entry in manifest_list .fetch_manifest_entry (io ):
586- data_file = manifest_entry .data_file
587- if data_file_filter and data_file .content not in data_file_filter :
588- continue
589- column_sizes = data_file .column_sizes or {}
590- value_counts = data_file .value_counts or {}
591- null_value_counts = data_file .null_value_counts or {}
592- nan_value_counts = data_file .nan_value_counts or {}
593- lower_bounds = data_file .lower_bounds or {}
594- upper_bounds = data_file .upper_bounds or {}
595- readable_metrics = {
596- schema .find_column_name (field .field_id ): {
597- "column_size" : column_sizes .get (field .field_id ),
598- "value_count" : value_counts .get (field .field_id ),
599- "null_value_count" : null_value_counts .get (field .field_id ),
600- "nan_value_count" : nan_value_counts .get (field .field_id ),
601- "lower_bound" : from_bytes (field .field_type , lower_bound )
602- if (lower_bound := lower_bounds .get (field .field_id ))
603- else None ,
604- "upper_bound" : from_bytes (field .field_type , upper_bound )
605- if (upper_bound := upper_bounds .get (field .field_id ))
606- else None ,
607- }
608- for field in self .tbl .metadata .schema ().fields
609- }
610- files .append (
611- {
612- "content" : data_file .content ,
613- "file_path" : data_file .file_path ,
614- "file_format" : data_file .file_format ,
615- "spec_id" : data_file .spec_id ,
616- "record_count" : data_file .record_count ,
617- "file_size_in_bytes" : data_file .file_size_in_bytes ,
618- "column_sizes" : dict (data_file .column_sizes ) if data_file .column_sizes is not None else None ,
619- "value_counts" : dict (data_file .value_counts ) if data_file .value_counts is not None else None ,
620- "null_value_counts" : dict (data_file .null_value_counts )
621- if data_file .null_value_counts is not None
622- else None ,
623- "nan_value_counts" : dict (data_file .nan_value_counts ) if data_file .nan_value_counts is not None else None ,
624- "lower_bounds" : dict (data_file .lower_bounds ) if data_file .lower_bounds is not None else None ,
625- "upper_bounds" : dict (data_file .upper_bounds ) if data_file .upper_bounds is not None else None ,
626- "key_metadata" : data_file .key_metadata ,
627- "split_offsets" : data_file .split_offsets ,
628- "equality_ids" : data_file .equality_ids ,
629- "sort_order_id" : data_file .sort_order_id ,
630- "readable_metrics" : readable_metrics ,
631- }
632- )
655+ files_table .append (self ._get_files_from_manifest (manifest_list , data_file_filter ))
633656
634- return pa .Table .from_pylist (
635- files ,
636- schema = files_schema ,
637- )
657+ return pa .concat_tables (files_table )
638658
639659 def files (self , snapshot_id : Optional [int ] = None ) -> "pa.Table" :
640660 return self ._files (snapshot_id )
@@ -657,3 +677,30 @@ def all_manifests(self) -> "pa.Table":
657677 lambda args : self ._generate_manifests_table (* args ), [(snapshot , True ) for snapshot in snapshots ]
658678 )
659679 return pa .concat_tables (manifests_by_snapshots )
680+
681+ def _all_files (self , data_file_filter : Optional [Set [DataFileContent ]] = None ) -> "pa.Table" :
682+ import pyarrow as pa
683+
684+ snapshots = self .tbl .snapshots ()
685+ if not snapshots :
686+ return pa .Table .from_pylist ([], schema = self ._get_files_schema ())
687+
688+ executor = ExecutorFactory .get_or_create ()
689+ manifest_lists = executor .map (lambda snapshot : snapshot .manifests (self .tbl .io ), snapshots )
690+
691+ unique_manifests = {(manifest .manifest_path , manifest ) for manifest_list in manifest_lists for manifest in manifest_list }
692+
693+ file_lists = executor .map (
694+ lambda args : self ._get_files_from_manifest (* args ), [(manifest , data_file_filter ) for _ , manifest in unique_manifests ]
695+ )
696+
697+ return pa .concat_tables (file_lists )
698+
699+ def all_files (self ) -> "pa.Table" :
700+ return self ._all_files ()
701+
702+ def all_data_files (self ) -> "pa.Table" :
703+ return self ._all_files ({DataFileContent .DATA })
704+
705+ def all_delete_files (self ) -> "pa.Table" :
706+ return self ._all_files ({DataFileContent .POSITION_DELETES , DataFileContent .EQUALITY_DELETES })
0 commit comments