@@ -2568,7 +2568,16 @@ def write_orc(task: WriteTask) -> DataFile:
25682568 fo = io .new_output (file_path )
25692569 with fo .create (overwrite = True ) as fos :
25702570 orc .write_table (arrow_table , fos )
2571- # You may want to add statistics extraction here if needed
2571+
2572+ # Extract statistics from the written ORC file
2573+ orc_file = orc .ORCFile (fo .to_input_file ().open ())
2574+ statistics = data_file_statistics_from_orc_metadata (
2575+ orc_metadata = orc_file ,
2576+ stats_columns = compute_statistics_plan (file_schema , table_metadata .properties ),
2577+ orc_column_mapping = orc_column_to_id_mapping (file_schema ),
2578+ arrow_table = arrow_table ,
2579+ )
2580+
25722581 data_file = DataFile .from_args (
25732582 content = DataFileContent .DATA ,
25742583 file_path = file_path ,
@@ -2579,7 +2588,7 @@ def write_orc(task: WriteTask) -> DataFile:
25792588 spec_id = table_metadata .default_spec_id ,
25802589 equality_ids = None ,
25812590 key_metadata = None ,
2582- # statistics=... (if you implement ORC stats)
2591+ ** statistics . to_serialized_dict (),
25832592 )
25842593 return data_file
25852594
@@ -2877,3 +2886,180 @@ def _get_field_from_arrow_table(arrow_table: pa.Table, field_path: str) -> pa.Ar
28772886 field_array = arrow_table [path_parts [0 ]]
28782887 # Navigate into the struct using the remaining path parts
28792888 return pc .struct_field (field_array , path_parts [1 :])
2889+
2890+
2891+ def data_file_statistics_from_orc_metadata (
2892+ orc_metadata : "orc.ORCFile" ,
2893+ stats_columns : Dict [int , StatisticsCollector ],
2894+ orc_column_mapping : Dict [str , int ],
2895+ arrow_table : Optional [pa .Table ] = None ,
2896+ ) -> DataFileStatistics :
2897+ """
2898+ Compute and return DataFileStatistics that includes the following.
2899+
2900+ - record_count
2901+ - column_sizes
2902+ - value_counts
2903+ - null_value_counts
2904+ - nan_value_counts
2905+ - column_aggregates
2906+ - split_offsets
2907+
2908+ Args:
2909+ orc_metadata (pyarrow.orc.ORCFile): A pyarrow ORC file object.
2910+ stats_columns (Dict[int, StatisticsCollector]): The statistics gathering plan. It is required to
2911+ set the mode for column metrics collection
2912+ orc_column_mapping (Dict[str, int]): The mapping of the ORC column name to the field ID
2913+ arrow_table (pa.Table, optional): The original arrow table that was written, used for row count
2914+ """
2915+ column_sizes : Dict [int , int ] = {}
2916+ value_counts : Dict [int , int ] = {}
2917+ split_offsets : List [int ] = []
2918+
2919+ null_value_counts : Dict [int , int ] = {}
2920+ nan_value_counts : Dict [int , int ] = {}
2921+
2922+ col_aggs = {}
2923+
2924+ invalidate_col : Set [int ] = set ()
2925+
2926+ # Get row count from the arrow table if available, otherwise use a default
2927+ if arrow_table is not None :
2928+ record_count = arrow_table .num_rows
2929+ else :
2930+ # Fallback: ORC doesn't provide num_rows like Parquet, so we'll use a default
2931+ record_count = 0
2932+
2933+ # ORC files have a single stripe structure, unlike Parquet's row groups
2934+ # We'll process the file-level statistics
2935+ for col_name , field_id in orc_column_mapping .items ():
2936+ stats_col = stats_columns [field_id ]
2937+
2938+ # Initialize column sizes (ORC doesn't provide per-column size like Parquet)
2939+ column_sizes [field_id ] = 0 # ORC doesn't provide detailed column size info
2940+
2941+ if stats_col .mode == MetricsMode (MetricModeTypes .NONE ):
2942+ continue
2943+
2944+ # Get column statistics from ORC metadata
2945+ try :
2946+ # ORC provides file-level statistics
2947+ # Note: ORC statistics are more limited than Parquet
2948+ # We'll use the available statistics and set defaults for missing ones
2949+
2950+ # For ORC, we'll use the total number of values as value count
2951+ # This is a simplification since ORC doesn't provide per-column value counts like Parquet
2952+ value_counts [field_id ] = record_count
2953+
2954+ # ORC doesn't provide null counts in the same way as Parquet
2955+ # We'll set this to 0 for now, as ORC doesn't expose null counts easily
2956+ null_value_counts [field_id ] = 0
2957+
2958+ if stats_col .mode == MetricsMode (MetricModeTypes .COUNTS ):
2959+ continue
2960+
2961+ if field_id not in col_aggs :
2962+ col_aggs [field_id ] = StatsAggregator (
2963+ stats_col .iceberg_type , _primitive_to_physical (stats_col .iceberg_type ), stats_col .mode .length
2964+ )
2965+
2966+ # ORC doesn't provide min/max statistics in the same way as Parquet
2967+ # We'll skip the min/max aggregation for ORC files
2968+ # This is a limitation of ORC's metadata structure compared to Parquet
2969+
2970+ except Exception as e :
2971+ invalidate_col .add (field_id )
2972+ logger .warning (f"Failed to extract ORC statistics for column { col_name } : { e } " )
2973+
2974+ # ORC doesn't have split offsets like Parquet
2975+ # We'll use an empty list or a single offset at 0
2976+ split_offsets = [0 ] if record_count > 0 else []
2977+
2978+ # Clean up invalid columns
2979+ for field_id in invalidate_col :
2980+ col_aggs .pop (field_id , None )
2981+ null_value_counts .pop (field_id , None )
2982+
2983+ return DataFileStatistics (
2984+ record_count = record_count ,
2985+ column_sizes = column_sizes ,
2986+ value_counts = value_counts ,
2987+ null_value_counts = null_value_counts ,
2988+ nan_value_counts = nan_value_counts ,
2989+ column_aggregates = col_aggs ,
2990+ split_offsets = split_offsets ,
2991+ )
2992+
2993+
2994+ class ID2OrcColumn :
2995+ field_id : int
2996+ orc_column : str
2997+
2998+ def __init__ (self , field_id : int , orc_column : str ):
2999+ self .field_id = field_id
3000+ self .orc_column = orc_column
3001+
3002+
3003+ class ID2OrcColumnVisitor (PreOrderSchemaVisitor [List [ID2OrcColumn ]]):
3004+ _field_id : int = 0
3005+ _path : List [str ]
3006+
3007+ def __init__ (self ) -> None :
3008+ self ._path = []
3009+
3010+ def schema (self , schema : Schema , struct_result : Callable [[], List [ID2OrcColumn ]]) -> List [ID2OrcColumn ]:
3011+ return struct_result ()
3012+
3013+ def struct (self , struct : StructType , field_results : List [Callable [[], List [ID2OrcColumn ]]]) -> List [ID2OrcColumn ]:
3014+ return list (itertools .chain (* [result () for result in field_results ]))
3015+
3016+ def field (self , field : NestedField , field_result : Callable [[], List [ID2OrcColumn ]]) -> List [ID2OrcColumn ]:
3017+ self ._field_id = field .field_id
3018+ self ._path .append (field .name )
3019+ result = field_result ()
3020+ self ._path .pop ()
3021+ return result
3022+
3023+ def list (self , list_type : ListType , element_result : Callable [[], List [ID2OrcColumn ]]) -> List [ID2OrcColumn ]:
3024+ self ._field_id = list_type .element_id
3025+ self ._path .append ("list.element" )
3026+ result = element_result ()
3027+ self ._path .pop ()
3028+ return result
3029+
3030+ def map (
3031+ self ,
3032+ map_type : MapType ,
3033+ key_result : Callable [[], List [ID2OrcColumn ]],
3034+ value_result : Callable [[], List [ID2OrcColumn ]],
3035+ ) -> List [ID2OrcColumn ]:
3036+ self ._field_id = map_type .key_id
3037+ self ._path .append ("key_value.key" )
3038+ k = key_result ()
3039+ self ._path .pop ()
3040+ self ._field_id = map_type .value_id
3041+ self ._path .append ("key_value.value" )
3042+ v = value_result ()
3043+ self ._path .pop ()
3044+ return k + v
3045+
3046+ def primitive (self , primitive : PrimitiveType ) -> List [ID2OrcColumn ]:
3047+ return [ID2OrcColumn (field_id = self ._field_id , orc_column = "." .join (self ._path ))]
3048+
3049+
3050+ def orc_column_to_id_mapping (
3051+ schema : Schema ,
3052+ ) -> Dict [str , int ]:
3053+ """
3054+ Create a mapping from ORC column names to Iceberg field IDs.
3055+
3056+ Args:
3057+ schema: The Iceberg schema
3058+
3059+ Returns:
3060+ A dictionary mapping ORC column names to field IDs
3061+ """
3062+ result : Dict [str , int ] = {}
3063+ for pair in pre_order_visit (schema , ID2OrcColumnVisitor ()):
3064+ result [pair .orc_column ] = pair .field_id
3065+ return result
0 commit comments