131131)
132132from pyiceberg .partitioning import PartitionField , PartitionFieldValue , PartitionKey , PartitionSpec , partition_record_value
133133from pyiceberg .schema import (
134- Accessor ,
135134 PartnerAccessor ,
136135 PreOrderSchemaVisitor ,
137136 Schema ,
@@ -1402,41 +1401,23 @@ def _field_id(self, field: pa.Field) -> int:
14021401
14031402def _get_column_projection_values (
14041403 file : DataFile , projected_schema : Schema , partition_spec : Optional [PartitionSpec ], file_project_field_ids : Set [int ]
1405- ) -> Tuple [ bool , Dict [str , Any ] ]:
1404+ ) -> Dict [int , Any ]:
14061405 """Apply Column Projection rules to File Schema."""
14071406 project_schema_diff = projected_schema .field_ids .difference (file_project_field_ids )
1408- should_project_columns = len (project_schema_diff ) > 0
1409- projected_missing_fields : Dict [ str , Any ] = {}
1407+ if len (project_schema_diff ) == 0 or partition_spec is None :
1408+ return EMPTY_DICT
14101409
1411- if not should_project_columns :
1412- return False , {}
1413-
1414- partition_schema : StructType
1415- accessors : Dict [int , Accessor ]
1416-
1417- if partition_spec is not None :
1418- partition_schema = partition_spec .partition_type (projected_schema )
1419- accessors = build_position_accessors (partition_schema )
1420- else :
1421- return False , {}
1410+ partition_schema = partition_spec .partition_type (projected_schema )
1411+ accessors = build_position_accessors (partition_schema )
14221412
1413+ projected_missing_fields = {}
14231414 for field_id in project_schema_diff :
14241415 for partition_field in partition_spec .fields_by_source_id (field_id ):
14251416 if isinstance (partition_field .transform , IdentityTransform ):
1426- accessor = accessors .get (partition_field .field_id )
1427-
1428- if accessor is None :
1429- continue
1417+ if partition_value := accessors [partition_field .field_id ].get (file .partition ):
1418+ projected_missing_fields [field_id ] = partition_value
14301419
1431- # The partition field may not exist in the partition record of the data file.
1432- # This can happen when new partition fields are introduced after the file was written.
1433- try :
1434- if partition_value := accessor .get (file .partition ):
1435- projected_missing_fields [partition_field .name ] = partition_value
1436- except IndexError :
1437- continue
1438-
1439- return True , projected_missing_fields
1420+ return projected_missing_fields
14401421
14411422
14421423def _task_to_record_batches (
@@ -1460,9 +1441,8 @@ def _task_to_record_batches(
14601441 # the table format version.
14611442 file_schema = pyarrow_to_schema (physical_schema , name_mapping , downcast_ns_timestamp_to_us = True )
14621443
1463- # Apply column projection rules
1464- # https://iceberg.apache.org/spec/#column-projection
1465- should_project_columns , projected_missing_fields = _get_column_projection_values (
1444+ # Apply column projection rules: https://iceberg.apache.org/spec/#column-projection
1445+ projected_missing_fields = _get_column_projection_values (
14661446 task .file , projected_schema , partition_spec , file_schema .field_ids
14671447 )
14681448
@@ -1517,16 +1497,9 @@ def _task_to_record_batches(
15171497 file_project_schema ,
15181498 current_batch ,
15191499 downcast_ns_timestamp_to_us = True ,
1500+ projected_missing_fields = projected_missing_fields ,
15201501 )
15211502
1522- # Inject projected column values if available
1523- if should_project_columns :
1524- for name , value in projected_missing_fields .items ():
1525- index = result_batch .schema .get_field_index (name )
1526- if index != - 1 :
1527- arr = pa .repeat (value , result_batch .num_rows )
1528- result_batch = result_batch .set_column (index , name , arr )
1529-
15301503 yield result_batch
15311504
15321505
@@ -1696,7 +1669,7 @@ def _record_batches_from_scan_tasks_and_deletes(
16961669 deletes_per_file .get (task .file .file_path ),
16971670 self ._case_sensitive ,
16981671 self ._table_metadata .name_mapping (),
1699- self ._table_metadata .spec ( ),
1672+ self ._table_metadata .specs (). get ( task . file . spec_id ),
17001673 )
17011674 for batch in batches :
17021675 if self ._limit is not None :
@@ -1714,12 +1687,15 @@ def _to_requested_schema(
17141687 batch : pa .RecordBatch ,
17151688 downcast_ns_timestamp_to_us : bool = False ,
17161689 include_field_ids : bool = False ,
1690+ projected_missing_fields : Dict [int , Any ] = EMPTY_DICT ,
17171691) -> pa .RecordBatch :
17181692 # We could reuse some of these visitors
17191693 struct_array = visit_with_partner (
17201694 requested_schema ,
17211695 batch ,
1722- ArrowProjectionVisitor (file_schema , downcast_ns_timestamp_to_us , include_field_ids ),
1696+ ArrowProjectionVisitor (
1697+ file_schema , downcast_ns_timestamp_to_us , include_field_ids , projected_missing_fields = projected_missing_fields
1698+ ),
17231699 ArrowAccessor (file_schema ),
17241700 )
17251701 return pa .RecordBatch .from_struct_array (struct_array )
@@ -1730,18 +1706,21 @@ class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra
17301706 _include_field_ids : bool
17311707 _downcast_ns_timestamp_to_us : bool
17321708 _use_large_types : Optional [bool ]
1709+ _projected_missing_fields : Dict [int , Any ]
17331710
17341711 def __init__ (
17351712 self ,
17361713 file_schema : Schema ,
17371714 downcast_ns_timestamp_to_us : bool = False ,
17381715 include_field_ids : bool = False ,
17391716 use_large_types : Optional [bool ] = None ,
1717+ projected_missing_fields : Dict [int , Any ] = EMPTY_DICT ,
17401718 ) -> None :
17411719 self ._file_schema = file_schema
17421720 self ._include_field_ids = include_field_ids
17431721 self ._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
17441722 self ._use_large_types = use_large_types
1723+ self ._projected_missing_fields = projected_missing_fields
17451724
17461725 if use_large_types is not None :
17471726 deprecation_message (
@@ -1821,7 +1800,9 @@ def struct(
18211800 elif field .optional or field .initial_default is not None :
18221801 # When an optional field is added, or when a required field with a non-null initial default is added
18231802 arrow_type = schema_to_pyarrow (field .field_type , include_field_ids = self ._include_field_ids )
1824- if field .initial_default is None :
1803+ if projected_value := self ._projected_missing_fields .get (field .field_id ):
1804+ field_arrays .append (pa .repeat (pa .scalar (projected_value , type = arrow_type ), len (struct_array )))
1805+ elif field .initial_default is None :
18251806 field_arrays .append (pa .nulls (len (struct_array ), type = arrow_type ))
18261807 else :
18271808 field_arrays .append (pa .repeat (field .initial_default , len (struct_array )))
0 commit comments