Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -483,15 +483,15 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
val serde = handler.asInstanceOf[CometOperatorSerde[SparkPlan]]
if (isOperatorEnabled(serde, op)) {
// For operators that require native children (like writes), check if all data-producing
// children are CometNativeExec. This prevents runtime failures when the native operator
// children produce Arrow data. This prevents runtime failures when the native operator
// expects Arrow arrays but receives non-Arrow data (e.g., OnHeapColumnVector).
if (serde.requiresNativeChildren && op.children.nonEmpty) {
// Get the actual data-producing children (unwrap WriteFilesExec if present)
val dataProducingChildren = op.children.flatMap {
case writeFiles: WriteFilesExec => Seq(writeFiles.child)
case other => Seq(other)
}
if (!dataProducingChildren.forall(_.isInstanceOf[CometNativeExec])) {
if (!dataProducingChildren.forall(producesArrowData)) {
withInfo(op, "Cannot perform native operation because input is not in Arrow format")
return None
}
Expand Down Expand Up @@ -600,4 +600,19 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
}
}

/**
* Checks if a plan produces Arrow-formatted data by unwrapping wrapper operators. This handles
* ReusedExchangeExec (used in multi-insert), QueryStageExec (AQE), and checks for CometExec
* (includes CometNativeExec and sink operators like CometUnionExec, CometCoalesceExec, etc.).
*/
private def producesArrowData(plan: SparkPlan): Boolean = {
plan match {
case _: CometExec => true
case r: ReusedExchangeExec => producesArrowData(r.child)
case s: ShuffleQueryStageExec => producesArrowData(s.plan)
case b: BroadcastQueryStageExec => producesArrowData(b.plan)
case _ => false
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,39 @@ class CometParquetWriterSuite extends CometTestBase {
}
}

// Test for issue #3430: SPARK-48817 multi-insert with native writer in Spark 4.x
test("parquet write with multi-insert pattern") {
withTempPath { dir =>
val output1 = new File(dir, "output1.parquet").getAbsolutePath
val output2 = new File(dir, "output2.parquet").getAbsolutePath

withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true") {

// Create source data with repartition (simulating SPARK-48817 test pattern)
val sourceData = spark.range(1, 10).toDF("id").repartition(3)

// Write to first output
val plan1 = captureWritePlan(path => sourceData.write.parquet(path), output1)

// Write to second output (simulating multi-insert reuse pattern)
val plan2 = captureWritePlan(path => sourceData.write.parquet(path), output2)

// Verify both writes completed correctly
val result1 = spark.read.parquet(output1)
val result2 = spark.read.parquet(output2)
assert(result1.count() == 9)
assert(result2.count() == 9)

// Verify native write was used for both
assertHasCometNativeWriteExec(plan1)
assertHasCometNativeWriteExec(plan2)
}
}
}

test("parquet write with map type") {
withTempPath { dir =>
val outputPath = new File(dir, "output.parquet").getAbsolutePath
Expand Down
Loading