From e0de6a4ee362c131aa3cb6093177cb6d1732666a Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Mon, 16 Feb 2026 14:39:45 +0530 Subject: [PATCH] Fix multi-insert with native writer in Spark 4.x (#3430) --- .../apache/comet/rules/CometExecRule.scala | 19 +++++++++-- .../parquet/CometParquetWriterSuite.scala | 33 +++++++++++++++++++ 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index 76e741e3bf..7f2d832791 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -483,7 +483,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { val serde = handler.asInstanceOf[CometOperatorSerde[SparkPlan]] if (isOperatorEnabled(serde, op)) { // For operators that require native children (like writes), check if all data-producing - // children are CometNativeExec. This prevents runtime failures when the native operator + // children produce Arrow data. This prevents runtime failures when the native operator // expects Arrow arrays but receives non-Arrow data (e.g., OnHeapColumnVector). if (serde.requiresNativeChildren && op.children.nonEmpty) { // Get the actual data-producing children (unwrap WriteFilesExec if present) @@ -491,7 +491,7 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { case writeFiles: WriteFilesExec => Seq(writeFiles.child) case other => Seq(other) } - if (!dataProducingChildren.forall(_.isInstanceOf[CometNativeExec])) { + if (!dataProducingChildren.forall(producesArrowData)) { withInfo(op, "Cannot perform native operation because input is not in Arrow format") return None } @@ -600,4 +600,19 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { } } + /** + * Checks if a plan produces Arrow-formatted data by unwrapping wrapper operators. This handles + * ReusedExchangeExec (used in multi-insert), QueryStageExec (AQE), and checks for CometExec + * (includes CometNativeExec and sink operators like CometUnionExec, CometCoalesceExec, etc.). + */ + private def producesArrowData(plan: SparkPlan): Boolean = { + plan match { + case _: CometExec => true + case r: ReusedExchangeExec => producesArrowData(r.child) + case s: ShuffleQueryStageExec => producesArrowData(s.plan) + case b: BroadcastQueryStageExec => producesArrowData(b.plan) + case _ => false + } + } + } diff --git a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala index e4c405c003..469c908bab 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala @@ -140,6 +140,39 @@ class CometParquetWriterSuite extends CometTestBase { } } + // Test for issue #3430: SPARK-48817 multi-insert with native writer in Spark 4.x + test("parquet write with multi-insert pattern") { + withTempPath { dir => + val output1 = new File(dir, "output1.parquet").getAbsolutePath + val output2 = new File(dir, "output2.parquet").getAbsolutePath + + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true") { + + // Create source data with repartition (simulating SPARK-48817 test pattern) + val sourceData = spark.range(1, 10).toDF("id").repartition(3) + + // Write to first output + val plan1 = captureWritePlan(path => sourceData.write.parquet(path), output1) + + // Write to second output (simulating multi-insert reuse pattern) + val plan2 = captureWritePlan(path => sourceData.write.parquet(path), output2) + + // Verify both writes completed correctly + val result1 = spark.read.parquet(output1) + val result2 = spark.read.parquet(output2) + assert(result1.count() == 9) + assert(result2.count() == 9) + + // Verify native write was used for both + assertHasCometNativeWriteExec(plan1) + assertHasCometNativeWriteExec(plan2) + } + } + } + test("parquet write with map type") { withTempPath { dir => val outputPath = new File(dir, "output.parquet").getAbsolutePath