diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/PredicateUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/PredicateUtils.java new file mode 100644 index 000000000000..49964185f4be --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/utils/PredicateUtils.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.predicate.Between; +import org.apache.paimon.predicate.CompareUtils; +import org.apache.paimon.predicate.CompoundPredicate; +import org.apache.paimon.predicate.GreaterOrEqual; +import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.LessOrEqual; +import org.apache.paimon.predicate.Or; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.predicate.Transform; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +/** Utils for {@link Predicate}. */ +public class PredicateUtils { + + /** + * Try to rewrite possible {@code GREATER_OR_EQUAL} and {@code LESS_OR_EQUAL} predicates to + * {@code BETWEEN} leaf predicate. This method will recursively try to rewrite the children + * predicates of an {@code AND}, for example: {@code OR(a >= 1, AND(b >= 1, b <= 2))} will be + * rewritten to {@code OR(a >= 1, BETWEEN(b, 1, 2))}. + */ + public static Predicate tryRewriteBetweenPredicate(@Nullable Predicate filter) { + if (filter == null || filter instanceof LeafPredicate) { + return filter; + } + CompoundPredicate compoundPredicate = (CompoundPredicate) filter; + boolean isOr = compoundPredicate.function() instanceof Or; + + Map> leavesByTransform = new HashMap<>(); + List resultChildren = new ArrayList<>(); + // Flatten the children predicates of an AND + // For example, for AND(b >= 1, AND(a >= 1, b <= 2)), we will get [a >= 1, b >= 1, b <= 2] + // After flattening, all children will be either LeafPredicate or ORPredicate + List effectiveChildren = + isOr ? compoundPredicate.children() : flattenChildren(compoundPredicate.children()); + for (Predicate child : effectiveChildren) { + if (child instanceof LeafPredicate) { + leavesByTransform + .computeIfAbsent( + ((LeafPredicate) child).transform(), k -> new ArrayList<>()) + .add((LeafPredicate) child); + } else { + resultChildren.add(tryRewriteBetweenPredicate(child)); + } + } + + for (Map.Entry> leaves : leavesByTransform.entrySet()) { + if (isOr) { + resultChildren.addAll(leaves.getValue()); + continue; + } + + Transform transform = leaves.getKey(); + + // for children predicates of an AND, we only need to reserve + // the largest GREATER_OR_EQUAL and the smallest LESS_OR_EQUAL + // For example, for AND(a >= 1, a >= 2, a <= 3, a <= 4), we only need to reserve a >= 2 + // and a <= 3 + LeafPredicate gtePredicate = null; + LeafPredicate ltePredicate = null; + for (LeafPredicate leaf : leaves.getValue()) { + if (leaf.function() instanceof GreaterOrEqual) { + if (gtePredicate == null + || CompareUtils.compareLiteral( + transform.outputType(), + leaf.literals().get(0), + gtePredicate.literals().get(0)) + > 0) { + gtePredicate = leaf; + } + } else if (leaf.function() instanceof LessOrEqual) { + if (ltePredicate == null + || CompareUtils.compareLiteral( + transform.outputType(), + leaf.literals().get(0), + ltePredicate.literals().get(0)) + < 0) { + ltePredicate = leaf; + } + } else { + resultChildren.add(leaf); + } + } + + boolean converted = false; + if (gtePredicate != null && ltePredicate != null) { + Optional betweenLeaf = convertToBetweenLeaf(gtePredicate, ltePredicate); + if (betweenLeaf.isPresent()) { + converted = true; + resultChildren.add(betweenLeaf.get()); + } + } + if (!converted) { + if (gtePredicate != null) { + resultChildren.add(gtePredicate); + } + if (ltePredicate != null) { + resultChildren.add(ltePredicate); + } + } + } + + return isOr ? PredicateBuilder.or(resultChildren) : PredicateBuilder.and(resultChildren); + } + + private static List flattenChildren(List children) { + List result = new ArrayList<>(); + for (Predicate child : children) { + if (child instanceof LeafPredicate) { + result.add(child); + } else { + CompoundPredicate compoundPredicate = (CompoundPredicate) child; + if (compoundPredicate.function() instanceof Or) { + result.add(child); + } else { + result.addAll(flattenChildren(compoundPredicate.children())); + } + } + } + return result; + } + + /** + * Convert child predicates of an AND to a BETWEEN leaf predicate. Return `Optional.empty()` if + * not possible. + */ + public static Optional convertToBetweenLeaf( + Predicate leftChild, Predicate rightChild) { + if (leftChild instanceof LeafPredicate && rightChild instanceof LeafPredicate) { + LeafPredicate left = (LeafPredicate) leftChild; + LeafPredicate right = (LeafPredicate) rightChild; + if (Objects.equals(left.transform(), right.transform())) { + if (left.function() instanceof GreaterOrEqual + && right.function() instanceof LessOrEqual) { + return createBetweenLeaf(left, right); + } else if (left.function() instanceof LessOrEqual + && right.function() instanceof GreaterOrEqual) { + return createBetweenLeaf(right, left); + } + } + } + + return Optional.empty(); + } + + private static Optional createBetweenLeaf( + LeafPredicate gtePredicate, LeafPredicate ltePredicate) { + // gtePredicate and ltePredicate should have the same transform + Transform transform = gtePredicate.transform(); + Object lbLiteral = gtePredicate.literals().get(0); + Object ubLiteral = ltePredicate.literals().get(0); + + if (CompareUtils.compareLiteral(transform.outputType(), lbLiteral, ubLiteral) > 0) { + return Optional.empty(); + } + + return Optional.of( + new LeafPredicate( + transform, Between.INSTANCE, Arrays.asList(lbLiteral, ubLiteral))); + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/utils/PredicateUtilsTest.java b/paimon-common/src/test/java/org/apache/paimon/utils/PredicateUtilsTest.java new file mode 100644 index 000000000000..ca299500ba49 --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/utils/PredicateUtilsTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.predicate.And; +import org.apache.paimon.predicate.Between; +import org.apache.paimon.predicate.CompoundPredicate; +import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.Or; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link PredicateUtils}. */ +public class PredicateUtilsTest { + + @Test + public void testTryRewriteBetweenPredicateBasic() { + // Test basic case: AND(a>=1, a<=10, a is not null) should be rewritten to BETWEEN + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + Predicate gte = builder.greaterOrEqual(0, 1); + Predicate lte = builder.lessOrEqual(0, 10); + Predicate isNotNull = builder.isNotNull(0); + + Predicate andPredicate = PredicateBuilder.and(gte, isNotNull, lte); + Predicate result = PredicateUtils.tryRewriteBetweenPredicate(andPredicate); + + assertThat(result).isInstanceOf(CompoundPredicate.class); + CompoundPredicate compoundResult = (CompoundPredicate) result; + assertThat(compoundResult.function()).isInstanceOf(And.class); + assertThat(compoundResult.children()).hasSize(2); + + Predicate betweenChild = compoundResult.children().get(1); + assertThat(betweenChild).isInstanceOf(LeafPredicate.class); + LeafPredicate betweenLeaf = (LeafPredicate) betweenChild; + assertThat(betweenLeaf.function()).isInstanceOf(Between.class); + assertThat(betweenLeaf.literals()).containsExactly(1, 10); + + Predicate notNullChild = compoundResult.children().get(0); + assertThat(notNullChild).isInstanceOf(LeafPredicate.class); + assertThat(notNullChild.toString()).contains("IsNotNull"); + } + + @Test + public void testTryRewriteBetweenPredicateRecursive() { + // Test recursive case: OR(b>=1, AND(a>=1, a<=10, a is not null)) should rewrite nested AND + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType(), new IntType())); + + Predicate gteB = builder.greaterOrEqual(1, 1); + Predicate gteA = builder.greaterOrEqual(0, 1); + Predicate lteA = builder.lessOrEqual(0, 10); + Predicate isNotNullA = builder.isNotNull(0); + Predicate andPredicate = PredicateBuilder.and(gteA, isNotNullA, lteA); + Predicate orPredicate = PredicateBuilder.or(gteB, andPredicate); + + Predicate result = PredicateUtils.tryRewriteBetweenPredicate(orPredicate); + + assertThat(result).isInstanceOf(CompoundPredicate.class); + CompoundPredicate compoundResult = (CompoundPredicate) result; + assertThat(compoundResult.function()).isInstanceOf(Or.class); + assertThat(compoundResult.children()).hasSize(2); + + Predicate secondChild = compoundResult.children().get(1); + assertThat(secondChild).isInstanceOf(LeafPredicate.class); + assertThat(secondChild.toString()).contains("GreaterOrEqual"); + + Predicate firstChild = compoundResult.children().get(0); + assertThat(firstChild).isInstanceOf(CompoundPredicate.class); + CompoundPredicate innerAnd = (CompoundPredicate) firstChild; + assertThat(innerAnd.function()).isInstanceOf(And.class); + assertThat(innerAnd.children()).hasSize(2); + + Predicate betweenCandidate = innerAnd.children().get(1); + assertThat(betweenCandidate).isInstanceOf(LeafPredicate.class); + LeafPredicate betweenLeaf = (LeafPredicate) betweenCandidate; + assertThat(betweenLeaf.function()).isInstanceOf(Between.class); + assertThat(betweenLeaf.literals()).containsExactly(1, 10); + } + + /** + * Test this complicated scenario. + * + *
{@code
+     *             AND
+     *           /  |  \
+     *         OR  AND a>=1
+     *        /|   || \
+     *       / |  / |  \
+     * a>=1 a<=2 OR AND a>=2
+     *          / |  | \
+     *         /  |  |  \
+     *     a>=1 b<2 b>=1 a<=10
+     *
+     * }
+ */ + @Test + public void testAnExtremeComplicatedPredicate() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType(), new IntType())); + Predicate l3p1 = builder.greaterOrEqual(0, 1); + Predicate l3p2 = builder.lessThan(1, 2); + Predicate l3p3 = builder.greaterOrEqual(1, 1); + Predicate l3p4 = builder.lessOrEqual(0, 10); + Predicate l2p1 = builder.greaterOrEqual(0, 1); + Predicate l2p2 = builder.lessOrEqual(1, 2); + Predicate l2p3 = PredicateBuilder.or(l3p1, l3p2); + Predicate l2p4 = PredicateBuilder.and(l3p3, l3p4); + Predicate l2p5 = builder.greaterOrEqual(0, 2); + Predicate l1p1 = PredicateBuilder.or(l2p1, l2p2); + Predicate l1p2 = PredicateBuilder.and(l2p3, l2p4, l2p5); + Predicate l1p3 = builder.greaterOrEqual(0, 1); + Predicate root = PredicateBuilder.and(l1p1, l1p2, l1p3); + + Predicate result = PredicateUtils.tryRewriteBetweenPredicate(root); + assertThat(result).isInstanceOf(CompoundPredicate.class); + + CompoundPredicate compoundResult = (CompoundPredicate) result; + assertThat(compoundResult.function()).isInstanceOf(And.class); + + // directly check the toString + String resultString = compoundResult.toString(); + assertThat(resultString).contains("Between(f0, [2, 10])"); + } + + @Test + public void testTryRewriteBetweenPredicateIntersection() { + // Test intersection case: AND(a>=1, a<=10, a>=2, a<=7) should use intersection (2, 7) + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + + Predicate gte1 = builder.greaterOrEqual(0, 1); + Predicate lte10 = builder.lessOrEqual(0, 10); + Predicate gte2 = builder.greaterOrEqual(0, 2); + Predicate lte7 = builder.lessOrEqual(0, 7); + + Predicate predicate = + PredicateBuilder.and( + PredicateBuilder.and(gte1, lte10), PredicateBuilder.and(gte2, lte7)); + Predicate result = PredicateUtils.tryRewriteBetweenPredicate(predicate); + + assertThat(result).isInstanceOf(LeafPredicate.class); + LeafPredicate betweenLeaf = (LeafPredicate) result; + assertThat(betweenLeaf.function()).isInstanceOf(Between.class); + assertThat(betweenLeaf.literals()).containsExactly(2, 7); + } + + @Test + public void testTryRewriteBetweenPredicateDifferentColumns() { + // Test different columns case: AND(a>=1, b<=10) should not be rewritten + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType(), new IntType())); + + Predicate gteA = builder.greaterOrEqual(0, 1); + Predicate lteB = builder.lessOrEqual(1, 10); + Predicate predicate = PredicateBuilder.and(gteA, lteB); + + Predicate result = PredicateUtils.tryRewriteBetweenPredicate(predicate); + + assertThat(result).isInstanceOf(CompoundPredicate.class); + CompoundPredicate compoundResult = (CompoundPredicate) result; + assertThat(compoundResult.function()).isInstanceOf(And.class); + assertThat(compoundResult.children()).hasSize(2); + assertThat(compoundResult.children().stream().map(Predicate::toString)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList("GreaterOrEqual(f0, 1)", "LessOrEqual(f1, 10)")); + } + + @Test + public void testTryRewriteBetweenPredicateInvalidRange() { + // Test invalid range case: AND(a>=10, a<=1) should not be rewritten to BETWEEN + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + + Predicate gte = builder.greaterOrEqual(0, 10); + Predicate lte = builder.lessOrEqual(0, 1); + Predicate predicate = PredicateBuilder.and(gte, lte); + + Predicate result = PredicateUtils.tryRewriteBetweenPredicate(predicate); + + assertThat(result).isInstanceOf(CompoundPredicate.class); + CompoundPredicate compoundResult = (CompoundPredicate) result; + assertThat(compoundResult.function()).isInstanceOf(And.class); + assertThat(compoundResult.children()).hasSize(2); + assertThat(compoundResult.children().stream().map(Predicate::toString)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList("GreaterOrEqual(f0, 10)", "LessOrEqual(f0, 1)")); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java index f4f529dc4c85..f8cf39d1750d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java @@ -27,6 +27,7 @@ import org.apache.paimon.table.InnerTable; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Filter; +import org.apache.paimon.utils.PredicateUtils; import org.apache.paimon.utils.Range; import javax.annotation.Nullable; @@ -90,6 +91,7 @@ public ReadBuilder withFilter(Predicate filter) { } else { this.filter = PredicateBuilder.and(this.filter, filter); } + this.filter = PredicateUtils.tryRewriteBetweenPredicate(this.filter); return this; } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTestBase.scala index 334baa0b2fe0..37f8abb95d53 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTestBase.scala @@ -65,6 +65,12 @@ abstract class RowIdPushDownTestBase extends PaimonSparkTestBase { sql("SELECT * FROM t WHERE _ROW_ID IN (6, 7)"), Seq() ) + if (gteqSpark3_3) { + checkAnswer( + sql("SELECT * FROM t WHERE _ROW_ID BETWEEN 0 AND 2"), + Seq(Row(0, 0, "0"), Row(1, 1, "1"), Row(2, 2, "2")) + ) + } // 2.CompoundPredicate checkAnswer(