From ae0b3a23f655acdafe10df3e84479c9b33567334 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Wed, 11 Feb 2026 00:57:06 +0530 Subject: [PATCH 1/3] test: add SPARK-38811 INSERT INTO ALTER TABLE ADD COLUMNS tests --- .../parquet/CometParquetWriterSuite.scala | 115 ++++++++++++++++++ 1 file changed, 115 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala index b691039f19..9ab42a4109 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala @@ -377,6 +377,121 @@ class CometParquetWriterSuite extends CometTestBase { } } + private def withNativeWriteConf(f: => Unit): Unit = { + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true") { + f + } + } + + // SPARK-38811 INSERT INTO on columns added with ALTER TABLE ADD COLUMNS: Positive tests + // Mirrors the Spark InsertSuite test to validate Comet native writer compatibility. + + test("SPARK-38811: simple default value with concat expression") { + withNativeWriteConf { + withTable("t") { + sql("create table t(i boolean) using parquet") + sql("alter table t add column s string default concat('abc', 'def')") + sql("insert into t values(true, default)") + checkAnswer(spark.table("t"), Row(true, "abcdef")) + } + } + } + + test("SPARK-38811: multiple trailing default values") { + withNativeWriteConf { + withTable("t") { + sql("create table t(i int) using parquet") + sql("alter table t add column s bigint default 42") + sql("alter table t add column x bigint default 43") + sql("insert into t(i) values(1)") + checkAnswer(spark.table("t"), Row(1, 42, 43)) + } + } + } + + test("SPARK-38811: multiple trailing defaults via add columns") { + withNativeWriteConf { + withTable("t") { + sql("create table t(i int) using parquet") + sql("alter table t add columns s bigint default 42, x bigint default 43") + sql("insert into t(i) values(1)") + checkAnswer(spark.table("t"), Row(1, 42, 43)) + } + } + } + + test("SPARK-38811: default with nullable column (no default)") { + withNativeWriteConf { + withTable("t") { + sql("create table t(i int) using parquet") + sql("alter table t add column s bigint default 42") + sql("alter table t add column x bigint") + sql("insert into t(i) values(1)") + checkAnswer(spark.table("t"), Row(1, 42, null)) + } + } + } + + test("SPARK-38811: expression default (41 + 1)") { + withNativeWriteConf { + withTable("t") { + sql("create table t(i boolean) using parquet") + sql("alter table t add column s bigint default 41 + 1") + sql("insert into t(i) values(default)") + checkAnswer(spark.table("t"), Row(null, 42)) + } + } + } + + test("SPARK-38811: explicit defaults in multiple positions") { + withNativeWriteConf { + withTable("t") { + sql("create table t(i boolean default false) using parquet") + sql("alter table t add column s bigint default 42") + sql("insert into t values(false, default), (default, 42)") + checkAnswer(spark.table("t"), Seq(Row(false, 42), Row(false, 42))) + } + } + } + + test("SPARK-38811: default with alias over VALUES") { + withNativeWriteConf { + withTable("t") { + sql("create table t(i boolean) using parquet") + sql("alter table t add column s bigint default 42") + sql("insert into t select * from values (false, default) as tab(col, other)") + checkAnswer(spark.table("t"), Row(false, 42)) + } + } + } + + test("SPARK-38811: default value in wrong order evaluates to NULL") { + withNativeWriteConf { + withTable("t") { + sql("create table t(i boolean) using parquet") + sql("alter table t add column s bigint default 42") + sql("insert into t values (default, 43)") + checkAnswer(spark.table("t"), Row(null, 43)) + } + } + } + + // INSERT INTO ... SELECT with native write config fails due to pre-existing + // catalog refresh issue tracked separately. Skipping these variants. + ignore("SPARK-38811: default via SELECT statement") { + withNativeWriteConf { + withTable("t") { + sql("create table t(i boolean) using parquet") + sql("alter table t add column s bigint default 42") + sql("insert into t select false, default") + checkAnswer(spark.table("t"), Row(false, 42)) + } + } + } + private def createTestData(inputDir: File): String = { val inputPath = new File(inputDir, "input.parquet").getAbsolutePath val schema = FuzzDataGenerator.generateSchema( From bd5e713f1538bacb27cd32bffeb167758ceeb190 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Wed, 11 Feb 2026 00:57:14 +0530 Subject: [PATCH 2/3] fix: use outputColumnNames for native writer column mapping --- .../comet/serde/operator/CometDataWritingCommand.scala | 4 ++-- .../apache/spark/sql/comet/CometNativeWriteExec.scala | 9 ++++++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala index 31575138f8..a03a150568 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala @@ -132,7 +132,7 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec .newBuilder() .setOutputPath(outputPath) .setCompression(codec) - .addAllColumnNames(cmd.query.output.map(_.name).asJava) + .addAllColumnNames(cmd.outputColumnNames.asJava) // Note: work_dir, job_id, and task_attempt_id will be set at execution time // in CometNativeWriteExec, as they depend on the Spark task context @@ -201,7 +201,7 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec throw new SparkException(s"Could not instantiate FileCommitProtocol: ${e.getMessage}") } - CometNativeWriteExec(nativeOp, childPlan, outputPath, committer, jobId) + CometNativeWriteExec(nativeOp, childPlan, outputPath, committer, jobId, cmd.catalogTable) } private def parseCompressionCodec(cmd: InsertIntoHadoopFsRelationCommand) = { diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala index 39e7ac6eef..0cb3741f45 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -63,7 +64,8 @@ case class CometNativeWriteExec( child: SparkPlan, outputPath: String, committer: Option[FileCommitProtocol] = None, - jobTrackerID: String = Utils.createTempDir().getName) + jobTrackerID: String = Utils.createTempDir().getName, + catalogTable: Option[CatalogTable] = None) extends CometNativeExec with UnaryExecNode { @@ -135,6 +137,11 @@ case class CometNativeWriteExec( } } + // Refresh the catalog table cache so subsequent reads see the new data + catalogTable.foreach { ct => + session.catalog.refreshTable(ct.identifier.quotedString) + } + // Return empty RDD as write operations don't return data sparkContext.emptyRDD[InternalRow] } From 788bcab77ad843fec2d97e10f4c832b7ec08c589 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sun, 15 Feb 2026 11:31:53 +0530 Subject: [PATCH 3/3] test: add CometNativeWriteExec plan assertion to SPARK-38811 tests --- .../parquet/CometParquetWriterSuite.scala | 66 ++++++++++++++++--- 1 file changed, 56 insertions(+), 10 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala index c2cb38da91..9e4c96854f 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala @@ -386,6 +386,51 @@ class CometParquetWriterSuite extends CometTestBase { } } + private def assertCometNativeWrite(insertSql: String): Unit = { + val plan = captureSqlWritePlan(insertSql) + val hasNativeWrite = plan.exists { + case _: CometNativeWriteExec => true + case d: DataWritingCommandExec => + d.child.exists(_.isInstanceOf[CometNativeWriteExec]) + case _ => false + } + assert( + hasNativeWrite, + s"Expected CometNativeWriteExec in plan, but not found:\n${plan.treeString}") + } + + private def captureSqlWritePlan(sqlText: String): SparkPlan = { + var capturedPlan: Option[QueryExecution] = None + + val listener = new org.apache.spark.sql.util.QueryExecutionListener { + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + if (funcName == "command") { + capturedPlan = Some(qe) + } + } + override def onFailure( + funcName: String, + qe: QueryExecution, + exception: Exception): Unit = {} + } + + spark.listenerManager.register(listener) + try { + sql(sqlText) + val maxWaitTimeMs = 5000 + val checkIntervalMs = 50 + var iterations = 0 + while (capturedPlan.isEmpty && iterations < maxWaitTimeMs / checkIntervalMs) { + Thread.sleep(checkIntervalMs) + iterations += 1 + } + assert(capturedPlan.isDefined, s"Failed to capture plan for: $sqlText") + stripAQEPlan(capturedPlan.get.executedPlan) + } finally { + spark.listenerManager.unregister(listener) + } + } + // SPARK-38811 INSERT INTO on columns added with ALTER TABLE ADD COLUMNS: Positive tests // Mirrors the Spark InsertSuite test to validate Comet native writer compatibility. @@ -394,7 +439,7 @@ class CometParquetWriterSuite extends CometTestBase { withTable("t") { sql("create table t(i boolean) using parquet") sql("alter table t add column s string default concat('abc', 'def')") - sql("insert into t values(true, default)") + assertCometNativeWrite("insert into t values(true, default)") checkAnswer(spark.table("t"), Row(true, "abcdef")) } } @@ -406,7 +451,7 @@ class CometParquetWriterSuite extends CometTestBase { sql("create table t(i int) using parquet") sql("alter table t add column s bigint default 42") sql("alter table t add column x bigint default 43") - sql("insert into t(i) values(1)") + assertCometNativeWrite("insert into t(i) values(1)") checkAnswer(spark.table("t"), Row(1, 42, 43)) } } @@ -417,7 +462,7 @@ class CometParquetWriterSuite extends CometTestBase { withTable("t") { sql("create table t(i int) using parquet") sql("alter table t add columns s bigint default 42, x bigint default 43") - sql("insert into t(i) values(1)") + assertCometNativeWrite("insert into t(i) values(1)") checkAnswer(spark.table("t"), Row(1, 42, 43)) } } @@ -429,7 +474,7 @@ class CometParquetWriterSuite extends CometTestBase { sql("create table t(i int) using parquet") sql("alter table t add column s bigint default 42") sql("alter table t add column x bigint") - sql("insert into t(i) values(1)") + assertCometNativeWrite("insert into t(i) values(1)") checkAnswer(spark.table("t"), Row(1, 42, null)) } } @@ -440,7 +485,7 @@ class CometParquetWriterSuite extends CometTestBase { withTable("t") { sql("create table t(i boolean) using parquet") sql("alter table t add column s bigint default 41 + 1") - sql("insert into t(i) values(default)") + assertCometNativeWrite("insert into t(i) values(default)") checkAnswer(spark.table("t"), Row(null, 42)) } } @@ -451,7 +496,7 @@ class CometParquetWriterSuite extends CometTestBase { withTable("t") { sql("create table t(i boolean default false) using parquet") sql("alter table t add column s bigint default 42") - sql("insert into t values(false, default), (default, 42)") + assertCometNativeWrite("insert into t values(false, default), (default, 42)") checkAnswer(spark.table("t"), Seq(Row(false, 42), Row(false, 42))) } } @@ -462,7 +507,8 @@ class CometParquetWriterSuite extends CometTestBase { withTable("t") { sql("create table t(i boolean) using parquet") sql("alter table t add column s bigint default 42") - sql("insert into t select * from values (false, default) as tab(col, other)") + assertCometNativeWrite( + "insert into t select * from values (false, default) as tab(col, other)") checkAnswer(spark.table("t"), Row(false, 42)) } } @@ -473,14 +519,14 @@ class CometParquetWriterSuite extends CometTestBase { withTable("t") { sql("create table t(i boolean) using parquet") sql("alter table t add column s bigint default 42") - sql("insert into t values (default, 43)") + assertCometNativeWrite("insert into t values (default, 43)") checkAnswer(spark.table("t"), Row(null, 43)) } } } - // INSERT INTO ... SELECT with native write config fails due to pre-existing - // catalog refresh issue tracked separately. Skipping these variants. + // INSERT INTO ... SELECT with native writer fails, + // open issue: https://github.com/apache/datafusion-comet/issues/3521 ignore("SPARK-38811: default via SELECT statement") { withNativeWriteConf { withTable("t") {