diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 01e385b0ae..2d82da2e06 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -42,7 +42,7 @@ import org.apache.spark.sql.types._ import org.apache.comet.{CometConf, CometNativeException, DataTypeSupport} import org.apache.comet.CometConf._ -import org.apache.comet.CometSparkSessionExtensions.{hasExplainInfo, isCometLoaded, withInfo, withInfos} +import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, withInfo, withInfos} import org.apache.comet.DataTypeSupport.isComplexType import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflection} import org.apache.comet.objectstore.NativeConfig @@ -145,21 +145,9 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com if (!CometScanExec.isFileFormatSupported(r.fileFormat)) { return withInfo(scanExec, s"Unsupported file format ${r.fileFormat}") } + val hadoopConf = r.sparkSession.sessionState.newHadoopConfWithOptions(r.options) - var scanImpl = COMET_NATIVE_SCAN_IMPL.get() - - val hadoopConf = scanExec.relation.sparkSession.sessionState - .newHadoopConfWithOptions(scanExec.relation.options) - - // if scan is auto then pick the best available scan - if (scanImpl == SCAN_AUTO) { - scanImpl = selectScan(scanExec, r.partitionSchema, hadoopConf) - } - - if (scanImpl == SCAN_NATIVE_DATAFUSION && !CometNativeScan.isSupported(scanExec)) { - return scanExec - } - + // TODO is this restriction valid for all native scan types? val possibleDefaultValues = getExistenceDefaultValues(scanExec.requiredSchema) if (possibleDefaultValues.exists(d => { d != null && (d.isInstanceOf[ArrayBasedMapData] || d @@ -171,24 +159,21 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com withInfo( scanExec, "Full native scan disabled because nested types for default values are not supported") + return scanExec } - if (encryptionEnabled(hadoopConf) && scanImpl != CometConf.SCAN_NATIVE_COMET) { - if (!isEncryptionConfigSupported(hadoopConf)) { - withInfo(scanExec, s"$scanImpl does not support encryption") - } - } - - // check that schema is supported - checkSchema(scanExec, scanImpl, r) - - if (hasExplainInfo(scanExec)) { - // could not accelerate, and plan is already tagged with fallback reasons - scanExec - } else { - // this is confusing, but we always insert a CometScanExec here, which may replaced - // with a CometNativeExec when CometExecRule runs, depending on the scanImpl value. - CometScanExec(scanExec, session, scanImpl) + COMET_NATIVE_SCAN_IMPL.get() match { + case SCAN_AUTO => + // TODO add support for native_datafusion in the future + nativeIcebergCompatScan(session, scanExec, r, hadoopConf) + .orElse(nativeCometScan(session, scanExec, r, hadoopConf)) + .getOrElse(scanExec) + case SCAN_NATIVE_DATAFUSION => + nativeDataFusionScan(session, scanExec, r, hadoopConf).getOrElse(scanExec) + case SCAN_NATIVE_ICEBERG_COMPAT => + nativeIcebergCompatScan(session, scanExec, r, hadoopConf).getOrElse(scanExec) + case SCAN_NATIVE_COMET => + nativeCometScan(session, scanExec, r, hadoopConf).getOrElse(scanExec) } case _ => @@ -196,6 +181,50 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com } } + private def nativeDataFusionScan( + session: SparkSession, + scanExec: FileSourceScanExec, + r: HadoopFsRelation, + hadoopConf: Configuration): Option[SparkPlan] = { + if (!CometNativeScan.isSupported(scanExec)) { + return None + } + if (encryptionEnabled(hadoopConf) && !isEncryptionConfigSupported(hadoopConf)) { + withInfo(scanExec, s"$SCAN_NATIVE_DATAFUSION does not support encryption") + return None + } + if (!isSchemaSupported(scanExec, SCAN_NATIVE_DATAFUSION, r)) { + return None + } + Some(CometScanExec(scanExec, session, SCAN_NATIVE_DATAFUSION)) + } + + private def nativeIcebergCompatScan( + session: SparkSession, + scanExec: FileSourceScanExec, + r: HadoopFsRelation, + hadoopConf: Configuration): Option[SparkPlan] = { + if (encryptionEnabled(hadoopConf) && !isEncryptionConfigSupported(hadoopConf)) { + withInfo(scanExec, s"$SCAN_NATIVE_ICEBERG_COMPAT does not support encryption") + return None + } + if (!isSchemaSupported(scanExec, SCAN_NATIVE_ICEBERG_COMPAT, r)) { + return None + } + Some(CometScanExec(scanExec, session, SCAN_NATIVE_ICEBERG_COMPAT)) + } + + private def nativeCometScan( + session: SparkSession, + scanExec: FileSourceScanExec, + r: HadoopFsRelation, + hadoopConf: Configuration): Option[SparkPlan] = { + if (!isSchemaSupported(scanExec, SCAN_NATIVE_COMET, r)) { + return None + } + Some(CometScanExec(scanExec, session, SCAN_NATIVE_COMET)) + } + private def transformV2Scan(scanExec: BatchScanExec): SparkPlan = { scanExec.scan match { @@ -612,20 +641,32 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com private def isDynamicPruningFilter(e: Expression): Boolean = e.exists(_.isInstanceOf[PlanExpression[_]]) - def checkSchema(scanExec: FileSourceScanExec, scanImpl: String, r: HadoopFsRelation): Unit = { + private def isSchemaSupported( + scanExec: FileSourceScanExec, + scanImpl: String, + r: HadoopFsRelation): Boolean = { val fallbackReasons = new ListBuffer[String]() val typeChecker = CometScanTypeChecker(scanImpl) val schemaSupported = typeChecker.isSchemaSupported(scanExec.requiredSchema, fallbackReasons) if (!schemaSupported) { - withInfo(scanExec, s"Unsupported schema ${scanExec.requiredSchema} for $scanImpl") + withInfo( + scanExec, + s"Unsupported schema ${scanExec.requiredSchema} " + + s"for $scanImpl: ${fallbackReasons.mkString(", ")}") + return false } val partitionSchemaSupported = typeChecker.isSchemaSupported(r.partitionSchema, fallbackReasons) if (!partitionSchemaSupported) { - fallbackReasons += s"Unsupported partitioning schema ${r.partitionSchema} for $scanImpl" + withInfo( + scanExec, + s"Unsupported partitioning schema ${scanExec.requiredSchema} " + + s"for $scanImpl: ${fallbackReasons + .mkString(", ")}") + return false } - withInfos(scanExec, fallbackReasons.toSet) + true } }