@@ -1342,9 +1342,8 @@ def _get_column_projection_values(
13421342def _task_to_record_batches (
13431343 fs : FileSystem ,
13441344 task : FileScanTask ,
1345- bound_row_filter : BooleanExpression ,
1345+ schema : Schema ,
13461346 projected_schema : Schema ,
1347- projected_field_ids : Set [int ],
13481347 positional_deletes : Optional [List [ChunkedArray ]],
13491348 case_sensitive : bool ,
13501349 name_mapping : Optional [NameMapping ] = None ,
@@ -1363,8 +1362,8 @@ def _task_to_record_batches(
13631362 file_schema = pyarrow_to_schema (physical_schema , name_mapping , downcast_ns_timestamp_to_us = True )
13641363
13651364 pyarrow_filter = None
1366- if bound_row_filter is not AlwaysTrue ():
1367- translated_row_filter = translate_column_names (bound_row_filter , file_schema , case_sensitive = case_sensitive )
1365+ if task . residual is not AlwaysTrue ():
1366+ translated_row_filter = translate_column_names (task . residual , file_schema , case_sensitive = case_sensitive )
13681367 bound_file_filter = bind (file_schema , translated_row_filter , case_sensitive = case_sensitive )
13691368 pyarrow_filter = expression_to_pyarrow (bound_file_filter )
13701369
@@ -1374,7 +1373,13 @@ def _task_to_record_batches(
13741373 task .file , projected_schema , partition_spec , file_schema .field_ids
13751374 )
13761375
1377- file_project_schema = prune_columns (file_schema , projected_field_ids , select_full_types = False )
1376+ file_project_schema = prune_columns (
1377+ file_schema ,
1378+ {
1379+ id for id in projected_schema .field_ids if not isinstance (projected_schema .find_type (id ), (MapType , ListType ))
1380+ }.union (extract_field_ids (task .residual )),
1381+ select_full_types = False ,
1382+ )
13781383
13791384 fragment_scanner = ds .Scanner .from_fragment (
13801385 fragment = fragment ,
@@ -1474,7 +1479,7 @@ class ArrowScan:
14741479 _table_metadata : TableMetadata
14751480 _io : FileIO
14761481 _projected_schema : Schema
1477- _bound_row_filter : BooleanExpression
1482+ _bound_row_filter : Optional [ BooleanExpression ]
14781483 _case_sensitive : bool
14791484 _limit : Optional [int ]
14801485 """Scan the Iceberg Table and create an Arrow construct.
@@ -1493,14 +1498,18 @@ def __init__(
14931498 table_metadata : TableMetadata ,
14941499 io : FileIO ,
14951500 projected_schema : Schema ,
1496- row_filter : BooleanExpression ,
1501+ row_filter : Optional [ BooleanExpression ] = None ,
14971502 case_sensitive : bool = True ,
14981503 limit : Optional [int ] = None ,
14991504 ) -> None :
15001505 self ._table_metadata = table_metadata
15011506 self ._io = io
15021507 self ._projected_schema = projected_schema
1503- self ._bound_row_filter = bind (table_metadata .schema (), row_filter , case_sensitive = case_sensitive )
1508+ # TBD: Should we deprecate the `row_filter` argument?
1509+ if row_filter is not None :
1510+ self ._bound_row_filter = bind (table_metadata .schema (), row_filter , case_sensitive = case_sensitive )
1511+ else :
1512+ self ._bound_row_filter = None
15041513 self ._case_sensitive = case_sensitive
15051514 self ._limit = limit
15061515
@@ -1512,15 +1521,6 @@ def _use_large_types(self) -> bool:
15121521 """
15131522 return property_as_bool (self ._io .properties , PYARROW_USE_LARGE_TYPES_ON_READ , True )
15141523
1515- @property
1516- def _projected_field_ids (self ) -> Set [int ]:
1517- """Set of field IDs that should be projected from the data files."""
1518- return {
1519- id
1520- for id in self ._projected_schema .field_ids
1521- if not isinstance (self ._projected_schema .find_type (id ), (MapType , ListType ))
1522- }.union (extract_field_ids (self ._bound_row_filter ))
1523-
15241524 def to_table (self , tasks : Iterable [FileScanTask ]) -> pa .Table :
15251525 """Scan the Iceberg table and return a pa.Table.
15261526
@@ -1541,7 +1541,10 @@ def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
15411541 deletes_per_file = _read_all_delete_files (self ._io , tasks )
15421542 executor = ExecutorFactory .get_or_create ()
15431543
1544- def _table_from_scan_task (task : FileScanTask ) -> pa .Table :
1544+ if self ._bound_row_filter is not None :
1545+ tasks = [task .set_residual (expr = self ._bound_row_filter ) for task in tasks ]
1546+
1547+ def _table_from_scan_task (task : FileScanTask ) -> Optional [pa .Table ]:
15451548 batches = list (self ._record_batches_from_scan_tasks_and_deletes ([task ], deletes_per_file ))
15461549 if len (batches ) > 0 :
15471550 return pa .Table .from_batches (batches )
@@ -1601,6 +1604,9 @@ def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.Record
16011604 ResolveError: When a required field cannot be found in the file
16021605 ValueError: When a field type in the file cannot be projected to the schema type
16031606 """
1607+ if self ._bound_row_filter is not None :
1608+ tasks = [task .set_residual (expr = self ._bound_row_filter ) for task in tasks ]
1609+
16041610 deletes_per_file = _read_all_delete_files (self ._io , tasks )
16051611 return self ._record_batches_from_scan_tasks_and_deletes (tasks , deletes_per_file )
16061612
@@ -1614,9 +1620,8 @@ def _record_batches_from_scan_tasks_and_deletes(
16141620 batches = _task_to_record_batches (
16151621 _fs_from_file_path (self ._io , task .file .file_path ),
16161622 task ,
1617- self ._bound_row_filter ,
1623+ self ._table_metadata . schema () ,
16181624 self ._projected_schema ,
1619- self ._projected_field_ids ,
16201625 deletes_per_file .get (task .file .file_path ),
16211626 self ._case_sensitive ,
16221627 self ._table_metadata .name_mapping (),
0 commit comments