From 56f4d0149b59a18e60046631a5960410e0c3786f Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 23 Dec 2025 15:58:45 -0800 Subject: [PATCH 1/3] feat: Avoid duplicated write nodes for AQE execution --- .../apache/comet/rules/CometExecRule.scala | 9 +++++ .../parquet/CometParquetWriterSuite.scala | 35 ++++++++++++++++--- 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index ed48e36f07..bb4ce879d7 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AQEShuffleReadExec, BroadcastQueryStageExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec} import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec} +import org.apache.spark.sql.execution.datasources.WriteFilesExec import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.json.JsonFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat @@ -197,6 +198,14 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { case op if shouldApplySparkToColumnar(conf, op) => convertToComet(op, CometSparkToColumnarExec).getOrElse(op) + // AQE reoptimization looks for `DataWritingCommandExec` or `WriteFilesExec` + // if there is none it would reinsert write nodes, and since Comet remap those nodes + // to Comet counterparties the write nodes are twice to the plan. + // Checking if AQE inserted another write Command on top of existing write command + case _ @DataWritingCommandExec(_, w: WriteFilesExec) + if w.child.isInstanceOf[CometNativeWriteExec] => + w.child + case op: DataWritingCommandExec => convertToComet(op, CometDataWritingCommand).getOrElse(op) diff --git a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala index 2ea697fd4d..a8781188d8 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala @@ -54,7 +54,8 @@ class CometParquetWriterSuite extends CometTestBase { private def writeWithCometNativeWriteExec( inputPath: String, - outputPath: String): Option[QueryExecution] = { + outputPath: String, + num_partitions: Option[Int] = None): Option[QueryExecution] = { val df = spark.read.parquet(inputPath) // Use a listener to capture the execution plan during write @@ -77,8 +78,8 @@ class CometParquetWriterSuite extends CometTestBase { spark.listenerManager.register(listener) try { - // Perform native write - df.write.parquet(outputPath) + // Perform native write with optional partitioning + num_partitions.fold(df)(n => df.repartition(n)).write.parquet(outputPath) // Wait for listener to be called with timeout val maxWaitTimeMs = 15000 @@ -97,7 +98,7 @@ class CometParquetWriterSuite extends CometTestBase { s"Listener was not called within ${maxWaitTimeMs}ms - no execution plan captured") capturedPlan.foreach { qe => - val executedPlan = qe.executedPlan + val executedPlan = stripAQEPlan(qe.executedPlan) val hasNativeWrite = executedPlan.exists { case _: CometNativeWriteExec => true case d: DataWritingCommandExec => @@ -197,4 +198,30 @@ class CometParquetWriterSuite extends CometTestBase { } } } + + test("basic parquet write with repartition") { + withTempPath { dir => + val outputPath = new File(dir, "output.parquet").getAbsolutePath + + // Create test data and write it to a temp parquet file first + withTempPath { inputDir => + val inputPath = createTestData(inputDir) + Seq(true, false).foreach(adaptive => { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + "spark.sql.adaptive.enabled" -> adaptive.toString, + SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Halifax", + CometConf.getOperatorAllowIncompatConfigKey( + classOf[DataWritingCommandExec]) -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true") { + + val plan = writeWithCometNativeWriteExec(inputPath, outputPath, Some(10)) + println(plan) + + verifyWrittenFile(outputPath) + } + }) + } + } + } } From 940dd9d9bc664c2bc9464f72b27d0c2cf1ab397e Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 23 Dec 2025 16:22:32 -0800 Subject: [PATCH 2/3] feat: Avoid duplicated write nodes for AQE execution --- .../parquet/CometParquetWriterSuite.scala | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala index a8781188d8..fc1a73f7bb 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala @@ -99,19 +99,24 @@ class CometParquetWriterSuite extends CometTestBase { capturedPlan.foreach { qe => val executedPlan = stripAQEPlan(qe.executedPlan) - val hasNativeWrite = executedPlan.exists { - case _: CometNativeWriteExec => true + + // Count CometNativeWriteExec instances in the plan + var nativeWriteCount = 0 + executedPlan.foreach { + case _: CometNativeWriteExec => + nativeWriteCount += 1 case d: DataWritingCommandExec => - d.child.exists { - case _: CometNativeWriteExec => true - case _ => false + d.child.foreach { + case _: CometNativeWriteExec => + nativeWriteCount += 1 + case _ => } - case _ => false + case _ => } assert( - hasNativeWrite, - s"Expected CometNativeWriteExec in the plan, but got:\n${executedPlan.treeString}") + nativeWriteCount == 1, + s"Expected exactly one CometNativeWriteExec in the plan, but found $nativeWriteCount:\n${executedPlan.treeString}") } } finally { spark.listenerManager.unregister(listener) @@ -201,12 +206,13 @@ class CometParquetWriterSuite extends CometTestBase { test("basic parquet write with repartition") { withTempPath { dir => - val outputPath = new File(dir, "output.parquet").getAbsolutePath - // Create test data and write it to a temp parquet file first withTempPath { inputDir => val inputPath = createTestData(inputDir) Seq(true, false).foreach(adaptive => { + // Create a new output path for each AQE value + val outputPath = new File(dir, s"output_aqe_$adaptive.parquet").getAbsolutePath + withSQLConf( CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", "spark.sql.adaptive.enabled" -> adaptive.toString, From 1f0170ba709e9950b7b258a60b416dc669158d14 Mon Sep 17 00:00:00 2001 From: comphead Date: Tue, 23 Dec 2025 16:26:15 -0800 Subject: [PATCH 3/3] feat: Avoid duplicated write nodes for AQE execution --- .../org/apache/comet/parquet/CometParquetWriterSuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala index fc1a73f7bb..3ae7f949ab 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala @@ -221,9 +221,7 @@ class CometParquetWriterSuite extends CometTestBase { classOf[DataWritingCommandExec]) -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true") { - val plan = writeWithCometNativeWriteExec(inputPath, outputPath, Some(10)) - println(plan) - + writeWithCometNativeWriteExec(inputPath, outputPath, Some(10)) verifyWrittenFile(outputPath) } })