131131)
132132from pyiceberg .partitioning import PartitionField , PartitionFieldValue , PartitionKey , PartitionSpec , partition_record_value
133133from pyiceberg .schema import (
134- Accessor ,
135134 PartnerAccessor ,
136135 PreOrderSchemaVisitor ,
137136 Schema ,
@@ -1402,41 +1401,23 @@ def _field_id(self, field: pa.Field) -> int:
14021401
14031402def _get_column_projection_values (
14041403 file : DataFile , projected_schema : Schema , partition_spec : Optional [PartitionSpec ], file_project_field_ids : Set [int ]
1405- ) -> Tuple [ bool , Dict [str , Any ] ]:
1404+ ) -> Dict [int , Any ]:
14061405 """Apply Column Projection rules to File Schema."""
14071406 project_schema_diff = projected_schema .field_ids .difference (file_project_field_ids )
1408- should_project_columns = len (project_schema_diff ) > 0
1409- projected_missing_fields : Dict [ str , Any ] = {}
1407+ if len (project_schema_diff ) == 0 or partition_spec is None :
1408+ return EMPTY_DICT
14101409
1411- if not should_project_columns :
1412- return False , {}
1413-
1414- partition_schema : StructType
1415- accessors : Dict [int , Accessor ]
1416-
1417- if partition_spec is not None :
1418- partition_schema = partition_spec .partition_type (projected_schema )
1419- accessors = build_position_accessors (partition_schema )
1420- else :
1421- return False , {}
1410+ partition_schema = partition_spec .partition_type (projected_schema )
1411+ accessors = build_position_accessors (partition_schema )
14221412
1413+ projected_missing_fields = {}
14231414 for field_id in project_schema_diff :
14241415 for partition_field in partition_spec .fields_by_source_id (field_id ):
14251416 if isinstance (partition_field .transform , IdentityTransform ):
1426- accessor = accessors .get (partition_field .field_id )
1417+ if partition_value := accessors [partition_field .field_id ].get (file .partition ):
1418+ projected_missing_fields [field_id ] = partition_value
14271419
1428- if accessor is None :
1429- continue
1430-
1431- # The partition field may not exist in the partition record of the data file.
1432- # This can happen when new partition fields are introduced after the file was written.
1433- try :
1434- if partition_value := accessor .get (file .partition ):
1435- projected_missing_fields [partition_field .name ] = partition_value
1436- except IndexError :
1437- continue
1438-
1439- return True , projected_missing_fields
1420+ return projected_missing_fields
14401421
14411422
14421423def _task_to_record_batches (
@@ -1460,18 +1441,19 @@ def _task_to_record_batches(
14601441 # the table format version.
14611442 file_schema = pyarrow_to_schema (physical_schema , name_mapping , downcast_ns_timestamp_to_us = True )
14621443
1444+ # Apply column projection rules: https://iceberg.apache.org/spec/#column-projection
1445+ projected_missing_fields = _get_column_projection_values (
1446+ task .file , projected_schema , partition_spec , file_schema .field_ids
1447+ )
1448+
14631449 pyarrow_filter = None
14641450 if bound_row_filter is not AlwaysTrue ():
1465- translated_row_filter = translate_column_names (bound_row_filter , file_schema , case_sensitive = case_sensitive )
1451+ translated_row_filter = translate_column_names (
1452+ bound_row_filter , file_schema , case_sensitive = case_sensitive , projected_field_values = projected_missing_fields
1453+ )
14661454 bound_file_filter = bind (file_schema , translated_row_filter , case_sensitive = case_sensitive )
14671455 pyarrow_filter = expression_to_pyarrow (bound_file_filter )
14681456
1469- # Apply column projection rules
1470- # https://iceberg.apache.org/spec/#column-projection
1471- should_project_columns , projected_missing_fields = _get_column_projection_values (
1472- task .file , projected_schema , partition_spec , file_schema .field_ids
1473- )
1474-
14751457 file_project_schema = prune_columns (file_schema , projected_field_ids , select_full_types = False )
14761458
14771459 fragment_scanner = ds .Scanner .from_fragment (
@@ -1510,23 +1492,14 @@ def _task_to_record_batches(
15101492
15111493 current_batch = table .combine_chunks ().to_batches ()[0 ]
15121494
1513- result_batch = _to_requested_schema (
1495+ yield _to_requested_schema (
15141496 projected_schema ,
15151497 file_project_schema ,
15161498 current_batch ,
15171499 downcast_ns_timestamp_to_us = True ,
1500+ projected_missing_fields = projected_missing_fields ,
15181501 )
15191502
1520- # Inject projected column values if available
1521- if should_project_columns :
1522- for name , value in projected_missing_fields .items ():
1523- index = result_batch .schema .get_field_index (name )
1524- if index != - 1 :
1525- arr = pa .repeat (value .value (), result_batch .num_rows )
1526- result_batch = result_batch .set_column (index , name , arr )
1527-
1528- yield result_batch
1529-
15301503
15311504def _read_all_delete_files (io : FileIO , tasks : Iterable [FileScanTask ]) -> Dict [str , List [ChunkedArray ]]:
15321505 deletes_per_file : Dict [str , List [ChunkedArray ]] = {}
@@ -1694,7 +1667,7 @@ def _record_batches_from_scan_tasks_and_deletes(
16941667 deletes_per_file .get (task .file .file_path ),
16951668 self ._case_sensitive ,
16961669 self ._table_metadata .name_mapping (),
1697- self ._table_metadata .spec ( ),
1670+ self ._table_metadata .specs (). get ( task . file . spec_id ),
16981671 )
16991672 for batch in batches :
17001673 if self ._limit is not None :
@@ -1712,12 +1685,15 @@ def _to_requested_schema(
17121685 batch : pa .RecordBatch ,
17131686 downcast_ns_timestamp_to_us : bool = False ,
17141687 include_field_ids : bool = False ,
1688+ projected_missing_fields : Dict [int , Any ] = EMPTY_DICT ,
17151689) -> pa .RecordBatch :
17161690 # We could reuse some of these visitors
17171691 struct_array = visit_with_partner (
17181692 requested_schema ,
17191693 batch ,
1720- ArrowProjectionVisitor (file_schema , downcast_ns_timestamp_to_us , include_field_ids ),
1694+ ArrowProjectionVisitor (
1695+ file_schema , downcast_ns_timestamp_to_us , include_field_ids , projected_missing_fields = projected_missing_fields
1696+ ),
17211697 ArrowAccessor (file_schema ),
17221698 )
17231699 return pa .RecordBatch .from_struct_array (struct_array )
@@ -1728,18 +1704,21 @@ class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra
17281704 _include_field_ids : bool
17291705 _downcast_ns_timestamp_to_us : bool
17301706 _use_large_types : Optional [bool ]
1707+ _projected_missing_fields : Dict [int , Any ]
17311708
17321709 def __init__ (
17331710 self ,
17341711 file_schema : Schema ,
17351712 downcast_ns_timestamp_to_us : bool = False ,
17361713 include_field_ids : bool = False ,
17371714 use_large_types : Optional [bool ] = None ,
1715+ projected_missing_fields : Dict [int , Any ] = EMPTY_DICT ,
17381716 ) -> None :
17391717 self ._file_schema = file_schema
17401718 self ._include_field_ids = include_field_ids
17411719 self ._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
17421720 self ._use_large_types = use_large_types
1721+ self ._projected_missing_fields = projected_missing_fields
17431722
17441723 if use_large_types is not None :
17451724 deprecation_message (
@@ -1819,10 +1798,12 @@ def struct(
18191798 elif field .optional or field .initial_default is not None :
18201799 # When an optional field is added, or when a required field with a non-null initial default is added
18211800 arrow_type = schema_to_pyarrow (field .field_type , include_field_ids = self ._include_field_ids )
1822- if field .initial_default is None :
1801+ if projected_value := self ._projected_missing_fields .get (field .field_id ):
1802+ field_arrays .append (pa .repeat (pa .scalar (projected_value , type = arrow_type ), len (struct_array )))
1803+ elif field .initial_default is None :
18231804 field_arrays .append (pa .nulls (len (struct_array ), type = arrow_type ))
18241805 else :
1825- field_arrays .append (pa .repeat (field .initial_default , len (struct_array )))
1806+ field_arrays .append (pa .repeat (pa . scalar ( field .initial_default , type = arrow_type ) , len (struct_array )))
18261807 fields .append (self ._construct_field (field , arrow_type ))
18271808 else :
18281809 raise ResolveError (f"Field is required, and could not be found in the file: { field } " )
0 commit comments