-
Notifications
You must be signed in to change notification settings - Fork 1.3k
[python] Fix filter on data evolution table not working issue #7211
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,143 @@ | ||
| ############################################################################### | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| ############################################################################### | ||
|
|
||
| import logging | ||
| from typing import List, Optional, Tuple | ||
|
|
||
| import pyarrow as pa | ||
| import pyarrow.compute as pc | ||
| import pyarrow.dataset as ds | ||
|
|
||
| from pypaimon.common.predicate import Predicate | ||
| from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader | ||
| from pypaimon.schema.data_types import DataField | ||
| from pypaimon.table.row.offset_row import OffsetRow | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class FilterRecordBatchReader(RecordBatchReader): | ||
| """ | ||
| Wraps a RecordBatchReader and filters each batch by predicate. | ||
| Used for data evolution read where predicate cannot be pushed down to | ||
| individual file readers. Only used when predicate columns are in read schema. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| reader: RecordBatchReader, | ||
| predicate: Predicate, | ||
| field_names: Optional[List[str]] = None, | ||
| schema_fields: Optional[List[DataField]] = None, | ||
| ): | ||
| self.reader = reader | ||
| self.predicate = predicate | ||
| self.field_names = field_names | ||
| self.schema_fields = schema_fields | ||
|
|
||
| def read_arrow_batch(self) -> Optional[pa.RecordBatch]: | ||
| while True: | ||
| batch = self.reader.read_arrow_batch() | ||
| if batch is None: | ||
| return None | ||
| if batch.num_rows == 0: | ||
| return batch | ||
| filtered = self._filter_batch(batch) | ||
| if filtered is not None and filtered.num_rows > 0: | ||
| return filtered | ||
| continue | ||
|
|
||
| def _build_col_indices(self, batch: pa.RecordBatch) -> Tuple[List[Optional[int]], int]: | ||
| names = set(batch.schema.names) | ||
| if self.schema_fields is not None: | ||
| fields = self.schema_fields | ||
| elif self.field_names is not None: | ||
| fields = self.field_names | ||
| else: | ||
| return list(range(batch.num_columns)), batch.num_columns | ||
| indices = [] | ||
| for f in fields: | ||
| name = f.name if hasattr(f, 'name') else f | ||
| indices.append(batch.schema.get_field_index(name) if name in names else None) | ||
| return indices, len(indices) | ||
|
|
||
| def _filter_batch_simple_null( | ||
| self, batch: pa.RecordBatch | ||
| ) -> Optional[pa.RecordBatch]: | ||
| if self.predicate.method not in ('isNull', 'isNotNull') or not self.predicate.field: | ||
| return None | ||
| if self.predicate.field not in batch.schema.names: | ||
| return None | ||
| col = batch.column(self.predicate.field) | ||
| mask = pc.is_null(col) if self.predicate.method == 'isNull' else pc.is_valid(col) | ||
| return batch.filter(mask) | ||
|
|
||
| def _filter_batch(self, batch: pa.RecordBatch) -> Optional[pa.RecordBatch]: | ||
| simple_null = self._filter_batch_simple_null(batch) | ||
| if simple_null is not None: | ||
| return simple_null | ||
| if not self.predicate.has_null_check(): | ||
| try: | ||
| expr = self.predicate.to_arrow() | ||
| result = ds.InMemoryDataset(pa.Table.from_batches([batch])).scanner( | ||
| filter=expr | ||
| ).to_table() | ||
| if result.num_rows == 0: | ||
| return None | ||
| batches = result.to_batches() | ||
| if not batches: | ||
| return None | ||
| if len(batches) == 1: | ||
| return batches[0] | ||
| concat_batches = getattr(pa, "concat_batches", None) | ||
| if concat_batches is not None: | ||
| return concat_batches(batches) | ||
| return pa.RecordBatch.from_arrays( | ||
| [result.column(i) for i in range(result.num_columns)], | ||
| schema=result.schema, | ||
| ) | ||
| except (TypeError, ValueError, pa.ArrowInvalid) as e: | ||
| logger.debug( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What exception here? |
||
| "PyArrow vectorized filtering failed, fallback to row-by-row: %s", e | ||
| ) | ||
| nrows = batch.num_rows | ||
| col_indices, ncols = self._build_col_indices(batch) | ||
| mask = [] | ||
| row_tuple = [None] * ncols | ||
| offset_row = OffsetRow(row_tuple, 0, ncols) | ||
| for i in range(nrows): | ||
| for j in range(ncols): | ||
| if col_indices[j] is not None: | ||
| row_tuple[j] = batch.column(col_indices[j])[i].as_py() | ||
| else: | ||
| row_tuple[j] = None | ||
| offset_row.replace(tuple(row_tuple)) | ||
| try: | ||
| mask.append(self.predicate.test(offset_row)) | ||
| except (TypeError, ValueError): | ||
| mask.append(False) | ||
| if not any(mask): | ||
| return None | ||
| return batch.filter(pa.array(mask)) | ||
|
|
||
| def return_batch_pos(self) -> int: | ||
| pos = getattr(self.reader, 'return_batch_pos', lambda: 0)() | ||
| return pos if pos is not None else 0 | ||
|
|
||
| def close(self) -> None: | ||
| self.reader.close() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -434,7 +434,7 @@ def _filter_manifest_entry(self, entry: ManifestEntry) -> bool: | |
| else: | ||
| if not self.predicate: | ||
| return True | ||
| if self.predicate_for_stats is None: | ||
| if self.predicate_for_stats is None or self.data_evolution: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Create a separate |
||
| return True | ||
| if entry.file.value_stats_cols is None and entry.file.write_cols is not None: | ||
| stats_fields = entry.file.write_cols | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,6 +40,7 @@ | |
| from pypaimon.read.reader.field_bunch import BlobBunch, DataBunch, FieldBunch | ||
| from pypaimon.read.reader.filter_record_reader import FilterRecordReader | ||
| from pypaimon.read.reader.format_avro_reader import FormatAvroReader | ||
| from pypaimon.read.reader.filter_record_batch_reader import FilterRecordBatchReader | ||
| from pypaimon.read.reader.row_range_filter_record_reader import RowIdFilterRecordBatchReader | ||
| from pypaimon.read.reader.format_blob_reader import FormatBlobReader | ||
| from pypaimon.read.reader.format_lance_reader import FormatLanceReader | ||
|
|
@@ -52,6 +53,7 @@ | |
| from pypaimon.read.reader.key_value_wrap_reader import KeyValueWrapReader | ||
| from pypaimon.read.reader.shard_batch_reader import ShardBatchReader | ||
| from pypaimon.read.reader.sort_merge_reader import SortMergeReaderWithMinHeap | ||
| from pypaimon.read.push_down_utils import _get_all_fields | ||
| from pypaimon.read.split import Split | ||
| from pypaimon.read.sliced_split import SlicedSplit | ||
| from pypaimon.schema.data_types import DataField, PyarrowFieldParser | ||
|
|
@@ -449,6 +451,10 @@ def __init__( | |
| actual_split = split.data_split() | ||
| super().__init__(table, predicate, read_type, actual_split, row_tracking_enabled) | ||
|
|
||
| def _push_down_predicate(self) -> Optional[Predicate]: | ||
| # Do not push predicate to file readers; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Detailed comments, why not push predicate. |
||
| return None | ||
|
|
||
| def create_reader(self) -> RecordReader: | ||
| files = self.split.files | ||
| suppliers = [] | ||
|
|
@@ -467,7 +473,21 @@ def create_reader(self) -> RecordReader: | |
| lambda files=need_merge_files: self._create_union_reader(files) | ||
| ) | ||
|
|
||
| return ConcatBatchReader(suppliers) | ||
| merge_reader = ConcatBatchReader(suppliers) | ||
| if self.predicate is not None: | ||
| # Only apply filter when all predicate columns are in read_type (e.g. projected schema). | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What we are returning here is complete row, right? So this check should be applicable to all table modes? That shouldn't be added here, it should be verified in |
||
| read_names = {f.name for f in self.read_fields} | ||
| if _get_all_fields(self.predicate).issubset(read_names): | ||
| field_names = [f.name for f in self.read_fields] | ||
| # Row-by-row path uses predicate.index (from read_type); layout must match. | ||
| schema_fields = self.read_fields | ||
| return FilterRecordBatchReader( | ||
| merge_reader, | ||
| self.predicate, | ||
| field_names=field_names, | ||
| schema_fields=schema_fields, | ||
| ) | ||
| return merge_reader | ||
|
|
||
| def _split_by_row_id(self, files: List[DataFileMeta]) -> List[List[DataFileMeta]]: | ||
| """Split files by firstRowId for data evolution.""" | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can just use
Predicate.to_arrow, it is same to other table modes.