Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions paimon-python/pypaimon/common/predicate.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ def test(self, record: InternalRow) -> bool:
return tester.test_by_value(field_value, self.literals)
raise ValueError(f"Unsupported predicate method: {self.method}")

def has_null_check(self) -> bool:
if self.method in ('isNull', 'isNotNull'):
return False
if self.method not in ('and', 'or') and self.literals and any(lit is None for lit in self.literals):
return True
if self.method in ('and', 'or') and self.literals:
return any(p.has_null_check() for p in self.literals)
return False

def test_by_simple_stats(self, stat: SimpleStats, row_count: int) -> bool:
"""Test predicate against BinaryRow stats with denseIndexMapping like Java implementation."""
if self.method == 'and':
Expand Down
143 changes: 143 additions & 0 deletions paimon-python/pypaimon/read/reader/filter_record_batch_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
###############################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################

import logging
from typing import List, Optional, Tuple

import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.dataset as ds

from pypaimon.common.predicate import Predicate
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
from pypaimon.schema.data_types import DataField
from pypaimon.table.row.offset_row import OffsetRow

logger = logging.getLogger(__name__)


class FilterRecordBatchReader(RecordBatchReader):
"""
Wraps a RecordBatchReader and filters each batch by predicate.
Used for data evolution read where predicate cannot be pushed down to
individual file readers. Only used when predicate columns are in read schema.
"""

def __init__(
self,
reader: RecordBatchReader,
predicate: Predicate,
field_names: Optional[List[str]] = None,
schema_fields: Optional[List[DataField]] = None,
):
self.reader = reader
self.predicate = predicate
self.field_names = field_names
self.schema_fields = schema_fields

def read_arrow_batch(self) -> Optional[pa.RecordBatch]:
while True:
batch = self.reader.read_arrow_batch()
if batch is None:
return None
if batch.num_rows == 0:
return batch
filtered = self._filter_batch(batch)
if filtered is not None and filtered.num_rows > 0:
return filtered
continue

def _build_col_indices(self, batch: pa.RecordBatch) -> Tuple[List[Optional[int]], int]:
names = set(batch.schema.names)
if self.schema_fields is not None:
fields = self.schema_fields
elif self.field_names is not None:
fields = self.field_names
else:
return list(range(batch.num_columns)), batch.num_columns
indices = []
for f in fields:
name = f.name if hasattr(f, 'name') else f
indices.append(batch.schema.get_field_index(name) if name in names else None)
return indices, len(indices)

def _filter_batch_simple_null(
self, batch: pa.RecordBatch
) -> Optional[pa.RecordBatch]:
if self.predicate.method not in ('isNull', 'isNotNull') or not self.predicate.field:
return None
if self.predicate.field not in batch.schema.names:
return None
col = batch.column(self.predicate.field)
mask = pc.is_null(col) if self.predicate.method == 'isNull' else pc.is_valid(col)
return batch.filter(mask)

def _filter_batch(self, batch: pa.RecordBatch) -> Optional[pa.RecordBatch]:
simple_null = self._filter_batch_simple_null(batch)
if simple_null is not None:
return simple_null
if not self.predicate.has_null_check():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just use Predicate.to_arrow, it is same to other table modes.

try:
expr = self.predicate.to_arrow()
result = ds.InMemoryDataset(pa.Table.from_batches([batch])).scanner(
filter=expr
).to_table()
if result.num_rows == 0:
return None
batches = result.to_batches()
if not batches:
return None
if len(batches) == 1:
return batches[0]
concat_batches = getattr(pa, "concat_batches", None)
if concat_batches is not None:
return concat_batches(batches)
return pa.RecordBatch.from_arrays(
[result.column(i) for i in range(result.num_columns)],
schema=result.schema,
)
except (TypeError, ValueError, pa.ArrowInvalid) as e:
logger.debug(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exception here?

"PyArrow vectorized filtering failed, fallback to row-by-row: %s", e
)
nrows = batch.num_rows
col_indices, ncols = self._build_col_indices(batch)
mask = []
row_tuple = [None] * ncols
offset_row = OffsetRow(row_tuple, 0, ncols)
for i in range(nrows):
for j in range(ncols):
if col_indices[j] is not None:
row_tuple[j] = batch.column(col_indices[j])[i].as_py()
else:
row_tuple[j] = None
offset_row.replace(tuple(row_tuple))
try:
mask.append(self.predicate.test(offset_row))
except (TypeError, ValueError):
mask.append(False)
if not any(mask):
return None
return batch.filter(pa.array(mask))

