From dbc41980879124e8ba8bfd984f82acf11d1ea493 Mon Sep 17 00:00:00 2001 From: "xiyu.zk" Date: Wed, 11 Feb 2026 17:31:42 +0800 Subject: [PATCH] [spark] Support AlwaysTrue and AlwaysFalse predicates in SparkV2FilterConverter --- .../paimon/predicate/FalseFunction.java | 71 +++++++ .../apache/paimon/predicate/LeafFunction.java | 2 + .../paimon/predicate/LeafPredicate.java | 6 + .../predicate/PartitionPredicateVisitor.java | 4 +- .../apache/paimon/predicate/Transform.java | 3 +- .../apache/paimon/predicate/TrueFunction.java | 71 +++++++ .../paimon/predicate/TrueTransform.java | 84 ++++++++ .../paimon/predicate/LeafPredicateTest.java | 96 ++++++++++ .../predicate/PredicateJsonSerdeTest.java | 18 ++ .../paimon/spark/SparkV2FilterConverter.scala | 17 +- .../sql/SparkV2FilterConverterTestBase.scala | 179 +++++++++++++++++- 11 files changed, 544 insertions(+), 7 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/predicate/FalseFunction.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/predicate/TrueFunction.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/predicate/TrueTransform.java diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/FalseFunction.java b/paimon-common/src/main/java/org/apache/paimon/predicate/FalseFunction.java new file mode 100644 index 000000000000..36b93009610f --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/FalseFunction.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.predicate; + +import org.apache.paimon.types.DataType; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; + +import java.util.List; +import java.util.Optional; + +/** A {@link LeafFunction} that always returns {@code false}. Used for AlwaysFalse predicates. */ +public class FalseFunction extends LeafFunction { + + private static final long serialVersionUID = 1L; + + public static final String NAME = "FALSE"; + + public static final FalseFunction INSTANCE = new FalseFunction(); + + @JsonCreator + private FalseFunction() {} + + @Override + public boolean test(DataType type, Object field, List literals) { + return false; + } + + @Override + public boolean test( + DataType type, + long rowCount, + Object min, + Object max, + Long nullCount, + List literals) { + return false; + } + + @Override + public Optional negate() { + return Optional.of(TrueFunction.INSTANCE); + } + + @Override + public T visit(FunctionVisitor visitor, FieldRef fieldRef, List literals) { + throw new UnsupportedOperationException( + "FalseFunction does not support field-based visitation."); + } + + @Override + public String toJson() { + return NAME; + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafFunction.java b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafFunction.java index 5d8edc1d675a..0f45e8e920d2 100644 --- a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafFunction.java +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafFunction.java @@ -68,6 +68,8 @@ private static Map createRegistry() { registry.put(NotIn.NAME, NotIn.INSTANCE); registry.put(Between.NAME, Between.INSTANCE); registry.put(NotBetween.NAME, NotBetween.INSTANCE); + registry.put(TrueFunction.NAME, TrueFunction.INSTANCE); + registry.put(FalseFunction.NAME, FalseFunction.INSTANCE); return Collections.unmodifiableMap(registry); } } diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java index 5e5b3863116a..e10b5deb5e0d 100644 --- a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicate.java @@ -143,6 +143,9 @@ public boolean test( long rowCount, InternalRow minValues, InternalRow maxValues, InternalArray nullCounts) { Optional fieldRefOptional = fieldRefOptional(); if (!fieldRefOptional.isPresent()) { + if (transform instanceof TrueTransform) { + return function.test(transform.outputType(), 0, null, null, null, literals); + } return true; } FieldRef fieldRef = fieldRefOptional.get(); @@ -165,6 +168,9 @@ public boolean test( @Override public Optional negate() { + if (transform instanceof TrueTransform) { + return function.negate().map(neg -> new LeafPredicate(transform, neg, literals)); + } Optional fieldRefOptional = fieldRefOptional(); if (!fieldRefOptional.isPresent()) { return Optional.empty(); diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionPredicateVisitor.java b/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionPredicateVisitor.java index 4db27bd1a119..94f92102e0b1 100644 --- a/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionPredicateVisitor.java +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionPredicateVisitor.java @@ -32,14 +32,16 @@ public PartitionPredicateVisitor(List partitionKeys) { @Override public Boolean visit(LeafPredicate predicate) { Transform transform = predicate.transform(); + boolean hasFieldRef = false; for (Object input : transform.inputs()) { if (input instanceof FieldRef) { + hasFieldRef = true; if (!partitionKeys.contains(((FieldRef) input).name())) { return false; } } } - return true; + return hasFieldRef; } @Override diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/Transform.java b/paimon-common/src/main/java/org/apache/paimon/predicate/Transform.java index 199e1df17055..6d7b9ba6990a 100644 --- a/paimon-common/src/main/java/org/apache/paimon/predicate/Transform.java +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/Transform.java @@ -38,7 +38,8 @@ @JsonSubTypes.Type(value = ConcatTransform.class, name = ConcatTransform.NAME), @JsonSubTypes.Type(value = ConcatWsTransform.class, name = ConcatWsTransform.NAME), @JsonSubTypes.Type(value = UpperTransform.class, name = UpperTransform.NAME), - @JsonSubTypes.Type(value = LowerTransform.class, name = LowerTransform.NAME) + @JsonSubTypes.Type(value = LowerTransform.class, name = LowerTransform.NAME), + @JsonSubTypes.Type(value = TrueTransform.class, name = TrueTransform.NAME) }) public interface Transform extends Serializable { diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/TrueFunction.java b/paimon-common/src/main/java/org/apache/paimon/predicate/TrueFunction.java new file mode 100644 index 000000000000..6ca9d057f078 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/TrueFunction.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.predicate; + +import org.apache.paimon.types.DataType; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; + +import java.util.List; +import java.util.Optional; + +/** A {@link LeafFunction} that always returns {@code true}. Used for AlwaysTrue predicates. */ +public class TrueFunction extends LeafFunction { + + private static final long serialVersionUID = 1L; + + public static final String NAME = "TRUE"; + + public static final TrueFunction INSTANCE = new TrueFunction(); + + @JsonCreator + private TrueFunction() {} + + @Override + public boolean test(DataType type, Object field, List literals) { + return true; + } + + @Override + public boolean test( + DataType type, + long rowCount, + Object min, + Object max, + Long nullCount, + List literals) { + return true; + } + + @Override + public Optional negate() { + return Optional.of(FalseFunction.INSTANCE); + } + + @Override + public T visit(FunctionVisitor visitor, FieldRef fieldRef, List literals) { + throw new UnsupportedOperationException( + "TrueFunction does not support field-based visitation."); + } + + @Override + public String toJson() { + return NAME; + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/TrueTransform.java b/paimon-common/src/main/java/org/apache/paimon/predicate/TrueTransform.java new file mode 100644 index 000000000000..2017d2e8d2e5 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/TrueTransform.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.predicate; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; + +import java.util.Collections; +import java.util.List; + +/** A {@link Transform} that always returns {@code true}. Used for constant predicates. */ +public class TrueTransform implements Transform { + + private static final long serialVersionUID = 1L; + + public static final String NAME = "TRUE"; + + public static final TrueTransform INSTANCE = new TrueTransform(); + + @JsonCreator + private TrueTransform() {} + + @Override + public String name() { + return NAME; + } + + @Override + public List inputs() { + return Collections.emptyList(); + } + + @Override + public DataType outputType() { + return DataTypes.BOOLEAN(); + } + + @Override + public Object transform(InternalRow row) { + return true; + } + + @Override + public Transform copyWithNewInputs(List inputs) { + return INSTANCE; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + return o != null && getClass() == o.getClass(); + } + + @Override + public int hashCode() { + return NAME.hashCode(); + } + + @Override + public String toString() { + return NAME; + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/predicate/LeafPredicateTest.java b/paimon-common/src/test/java/org/apache/paimon/predicate/LeafPredicateTest.java index 1368611554cf..0af7cf38067e 100644 --- a/paimon-common/src/test/java/org/apache/paimon/predicate/LeafPredicateTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/predicate/LeafPredicateTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.predicate; import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericArray; import org.apache.paimon.data.GenericRow; import org.apache.paimon.types.DataTypes; import org.apache.paimon.utils.InstantiationUtil; @@ -27,6 +28,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import static org.assertj.core.api.Assertions.assertThat; @@ -78,4 +80,98 @@ private LeafPredicate create() { literals.add(BinaryString.fromString("ha-he")); return LeafPredicate.of(transform, Equal.INSTANCE, literals); } + + @Test + public void testAlwaysTrueRow() { + LeafPredicate predicate = + LeafPredicate.of( + TrueTransform.INSTANCE, TrueFunction.INSTANCE, Collections.emptyList()); + assertThat(predicate.test(GenericRow.of(1))).isTrue(); + assertThat(predicate.test(GenericRow.of((Object) null))).isTrue(); + } + + @Test + public void testAlwaysFalseRow() { + LeafPredicate predicate = + LeafPredicate.of( + TrueTransform.INSTANCE, FalseFunction.INSTANCE, Collections.emptyList()); + assertThat(predicate.test(GenericRow.of(1))).isFalse(); + assertThat(predicate.test(GenericRow.of((Object) null))).isFalse(); + } + + @Test + public void testAlwaysTrueMinMax() { + LeafPredicate predicate = + LeafPredicate.of( + TrueTransform.INSTANCE, TrueFunction.INSTANCE, Collections.emptyList()); + assertThat( + predicate.test( + 10, + GenericRow.of(1), + GenericRow.of(10), + new GenericArray(new long[] {0}))) + .isTrue(); + assertThat(predicate.test(1, null, null, null)).isTrue(); + } + + @Test + public void testAlwaysFalseMinMax() { + LeafPredicate predicate = + LeafPredicate.of( + TrueTransform.INSTANCE, FalseFunction.INSTANCE, Collections.emptyList()); + assertThat( + predicate.test( + 10, + GenericRow.of(1), + GenericRow.of(10), + new GenericArray(new long[] {0}))) + .isFalse(); + assertThat(predicate.test(1, null, null, null)).isFalse(); + } + + @Test + public void testAlwaysTrueNegate() { + LeafPredicate predicate = + LeafPredicate.of( + TrueTransform.INSTANCE, TrueFunction.INSTANCE, Collections.emptyList()); + Predicate negated = predicate.negate().get(); + assertThat(negated).isInstanceOf(LeafPredicate.class); + LeafPredicate negatedLeaf = (LeafPredicate) negated; + assertThat(negatedLeaf.function()).isEqualTo(FalseFunction.INSTANCE); + assertThat(negatedLeaf.transform()).isInstanceOf(TrueTransform.class); + assertThat(negatedLeaf.test(GenericRow.of(1))).isFalse(); + } + + @Test + public void testAlwaysFalseNegate() { + LeafPredicate predicate = + LeafPredicate.of( + TrueTransform.INSTANCE, FalseFunction.INSTANCE, Collections.emptyList()); + Predicate negated = predicate.negate().get(); + assertThat(negated).isInstanceOf(LeafPredicate.class); + LeafPredicate negatedLeaf = (LeafPredicate) negated; + assertThat(negatedLeaf.function()).isEqualTo(TrueFunction.INSTANCE); + assertThat(negatedLeaf.transform()).isInstanceOf(TrueTransform.class); + assertThat(negatedLeaf.test(GenericRow.of(1))).isTrue(); + } + + @Test + public void testAlwaysTrueSerialization() throws IOException, ClassNotFoundException { + LeafPredicate predicate = + LeafPredicate.of( + TrueTransform.INSTANCE, TrueFunction.INSTANCE, Collections.emptyList()); + LeafPredicate clone = InstantiationUtil.clone(predicate); + assertThat(clone).isEqualTo(predicate); + assertThat(clone.hashCode()).isEqualTo(predicate.hashCode()); + } + + @Test + public void testAlwaysFalseSerialization() throws IOException, ClassNotFoundException { + LeafPredicate predicate = + LeafPredicate.of( + TrueTransform.INSTANCE, FalseFunction.INSTANCE, Collections.emptyList()); + LeafPredicate clone = InstantiationUtil.clone(predicate); + assertThat(clone).isEqualTo(predicate); + assertThat(clone.hashCode()).isEqualTo(predicate.hashCode()); + } } diff --git a/paimon-common/src/test/java/org/apache/paimon/predicate/PredicateJsonSerdeTest.java b/paimon-common/src/test/java/org/apache/paimon/predicate/PredicateJsonSerdeTest.java index 475e889c347b..3093b83eff06 100644 --- a/paimon-common/src/test/java/org/apache/paimon/predicate/PredicateJsonSerdeTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/predicate/PredicateJsonSerdeTest.java @@ -156,6 +156,24 @@ private static Stream testData() { .expectJson( "{\"kind\":\"LEAF\",\"transform\":{\"name\":\"FIELD_REF\",\"fieldRef\":{\"index\":2,\"name\":\"f2\",\"type\":\"STRING\"}},\"function\":\"LIKE\",\"literals\":[\"%a%b%\"]}"), + // LeafPredicate - TrueTransform + TrueFunction (AlwaysTrue) + TestSpec.forPredicate( + LeafPredicate.of( + TrueTransform.INSTANCE, + TrueFunction.INSTANCE, + Collections.emptyList())) + .expectJson( + "{\"kind\":\"LEAF\",\"transform\":{\"name\":\"TRUE\"},\"function\":\"TRUE\",\"literals\":[]}"), + + // LeafPredicate - TrueTransform + FalseFunction (AlwaysFalse) + TestSpec.forPredicate( + LeafPredicate.of( + TrueTransform.INSTANCE, + FalseFunction.INSTANCE, + Collections.emptyList())) + .expectJson( + "{\"kind\":\"LEAF\",\"transform\":{\"name\":\"TRUE\"},\"function\":\"FALSE\",\"literals\":[]}"), + // LeafPredicate - In with many values including nulls TestSpec.forPredicate( builder.in( diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala index 36e188bde76a..970d619792cc 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala @@ -18,7 +18,7 @@ package org.apache.paimon.spark -import org.apache.paimon.predicate.{Predicate, PredicateBuilder, Transform} +import org.apache.paimon.predicate.{FalseFunction, LeafPredicate, Predicate, PredicateBuilder, Transform, TrueFunction, TrueTransform} import org.apache.paimon.spark.util.SparkExpressionConverter.{toPaimonLiteral, toPaimonTransform} import org.apache.paimon.types.RowType @@ -164,7 +164,18 @@ case class SparkV2FilterConverter(rowType: RowType) extends Logging { throw new UnsupportedOperationException(s"Convert $sparkPredicate is unsupported.") } - // TODO: AlwaysTrue, AlwaysFalse + case ALWAYS_TRUE => + LeafPredicate.of( + TrueTransform.INSTANCE, + TrueFunction.INSTANCE, + java.util.Collections.emptyList()) + + case ALWAYS_FALSE => + LeafPredicate.of( + TrueTransform.INSTANCE, + FalseFunction.INSTANCE, + java.util.Collections.emptyList()) + case _ => throw new UnsupportedOperationException(s"Convert $sparkPredicate is unsupported.") } } @@ -228,5 +239,7 @@ object SparkV2FilterConverter extends Logging { private val STRING_START_WITH = "STARTS_WITH" private val STRING_END_WITH = "ENDS_WITH" private val STRING_CONTAINS = "CONTAINS" + private val ALWAYS_TRUE = "ALWAYS_TRUE" + private val ALWAYS_FALSE = "ALWAYS_FALSE" } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala index 2d90777fd78c..430f0785ebb3 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala @@ -19,7 +19,7 @@ package org.apache.paimon.spark.sql import org.apache.paimon.data.{BinaryString, Decimal, Timestamp} -import org.apache.paimon.predicate.PredicateBuilder +import org.apache.paimon.predicate.{FalseFunction, LeafPredicate, PredicateBuilder, TrueFunction, TrueTransform} import org.apache.paimon.spark.{PaimonSparkTestBase, SparkV2FilterConverter} import org.apache.paimon.spark.util.shim.TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType import org.apache.paimon.table.source.DataSplit @@ -29,7 +29,7 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.PaimonUtils.translateFilterV2 import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.plans.logical.Filter -import org.apache.spark.sql.connector.expressions.filter.Predicate +import org.apache.spark.sql.connector.expressions.filter.{And, Not, Or, Predicate => SparkPredicate} import java.time.{LocalDate, LocalDateTime} @@ -355,7 +355,165 @@ abstract class SparkV2FilterConverterTestBase extends PaimonSparkTestBase { assert(scanFilesCount(filter) == 4) } - private def v2Filter(str: String, tableName: String = "test_tbl"): Predicate = { + private def paimonAlwaysTrue: org.apache.paimon.predicate.Predicate = + LeafPredicate.of( + TrueTransform.INSTANCE, + TrueFunction.INSTANCE, + java.util.Collections.emptyList()) + + private def paimonAlwaysFalse: org.apache.paimon.predicate.Predicate = + LeafPredicate.of( + TrueTransform.INSTANCE, + FalseFunction.INSTANCE, + java.util.Collections.emptyList()) + + test("V2Filter: AlwaysTrue") { + val sparkAlwaysTrue = + new SparkPredicate( + "ALWAYS_TRUE", + Array.empty[org.apache.spark.sql.connector.expressions.Expression]) + val actual = converter.convert(sparkAlwaysTrue).get + assert(actual.equals(paimonAlwaysTrue)) + } + + test("V2Filter: AlwaysFalse") { + val sparkAlwaysFalse = + new SparkPredicate( + "ALWAYS_FALSE", + Array.empty[org.apache.spark.sql.connector.expressions.Expression]) + val actual = converter.convert(sparkAlwaysFalse).get + assert(actual.equals(paimonAlwaysFalse)) + } + + test("V2Filter: NOT AlwaysTrue") { + val sparkAlwaysTrue = + new SparkPredicate( + "ALWAYS_TRUE", + Array.empty[org.apache.spark.sql.connector.expressions.Expression]) + val sparkNotAlwaysTrue = new Not(sparkAlwaysTrue) + val actual = converter.convert(sparkNotAlwaysTrue).get + assert(actual.equals(paimonAlwaysFalse)) + } + + test("V2Filter: NOT AlwaysFalse") { + val sparkAlwaysFalse = + new SparkPredicate( + "ALWAYS_FALSE", + Array.empty[org.apache.spark.sql.connector.expressions.Expression]) + val sparkNotAlwaysFalse = new Not(sparkAlwaysFalse) + val actual = converter.convert(sparkNotAlwaysFalse).get + assert(actual.equals(paimonAlwaysTrue)) + } + + test("V2Filter: AND with AlwaysTrue") { + val sparkAlwaysTrue = + new SparkPredicate( + "ALWAYS_TRUE", + Array.empty[org.apache.spark.sql.connector.expressions.Expression]) + val intGt2 = v2Filter("int_col > 2") + val sparkAnd = new And(sparkAlwaysTrue, intGt2) + val actual = converter.convert(sparkAnd).get + assert(actual.equals(PredicateBuilder.and(paimonAlwaysTrue, builder.greaterThan(3, 2)))) + } + + test("V2Filter: OR with AlwaysFalse") { + val sparkAlwaysFalse = + new SparkPredicate( + "ALWAYS_FALSE", + Array.empty[org.apache.spark.sql.connector.expressions.Expression]) + val intGt2 = v2Filter("int_col > 2") + val sparkOr = new Or(sparkAlwaysFalse, intGt2) + val actual = converter.convert(sparkOr).get + assert(actual.equals(PredicateBuilder.or(paimonAlwaysFalse, builder.greaterThan(3, 2)))) + } + + test("V2Filter: performance - OR(AlwaysFalse, int_col = 1) enables file skipping") { + // Before: OR(ALWAYS_FALSE, int_col = 1) conversion fails entirely -> no pushdown -> 4 files + // After: OR(ALWAYS_FALSE, int_col = 1) converts correctly -> pushdown -> 1 file + val sparkAlwaysFalse = + new SparkPredicate( + "ALWAYS_FALSE", + Array.empty[org.apache.spark.sql.connector.expressions.Expression]) + val intEq1 = v2Filter("int_col = 1") + val sparkOr = new Or(sparkAlwaysFalse, intEq1) + val paimonPredicate = converter.convert(sparkOr).get + + val filesScanned = scanFilesWithPredicate(paimonPredicate) + // Only 1 file should be scanned (the one containing int_col = 1) + assert(filesScanned == 1, s"Expected 1 file but scanned $filesScanned files") + } + + test("V2Filter: performance - AND(AlwaysTrue, int_col > 2) enables file skipping") { + // Before: AND(ALWAYS_TRUE, int_col > 2) conversion fails entirely -> no pushdown -> 4 files + // After: AND(ALWAYS_TRUE, int_col > 2) converts correctly -> pushdown -> 1 file + val sparkAlwaysTrue = + new SparkPredicate( + "ALWAYS_TRUE", + Array.empty[org.apache.spark.sql.connector.expressions.Expression]) + val intGt2 = v2Filter("int_col > 2") + val sparkAnd = new And(sparkAlwaysTrue, intGt2) + val paimonPredicate = converter.convert(sparkAnd).get + + val filesScanned = scanFilesWithPredicate(paimonPredicate) + // Only 1 file should be scanned (the one containing int_col = 3) + assert(filesScanned == 1, s"Expected 1 file but scanned $filesScanned files") + } + + test("V2Filter: performance - pure AlwaysFalse achieves zero I/O") { + // AlwaysFalse alone should skip all data files -> 0 files scanned + val sparkAlwaysFalse = + new SparkPredicate( + "ALWAYS_FALSE", + Array.empty[org.apache.spark.sql.connector.expressions.Expression]) + val paimonPredicate = converter.convert(sparkAlwaysFalse).get + + val filesScanned = scanFilesWithPredicate(paimonPredicate) + assert(filesScanned == 0, s"Expected 0 files but scanned $filesScanned files") + } + + test("V2Filter: performance - AND(AlwaysFalse, int_col = 1) achieves zero I/O") { + // AND with AlwaysFalse should result in zero files no matter what the other predicate is + val sparkAlwaysFalse = + new SparkPredicate( + "ALWAYS_FALSE", + Array.empty[org.apache.spark.sql.connector.expressions.Expression]) + val intEq1 = v2Filter("int_col = 1") + val sparkAnd = new And(sparkAlwaysFalse, intEq1) + val paimonPredicate = converter.convert(sparkAnd).get + + val filesScanned = scanFilesWithPredicate(paimonPredicate) + assert(filesScanned == 0, s"Expected 0 files but scanned $filesScanned files") + } + + test("V2Filter: performance - NOT(AlwaysTrue) achieves zero I/O") { + // NOT(ALWAYS_TRUE) = ALWAYS_FALSE -> 0 files scanned + val sparkAlwaysTrue = + new SparkPredicate( + "ALWAYS_TRUE", + Array.empty[org.apache.spark.sql.connector.expressions.Expression]) + val sparkNot = new Not(sparkAlwaysTrue) + val paimonPredicate = converter.convert(sparkNot).get + + val filesScanned = scanFilesWithPredicate(paimonPredicate) + assert(filesScanned == 0, s"Expected 0 files but scanned $filesScanned files") + } + + test("V2Filter: performance - OR(AlwaysTrue, int_col = 1) scans all files") { + // OR with AlwaysTrue means everything matches -> all files scanned + val sparkAlwaysTrue = + new SparkPredicate( + "ALWAYS_TRUE", + Array.empty[org.apache.spark.sql.connector.expressions.Expression]) + val intEq1 = v2Filter("int_col = 1") + val sparkOr = new Or(sparkAlwaysTrue, intEq1) + val paimonPredicate = converter.convert(sparkOr).get + + val filesScanned = scanFilesWithPredicate(paimonPredicate) + // All 4 files should be scanned because AlwaysTrue in OR matches everything + assert(filesScanned == 4, s"Expected 4 files but scanned $filesScanned files") + } + + private def v2Filter(str: String, tableName: String = "test_tbl"): SparkPredicate = { val condition = sql(s"SELECT * FROM $tableName WHERE $str").queryExecution.optimizedPlan .collectFirst { case f: Filter => f } .get @@ -369,4 +527,19 @@ abstract class SparkV2FilterConverterTestBase extends PaimonSparkTestBase { .map(_.asInstanceOf[DataSplit].dataFiles().size()) .sum } + + /** Use Paimon ReadBuilder to count how many data files would be scanned for a given predicate. */ + private def scanFilesWithPredicate( + predicate: org.apache.paimon.predicate.Predicate, + tableName: String = "test_tbl"): Int = { + loadTable(tableName) + .newReadBuilder() + .withFilter(predicate) + .newScan() + .plan() + .splits() + .asScala + .map(_.asInstanceOf[DataSplit].dataFiles().size()) + .sum + } }