diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index e3b0e40566..47e44acb69 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -98,6 +98,11 @@ jobs: java_version: "17" maven_opts: "-Pspark-4.0" scan_impl: "native_comet" + + - name: "Spark 4.1, JDK 17" + java_version: "17" + maven_opts: "-Pspark-4.1" + scan_impl: "native_comet" suite: - name: "fuzz" value: | diff --git a/.github/workflows/spark_sql_test.yml b/.github/workflows/spark_sql_test.yml index 3d7aa2e2f9..4d680f0d07 100644 --- a/.github/workflows/spark_sql_test.yml +++ b/.github/workflows/spark_sql_test.yml @@ -50,7 +50,7 @@ jobs: strategy: matrix: os: [ubuntu-24.04] - spark-version: [{short: '3.4', full: '3.4.3', java: 11}, {short: '3.5', full: '3.5.7', java: 11}, {short: '4.0', full: '4.0.1', java: 17}] + spark-version: [{short: '3.4', full: '3.4.3', java: 11}, {short: '3.5', full: '3.5.7', java: 11}, {short: '4.0', full: '4.0.1', java: 17}, {short: '4.1', full: '4.1.0', java: 17}] module: - {name: "catalyst", args1: "catalyst/test", args2: ""} - {name: "sql_core-1", args1: "", args2: sql/testOnly * -- -l org.apache.spark.tags.ExtendedSQLTest -l org.apache.spark.tags.SlowSQLTest} @@ -59,10 +59,12 @@ jobs: - {name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"} - {name: "sql_hive-2", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.ExtendedHiveTest"} - {name: "sql_hive-3", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.SlowHiveTest"} - # Skip sql_hive-1 for Spark 4.0 due to https://github.com/apache/datafusion-comet/issues/2946 + # Skip sql_hive-1 for Spark 4.0+ due to https://github.com/apache/datafusion-comet/issues/2946 exclude: - spark-version: {short: '4.0', full: '4.0.1', java: 17} module: {name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"} + - spark-version: {short: '4.1', full: '4.1.0', java: 17} + module: { name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest" } fail-fast: false name: spark-sql-${{ matrix.module.name }}/${{ matrix.os }}/spark-${{ matrix.spark-version.full }}/java-${{ matrix.spark-version.java }} runs-on: ${{ matrix.os }} diff --git a/common/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala b/common/src/main/spark-4.x/org/apache/comet/shims/CometTypeShim.scala similarity index 100% rename from common/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala rename to common/src/main/spark-4.x/org/apache/comet/shims/CometTypeShim.scala diff --git a/common/src/main/spark-4.0/org/apache/comet/shims/ShimBatchReader.scala b/common/src/main/spark-4.x/org/apache/comet/shims/ShimBatchReader.scala similarity index 100% rename from common/src/main/spark-4.0/org/apache/comet/shims/ShimBatchReader.scala rename to common/src/main/spark-4.x/org/apache/comet/shims/ShimBatchReader.scala diff --git a/common/src/main/spark-4.0/org/apache/comet/shims/ShimCometConf.scala b/common/src/main/spark-4.x/org/apache/comet/shims/ShimCometConf.scala similarity index 100% rename from common/src/main/spark-4.0/org/apache/comet/shims/ShimCometConf.scala rename to common/src/main/spark-4.x/org/apache/comet/shims/ShimCometConf.scala diff --git a/common/src/main/spark-4.0/org/apache/comet/shims/ShimFileFormat.scala b/common/src/main/spark-4.x/org/apache/comet/shims/ShimFileFormat.scala similarity index 100% rename from common/src/main/spark-4.0/org/apache/comet/shims/ShimFileFormat.scala rename to common/src/main/spark-4.x/org/apache/comet/shims/ShimFileFormat.scala diff --git a/common/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimTaskMetrics.scala b/common/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimTaskMetrics.scala similarity index 100% rename from common/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimTaskMetrics.scala rename to common/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimTaskMetrics.scala diff --git a/dev/diffs/4.1.0.diff b/dev/diffs/4.1.0.diff new file mode 100644 index 0000000000..241c864683 --- /dev/null +++ b/dev/diffs/4.1.0.diff @@ -0,0 +1,3740 @@ +diff --git a/pom.xml b/pom.xml +index 1824a28614b..0d0ce6d27ce 100644 +--- a/pom.xml ++++ b/pom.xml +@@ -152,6 +152,8 @@ + 4.0.3 + 2.5.3 + 2.0.8 ++ 4.0 ++ 0.13.0-SNAPSHOT + + + org.apache.datasketches +diff --git a/sql/core/pom.xml b/sql/core/pom.xml +index e3aa8f1f3bb..8817d48b272 100644 +--- a/sql/core/pom.xml ++++ b/sql/core/pom.xml +@@ -97,6 +97,10 @@ + org.apache.spark + spark-tags_${scala.binary.version} + ++ ++ org.apache.datafusion ++ comet-spark-spark${spark.version.short}_${scala.binary.version} ++ + + + 17 + ${java.version} + ${java.version} + + + + + + spark-4.1 + + + 2.13.17 + 2.13 + 4.1.0 + 4.1 + 1.16.0 + 4.13.9 + 2.0.17 + spark-4.x + spark-4.1 17 ${java.version} diff --git a/spark/pom.xml b/spark/pom.xml index 3b832e37a2..d4bdba884b 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -256,6 +256,32 @@ under the License. + + + spark-4.1 + + + org.apache.iceberg + + iceberg-spark-runtime-4.0_${scala.binary.version} + 1.10.0 + test + + + + org.eclipse.jetty + jetty-server + 11.0.24 + test + + + org.eclipse.jetty + jetty-servlet + 11.0.24 + test + + + diff --git a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometBypassMergeSortShuffleWriter.java b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometBypassMergeSortShuffleWriter.java index a58ec7851b..2ee068ae9a 100644 --- a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometBypassMergeSortShuffleWriter.java +++ b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometBypassMergeSortShuffleWriter.java @@ -41,7 +41,6 @@ import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper; import org.apache.spark.scheduler.MapStatus; -import org.apache.spark.scheduler.MapStatus$; import org.apache.spark.serializer.SerializerInstance; import org.apache.spark.shuffle.ShuffleWriteMetricsReporter; import org.apache.spark.shuffle.ShuffleWriter; @@ -172,7 +171,7 @@ public void write(Iterator> records) throws IOException { .commitAllPartitions(ShuffleChecksumHelper.EMPTY_CHECKSUM_VALUE) .getPartitionLengths(); mapStatus = - MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId); + new MapStatusBuilder(blockManager.shuffleServerId(), partitionLengths, mapId).build(); return; } final long openStartTime = System.nanoTime(); @@ -261,7 +260,8 @@ public void write(Iterator> records) throws IOException { // TODO: We probably can move checksum generation here when concatenating partition files partitionLengths = writePartitionedData(mapOutputWriter); - mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId); + mapStatus = + new MapStatusBuilder(blockManager.shuffleServerId(), partitionLengths, mapId).build(); } catch (Exception e) { try { mapOutputWriter.abort(e); diff --git a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometUnsafeShuffleWriter.java b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometUnsafeShuffleWriter.java index a845e743d4..c7248bafe9 100644 --- a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometUnsafeShuffleWriter.java +++ b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometUnsafeShuffleWriter.java @@ -50,7 +50,6 @@ import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper; import org.apache.spark.network.util.LimitedInputStream; import org.apache.spark.scheduler.MapStatus; -import org.apache.spark.scheduler.MapStatus$; import org.apache.spark.serializer.SerializationStream; import org.apache.spark.serializer.SerializerInstance; import org.apache.spark.shuffle.BaseShuffleHandle; @@ -288,7 +287,8 @@ void closeAndWriteOutput() throws IOException { } } } - mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId); + mapStatus = + new MapStatusBuilder(blockManager.shuffleServerId(), partitionLengths, mapId).build(); } @VisibleForTesting diff --git a/spark/src/main/scala/org/apache/comet/serde/aggregates.scala b/spark/src/main/scala/org/apache/comet/serde/aggregates.scala index a05efaebbc..02e31780de 100644 --- a/spark/src/main/scala/org/apache/comet/serde/aggregates.scala +++ b/spark/src/main/scala/org/apache/comet/serde/aggregates.scala @@ -30,7 +30,7 @@ import org.apache.comet.CometConf import org.apache.comet.CometConf.COMET_EXEC_STRICT_FLOATING_POINT import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.serde.QueryPlanSerde.{evalModeToProto, exprToProto, serializeDataType} -import org.apache.comet.shims.CometEvalModeUtil +import org.apache.comet.shims.{CometEvalModeUtil, CometSumShim} object CometMin extends CometAggregateExpressionSerde[Min] { @@ -211,7 +211,7 @@ object CometAverage extends CometAggregateExpressionSerde[Average] { } } -object CometSum extends CometAggregateExpressionSerde[Sum] { +object CometSum extends CometAggregateExpressionSerde[Sum] with CometSumShim { override def convert( aggExpr: AggregateExpression, @@ -225,7 +225,7 @@ object CometSum extends CometAggregateExpressionSerde[Sum] { return None } - val evalMode = sum.evalMode + val evalMode = sparkEvalMode(sum) val childExpr = exprToProto(sum.child, inputs, binding) val dataType = serializeDataType(sum.dataType) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala index 927e309325..6890c98ec0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala @@ -71,8 +71,14 @@ class CometShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { case 2 => c.newInstance(conf, null).asInstanceOf[IndexShuffleBlockResolver] case 3 => - c.newInstance(conf, null, Collections.emptyMap()) - .asInstanceOf[IndexShuffleBlockResolver] + val parameterTypes = c.getParameters.map(_.getType) + if (parameterTypes(2) == classOf[java.util.Map[Int, OpenHashSet[Long]]]) { + c.newInstance(conf, null, Collections.emptyMap()) + .asInstanceOf[IndexShuffleBlockResolver] + } else { + c.newInstance(conf, null, new ConcurrentHashMap[Int, OpenHashSet[Long]]()) + .asInstanceOf[IndexShuffleBlockResolver] + } } } .head diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/MapStatusBuilder.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/MapStatusBuilder.scala new file mode 100644 index 0000000000..beca92b925 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/MapStatusBuilder.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet.execution.shuffle + +import org.apache.spark.scheduler.MapStatus +import org.apache.spark.storage.BlockManagerId + +class MapStatusBuilder(loc: BlockManagerId, uncompressedSizes: Array[Long], mapTaskId: Long) { + + // This works for both Spark 4.1 and previous versions + def build(): MapStatus = { + MapStatus(loc, uncompressedSizes, mapTaskId) + } +} diff --git a/spark/src/main/spark-3.x/org/apache/comet/shims/CometSumShim.scala b/spark/src/main/spark-3.x/org/apache/comet/shims/CometSumShim.scala new file mode 100644 index 0000000000..dcf1fc7419 --- /dev/null +++ b/spark/src/main/spark-3.x/org/apache/comet/shims/CometSumShim.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.shims + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.Sum + +/** + * `CometSumShim` acts as a shim for parsing expression Sum from different Spark versions. + */ +trait CometSumShim extends CometExprShim { + protected def sparkEvalMode(s: Sum): EvalMode.Value = s.evalMode +} diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/CometSumShim.scala b/spark/src/main/spark-4.0/org/apache/comet/shims/CometSumShim.scala new file mode 100644 index 0000000000..dcf1fc7419 --- /dev/null +++ b/spark/src/main/spark-4.0/org/apache/comet/shims/CometSumShim.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.shims + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.Sum + +/** + * `CometSumShim` acts as a shim for parsing expression Sum from different Spark versions. + */ +trait CometSumShim extends CometExprShim { + protected def sparkEvalMode(s: Sum): EvalMode.Value = s.evalMode +} diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimSQLConf.scala b/spark/src/main/spark-4.0/org/apache/comet/shims/ShimSQLConf.scala index bdb2739460..7ee0538dc3 100644 --- a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimSQLConf.scala +++ b/spark/src/main/spark-4.0/org/apache/comet/shims/ShimSQLConf.scala @@ -20,8 +20,15 @@ package org.apache.comet.shims import org.apache.spark.sql.internal.LegacyBehaviorPolicy +import org.apache.spark.sql.internal.SQLConf trait ShimSQLConf { protected val LEGACY = LegacyBehaviorPolicy.LEGACY protected val CORRECTED = LegacyBehaviorPolicy.CORRECTED + + def getBinaryOutputStyle: Option[SQLConf.BinaryOutputStyle.Value] = { + SQLConf.get + .getConf(SQLConf.BINARY_OUTPUT_STYLE) + .map(SQLConf.BinaryOutputStyle.withName) + } } diff --git a/spark/src/main/spark-4.1/org/apache/comet/shims/CometSumShim.scala b/spark/src/main/spark-4.1/org/apache/comet/shims/CometSumShim.scala new file mode 100644 index 0000000000..3b4a5abc07 --- /dev/null +++ b/spark/src/main/spark-4.1/org/apache/comet/shims/CometSumShim.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.shims + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.Sum + +/** + * `CometSumShim` acts as a shim for parsing expression Sum from different Spark versions. + */ +trait CometSumShim extends CometExprShim { + protected def sparkEvalMode(s: Sum): EvalMode.Value = s.evalContext.evalMode +} diff --git a/spark/src/main/spark-4.1/org/apache/comet/shims/ShimSQLConf.scala b/spark/src/main/spark-4.1/org/apache/comet/shims/ShimSQLConf.scala new file mode 100644 index 0000000000..e91b0f7db0 --- /dev/null +++ b/spark/src/main/spark-4.1/org/apache/comet/shims/ShimSQLConf.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.shims + +import org.apache.spark.sql.internal.LegacyBehaviorPolicy +import org.apache.spark.sql.internal.SQLConf + +trait ShimSQLConf { + protected val LEGACY = LegacyBehaviorPolicy.LEGACY + protected val CORRECTED = LegacyBehaviorPolicy.CORRECTED + + def getBinaryOutputStyle(): Option[SQLConf.BinaryOutputStyle.Value] = { + SQLConf.get + .getConf(SQLConf.BINARY_OUTPUT_STYLE) + } +} diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.x/org/apache/comet/shims/CometExprShim.scala similarity index 96% rename from spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala rename to spark/src/main/spark-4.x/org/apache/comet/shims/CometExprShim.scala index fc3db183b3..d7dd4de358 100644 --- a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.x/org/apache/comet/shims/CometExprShim.scala @@ -34,14 +34,12 @@ import org.apache.comet.serde.QueryPlanSerde.exprToProtoInternal /** * `CometExprShim` acts as a shim for parsing expressions from different Spark versions. */ -trait CometExprShim extends CommonStringExprs { +trait CometExprShim extends CommonStringExprs with ShimSQLConf { protected def evalMode(c: Cast): CometEvalMode.Value = CometEvalModeUtil.fromSparkEvalMode(c.evalMode) protected def binaryOutputStyle: BinaryOutputStyle = { - SQLConf.get - .getConf(SQLConf.BINARY_OUTPUT_STYLE) - .map(SQLConf.BinaryOutputStyle.withName) match { + getBinaryOutputStyle match { case Some(SQLConf.BinaryOutputStyle.UTF8) => BinaryOutputStyle.UTF8 case Some(SQLConf.BinaryOutputStyle.BASIC) => BinaryOutputStyle.BASIC case Some(SQLConf.BinaryOutputStyle.BASE64) => BinaryOutputStyle.BASE64 diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometBroadcastExchangeExec.scala b/spark/src/main/spark-4.x/org/apache/comet/shims/ShimCometBroadcastExchangeExec.scala similarity index 100% rename from spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometBroadcastExchangeExec.scala rename to spark/src/main/spark-4.x/org/apache/comet/shims/ShimCometBroadcastExchangeExec.scala diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala b/spark/src/main/spark-4.x/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala similarity index 100% rename from spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala rename to spark/src/main/spark-4.x/org/apache/comet/shims/ShimCometShuffleExchangeExec.scala diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala b/spark/src/main/spark-4.x/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala similarity index 100% rename from spark/src/main/spark-4.0/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala rename to spark/src/main/spark-4.x/org/apache/comet/shims/ShimCometSparkSessionExtensions.scala diff --git a/spark/src/main/spark-4.0/org/apache/spark/comet/shims/ShimCometDriverPlugin.scala b/spark/src/main/spark-4.x/org/apache/spark/comet/shims/ShimCometDriverPlugin.scala similarity index 100% rename from spark/src/main/spark-4.0/org/apache/spark/comet/shims/ShimCometDriverPlugin.scala rename to spark/src/main/spark-4.x/org/apache/spark/comet/shims/ShimCometDriverPlugin.scala diff --git a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala b/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala similarity index 100% rename from spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala rename to spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimCometScanExec.scala diff --git a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometShuffleWriteProcessor.scala b/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimCometShuffleWriteProcessor.scala similarity index 100% rename from spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometShuffleWriteProcessor.scala rename to spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimCometShuffleWriteProcessor.scala diff --git a/spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimStreamSourceAwareSparkPlan.scala b/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimStreamSourceAwareSparkPlan.scala similarity index 100% rename from spark/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimStreamSourceAwareSparkPlan.scala rename to spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimStreamSourceAwareSparkPlan.scala diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQueryTestSuite.scala b/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQueryTestSuite.scala index 56f5bcfdee..ec1e7d3931 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQueryTestSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQueryTestSuite.scala @@ -25,7 +25,7 @@ import java.nio.file.{Files, Paths} import scala.jdk.CollectionConverters._ import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.sql.catalyst.util.{fileToString, resourceToString, stringToFile} +import org.apache.spark.sql.catalyst.util.{resourceToString, stringToFile} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.TestSparkSession @@ -118,7 +118,7 @@ class CometTPCDSQueryTestSuite extends QueryTest with TPCDSBase with CometSQLQue // Read back the golden file. val (expectedSchema, expectedOutput) = { - val goldenOutput = fileToString(goldenFile) + val goldenOutput = Files.readString(goldenFile.toPath) val segments = goldenOutput.split("-- !query.*\n") // query has 3 segments, plus the header diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala index e158af6335..608ecfe2c4 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCHQuerySuite.scala @@ -27,7 +27,7 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED, MEMORY_OFFHEAP_SIZE} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.util.{fileToString, resourceToString, stringToFile} +import org.apache.spark.sql.catalyst.util.{resourceToString, stringToFile} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.TestSparkSession @@ -162,7 +162,7 @@ class CometTPCHQuerySuite extends QueryTest with TPCBase with ShimCometTPCHQuery // Read back the golden file. val (expectedSchema, expectedOutput) = { - val goldenOutput = fileToString(goldenFile) + val goldenOutput = Files.readString(goldenFile.toPath) val segments = goldenOutput.split("-- !query.*\n") // query has 3 segments, plus the header diff --git a/spark/src/test/spark-4.0/org/apache/comet/exec/CometShuffle4_0Suite.scala b/spark/src/test/spark-4.x/org/apache/comet/exec/CometShuffle4_xSuite.scala similarity index 98% rename from spark/src/test/spark-4.0/org/apache/comet/exec/CometShuffle4_0Suite.scala rename to spark/src/test/spark-4.x/org/apache/comet/exec/CometShuffle4_xSuite.scala index 3d0ec5006d..c012ca4c1f 100644 --- a/spark/src/test/spark-4.0/org/apache/comet/exec/CometShuffle4_0Suite.scala +++ b/spark/src/test/spark-4.x/org/apache/comet/exec/CometShuffle4_xSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{FloatType, LongType, StringType, TimestampType} -class CometShuffle4_0Suite extends CometColumnarShuffleSuite { +class CometShuffle4_xSuite extends CometColumnarShuffleSuite { override protected val asyncShuffleEnable: Boolean = false protected val adaptiveExecutionEnabled: Boolean = true diff --git a/spark/src/test/spark-4.0/org/apache/comet/iceberg/RESTCatalogHelper.scala b/spark/src/test/spark-4.x/org/apache/comet/iceberg/RESTCatalogHelper.scala similarity index 100% rename from spark/src/test/spark-4.0/org/apache/comet/iceberg/RESTCatalogHelper.scala rename to spark/src/test/spark-4.x/org/apache/comet/iceberg/RESTCatalogHelper.scala diff --git a/spark/src/test/spark-4.0/org/apache/comet/shims/ShimCometTPCHQuerySuite.scala b/spark/src/test/spark-4.x/org/apache/comet/shims/ShimCometTPCHQuerySuite.scala similarity index 100% rename from spark/src/test/spark-4.0/org/apache/comet/shims/ShimCometTPCHQuerySuite.scala rename to spark/src/test/spark-4.x/org/apache/comet/shims/ShimCometTPCHQuerySuite.scala diff --git a/spark/src/test/spark-4.0/org/apache/iceberg/rest/RESTCatalogServlet.java b/spark/src/test/spark-4.x/org/apache/iceberg/rest/RESTCatalogServlet.java similarity index 100% rename from spark/src/test/spark-4.0/org/apache/iceberg/rest/RESTCatalogServlet.java rename to spark/src/test/spark-4.x/org/apache/iceberg/rest/RESTCatalogServlet.java diff --git a/spark/src/test/spark-4.0/org/apache/spark/comet/shims/ShimTestUtils.scala b/spark/src/test/spark-4.x/org/apache/spark/comet/shims/ShimTestUtils.scala similarity index 100% rename from spark/src/test/spark-4.0/org/apache/spark/comet/shims/ShimTestUtils.scala rename to spark/src/test/spark-4.x/org/apache/spark/comet/shims/ShimTestUtils.scala diff --git a/spark/src/test/spark-4.0/org/apache/spark/sql/CometToPrettyStringSuite.scala b/spark/src/test/spark-4.x/org/apache/spark/sql/CometToPrettyStringSuite.scala similarity index 100% rename from spark/src/test/spark-4.0/org/apache/spark/sql/CometToPrettyStringSuite.scala rename to spark/src/test/spark-4.x/org/apache/spark/sql/CometToPrettyStringSuite.scala diff --git a/spark/src/test/spark-4.0/org/apache/spark/sql/ShimCometTestBase.scala b/spark/src/test/spark-4.x/org/apache/spark/sql/ShimCometTestBase.scala similarity index 100% rename from spark/src/test/spark-4.0/org/apache/spark/sql/ShimCometTestBase.scala rename to spark/src/test/spark-4.x/org/apache/spark/sql/ShimCometTestBase.scala diff --git a/spark/src/test/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometTPCDSQuerySuite.scala b/spark/src/test/spark-4.x/org/apache/spark/sql/comet/shims/ShimCometTPCDSQuerySuite.scala similarity index 100% rename from spark/src/test/spark-4.0/org/apache/spark/sql/comet/shims/ShimCometTPCDSQuerySuite.scala rename to spark/src/test/spark-4.x/org/apache/spark/sql/comet/shims/ShimCometTPCDSQuerySuite.scala