From 653f23efc2e25949555485354e93f2468aefc3e4 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Wed, 11 Feb 2026 12:12:08 +0800 Subject: [PATCH 1/2] fix filter not working issue on data evolution --- paimon-python/pypaimon/common/predicate.py | 9 + .../read/reader/filter_record_batch_reader.py | 143 ++++++++ .../pypaimon/read/scanner/file_scanner.py | 4 + paimon-python/pypaimon/read/split_read.py | 22 +- paimon-python/pypaimon/read/table_read.py | 4 +- .../pypaimon/tests/data_evolution_test.py | 330 ++++++++++++++++++ 6 files changed, 509 insertions(+), 3 deletions(-) create mode 100644 paimon-python/pypaimon/read/reader/filter_record_batch_reader.py diff --git a/paimon-python/pypaimon/common/predicate.py b/paimon-python/pypaimon/common/predicate.py index e6f69934e991..647e3a00ffdf 100644 --- a/paimon-python/pypaimon/common/predicate.py +++ b/paimon-python/pypaimon/common/predicate.py @@ -66,6 +66,15 @@ def test(self, record: InternalRow) -> bool: return tester.test_by_value(field_value, self.literals) raise ValueError(f"Unsupported predicate method: {self.method}") + def has_null_check(self) -> bool: + if self.method in ('isNull', 'isNotNull'): + return False + if self.method not in ('and', 'or') and self.literals and any(lit is None for lit in self.literals): + return True + if self.method in ('and', 'or') and self.literals: + return any(p.has_null_check() for p in self.literals) + return False + def test_by_simple_stats(self, stat: SimpleStats, row_count: int) -> bool: """Test predicate against BinaryRow stats with denseIndexMapping like Java implementation.""" if self.method == 'and': diff --git a/paimon-python/pypaimon/read/reader/filter_record_batch_reader.py b/paimon-python/pypaimon/read/reader/filter_record_batch_reader.py new file mode 100644 index 000000000000..4341c5c9e61d --- /dev/null +++ b/paimon-python/pypaimon/read/reader/filter_record_batch_reader.py @@ -0,0 +1,143 @@ +############################################################################### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +############################################################################### + +import logging +from typing import List, Optional, Tuple + +import pyarrow as pa +import pyarrow.compute as pc +import pyarrow.dataset as ds + +from pypaimon.common.predicate import Predicate +from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader +from pypaimon.schema.data_types import DataField +from pypaimon.table.row.offset_row import OffsetRow + +logger = logging.getLogger(__name__) + + +class FilterRecordBatchReader(RecordBatchReader): + """ + Wraps a RecordBatchReader and filters each batch by predicate. + Used for data evolution read where predicate cannot be pushed down to + individual file readers. Only used when predicate columns are in read schema. + """ + + def __init__( + self, + reader: RecordBatchReader, + predicate: Predicate, + field_names: Optional[List[str]] = None, + schema_fields: Optional[List[DataField]] = None, + ): + self.reader = reader + self.predicate = predicate + self.field_names = field_names + self.schema_fields = schema_fields + + def read_arrow_batch(self) -> Optional[pa.RecordBatch]: + while True: + batch = self.reader.read_arrow_batch() + if batch is None: + return None + if batch.num_rows == 0: + return batch + filtered = self._filter_batch(batch) + if filtered is not None and filtered.num_rows > 0: + return filtered + continue + + def _build_col_indices(self, batch: pa.RecordBatch) -> Tuple[List[Optional[int]], int]: + names = set(batch.schema.names) + if self.schema_fields is not None: + fields = self.schema_fields + elif self.field_names is not None: + fields = self.field_names + else: + return list(range(batch.num_columns)), batch.num_columns + indices = [] + for f in fields: + name = f.name if hasattr(f, 'name') else f + indices.append(batch.schema.get_field_index(name) if name in names else None) + return indices, len(indices) + + def _filter_batch_simple_null( + self, batch: pa.RecordBatch + ) -> Optional[pa.RecordBatch]: + if self.predicate.method not in ('isNull', 'isNotNull') or not self.predicate.field: + return None + if self.predicate.field not in batch.schema.names: + return None + col = batch.column(self.predicate.field) + mask = pc.is_null(col) if self.predicate.method == 'isNull' else pc.is_valid(col) + return batch.filter(mask) + + def _filter_batch(self, batch: pa.RecordBatch) -> Optional[pa.RecordBatch]: + simple_null = self._filter_batch_simple_null(batch) + if simple_null is not None: + return simple_null + if not self.predicate.has_null_check(): + try: + expr = self.predicate.to_arrow() + result = ds.InMemoryDataset(pa.Table.from_batches([batch])).scanner( + filter=expr + ).to_table() + if result.num_rows == 0: + return None + batches = result.to_batches() + if not batches: + return None + if len(batches) == 1: + return batches[0] + concat_batches = getattr(pa, "concat_batches", None) + if concat_batches is not None: + return concat_batches(batches) + return pa.RecordBatch.from_arrays( + [result.column(i) for i in range(result.num_columns)], + schema=result.schema, + ) + except (TypeError, ValueError, pa.ArrowInvalid) as e: + logger.debug( + "PyArrow vectorized filtering failed, fallback to row-by-row: %s", e + ) + nrows = batch.num_rows + col_indices, ncols = self._build_col_indices(batch) + mask = [] + row_tuple = [None] * ncols + offset_row = OffsetRow(row_tuple, 0, ncols) + for i in range(nrows): + for j in range(ncols): + if col_indices[j] is not None: + row_tuple[j] = batch.column(col_indices[j])[i].as_py() + else: + row_tuple[j] = None + offset_row.replace(tuple(row_tuple)) + try: + mask.append(self.predicate.test(offset_row)) + except (TypeError, ValueError): + mask.append(False) + if not any(mask): + return None + return batch.filter(pa.array(mask)) + + def return_batch_pos(self) -> int: + pos = getattr(self.reader, 'return_batch_pos', lambda: 0)() + return pos if pos is not None else 0 + + def close(self) -> None: + self.reader.close() diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py b/paimon-python/pypaimon/read/scanner/file_scanner.py index 8167a743dd27..b41dc8693d1f 100755 --- a/paimon-python/pypaimon/read/scanner/file_scanner.py +++ b/paimon-python/pypaimon/read/scanner/file_scanner.py @@ -434,7 +434,11 @@ def _filter_manifest_entry(self, entry: ManifestEntry) -> bool: else: if not self.predicate: return True +<<<<<<< HEAD if self.predicate_for_stats is None: +======= + if self.data_evolution: +>>>>>>> 867ca010a ([python] fix data evolution predicate filtering) return True if entry.file.value_stats_cols is None and entry.file.write_cols is not None: stats_fields = entry.file.write_cols diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index 5b876e9488d2..02e3067e2a2f 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -40,6 +40,7 @@ from pypaimon.read.reader.field_bunch import BlobBunch, DataBunch, FieldBunch from pypaimon.read.reader.filter_record_reader import FilterRecordReader from pypaimon.read.reader.format_avro_reader import FormatAvroReader +from pypaimon.read.reader.filter_record_batch_reader import FilterRecordBatchReader from pypaimon.read.reader.row_range_filter_record_reader import RowIdFilterRecordBatchReader from pypaimon.read.reader.format_blob_reader import FormatBlobReader from pypaimon.read.reader.format_lance_reader import FormatLanceReader @@ -52,6 +53,7 @@ from pypaimon.read.reader.key_value_wrap_reader import KeyValueWrapReader from pypaimon.read.reader.shard_batch_reader import ShardBatchReader from pypaimon.read.reader.sort_merge_reader import SortMergeReaderWithMinHeap +from pypaimon.read.push_down_utils import _get_all_fields from pypaimon.read.split import Split from pypaimon.read.sliced_split import SlicedSplit from pypaimon.schema.data_types import DataField, PyarrowFieldParser @@ -449,6 +451,10 @@ def __init__( actual_split = split.data_split() super().__init__(table, predicate, read_type, actual_split, row_tracking_enabled) + def _push_down_predicate(self) -> Optional[Predicate]: + # Do not push predicate to file readers; + return None + def create_reader(self) -> RecordReader: files = self.split.files suppliers = [] @@ -467,7 +473,21 @@ def create_reader(self) -> RecordReader: lambda files=need_merge_files: self._create_union_reader(files) ) - return ConcatBatchReader(suppliers) + merge_reader = ConcatBatchReader(suppliers) + if self.predicate is not None: + # Only apply filter when all predicate columns are in read_type (e.g. projected schema). + read_names = {f.name for f in self.read_fields} + if _get_all_fields(self.predicate).issubset(read_names): + field_names = [f.name for f in self.read_fields] + # Row-by-row path uses predicate.index (from read_type); layout must match. + schema_fields = self.read_fields + return FilterRecordBatchReader( + merge_reader, + self.predicate, + field_names=field_names, + schema_fields=schema_fields, + ) + return merge_reader def _split_by_row_id(self, files: List[DataFileMeta]) -> List[List[DataFileMeta]]: """Split files by firstRowId for data evolution.""" diff --git a/paimon-python/pypaimon/read/table_read.py b/paimon-python/pypaimon/read/table_read.py index e0a442f30582..e996eb97fed1 100644 --- a/paimon-python/pypaimon/read/table_read.py +++ b/paimon-python/pypaimon/read/table_read.py @@ -66,7 +66,7 @@ def _try_to_pad_batch_by_schema(batch: pyarrow.RecordBatch, target_schema): num_rows = batch.num_rows for field in target_schema: - if field.name in batch.column_names: + if field.name in batch.schema.names: col = batch.column(field.name) else: col = pyarrow.nulls(num_rows, type=field.type) @@ -198,7 +198,7 @@ def _create_split_read(self, split: Split) -> SplitRead: elif self.table.options.data_evolution_enabled(): return DataEvolutionSplitRead( table=self.table, - predicate=None, # Never push predicate to split read + predicate=self.predicate, read_type=self.read_type, split=split, row_tracking_enabled=True diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py b/paimon-python/pypaimon/tests/data_evolution_test.py index cfb09a0caf86..b7db4f5c7e8d 100644 --- a/paimon-python/pypaimon/tests/data_evolution_test.py +++ b/paimon-python/pypaimon/tests/data_evolution_test.py @@ -20,11 +20,59 @@ import unittest from types import SimpleNamespace +import pandas as pd import pyarrow as pa +import pyarrow.dataset as ds from pypaimon import CatalogFactory, Schema +from pypaimon.common.predicate import Predicate from pypaimon.manifest.manifest_list_manager import ManifestListManager from pypaimon.snapshot.snapshot_manager import SnapshotManager +from pypaimon.table.row.offset_row import OffsetRow + + +def _filter_batch_arrow(batch, predicate): + expr = predicate.to_arrow() + table = ds.InMemoryDataset(pa.Table.from_batches([batch])).scanner(filter=expr).to_table() + if table.num_rows == 0: + return batch.slice(0, 0) + batches = table.to_batches() + if len(batches) == 1: + return batches[0] + return pa.RecordBatch.from_arrays( + [table.column(i) for i in range(table.num_columns)], schema=table.schema + ) + + +def _filter_batch_row_by_row(batch, predicate, ncols): + nrows = batch.num_rows + mask = [] + row_tuple = [None] * ncols + offset_row = OffsetRow(row_tuple, 0, ncols) + for i in range(nrows): + for j in range(ncols): + row_tuple[j] = batch.column(j)[i].as_py() + offset_row.replace(tuple(row_tuple)) + try: + mask.append(predicate.test(offset_row)) + except (TypeError, ValueError): + mask.append(False) + if not any(mask): + return batch.slice(0, 0) + return batch.filter(pa.array(mask)) + + +def _batches_equal(a, b): + if a.num_rows != b.num_rows or a.num_columns != b.num_columns: + return False + for i in range(a.num_columns): + col_a, col_b = a.column(i), b.column(i) + for j in range(a.num_rows): + va_py = col_a[j].as_py() if hasattr(col_a[j], "as_py") else col_a[j] + vb_py = col_b[j].as_py() if hasattr(col_b[j], "as_py") else col_b[j] + if va_py != vb_py: + return False + return True class DataEvolutionTest(unittest.TestCase): @@ -443,6 +491,288 @@ def test_only_some_columns(self): }, schema=simple_pa_schema) self.assertEqual(actual, expect) + def _create_filter_test_table(self, table_name: str): + pa_schema = pa.schema([ + ("id", pa.int64()), + ("b", pa.int32()), + pa.field("c", pa.int32(), nullable=True), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, options={"row-tracking.enabled": "true", "data-evolution.enabled": "true"}, + ) + self.catalog.create_table(table_name, schema, ignore_if_exists=True) + table = self.catalog.get_table(table_name) + wb = table.new_batch_write_builder() + w0, c0 = wb.new_write().with_write_type(["id", "b"]), wb.new_commit() + w0.write_arrow(pa.Table.from_pydict( + {"id": [1, 2, 3], "b": [10, 20, 30]}, + schema=pa.schema([("id", pa.int64()), ("b", pa.int32())]), + )) + c0.commit(w0.prepare_commit()) + w0.close() + c0.close() + w1, c1 = wb.new_write().with_write_type(["c"]), wb.new_commit() + w1.write_arrow(pa.Table.from_pydict( + {"c": [100, None, 200]}, + schema=pa.schema([pa.field("c", pa.int32(), nullable=True)]), + )) + cmts1 = w1.prepare_commit() + for cmt in cmts1: + for nf in cmt.new_files: + nf.first_row_id = 0 + c1.commit(cmts1) + w1.close() + c1.close() + return table + + def test_with_filter(self): + table = self._create_filter_test_table("default.test_filter_on_evolved_column") + rb = table.new_read_builder() + splits = rb.new_scan().plan().splits() + + full_df = rb.new_read().to_pandas(splits) + self.assertEqual(len(full_df), 3, "Full scan must return 3 rows") + full_sorted = full_df.sort_values("id").reset_index(drop=True) + self.assertEqual(full_sorted["id"].tolist(), [1, 2, 3]) + self.assertEqual(full_sorted["b"].tolist(), [10, 20, 30]) + self.assertEqual(full_sorted["c"].iloc[0], 100) + self.assertTrue(pd.isna(full_sorted["c"].iloc[1]), "Row id=2 must have NULL c") + self.assertEqual(full_sorted["c"].iloc[2], 200) + + predicate_gt = rb.new_predicate_builder().greater_than("c", 150) + rb_gt = table.new_read_builder().with_filter(predicate_gt) + result_gt = rb_gt.new_read().to_pandas(rb_gt.new_scan().plan().splits()) + self.assertEqual(len(result_gt), 1, "Filter c > 150 should return 1 row (c=200)") + self.assertEqual(result_gt["id"].iloc[0], 3, "Row with c=200 must have id=3") + self.assertEqual(result_gt["b"].iloc[0], 30, "Row with c=200 must have b=30") + self.assertEqual(result_gt["c"].iloc[0], 200, "Filtered row must have c=200") + + predicate_lt = rb.new_predicate_builder().less_than("c", 150) + rb_lt = table.new_read_builder().with_filter(predicate_lt) + result_lt = rb_lt.new_read().to_pandas(rb_lt.new_scan().plan().splits()) + self.assertEqual(len(result_lt), 1, "Filter c < 150 should return 1 row (c=100)") + self.assertEqual(result_lt["id"].iloc[0], 1, "Row with c=100 must have id=1") + self.assertEqual(result_lt["c"].iloc[0], 100, "Filtered row must have c=100") + + predicate_id = rb.new_predicate_builder().equal("id", 2) + rb_id = table.new_read_builder().with_filter(predicate_id) + result_id = rb_id.new_read().to_pandas(rb_id.new_scan().plan().splits()) + self.assertEqual(len(result_id), 1, "Filter id == 2 should return 1 row") + self.assertEqual(result_id["id"].iloc[0], 2, "Filtered row must have id=2") + self.assertTrue(pd.isna(result_id["c"].iloc[0]), "Row id=2 must have c=NULL") + + pb = rb.new_predicate_builder() + predicate_and = pb.and_predicates([ + pb.greater_than("c", 50), + pb.less_than("c", 150), + ]) + rb_and = table.new_read_builder().with_filter(predicate_and) + result_and = rb_and.new_read().to_pandas(rb_and.new_scan().plan().splits()) + self.assertEqual( + len(result_and), 1, + "Filter c>50 AND c<150 should return 1 row (c=100)", + ) + self.assertEqual(result_and["id"].iloc[0], 1, "Row with c=100 must have id=1") + self.assertEqual(result_and["c"].iloc[0], 100, "Filtered row must have c=100") + + predicate_is_null = rb.new_predicate_builder().is_null("c") + rb_null = table.new_read_builder().with_filter(predicate_is_null) + result_null = rb_null.new_read().to_pandas(rb_null.new_scan().plan().splits()) + self.assertEqual(len(result_null), 1, "Filter c IS NULL should return 1 row (id=2)") + self.assertEqual(result_null["id"].iloc[0], 2, "NULL row must have id=2") + self.assertTrue(pd.isna(result_null["c"].iloc[0]), "Filtered row c must be NULL") + + predicate_not_null = rb.new_predicate_builder().is_not_null("c") + rb_not_null = table.new_read_builder().with_filter(predicate_not_null) + result_not_null = rb_not_null.new_read().to_pandas( + rb_not_null.new_scan().plan().splits()) + self.assertEqual( + len(result_not_null), 2, + "Filter c IS NOT NULL should return 2 rows (id=1, id=3)", + ) + result_not_null_sorted = result_not_null.sort_values("id").reset_index(drop=True) + self.assertEqual(result_not_null_sorted["id"].tolist(), [1, 3]) + self.assertEqual(result_not_null_sorted["c"].tolist(), [100, 200]) + + predicate_or = pb.or_predicates([ + pb.greater_than("c", 150), + pb.less_than("c", 100), + ]) + rb_or = table.new_read_builder().with_filter(predicate_or) + result_or = rb_or.new_read().to_pandas(rb_or.new_scan().plan().splits()) + self.assertEqual( + len(result_or), 1, + "Filter c>150 OR c<100 should return 1 row (id=3, c=200)", + ) + self.assertEqual(result_or["id"].iloc[0], 3, "Row with c=200 must have id=3") + self.assertEqual(result_or["c"].iloc[0], 200, "Filtered row must have c=200") + + def test_with_filter_and_projection(self): + table = self._create_filter_test_table("default.test_filter_and_projection_evolved") + rb_full = table.new_read_builder() + predicate = rb_full.new_predicate_builder().greater_than("c", 150) + rb_filtered = table.new_read_builder().with_projection(["c", "id"]).with_filter(predicate) + result = rb_filtered.new_read().to_pandas(rb_filtered.new_scan().plan().splits()) + self.assertEqual(len(result), 1, "Filter c > 150 with projection [c, id] should return 1 row") + self.assertEqual(result["id"].iloc[0], 3) + self.assertEqual(result["c"].iloc[0], 200) + for _, row in result.iterrows(): + self.assertGreater( + row["c"], + 150, + "Each row must satisfy predicate c > 150 (row-by-row path uses predicate.index; " + "if schema_fields != read_type, wrong column is compared).", + ) + + predicate2 = rb_full.new_predicate_builder().is_null("c") + rb2_filtered = table.new_read_builder().with_projection(["id", "c"]).with_filter(predicate2) + result2 = rb2_filtered.new_read().to_pandas(rb2_filtered.new_scan().plan().splits()) + self.assertEqual(len(result2), 1, "Filter c IS NULL with projection [id, c] should return 1 row") + self.assertEqual(result2["id"].iloc[0], 2) + self.assertTrue(pd.isna(result2["c"].iloc[0])) + + predicate3 = rb_full.new_predicate_builder().greater_than("c", 50) + rb3_filtered = table.new_read_builder().with_projection(["c"]).with_filter(predicate3) + result3 = rb3_filtered.new_read().to_pandas(rb3_filtered.new_scan().plan().splits()) + self.assertEqual(len(result3), 2, "Filter c > 50 with projection [c] should return 2 rows (c=100, 200)") + self.assertEqual(sorted(result3["c"].tolist()), [100, 200]) + + # Build predicate from same read_type as projection [id, c] so indices match (c at index 1). + rb4 = table.new_read_builder().with_projection(["id", "c"]) + pb4 = rb4.new_predicate_builder() + predicate_compound = pb4.and_predicates([ + pb4.greater_than("c", 150), + pb4.is_not_null("c"), + ]) + rb4_filtered = rb4.with_filter(predicate_compound) + result4 = rb4_filtered.new_read().to_pandas(rb4_filtered.new_scan().plan().splits()) + self.assertEqual(len(result4), 1, "Filter c>150 AND c IS NOT NULL with projection [id,c] should return 1 row") + self.assertEqual(result4["id"].iloc[0], 3) + self.assertEqual(result4["c"].iloc[0], 200) + + predicate_filter_on_non_projected = rb_full.new_predicate_builder().greater_than("c", 150) + rb_non_projected = table.new_read_builder().with_projection(["id"]).with_filter( + predicate_filter_on_non_projected + ) + result_non_projected = rb_non_projected.new_read().to_pandas( + rb_non_projected.new_scan().plan().splits() + ) + self.assertEqual( + len(result_non_projected), + 3, + "Filter c > 150 with projection [id]: c not in read_type so filter is dropped, all 3 rows returned.", + ) + self.assertEqual( + list(result_non_projected.columns), + ["id"], + "Projection [id] should return only id column.", + ) + table_read = rb_non_projected.new_read() + splits = rb_non_projected.new_scan().plan().splits() + expected_output_arity = len(table_read.read_type) + try: + rows_from_iterator = list(table_read.to_iterator(splits)) + except ValueError as e: + if "Expected Arrow table or array" in str(e): + self.skipTest( + "RecordBatchReader path uses polars.from_arrow(RecordBatch) which fails; " + "skip to_iterator projection assertion on this path" + ) + raise + self.assertEqual(len(rows_from_iterator), 3, "to_iterator should return same row count as to_pandas") + for row in rows_from_iterator: + self.assertIsInstance(row, OffsetRow) + self.assertEqual( + row.arity, + expected_output_arity, + "to_iterator must yield rows with only read_type columns (arity=%d)." + % expected_output_arity, + ) + + def test_null_predicate_arrow_vs_row_by_row(self): + schema = pa.schema([("id", pa.int64()), ("c", pa.int64())]) + batch = pa.RecordBatch.from_pydict( + {"id": [1, 2, 3], "c": [10, None, 20]}, + schema=schema, + ) + ncols = 2 + + # is_null('c'): Arrow and row-by-row must return same rows + pred_is_null = Predicate(method="isNull", index=1, field="c", literals=None) + self.assertFalse(pred_is_null.has_null_check()) + arrow_res = _filter_batch_arrow(batch, pred_is_null) + row_res = _filter_batch_row_by_row(batch, pred_is_null, ncols) + self.assertEqual(arrow_res.num_rows, row_res.num_rows) + self.assertTrue(_batches_equal(arrow_res, row_res)) + self.assertEqual(arrow_res.num_rows, 1) + self.assertEqual(arrow_res.column("id")[0].as_py(), 2) + self.assertIsNone(arrow_res.column("c")[0].as_py()) + + # is_not_null('c'): Arrow and row-by-row must return same rows + pred_not_null = Predicate(method="isNotNull", index=1, field="c", literals=None) + arrow_res2 = _filter_batch_arrow(batch, pred_not_null) + row_res2 = _filter_batch_row_by_row(batch, pred_not_null, ncols) + self.assertEqual(arrow_res2.num_rows, row_res2.num_rows) + self.assertTrue(_batches_equal(arrow_res2, row_res2)) + self.assertEqual(arrow_res2.num_rows, 2) + + pred_eq_null = Predicate(method="equal", index=1, field="c", literals=[None]) + self.assertTrue(pred_eq_null.has_null_check()) + row_res3 = _filter_batch_row_by_row(batch, pred_eq_null, ncols) + self.assertEqual(row_res3.num_rows, 0) # Paimon: val is None -> False, no row matches + arrow_res3 = _filter_batch_arrow(batch, pred_eq_null) + self.assertEqual(arrow_res3.num_rows, 0) # Arrow: NULL==NULL is null, filtered out + self.assertEqual(arrow_res3.num_rows, row_res3.num_rows) + + def test_filter_row_by_row_mismatched_schema(self): + batch = pa.RecordBatch.from_pydict( + {"c": [1, 200, 50], "id": [100, 2, 3]}, + schema=pa.schema([("c", pa.int64()), ("id", pa.int64())]), + ) + pred = Predicate(method="greaterThan", index=0, field="c", literals=[150]) + + ncols = 3 + nrows = batch.num_rows + id_col = batch.column("id") + c_col = batch.column("c") + row_tuple = [None] * ncols + offset_row = OffsetRow(row_tuple, 0, ncols) + mask = [] + for i in range(nrows): + row_tuple[0] = id_col[i].as_py() + row_tuple[1] = None + row_tuple[2] = c_col[i].as_py() + offset_row.replace(tuple(row_tuple)) + try: + mask.append(pred.test(offset_row)) + except (TypeError, ValueError): + mask.append(False) + rows_passing_wrong_layout = sum(mask) + self.assertEqual( + rows_passing_wrong_layout, + 0, + "With wrong layout (position 0 = id), predicate c > 150 becomes id > 150 -> 0 rows. " + "This reproduces FilterRecordBatchReader bug when schema_fields=table.fields.", + ) + ncols_right = 2 + row_tuple_right = [None] * ncols_right + offset_row_right = OffsetRow(row_tuple_right, 0, ncols_right) + mask_right = [] + for i in range(nrows): + row_tuple_right[0] = c_col[i].as_py() + row_tuple_right[1] = id_col[i].as_py() + offset_row_right.replace(tuple(row_tuple_right)) + try: + mask_right.append(pred.test(offset_row_right)) + except (TypeError, ValueError): + mask_right.append(False) + rows_passing_right_layout = sum(mask_right) + self.assertEqual( + rows_passing_right_layout, + 1, + "With correct layout (position 0 = c), predicate c > 150 -> 1 row (c=200).", + ) + def test_null_values(self): simple_pa_schema = pa.schema([ ('f0', pa.int32()), From 101be4ea36b290274072c5af7fcd68034b51ae86 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Wed, 11 Feb 2026 13:36:33 +0800 Subject: [PATCH 2/2] fix merge issue during rebase --- paimon-python/pypaimon/read/scanner/file_scanner.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py b/paimon-python/pypaimon/read/scanner/file_scanner.py index b41dc8693d1f..83d29e5ed863 100755 --- a/paimon-python/pypaimon/read/scanner/file_scanner.py +++ b/paimon-python/pypaimon/read/scanner/file_scanner.py @@ -434,11 +434,7 @@ def _filter_manifest_entry(self, entry: ManifestEntry) -> bool: else: if not self.predicate: return True -<<<<<<< HEAD - if self.predicate_for_stats is None: -======= - if self.data_evolution: ->>>>>>> 867ca010a ([python] fix data evolution predicate filtering) + if self.predicate_for_stats is None or self.data_evolution: return True if entry.file.value_stats_cols is None and entry.file.write_cols is not None: stats_fields = entry.file.write_cols