def return_batch_pos(self) -> int:
pos = getattr(self.reader, 'return_batch_pos', lambda: 0)()
return pos if pos is not None else 0

def close(self) -> None:
self.reader.close()
2 changes: 1 addition & 1 deletion paimon-python/pypaimon/read/scanner/file_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ def _filter_manifest_entry(self, entry: ManifestEntry) -> bool:
else:
if not self.predicate:
return True
if self.predicate_for_stats is None:
if self.predicate_for_stats is None or self.data_evolution:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create a separate if. And we should add comments to this if, explain why there is no filtering done here.

return True
if entry.file.value_stats_cols is None and entry.file.write_cols is not None:
stats_fields = entry.file.write_cols
Expand Down
22 changes: 21 additions & 1 deletion paimon-python/pypaimon/read/split_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from pypaimon.read.reader.field_bunch import BlobBunch, DataBunch, FieldBunch
from pypaimon.read.reader.filter_record_reader import FilterRecordReader
from pypaimon.read.reader.format_avro_reader import FormatAvroReader
from pypaimon.read.reader.filter_record_batch_reader import FilterRecordBatchReader
from pypaimon.read.reader.row_range_filter_record_reader import RowIdFilterRecordBatchReader
from pypaimon.read.reader.format_blob_reader import FormatBlobReader
from pypaimon.read.reader.format_lance_reader import FormatLanceReader
Expand All @@ -52,6 +53,7 @@
from pypaimon.read.reader.key_value_wrap_reader import KeyValueWrapReader
from pypaimon.read.reader.shard_batch_reader import ShardBatchReader
from pypaimon.read.reader.sort_merge_reader import SortMergeReaderWithMinHeap
from pypaimon.read.push_down_utils import _get_all_fields
from pypaimon.read.split import Split
from pypaimon.read.sliced_split import SlicedSplit
from pypaimon.schema.data_types import DataField, PyarrowFieldParser
Expand Down Expand Up @@ -449,6 +451,10 @@ def __init__(
actual_split = split.data_split()
super().__init__(table, predicate, read_type, actual_split, row_tracking_enabled)

def _push_down_predicate(self) -> Optional[Predicate]:
# Do not push predicate to file readers;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Detailed comments, why not push predicate.

return None

def create_reader(self) -> RecordReader:
files = self.split.files
suppliers = []
Expand All @@ -467,7 +473,21 @@ def create_reader(self) -> RecordReader:
lambda files=need_merge_files: self._create_union_reader(files)
)

return ConcatBatchReader(suppliers)
merge_reader = ConcatBatchReader(suppliers)
if self.predicate is not None:
# Only apply filter when all predicate columns are in read_type (e.g. projected schema).
Copy link
Contributor

@JingsongLi JingsongLi Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we are returning here is complete row, right? So this check should be applicable to all table modes? That shouldn't be added here, it should be verified in SplitRead.init.

read_names = {f.name for f in self.read_fields}
if _get_all_fields(self.predicate).issubset(read_names):
field_names = [f.name for f in self.read_fields]
# Row-by-row path uses predicate.index (from read_type); layout must match.
schema_fields = self.read_fields
return FilterRecordBatchReader(
merge_reader,
self.predicate,
field_names=field_names,
schema_fields=schema_fields,
)
return merge_reader

def _split_by_row_id(self, files: List[DataFileMeta]) -> List[List[DataFileMeta]]:
"""Split files by firstRowId for data evolution."""
Expand Down
4 changes: 2 additions & 2 deletions paimon-python/pypaimon/read/table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def _try_to_pad_batch_by_schema(batch: pyarrow.RecordBatch, target_schema):
num_rows = batch.num_rows

for field in target_schema:
if field.name in batch.column_names:
if field.name in batch.schema.names:
col = batch.column(field.name)
else:
col = pyarrow.nulls(num_rows, type=field.type)
Expand Down Expand Up @@ -198,7 +198,7 @@ def _create_split_read(self, split: Split) -> SplitRead:
elif self.table.options.data_evolution_enabled():
return DataEvolutionSplitRead(
table=self.table,
predicate=None, # Never push predicate to split read
predicate=self.predicate,
read_type=self.read_type,
split=split,
row_tracking_enabled=True
Expand Down
Loading