From 569d1b14ffabfa5c8d887d69d60e3203c1279603 Mon Sep 17 00:00:00 2001 From: Fokko Date: Wed, 12 Feb 2025 15:42:51 +0100 Subject: [PATCH 1/4] Apply residuals when reading a table --- pyiceberg/io/pyarrow.py | 45 ++++++++++++++++++++----------------- pyiceberg/table/__init__.py | 12 ++++++++-- tests/io/test_pyarrow.py | 3 ++- 3 files changed, 37 insertions(+), 23 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 33c81c33bf..72c64fa6cc 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1342,9 +1342,8 @@ def _get_column_projection_values( def _task_to_record_batches( fs: FileSystem, task: FileScanTask, - bound_row_filter: BooleanExpression, + schema: Schema, projected_schema: Schema, - projected_field_ids: Set[int], positional_deletes: Optional[List[ChunkedArray]], case_sensitive: bool, name_mapping: Optional[NameMapping] = None, @@ -1363,8 +1362,8 @@ def _task_to_record_batches( file_schema = pyarrow_to_schema(physical_schema, name_mapping, downcast_ns_timestamp_to_us=True) pyarrow_filter = None - if bound_row_filter is not AlwaysTrue(): - translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive) + if task.residual is not AlwaysTrue(): + translated_row_filter = translate_column_names(task.residual, file_schema, case_sensitive=case_sensitive) bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) pyarrow_filter = expression_to_pyarrow(bound_file_filter) @@ -1374,7 +1373,13 @@ def _task_to_record_batches( task.file, projected_schema, partition_spec, file_schema.field_ids ) - file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) + file_project_schema = prune_columns( + file_schema, + { + id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType)) + }.union(extract_field_ids(task.residual)), + select_full_types=False, + ) fragment_scanner = ds.Scanner.from_fragment( fragment=fragment, @@ -1474,7 +1479,7 @@ class ArrowScan: _table_metadata: TableMetadata _io: FileIO _projected_schema: Schema - _bound_row_filter: BooleanExpression + _bound_row_filter: Optional[BooleanExpression] _case_sensitive: bool _limit: Optional[int] """Scan the Iceberg Table and create an Arrow construct. @@ -1493,14 +1498,18 @@ def __init__( table_metadata: TableMetadata, io: FileIO, projected_schema: Schema, - row_filter: BooleanExpression, + row_filter: Optional[BooleanExpression] = None, case_sensitive: bool = True, limit: Optional[int] = None, ) -> None: self._table_metadata = table_metadata self._io = io self._projected_schema = projected_schema - self._bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive) + # TBD: Should we deprecate the `row_filter` argument? + if row_filter is not None: + self._bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive) + else: + self._bound_row_filter = None self._case_sensitive = case_sensitive self._limit = limit @@ -1512,15 +1521,6 @@ def _use_large_types(self) -> bool: """ return property_as_bool(self._io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True) - @property - def _projected_field_ids(self) -> Set[int]: - """Set of field IDs that should be projected from the data files.""" - return { - id - for id in self._projected_schema.field_ids - if not isinstance(self._projected_schema.find_type(id), (MapType, ListType)) - }.union(extract_field_ids(self._bound_row_filter)) - def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table: """Scan the Iceberg table and return a pa.Table. @@ -1541,7 +1541,10 @@ def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table: deletes_per_file = _read_all_delete_files(self._io, tasks) executor = ExecutorFactory.get_or_create() - def _table_from_scan_task(task: FileScanTask) -> pa.Table: + if self._bound_row_filter is not None: + tasks = [task.set_residual(expr=self._bound_row_filter) for task in tasks] + + def _table_from_scan_task(task: FileScanTask) -> Optional[pa.Table]: batches = list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file)) if len(batches) > 0: return pa.Table.from_batches(batches) @@ -1601,6 +1604,9 @@ def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.Record ResolveError: When a required field cannot be found in the file ValueError: When a field type in the file cannot be projected to the schema type """ + if self._bound_row_filter is not None: + tasks = [task.set_residual(expr=self._bound_row_filter) for task in tasks] + deletes_per_file = _read_all_delete_files(self._io, tasks) return self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file) @@ -1614,9 +1620,8 @@ def _record_batches_from_scan_tasks_and_deletes( batches = _task_to_record_batches( _fs_from_file_path(self._io, task.file.file_path), task, - self._bound_row_filter, + self._table_metadata.schema(), self._projected_schema, - self._projected_field_ids, deletes_per_file.get(task.file.file_path), self._case_sensitive, self._table_metadata.name_mapping(), diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 8186d6573c..8645709120 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1394,6 +1394,10 @@ def __init__( self.length = length or data_file.file_size_in_bytes self.residual = residual + def set_residual(self, expr: BooleanExpression) -> "FileScanTask": + self.residual = expr + return self + def _open_manifest( io: FileIO, @@ -1584,8 +1588,12 @@ def plan_files(self) -> Iterable[FileScanTask]: data_entry, positional_delete_entries, ), - residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for( - data_entry.data_file.partition + residual=bind( + self.table_metadata.schema(), + residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for( + data_entry.data_file.partition + ), + case_sensitive=self.case_sensitive, ), ) for data_entry in data_entries diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index e2be7872a9..cb3ac81b38 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -984,7 +984,8 @@ def project( partition={}, record_count=3, file_size_in_bytes=3, - ) + ), + residual=expr or AlwaysTrue(), ) for file in files ] From fafdb686cbeb0615c3895fccf31554a30821daa9 Mon Sep 17 00:00:00 2001 From: Fokko Date: Fri, 14 Feb 2025 12:07:24 +0100 Subject: [PATCH 2/4] Remove schema --- pyiceberg/io/pyarrow.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 72c64fa6cc..b77c0000c4 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1342,7 +1342,6 @@ def _get_column_projection_values( def _task_to_record_batches( fs: FileSystem, task: FileScanTask, - schema: Schema, projected_schema: Schema, positional_deletes: Optional[List[ChunkedArray]], case_sensitive: bool, @@ -1620,7 +1619,6 @@ def _record_batches_from_scan_tasks_and_deletes( batches = _task_to_record_batches( _fs_from_file_path(self._io, task.file.file_path), task, - self._table_metadata.schema(), self._projected_schema, deletes_per_file.get(task.file.file_path), self._case_sensitive, From ab5d27765f00769e141437613ccfcfa581ea93cf Mon Sep 17 00:00:00 2001 From: Fokko Date: Tue, 25 Mar 2025 16:13:53 +0100 Subject: [PATCH 3/4] Mark as deprecated --- pyiceberg/io/pyarrow.py | 11 ++++++++--- pyiceberg/table/__init__.py | 2 +- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index f796736aa4..ff05f16b17 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -171,6 +171,7 @@ from pyiceberg.utils.concurrent import ExecutorFactory from pyiceberg.utils.config import Config from pyiceberg.utils.datetime import millis_to_datetime +from pyiceberg.utils.deprecated import deprecation_message from pyiceberg.utils.properties import get_first_property_value, property_as_bool, property_as_int from pyiceberg.utils.singleton import Singleton from pyiceberg.utils.truncate import truncate_upper_bound_binary_string, truncate_upper_bound_text_string @@ -1515,8 +1516,12 @@ def __init__( self._table_metadata = table_metadata self._io = io self._projected_schema = projected_schema - # TBD: Should we deprecate the `row_filter` argument? if row_filter is not None: + deprecation_message( + deprecated_in="0.9.0", + removed_in="0.10.0", + help_message="row_filter is marked as deprecated, and will be removed in 0.10.0. Please make sure to set the residual on the ScanTasks.", + ) self._bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive) else: self._bound_row_filter = None @@ -1552,7 +1557,7 @@ def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table: executor = ExecutorFactory.get_or_create() if self._bound_row_filter is not None: - tasks = [task.set_residual(expr=self._bound_row_filter) for task in tasks] + tasks = [task._set_residual(expr=self._bound_row_filter) for task in tasks] def _table_from_scan_task(task: FileScanTask) -> Optional[pa.Table]: batches = list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file)) @@ -1615,7 +1620,7 @@ def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.Record ValueError: When a field type in the file cannot be projected to the schema type """ if self._bound_row_filter is not None: - tasks = [task.set_residual(expr=self._bound_row_filter) for task in tasks] + tasks = [task._set_residual(expr=self._bound_row_filter) for task in tasks] deletes_per_file = _read_all_delete_files(self._io, tasks) return self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 6f99bd94b4..b002e3368f 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1544,7 +1544,7 @@ def __init__( self.length = length or data_file.file_size_in_bytes self.residual = residual - def set_residual(self, expr: BooleanExpression) -> "FileScanTask": + def _set_residual(self, expr: BooleanExpression) -> "FileScanTask": self.residual = expr return self From afdfef2fe5542b6d02b09c96193d3df8354e14b5 Mon Sep 17 00:00:00 2001 From: Fokko Date: Wed, 23 Apr 2025 07:49:09 +0200 Subject: [PATCH 4/4] WIP --- pyiceberg/table/__init__.py | 3 +-- tests/io/test_pyarrow.py | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 3d5c04d09d..5053510202 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1796,7 +1796,7 @@ def to_arrow(self) -> pa.Table: from pyiceberg.io.pyarrow import ArrowScan return ArrowScan( - self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit + self.table_metadata, self.io, self.projection(), case_sensitive=self.case_sensitive, limit=self.limit ).to_table(self.plan_files()) def to_arrow_batch_reader(self) -> pa.RecordBatchReader: @@ -1889,7 +1889,6 @@ def count(self) -> int: table_metadata=self.table_metadata, io=self.io, projected_schema=self.projection(), - row_filter=self.row_filter, case_sensitive=self.case_sensitive, ) tbl = arrow_scan.to_table([task]) diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index f5b72b7797..7b115c6a9f 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -972,7 +972,6 @@ def project( ), io=PyArrowFileIO(), projected_schema=schema, - row_filter=expr or AlwaysTrue(), case_sensitive=True, ).to_table( tasks=[