From 45c55063e19cc37764bcfb6de2d658e79a16ca96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Wed, 11 Feb 2026 16:55:01 +0800 Subject: [PATCH 1/7] [spark] supports converting some SparkPredicate to Paimon between LeafPredicate --- .../paimon/spark/SparkV2FilterConverter.scala | 47 ++++++++++++++++++- .../spark/sql/RowIdPushDownTestBase.scala | 4 ++ .../sql/SparkV2FilterConverterTestBase.scala | 35 +++++++++++++- 3 files changed, 83 insertions(+), 3 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala index 36e188bde76a..d0d4deabcece 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala @@ -18,7 +18,7 @@ package org.apache.paimon.spark -import org.apache.paimon.predicate.{Predicate, PredicateBuilder, Transform} +import org.apache.paimon.predicate.{GreaterOrEqual, LeafPredicate, LessOrEqual, Predicate, PredicateBuilder, Transform} import org.apache.paimon.spark.util.SparkExpressionConverter.{toPaimonLiteral, toPaimonTransform} import org.apache.paimon.types.RowType @@ -26,6 +26,8 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.connector.expressions.{Expression, Literal} import org.apache.spark.sql.connector.expressions.filter.{And, Not, Or, Predicate => SparkPredicate} +import java.util.Objects + import scala.collection.JavaConverters._ /** Conversion from [[SparkPredicate]] to [[Predicate]]. */ @@ -125,7 +127,12 @@ case class SparkV2FilterConverter(rowType: RowType) extends Logging { case AND => val and = sparkPredicate.asInstanceOf[And] - PredicateBuilder.and(convert(and.left), convert(and.right())) + val leftPredicate = convert(and.left) + val rightPredicate = convert(and.right()) + convertToBetweenFunction(leftPredicate, rightPredicate) match { + case Some(predicate) => predicate + case _ => PredicateBuilder.and(leftPredicate, rightPredicate) + } case OR => val or = sparkPredicate.asInstanceOf[Or] @@ -169,6 +176,42 @@ case class SparkV2FilterConverter(rowType: RowType) extends Logging { } } + private def convertToBetweenFunction( + leftPredicate: Predicate, + rightPredicate: Predicate): Option[Predicate] = { + def toBetweenLeafPredicate( + transform: Transform, + lowerBoundInclusive: Object, + upperBoundInclusive: Object): Predicate = { + builder.between(transform, lowerBoundInclusive, upperBoundInclusive) + } + + (leftPredicate, rightPredicate) match { + case (left: LeafPredicate, right: LeafPredicate) => + // left and right should have the same transform + if (!Objects.equals(left.transform(), right.transform())) { + return None + } + (left.function(), right.function()) match { + case (_: GreaterOrEqual, _: LessOrEqual) => + Some( + toBetweenLeafPredicate( + left.transform(), + left.literals().get(0), + right.literals().get(0))) + case (_: LessOrEqual, _: GreaterOrEqual) => + Some( + toBetweenLeafPredicate( + left.transform(), + right.literals().get(0), + left.literals().get(0))) + case _ => None + } + case _ => + None + } + } + private object UnaryPredicate { def unapply(sparkPredicate: SparkPredicate): Option[Transform] = { sparkPredicate.children() match { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTestBase.scala index 334baa0b2fe0..ebbab2410a3d 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTestBase.scala @@ -65,6 +65,10 @@ abstract class RowIdPushDownTestBase extends PaimonSparkTestBase { sql("SELECT * FROM t WHERE _ROW_ID IN (6, 7)"), Seq() ) + checkAnswer( + sql("SELECT * FROM t WHERE _ROW_ID BETWEEN 0 AND 2"), + Seq(Row(0, 0, "0"), Row(1, 1, "1"), Row(2, 2, "2")) + ) // 2.CompoundPredicate checkAnswer( diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala index 2d90777fd78c..ad179bf7531f 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala @@ -19,7 +19,7 @@ package org.apache.paimon.spark.sql import org.apache.paimon.data.{BinaryString, Decimal, Timestamp} -import org.apache.paimon.predicate.PredicateBuilder +import org.apache.paimon.predicate.{Between, LeafPredicate, PredicateBuilder} import org.apache.paimon.spark.{PaimonSparkTestBase, SparkV2FilterConverter} import org.apache.paimon.spark.util.shim.TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType import org.apache.paimon.table.source.DataSplit @@ -295,6 +295,39 @@ abstract class SparkV2FilterConverterTestBase extends PaimonSparkTestBase { assert(scanFilesCount(filter) == 3) } + test("V2Filter: Between") { + // 1. test basic between + val filter = "string_col BETWEEN 'a' AND 'r'" + val actual = converter.convert(v2Filter(filter)).get + assert( + actual.equals(builder.between(0, BinaryString.fromString("a"), BinaryString.fromString("r")))) + checkAnswer( + sql(s"SELECT string_col from test_tbl WHERE $filter ORDER BY string_col"), + Seq(Row("hello"), Row("hi"), Row("paimon")) + ) + + // 2. >= and <= on same transform should also be converted to between + val filter1 = "CONCAT(string_col, '_suffix') >= 'a' AND CONCAT(string_col, '_suffix') <= 'r'" + val actual1 = converter.convert(v2Filter(filter1)).get + assert(actual1.isInstanceOf[LeafPredicate]) + val function = actual1.asInstanceOf[LeafPredicate].function + assert(function.isInstanceOf[Between]) + checkAnswer( + sql(s"SELECT string_col from test_tbl WHERE $filter1 ORDER BY string_col"), + Seq(Row("hello"), Row("hi"), Row("paimon")) + ) + + // 3. >= and <= on different transform should not be converted to between + val filter2 = "CONCAT(string_col, '_suffix1') >= 'a' AND CONCAT(string_col, '_suffix2') <= 'r'" + val actual2 = converter.convert(v2Filter(filter2)).get + assert(!actual2.isInstanceOf[LeafPredicate]) + + // 4. >= and <= on different columns should not be converted to between + val filter3 = "string_col >= 'a' AND int_col <= 2" + val actual3 = converter.convert(v2Filter(filter3)).get + assert(!actual3.isInstanceOf[LeafPredicate]) + } + test("V2Filter: And") { val filter = "int_col > 1 AND int_col < 3" val actual = converter.convert(v2Filter(filter)).get From 4bbe96d8d34425fd6c3274b52ca66612692aef57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Wed, 11 Feb 2026 17:06:58 +0800 Subject: [PATCH 2/7] minor refactor --- .../paimon/spark/SparkV2FilterConverter.scala | 24 ++++++++----------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala index d0d4deabcece..da52e8106fb8 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala @@ -179,11 +179,15 @@ case class SparkV2FilterConverter(rowType: RowType) extends Logging { private def convertToBetweenFunction( leftPredicate: Predicate, rightPredicate: Predicate): Option[Predicate] = { + def toBetweenLeafPredicate( - transform: Transform, - lowerBoundInclusive: Object, - upperBoundInclusive: Object): Predicate = { - builder.between(transform, lowerBoundInclusive, upperBoundInclusive) + gtePredicate: LeafPredicate, + ltePredicate: LeafPredicate): Predicate = { + // gtePredicate and ltePredicate should have the same transform + builder.between( + gtePredicate.transform(), + gtePredicate.literals().get(0), + ltePredicate.literals().get(0)) } (leftPredicate, rightPredicate) match { @@ -194,17 +198,9 @@ case class SparkV2FilterConverter(rowType: RowType) extends Logging { } (left.function(), right.function()) match { case (_: GreaterOrEqual, _: LessOrEqual) => - Some( - toBetweenLeafPredicate( - left.transform(), - left.literals().get(0), - right.literals().get(0))) + Some(toBetweenLeafPredicate(left, right)) case (_: LessOrEqual, _: GreaterOrEqual) => - Some( - toBetweenLeafPredicate( - left.transform(), - right.literals().get(0), - left.literals().get(0))) + Some(toBetweenLeafPredicate(right, left)) case _ => None } case _ => From 53183da8c807552f708a91218160c6b319424a28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Wed, 11 Feb 2026 22:49:49 +0800 Subject: [PATCH 3/7] fix test --- .../paimon/spark/PaimonBaseScanBuilder.scala | 45 ++++++++++++- .../paimon/spark/SparkV2FilterConverter.scala | 37 +---------- .../paimon/spark/util/PredicateUtils.scala | 64 +++++++++++++++++++ 3 files changed, 109 insertions(+), 37 deletions(-) create mode 100644 paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/PredicateUtils.scala diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala index 47723171e4d6..35ba6c40e7d1 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala @@ -21,7 +21,8 @@ package org.apache.paimon.spark import org.apache.paimon.CoreOptions import org.apache.paimon.partition.PartitionPredicate import org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicatesAndDataPredicates -import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate, TopN, VectorSearch} +import org.apache.paimon.predicate.{GreaterOrEqual, LeafPredicate, LessOrEqual, PartitionPredicateVisitor, Predicate, TopN, VectorSearch} +import org.apache.paimon.spark.util.PredicateUtils import org.apache.paimon.table.{SpecialFields, Table} import org.apache.paimon.types.RowType @@ -101,7 +102,7 @@ abstract class PaimonBaseScanBuilder this.pushedPartitionFilters = Array(pair.getLeft.get()) } if (pushableDataFilters.nonEmpty) { - this.pushedDataFilters = pushableDataFilters.toArray + this.pushedDataFilters = rewriteDataFiltersToBetween(pushableDataFilters.toArray) } if (postScan.nonEmpty) { this.hasPostScanPredicates = true @@ -109,6 +110,46 @@ abstract class PaimonBaseScanBuilder postScan.toArray } + /** Rewrite pushable data filters to use Between predicate when possible. */ + private def rewriteDataFiltersToBetween( + pushableDataFilters: Array[Predicate]): Array[Predicate] = { + val leafPredicates = + pushableDataFilters.filter(_.isInstanceOf[LeafPredicate]).map(_.asInstanceOf[LeafPredicate]) + val nonLeafPredicates = pushableDataFilters.filterNot(_.isInstanceOf[LeafPredicate]) + + // Group LeafPredicates by transform + val groupedByTransform = leafPredicates.groupBy(_.transform()) + + val rewrittenPredicates = scala.collection.mutable.ArrayBuffer.empty[Predicate] + rewrittenPredicates ++= nonLeafPredicates + + groupedByTransform.values.foreach { + predicates => + // Filter predicates with LessOrEqual or GreaterOrEqual functions + val candidates = predicates.filter( + p => p.function().isInstanceOf[LessOrEqual] || p.function().isInstanceOf[GreaterOrEqual]) + // Add other predicates directly to result + val nonCandidates = predicates.filterNot( + p => p.function().isInstanceOf[LessOrEqual] || p.function().isInstanceOf[GreaterOrEqual]) + rewrittenPredicates ++= nonCandidates + + // Now we only support the simplest case: one pair of LessOrEqual and GreaterOrEqual + if (candidates.length == 2) { + // Try to convert to Between + PredicateUtils.convertToBetweenFunction(candidates(0), candidates(1)) match { + case Some(betweenPredicate) => + rewrittenPredicates += betweenPredicate + case None => + rewrittenPredicates ++= candidates + } + } else { + rewrittenPredicates ++= candidates + } + } + + rewrittenPredicates.toArray + } + override def pushedPredicates: Array[SparkPredicate] = { pushedSparkPredicates } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala index da52e8106fb8..53a054c852c8 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala @@ -18,7 +18,8 @@ package org.apache.paimon.spark -import org.apache.paimon.predicate.{GreaterOrEqual, LeafPredicate, LessOrEqual, Predicate, PredicateBuilder, Transform} +import org.apache.paimon.predicate.{Predicate, PredicateBuilder, Transform} +import org.apache.paimon.spark.util.PredicateUtils.convertToBetweenFunction import org.apache.paimon.spark.util.SparkExpressionConverter.{toPaimonLiteral, toPaimonTransform} import org.apache.paimon.types.RowType @@ -26,8 +27,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.connector.expressions.{Expression, Literal} import org.apache.spark.sql.connector.expressions.filter.{And, Not, Or, Predicate => SparkPredicate} -import java.util.Objects - import scala.collection.JavaConverters._ /** Conversion from [[SparkPredicate]] to [[Predicate]]. */ @@ -176,38 +175,6 @@ case class SparkV2FilterConverter(rowType: RowType) extends Logging { } } - private def convertToBetweenFunction( - leftPredicate: Predicate, - rightPredicate: Predicate): Option[Predicate] = { - - def toBetweenLeafPredicate( - gtePredicate: LeafPredicate, - ltePredicate: LeafPredicate): Predicate = { - // gtePredicate and ltePredicate should have the same transform - builder.between( - gtePredicate.transform(), - gtePredicate.literals().get(0), - ltePredicate.literals().get(0)) - } - - (leftPredicate, rightPredicate) match { - case (left: LeafPredicate, right: LeafPredicate) => - // left and right should have the same transform - if (!Objects.equals(left.transform(), right.transform())) { - return None - } - (left.function(), right.function()) match { - case (_: GreaterOrEqual, _: LessOrEqual) => - Some(toBetweenLeafPredicate(left, right)) - case (_: LessOrEqual, _: GreaterOrEqual) => - Some(toBetweenLeafPredicate(right, left)) - case _ => None - } - case _ => - None - } - } - private object UnaryPredicate { def unapply(sparkPredicate: SparkPredicate): Option[Transform] = { sparkPredicate.children() match { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/PredicateUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/PredicateUtils.scala new file mode 100644 index 000000000000..24b45065b854 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/PredicateUtils.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.util + +import org.apache.paimon.predicate.{Between, CompareUtils, GreaterOrEqual, LeafPredicate, LessOrEqual, Predicate} + +import java.util +import java.util.Objects + +object PredicateUtils { + + /** Try to convert AND's children predicates to BETWEEN leaf predicate, return None if failed. */ + def convertToBetweenFunction( + leftPredicate: Predicate, + rightPredicate: Predicate): Option[Predicate] = { + + def toBetweenLeafPredicate( + gtePredicate: LeafPredicate, + ltePredicate: LeafPredicate): Option[Predicate] = { + // gtePredicate and ltePredicate should have the same transform + val transform = gtePredicate.transform() + val literalLb = gtePredicate.literals().get(0) + val literalUb = ltePredicate.literals().get(0) + if (CompareUtils.compareLiteral(transform.outputType(), literalLb, literalUb) > 0) { + None + } else { + Some( + new LeafPredicate(transform, Between.INSTANCE, util.Arrays.asList(literalLb, literalUb))) + } + } + + (leftPredicate, rightPredicate) match { + case (left: LeafPredicate, right: LeafPredicate) => + if (!Objects.equals(left.transform(), right.transform())) { + return None + } + (left.function(), right.function()) match { + case (_: GreaterOrEqual, _: LessOrEqual) => + toBetweenLeafPredicate(left, right) + case (_: LessOrEqual, _: GreaterOrEqual) => + toBetweenLeafPredicate(right, left) + case _ => None + } + case _ => + None + } + } +} From 21685766ca691ceed15aa9253744bc41af3cc813 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Thu, 12 Feb 2026 10:53:09 +0800 Subject: [PATCH 4/7] fix test --- .../spark/sql/RowIdPushDownTestBase.scala | 10 +++-- .../sql/SparkV2FilterConverterTestBase.scala | 42 ++++++++++--------- 2 files changed, 29 insertions(+), 23 deletions(-) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTestBase.scala index ebbab2410a3d..37f8abb95d53 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTestBase.scala @@ -65,10 +65,12 @@ abstract class RowIdPushDownTestBase extends PaimonSparkTestBase { sql("SELECT * FROM t WHERE _ROW_ID IN (6, 7)"), Seq() ) - checkAnswer( - sql("SELECT * FROM t WHERE _ROW_ID BETWEEN 0 AND 2"), - Seq(Row(0, 0, "0"), Row(1, 1, "1"), Row(2, 2, "2")) - ) + if (gteqSpark3_3) { + checkAnswer( + sql("SELECT * FROM t WHERE _ROW_ID BETWEEN 0 AND 2"), + Seq(Row(0, 0, "0"), Row(1, 1, "1"), Row(2, 2, "2")) + ) + } // 2.CompoundPredicate checkAnswer( diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala index ad179bf7531f..47cd256a6a8b 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala @@ -306,26 +306,30 @@ abstract class SparkV2FilterConverterTestBase extends PaimonSparkTestBase { Seq(Row("hello"), Row("hi"), Row("paimon")) ) - // 2. >= and <= on same transform should also be converted to between - val filter1 = "CONCAT(string_col, '_suffix') >= 'a' AND CONCAT(string_col, '_suffix') <= 'r'" + // 2. >= and <= on different columns should not be converted to between + val filter1 = "string_col >= 'a' AND int_col <= 2" val actual1 = converter.convert(v2Filter(filter1)).get - assert(actual1.isInstanceOf[LeafPredicate]) - val function = actual1.asInstanceOf[LeafPredicate].function - assert(function.isInstanceOf[Between]) - checkAnswer( - sql(s"SELECT string_col from test_tbl WHERE $filter1 ORDER BY string_col"), - Seq(Row("hello"), Row("hi"), Row("paimon")) - ) - - // 3. >= and <= on different transform should not be converted to between - val filter2 = "CONCAT(string_col, '_suffix1') >= 'a' AND CONCAT(string_col, '_suffix2') <= 'r'" - val actual2 = converter.convert(v2Filter(filter2)).get - assert(!actual2.isInstanceOf[LeafPredicate]) - - // 4. >= and <= on different columns should not be converted to between - val filter3 = "string_col >= 'a' AND int_col <= 2" - val actual3 = converter.convert(v2Filter(filter3)).get - assert(!actual3.isInstanceOf[LeafPredicate]) + assert(!actual1.isInstanceOf[LeafPredicate]) + + // spark 3.4+ support converting CONCAT with fieldRef to predicates + if (gteqSpark3_4) { + // 3. >= and <= on same transform should also be converted to between + val filter2 = "CONCAT(string_col, '_suffix') >= 'a' AND CONCAT(string_col, '_suffix') <= 'r'" + val actual2 = converter.convert(v2Filter(filter2)).get + assert(actual1.isInstanceOf[LeafPredicate]) + val function = actual2.asInstanceOf[LeafPredicate].function + assert(function.isInstanceOf[Between]) + checkAnswer( + sql(s"SELECT string_col from test_tbl WHERE $filter2 ORDER BY string_col"), + Seq(Row("hello"), Row("hi"), Row("paimon")) + ) + + // 4. >= and <= on different transform should not be converted to between + val filter3 = + "CONCAT(string_col, '_suffix1') >= 'a' AND CONCAT(string_col, '_suffix2') <= 'r'" + val actual3 = converter.convert(v2Filter(filter3)).get + assert(!actual3.isInstanceOf[LeafPredicate]) + } } test("V2Filter: And") { From 51722944781a23aa933bd2b2766566ae6681339b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Thu, 12 Feb 2026 11:39:06 +0800 Subject: [PATCH 5/7] fix test --- .../paimon/spark/sql/SparkV2FilterConverterTestBase.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala index 47cd256a6a8b..a8b68ea3d50e 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala @@ -316,7 +316,7 @@ abstract class SparkV2FilterConverterTestBase extends PaimonSparkTestBase { // 3. >= and <= on same transform should also be converted to between val filter2 = "CONCAT(string_col, '_suffix') >= 'a' AND CONCAT(string_col, '_suffix') <= 'r'" val actual2 = converter.convert(v2Filter(filter2)).get - assert(actual1.isInstanceOf[LeafPredicate]) + assert(actual2.isInstanceOf[LeafPredicate]) val function = actual2.asInstanceOf[LeafPredicate].function assert(function.isInstanceOf[Between]) checkAnswer( From 992ef7242eb76dc9e4a5e5477f3bbf177ea6ee78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Thu, 12 Feb 2026 17:07:19 +0800 Subject: [PATCH 6/7] fix comment --- .../apache/paimon/utils/PredicateUtils.java | 188 ++++++++++++++++ .../paimon/utils/PredicateUtilsTest.java | 207 ++++++++++++++++++ .../paimon/table/source/ReadBuilderImpl.java | 2 + .../paimon/spark/PaimonBaseScanBuilder.scala | 45 +--- .../paimon/spark/SparkV2FilterConverter.scala | 8 +- .../paimon/spark/util/PredicateUtils.scala | 64 ------ .../sql/SparkV2FilterConverterTestBase.scala | 39 +--- 7 files changed, 401 insertions(+), 152 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/utils/PredicateUtils.java create mode 100644 paimon-common/src/test/java/org/apache/paimon/utils/PredicateUtilsTest.java delete mode 100644 paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/PredicateUtils.scala diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/PredicateUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/PredicateUtils.java new file mode 100644 index 000000000000..115a9a505e18 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/utils/PredicateUtils.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.predicate.Between; +import org.apache.paimon.predicate.CompareUtils; +import org.apache.paimon.predicate.CompoundPredicate; +import org.apache.paimon.predicate.GreaterOrEqual; +import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.LessOrEqual; +import org.apache.paimon.predicate.Or; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.predicate.Transform; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +/** Utils for {@link Predicate}. */ +public class PredicateUtils { + + /** + * Try to rewrite possible {@code GREATER_OR_EQUAL} and {@code LESS_OR_EQUAL} predicates to + * {@code BETWEEN} leaf predicate. This method will recursively try to rewrite the children + * predicates of an {@code AND}, for example: {@code OR(a >= 1, AND(b >= 1, b <= 2))} will be + * rewritten to {@code OR(a >= 1, BETWEEN(b, 1, 2))}. + */ + public static Predicate tryRewriteBetweenPredicate(Predicate filter) { + if (filter instanceof LeafPredicate) { + return filter; + } + CompoundPredicate compoundPredicate = (CompoundPredicate) filter; + boolean isOr = compoundPredicate.function() instanceof Or; + + Map> leavesByTransform = new HashMap<>(); + List resultChildren = new ArrayList<>(); + // Flatten the children predicates of an AND + // For example, for AND(b >= 1, AND(a >= 1, b <= 2)), we will get [a >= 1, b >= 1, b <= 2] + // After flattening, all children will be either LeafPredicate or ORPredicate + List effectiveChildren = + isOr ? compoundPredicate.children() : flattenChildren(compoundPredicate.children()); + for (Predicate child : effectiveChildren) { + if (child instanceof LeafPredicate) { + leavesByTransform + .computeIfAbsent( + ((LeafPredicate) child).transform(), k -> new ArrayList<>()) + .add((LeafPredicate) child); + } else { + resultChildren.add(tryRewriteBetweenPredicate(child)); + } + } + + for (Map.Entry> leaves : leavesByTransform.entrySet()) { + if (isOr) { + resultChildren.addAll(leaves.getValue()); + continue; + } + + Transform transform = leaves.getKey(); + + // for children predicates of an AND, we only need to reserve + // the largest GREATER_OR_EQUAL and the smallest LESS_OR_EQUAL + // For example, for AND(a >= 1, a >= 2, a <= 3, a <= 4), we only need to reserve a >= 2 + // and a <= 3 + LeafPredicate gtePredicate = null; + LeafPredicate ltePredicate = null; + for (LeafPredicate leaf : leaves.getValue()) { + if (leaf.function() instanceof GreaterOrEqual) { + if (gtePredicate == null + || CompareUtils.compareLiteral( + transform.outputType(), + leaf.literals().get(0), + gtePredicate.literals().get(0)) + > 0) { + gtePredicate = leaf; + } + } else if (leaf.function() instanceof LessOrEqual) { + if (ltePredicate == null + || CompareUtils.compareLiteral( + transform.outputType(), + leaf.literals().get(0), + ltePredicate.literals().get(0)) + < 0) { + ltePredicate = leaf; + } + } else { + resultChildren.add(leaf); + } + } + + boolean converted = false; + if (gtePredicate != null && ltePredicate != null) { + Optional betweenLeaf = convertToBetweenLeaf(gtePredicate, ltePredicate); + if (betweenLeaf.isPresent()) { + converted = true; + resultChildren.add(betweenLeaf.get()); + } + } + if (!converted) { + if (gtePredicate != null) { + resultChildren.add(gtePredicate); + } + if (ltePredicate != null) { + resultChildren.add(ltePredicate); + } + } + } + + return isOr ? PredicateBuilder.or(resultChildren) : PredicateBuilder.and(resultChildren); + } + + private static List flattenChildren(List children) { + List result = new ArrayList<>(); + for (Predicate child : children) { + if (child instanceof LeafPredicate) { + result.add(child); + } else { + CompoundPredicate compoundPredicate = (CompoundPredicate) child; + if (compoundPredicate.function() instanceof Or) { + result.add(child); + } else { + result.addAll(flattenChildren(compoundPredicate.children())); + } + } + } + return result; + } + + /** + * Convert child predicates of an AND to a BETWEEN leaf predicate. Return `Optional.empty()` if + * not possible. + */ + public static Optional convertToBetweenLeaf( + Predicate leftChild, Predicate rightChild) { + if (leftChild instanceof LeafPredicate && rightChild instanceof LeafPredicate) { + LeafPredicate left = (LeafPredicate) leftChild; + LeafPredicate right = (LeafPredicate) rightChild; + if (Objects.equals(left.transform(), right.transform())) { + if (left.function() instanceof GreaterOrEqual + && right.function() instanceof LessOrEqual) { + return createBetweenLeaf(left, right); + } else if (left.function() instanceof LessOrEqual + && right.function() instanceof GreaterOrEqual) { + return createBetweenLeaf(right, left); + } + } + } + + return Optional.empty(); + } + + private static Optional createBetweenLeaf( + LeafPredicate gtePredicate, LeafPredicate ltePredicate) { + // gtePredicate and ltePredicate should have the same transform + Transform transform = gtePredicate.transform(); + Object lbLiteral = gtePredicate.literals().get(0); + Object ubLiteral = ltePredicate.literals().get(0); + + if (CompareUtils.compareLiteral(transform.outputType(), lbLiteral, ubLiteral) > 0) { + return Optional.empty(); + } + + return Optional.of( + new LeafPredicate( + transform, Between.INSTANCE, Arrays.asList(lbLiteral, ubLiteral))); + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/utils/PredicateUtilsTest.java b/paimon-common/src/test/java/org/apache/paimon/utils/PredicateUtilsTest.java new file mode 100644 index 000000000000..ca299500ba49 --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/utils/PredicateUtilsTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.predicate.And; +import org.apache.paimon.predicate.Between; +import org.apache.paimon.predicate.CompoundPredicate; +import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.Or; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link PredicateUtils}. */ +public class PredicateUtilsTest { + + @Test + public void testTryRewriteBetweenPredicateBasic() { + // Test basic case: AND(a>=1, a<=10, a is not null) should be rewritten to BETWEEN + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + Predicate gte = builder.greaterOrEqual(0, 1); + Predicate lte = builder.lessOrEqual(0, 10); + Predicate isNotNull = builder.isNotNull(0); + + Predicate andPredicate = PredicateBuilder.and(gte, isNotNull, lte); + Predicate result = PredicateUtils.tryRewriteBetweenPredicate(andPredicate); + + assertThat(result).isInstanceOf(CompoundPredicate.class); + CompoundPredicate compoundResult = (CompoundPredicate) result; + assertThat(compoundResult.function()).isInstanceOf(And.class); + assertThat(compoundResult.children()).hasSize(2); + + Predicate betweenChild = compoundResult.children().get(1); + assertThat(betweenChild).isInstanceOf(LeafPredicate.class); + LeafPredicate betweenLeaf = (LeafPredicate) betweenChild; + assertThat(betweenLeaf.function()).isInstanceOf(Between.class); + assertThat(betweenLeaf.literals()).containsExactly(1, 10); + + Predicate notNullChild = compoundResult.children().get(0); + assertThat(notNullChild).isInstanceOf(LeafPredicate.class); + assertThat(notNullChild.toString()).contains("IsNotNull"); + } + + @Test + public void testTryRewriteBetweenPredicateRecursive() { + // Test recursive case: OR(b>=1, AND(a>=1, a<=10, a is not null)) should rewrite nested AND + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType(), new IntType())); + + Predicate gteB = builder.greaterOrEqual(1, 1); + Predicate gteA = builder.greaterOrEqual(0, 1); + Predicate lteA = builder.lessOrEqual(0, 10); + Predicate isNotNullA = builder.isNotNull(0); + Predicate andPredicate = PredicateBuilder.and(gteA, isNotNullA, lteA); + Predicate orPredicate = PredicateBuilder.or(gteB, andPredicate); + + Predicate result = PredicateUtils.tryRewriteBetweenPredicate(orPredicate); + + assertThat(result).isInstanceOf(CompoundPredicate.class); + CompoundPredicate compoundResult = (CompoundPredicate) result; + assertThat(compoundResult.function()).isInstanceOf(Or.class); + assertThat(compoundResult.children()).hasSize(2); + + Predicate secondChild = compoundResult.children().get(1); + assertThat(secondChild).isInstanceOf(LeafPredicate.class); + assertThat(secondChild.toString()).contains("GreaterOrEqual"); + + Predicate firstChild = compoundResult.children().get(0); + assertThat(firstChild).isInstanceOf(CompoundPredicate.class); + CompoundPredicate innerAnd = (CompoundPredicate) firstChild; + assertThat(innerAnd.function()).isInstanceOf(And.class); + assertThat(innerAnd.children()).hasSize(2); + + Predicate betweenCandidate = innerAnd.children().get(1); + assertThat(betweenCandidate).isInstanceOf(LeafPredicate.class); + LeafPredicate betweenLeaf = (LeafPredicate) betweenCandidate; + assertThat(betweenLeaf.function()).isInstanceOf(Between.class); + assertThat(betweenLeaf.literals()).containsExactly(1, 10); + } + + /** + * Test this complicated scenario. + * + *
{@code
+     *             AND
+     *           /  |  \
+     *         OR  AND a>=1
+     *        /|   || \
+     *       / |  / |  \
+     * a>=1 a<=2 OR AND a>=2
+     *          / |  | \
+     *         /  |  |  \
+     *     a>=1 b<2 b>=1 a<=10
+     *
+     * }
+ */ + @Test + public void testAnExtremeComplicatedPredicate() { + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType(), new IntType())); + Predicate l3p1 = builder.greaterOrEqual(0, 1); + Predicate l3p2 = builder.lessThan(1, 2); + Predicate l3p3 = builder.greaterOrEqual(1, 1); + Predicate l3p4 = builder.lessOrEqual(0, 10); + Predicate l2p1 = builder.greaterOrEqual(0, 1); + Predicate l2p2 = builder.lessOrEqual(1, 2); + Predicate l2p3 = PredicateBuilder.or(l3p1, l3p2); + Predicate l2p4 = PredicateBuilder.and(l3p3, l3p4); + Predicate l2p5 = builder.greaterOrEqual(0, 2); + Predicate l1p1 = PredicateBuilder.or(l2p1, l2p2); + Predicate l1p2 = PredicateBuilder.and(l2p3, l2p4, l2p5); + Predicate l1p3 = builder.greaterOrEqual(0, 1); + Predicate root = PredicateBuilder.and(l1p1, l1p2, l1p3); + + Predicate result = PredicateUtils.tryRewriteBetweenPredicate(root); + assertThat(result).isInstanceOf(CompoundPredicate.class); + + CompoundPredicate compoundResult = (CompoundPredicate) result; + assertThat(compoundResult.function()).isInstanceOf(And.class); + + // directly check the toString + String resultString = compoundResult.toString(); + assertThat(resultString).contains("Between(f0, [2, 10])"); + } + + @Test + public void testTryRewriteBetweenPredicateIntersection() { + // Test intersection case: AND(a>=1, a<=10, a>=2, a<=7) should use intersection (2, 7) + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + + Predicate gte1 = builder.greaterOrEqual(0, 1); + Predicate lte10 = builder.lessOrEqual(0, 10); + Predicate gte2 = builder.greaterOrEqual(0, 2); + Predicate lte7 = builder.lessOrEqual(0, 7); + + Predicate predicate = + PredicateBuilder.and( + PredicateBuilder.and(gte1, lte10), PredicateBuilder.and(gte2, lte7)); + Predicate result = PredicateUtils.tryRewriteBetweenPredicate(predicate); + + assertThat(result).isInstanceOf(LeafPredicate.class); + LeafPredicate betweenLeaf = (LeafPredicate) result; + assertThat(betweenLeaf.function()).isInstanceOf(Between.class); + assertThat(betweenLeaf.literals()).containsExactly(2, 7); + } + + @Test + public void testTryRewriteBetweenPredicateDifferentColumns() { + // Test different columns case: AND(a>=1, b<=10) should not be rewritten + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType(), new IntType())); + + Predicate gteA = builder.greaterOrEqual(0, 1); + Predicate lteB = builder.lessOrEqual(1, 10); + Predicate predicate = PredicateBuilder.and(gteA, lteB); + + Predicate result = PredicateUtils.tryRewriteBetweenPredicate(predicate); + + assertThat(result).isInstanceOf(CompoundPredicate.class); + CompoundPredicate compoundResult = (CompoundPredicate) result; + assertThat(compoundResult.function()).isInstanceOf(And.class); + assertThat(compoundResult.children()).hasSize(2); + assertThat(compoundResult.children().stream().map(Predicate::toString)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList("GreaterOrEqual(f0, 1)", "LessOrEqual(f1, 10)")); + } + + @Test + public void testTryRewriteBetweenPredicateInvalidRange() { + // Test invalid range case: AND(a>=10, a<=1) should not be rewritten to BETWEEN + PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType())); + + Predicate gte = builder.greaterOrEqual(0, 10); + Predicate lte = builder.lessOrEqual(0, 1); + Predicate predicate = PredicateBuilder.and(gte, lte); + + Predicate result = PredicateUtils.tryRewriteBetweenPredicate(predicate); + + assertThat(result).isInstanceOf(CompoundPredicate.class); + CompoundPredicate compoundResult = (CompoundPredicate) result; + assertThat(compoundResult.function()).isInstanceOf(And.class); + assertThat(compoundResult.children()).hasSize(2); + assertThat(compoundResult.children().stream().map(Predicate::toString)) + .containsExactlyInAnyOrderElementsOf( + Arrays.asList("GreaterOrEqual(f0, 10)", "LessOrEqual(f0, 1)")); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java index f4f529dc4c85..f8cf39d1750d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java @@ -27,6 +27,7 @@ import org.apache.paimon.table.InnerTable; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Filter; +import org.apache.paimon.utils.PredicateUtils; import org.apache.paimon.utils.Range; import javax.annotation.Nullable; @@ -90,6 +91,7 @@ public ReadBuilder withFilter(Predicate filter) { } else { this.filter = PredicateBuilder.and(this.filter, filter); } + this.filter = PredicateUtils.tryRewriteBetweenPredicate(this.filter); return this; } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala index 35ba6c40e7d1..47723171e4d6 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala @@ -21,8 +21,7 @@ package org.apache.paimon.spark import org.apache.paimon.CoreOptions import org.apache.paimon.partition.PartitionPredicate import org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicatesAndDataPredicates -import org.apache.paimon.predicate.{GreaterOrEqual, LeafPredicate, LessOrEqual, PartitionPredicateVisitor, Predicate, TopN, VectorSearch} -import org.apache.paimon.spark.util.PredicateUtils +import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate, TopN, VectorSearch} import org.apache.paimon.table.{SpecialFields, Table} import org.apache.paimon.types.RowType @@ -102,7 +101,7 @@ abstract class PaimonBaseScanBuilder this.pushedPartitionFilters = Array(pair.getLeft.get()) } if (pushableDataFilters.nonEmpty) { - this.pushedDataFilters = rewriteDataFiltersToBetween(pushableDataFilters.toArray) + this.pushedDataFilters = pushableDataFilters.toArray } if (postScan.nonEmpty) { this.hasPostScanPredicates = true @@ -110,46 +109,6 @@ abstract class PaimonBaseScanBuilder postScan.toArray } - /** Rewrite pushable data filters to use Between predicate when possible. */ - private def rewriteDataFiltersToBetween( - pushableDataFilters: Array[Predicate]): Array[Predicate] = { - val leafPredicates = - pushableDataFilters.filter(_.isInstanceOf[LeafPredicate]).map(_.asInstanceOf[LeafPredicate]) - val nonLeafPredicates = pushableDataFilters.filterNot(_.isInstanceOf[LeafPredicate]) - - // Group LeafPredicates by transform - val groupedByTransform = leafPredicates.groupBy(_.transform()) - - val rewrittenPredicates = scala.collection.mutable.ArrayBuffer.empty[Predicate] - rewrittenPredicates ++= nonLeafPredicates - - groupedByTransform.values.foreach { - predicates => - // Filter predicates with LessOrEqual or GreaterOrEqual functions - val candidates = predicates.filter( - p => p.function().isInstanceOf[LessOrEqual] || p.function().isInstanceOf[GreaterOrEqual]) - // Add other predicates directly to result - val nonCandidates = predicates.filterNot( - p => p.function().isInstanceOf[LessOrEqual] || p.function().isInstanceOf[GreaterOrEqual]) - rewrittenPredicates ++= nonCandidates - - // Now we only support the simplest case: one pair of LessOrEqual and GreaterOrEqual - if (candidates.length == 2) { - // Try to convert to Between - PredicateUtils.convertToBetweenFunction(candidates(0), candidates(1)) match { - case Some(betweenPredicate) => - rewrittenPredicates += betweenPredicate - case None => - rewrittenPredicates ++= candidates - } - } else { - rewrittenPredicates ++= candidates - } - } - - rewrittenPredicates.toArray - } - override def pushedPredicates: Array[SparkPredicate] = { pushedSparkPredicates } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala index 53a054c852c8..36e188bde76a 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkV2FilterConverter.scala @@ -19,7 +19,6 @@ package org.apache.paimon.spark import org.apache.paimon.predicate.{Predicate, PredicateBuilder, Transform} -import org.apache.paimon.spark.util.PredicateUtils.convertToBetweenFunction import org.apache.paimon.spark.util.SparkExpressionConverter.{toPaimonLiteral, toPaimonTransform} import org.apache.paimon.types.RowType @@ -126,12 +125,7 @@ case class SparkV2FilterConverter(rowType: RowType) extends Logging { case AND => val and = sparkPredicate.asInstanceOf[And] - val leftPredicate = convert(and.left) - val rightPredicate = convert(and.right()) - convertToBetweenFunction(leftPredicate, rightPredicate) match { - case Some(predicate) => predicate - case _ => PredicateBuilder.and(leftPredicate, rightPredicate) - } + PredicateBuilder.and(convert(and.left), convert(and.right())) case OR => val or = sparkPredicate.asInstanceOf[Or] diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/PredicateUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/PredicateUtils.scala deleted file mode 100644 index 24b45065b854..000000000000 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/PredicateUtils.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.spark.util - -import org.apache.paimon.predicate.{Between, CompareUtils, GreaterOrEqual, LeafPredicate, LessOrEqual, Predicate} - -import java.util -import java.util.Objects - -object PredicateUtils { - - /** Try to convert AND's children predicates to BETWEEN leaf predicate, return None if failed. */ - def convertToBetweenFunction( - leftPredicate: Predicate, - rightPredicate: Predicate): Option[Predicate] = { - - def toBetweenLeafPredicate( - gtePredicate: LeafPredicate, - ltePredicate: LeafPredicate): Option[Predicate] = { - // gtePredicate and ltePredicate should have the same transform - val transform = gtePredicate.transform() - val literalLb = gtePredicate.literals().get(0) - val literalUb = ltePredicate.literals().get(0) - if (CompareUtils.compareLiteral(transform.outputType(), literalLb, literalUb) > 0) { - None - } else { - Some( - new LeafPredicate(transform, Between.INSTANCE, util.Arrays.asList(literalLb, literalUb))) - } - } - - (leftPredicate, rightPredicate) match { - case (left: LeafPredicate, right: LeafPredicate) => - if (!Objects.equals(left.transform(), right.transform())) { - return None - } - (left.function(), right.function()) match { - case (_: GreaterOrEqual, _: LessOrEqual) => - toBetweenLeafPredicate(left, right) - case (_: LessOrEqual, _: GreaterOrEqual) => - toBetweenLeafPredicate(right, left) - case _ => None - } - case _ => - None - } - } -} diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala index a8b68ea3d50e..2d90777fd78c 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala @@ -19,7 +19,7 @@ package org.apache.paimon.spark.sql import org.apache.paimon.data.{BinaryString, Decimal, Timestamp} -import org.apache.paimon.predicate.{Between, LeafPredicate, PredicateBuilder} +import org.apache.paimon.predicate.PredicateBuilder import org.apache.paimon.spark.{PaimonSparkTestBase, SparkV2FilterConverter} import org.apache.paimon.spark.util.shim.TypeUtils.treatPaimonTimestampTypeAsSparkTimestampType import org.apache.paimon.table.source.DataSplit @@ -295,43 +295,6 @@ abstract class SparkV2FilterConverterTestBase extends PaimonSparkTestBase { assert(scanFilesCount(filter) == 3) } - test("V2Filter: Between") { - // 1. test basic between - val filter = "string_col BETWEEN 'a' AND 'r'" - val actual = converter.convert(v2Filter(filter)).get - assert( - actual.equals(builder.between(0, BinaryString.fromString("a"), BinaryString.fromString("r")))) - checkAnswer( - sql(s"SELECT string_col from test_tbl WHERE $filter ORDER BY string_col"), - Seq(Row("hello"), Row("hi"), Row("paimon")) - ) - - // 2. >= and <= on different columns should not be converted to between - val filter1 = "string_col >= 'a' AND int_col <= 2" - val actual1 = converter.convert(v2Filter(filter1)).get - assert(!actual1.isInstanceOf[LeafPredicate]) - - // spark 3.4+ support converting CONCAT with fieldRef to predicates - if (gteqSpark3_4) { - // 3. >= and <= on same transform should also be converted to between - val filter2 = "CONCAT(string_col, '_suffix') >= 'a' AND CONCAT(string_col, '_suffix') <= 'r'" - val actual2 = converter.convert(v2Filter(filter2)).get - assert(actual2.isInstanceOf[LeafPredicate]) - val function = actual2.asInstanceOf[LeafPredicate].function - assert(function.isInstanceOf[Between]) - checkAnswer( - sql(s"SELECT string_col from test_tbl WHERE $filter2 ORDER BY string_col"), - Seq(Row("hello"), Row("hi"), Row("paimon")) - ) - - // 4. >= and <= on different transform should not be converted to between - val filter3 = - "CONCAT(string_col, '_suffix1') >= 'a' AND CONCAT(string_col, '_suffix2') <= 'r'" - val actual3 = converter.convert(v2Filter(filter3)).get - assert(!actual3.isInstanceOf[LeafPredicate]) - } - } - test("V2Filter: And") { val filter = "int_col > 1 AND int_col < 3" val actual = converter.convert(v2Filter(filter)).get From 01a9d7419278bb6cc406d8e1009781ccef4b3cef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=86=E5=AE=87?= Date: Thu, 12 Feb 2026 17:28:57 +0800 Subject: [PATCH 7/7] fix test --- .../main/java/org/apache/paimon/utils/PredicateUtils.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/PredicateUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/PredicateUtils.java index 115a9a505e18..49964185f4be 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/PredicateUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/PredicateUtils.java @@ -29,6 +29,8 @@ import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.predicate.Transform; +import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -46,8 +48,8 @@ public class PredicateUtils { * predicates of an {@code AND}, for example: {@code OR(a >= 1, AND(b >= 1, b <= 2))} will be * rewritten to {@code OR(a >= 1, BETWEEN(b, 1, 2))}. */ - public static Predicate tryRewriteBetweenPredicate(Predicate filter) { - if (filter instanceof LeafPredicate) { + public static Predicate tryRewriteBetweenPredicate(@Nullable Predicate filter) { + if (filter == null || filter instanceof LeafPredicate) { return filter; } CompoundPredicate compoundPredicate = (CompoundPredicate) filter;