From 5bae0ad799f1d742949d9b5a789e2f83418998b7 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 23 Dec 2025 10:09:16 -0700 Subject: [PATCH 1/3] refactor --- .../apache/comet/rules/CometScanRule.scala | 114 ++++++++++++------ 1 file changed, 79 insertions(+), 35 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 01e385b0ae..d5161a4a71 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -42,7 +42,7 @@ import org.apache.spark.sql.types._ import org.apache.comet.{CometConf, CometNativeException, DataTypeSupport} import org.apache.comet.CometConf._ -import org.apache.comet.CometSparkSessionExtensions.{hasExplainInfo, isCometLoaded, withInfo, withInfos} +import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, isSpark40Plus, withInfo, withInfos} import org.apache.comet.DataTypeSupport.isComplexType import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflection} import org.apache.comet.objectstore.NativeConfig @@ -145,21 +145,9 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com if (!CometScanExec.isFileFormatSupported(r.fileFormat)) { return withInfo(scanExec, s"Unsupported file format ${r.fileFormat}") } + val hadoopConf = r.sparkSession.sessionState.newHadoopConfWithOptions(r.options) - var scanImpl = COMET_NATIVE_SCAN_IMPL.get() - - val hadoopConf = scanExec.relation.sparkSession.sessionState - .newHadoopConfWithOptions(scanExec.relation.options) - - // if scan is auto then pick the best available scan - if (scanImpl == SCAN_AUTO) { - scanImpl = selectScan(scanExec, r.partitionSchema, hadoopConf) - } - - if (scanImpl == SCAN_NATIVE_DATAFUSION && !CometNativeScan.isSupported(scanExec)) { - return scanExec - } - + // TODO is this restriction valid for all native scan types? val possibleDefaultValues = getExistenceDefaultValues(scanExec.requiredSchema) if (possibleDefaultValues.exists(d => { d != null && (d.isInstanceOf[ArrayBasedMapData] || d @@ -173,22 +161,18 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com "Full native scan disabled because nested types for default values are not supported") } - if (encryptionEnabled(hadoopConf) && scanImpl != CometConf.SCAN_NATIVE_COMET) { - if (!isEncryptionConfigSupported(hadoopConf)) { - withInfo(scanExec, s"$scanImpl does not support encryption") - } - } - - // check that schema is supported - checkSchema(scanExec, scanImpl, r) - - if (hasExplainInfo(scanExec)) { - // could not accelerate, and plan is already tagged with fallback reasons - scanExec - } else { - // this is confusing, but we always insert a CometScanExec here, which may replaced - // with a CometNativeExec when CometExecRule runs, depending on the scanImpl value. - CometScanExec(scanExec, session, scanImpl) + COMET_NATIVE_SCAN_IMPL.get() match { + case SCAN_AUTO => + // TODO add support for native_datafusion in the future + nativeIcebergCompatScan(session, scanExec, r, hadoopConf) + .orElse(nativeCometScan(session, scanExec, r, hadoopConf)) + .getOrElse(scanExec) + case SCAN_NATIVE_DATAFUSION => + nativeDataFusionScan(session, scanExec, r, hadoopConf).getOrElse(scanExec) + case SCAN_NATIVE_ICEBERG_COMPAT => + nativeIcebergCompatScan(session, scanExec, r, hadoopConf).getOrElse(scanExec) + case SCAN_NATIVE_COMET => + nativeCometScan(session, scanExec, r, hadoopConf).getOrElse(scanExec) } case _ => @@ -196,6 +180,54 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com } } + private def nativeDataFusionScan( + session: SparkSession, + scanExec: FileSourceScanExec, + r: HadoopFsRelation, + hadoopConf: Configuration): Option[SparkPlan] = { + if (isSpark40Plus) { + // there are still issues with Spark 4 support + return None + } + if (!CometNativeScan.isSupported(scanExec)) { + return None + } + if (encryptionEnabled(hadoopConf) && !isEncryptionConfigSupported(hadoopConf)) { + withInfo(scanExec, s"$SCAN_NATIVE_DATAFUSION does not support encryption") + return None + } + if (!isSchemaSupported(scanExec, SCAN_NATIVE_DATAFUSION, r)) { + return None + } + Some(CometScanExec(scanExec, session, SCAN_NATIVE_DATAFUSION)) + } + + private def nativeIcebergCompatScan( + session: SparkSession, + scanExec: FileSourceScanExec, + r: HadoopFsRelation, + hadoopConf: Configuration): Option[SparkPlan] = { + if (encryptionEnabled(hadoopConf) && !isEncryptionConfigSupported(hadoopConf)) { + withInfo(scanExec, s"$SCAN_NATIVE_ICEBERG_COMPAT does not support encryption") + return None + } + if (!isSchemaSupported(scanExec, SCAN_NATIVE_ICEBERG_COMPAT, r)) { + return None + } + Some(CometScanExec(scanExec, session, SCAN_NATIVE_ICEBERG_COMPAT)) + } + + private def nativeCometScan( + session: SparkSession, + scanExec: FileSourceScanExec, + r: HadoopFsRelation, + hadoopConf: Configuration): Option[SparkPlan] = { + if (!isSchemaSupported(scanExec, SCAN_NATIVE_COMET, r)) { + return None + } + Some(CometScanExec(scanExec, session, SCAN_NATIVE_COMET)) + } + private def transformV2Scan(scanExec: BatchScanExec): SparkPlan = { scanExec.scan match { @@ -612,20 +644,32 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com private def isDynamicPruningFilter(e: Expression): Boolean = e.exists(_.isInstanceOf[PlanExpression[_]]) - def checkSchema(scanExec: FileSourceScanExec, scanImpl: String, r: HadoopFsRelation): Unit = { + private def isSchemaSupported( + scanExec: FileSourceScanExec, + scanImpl: String, + r: HadoopFsRelation): Boolean = { val fallbackReasons = new ListBuffer[String]() val typeChecker = CometScanTypeChecker(scanImpl) val schemaSupported = typeChecker.isSchemaSupported(scanExec.requiredSchema, fallbackReasons) if (!schemaSupported) { - withInfo(scanExec, s"Unsupported schema ${scanExec.requiredSchema} for $scanImpl") + withInfo( + scanExec, + s"Unsupported schema ${scanExec.requiredSchema} " + + s"for $scanImpl: ${fallbackReasons.mkString(", ")}") + return false } val partitionSchemaSupported = typeChecker.isSchemaSupported(r.partitionSchema, fallbackReasons) if (!partitionSchemaSupported) { - fallbackReasons += s"Unsupported partitioning schema ${r.partitionSchema} for $scanImpl" + withInfo( + scanExec, + s"Unsupported partitioning schema ${scanExec.requiredSchema} " + + s"for $scanImpl: ${fallbackReasons + .mkString(", ")}") + return false } - withInfos(scanExec, fallbackReasons.toSet) + true } } From e855756683a9fa7571ad27933d64e7c82283548f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 23 Dec 2025 11:09:52 -0700 Subject: [PATCH 2/3] fix regression --- .../main/scala/org/apache/comet/rules/CometScanRule.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index d5161a4a71..6e11416454 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -42,7 +42,7 @@ import org.apache.spark.sql.types._ import org.apache.comet.{CometConf, CometNativeException, DataTypeSupport} import org.apache.comet.CometConf._ -import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, isSpark40Plus, withInfo, withInfos} +import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, withInfo, withInfos} import org.apache.comet.DataTypeSupport.isComplexType import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflection} import org.apache.comet.objectstore.NativeConfig @@ -185,10 +185,6 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com scanExec: FileSourceScanExec, r: HadoopFsRelation, hadoopConf: Configuration): Option[SparkPlan] = { - if (isSpark40Plus) { - // there are still issues with Spark 4 support - return None - } if (!CometNativeScan.isSupported(scanExec)) { return None } From bce716a2b67728ec5d98bc152f2e3d81938ea61c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 23 Dec 2025 14:24:37 -0700 Subject: [PATCH 3/3] fix --- spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 6e11416454..2d82da2e06 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -159,6 +159,7 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com withInfo( scanExec, "Full native scan disabled because nested types for default values are not supported") + return scanExec } COMET_NATIVE_SCAN_IMPL.get() match {