Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 76 additions & 35 deletions spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.sql.types._

import org.apache.comet.{CometConf, CometNativeException, DataTypeSupport}
import org.apache.comet.CometConf._
import org.apache.comet.CometSparkSessionExtensions.{hasExplainInfo, isCometLoaded, withInfo, withInfos}
import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, withInfo, withInfos}
import org.apache.comet.DataTypeSupport.isComplexType
import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflection}
import org.apache.comet.objectstore.NativeConfig
Expand Down Expand Up @@ -145,21 +145,9 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
if (!CometScanExec.isFileFormatSupported(r.fileFormat)) {
return withInfo(scanExec, s"Unsupported file format ${r.fileFormat}")
}
val hadoopConf = r.sparkSession.sessionState.newHadoopConfWithOptions(r.options)

var scanImpl = COMET_NATIVE_SCAN_IMPL.get()

val hadoopConf = scanExec.relation.sparkSession.sessionState
.newHadoopConfWithOptions(scanExec.relation.options)

// if scan is auto then pick the best available scan
if (scanImpl == SCAN_AUTO) {
scanImpl = selectScan(scanExec, r.partitionSchema, hadoopConf)
}

if (scanImpl == SCAN_NATIVE_DATAFUSION && !CometNativeScan.isSupported(scanExec)) {
return scanExec
}

// TODO is this restriction valid for all native scan types?
val possibleDefaultValues = getExistenceDefaultValues(scanExec.requiredSchema)
if (possibleDefaultValues.exists(d => {
d != null && (d.isInstanceOf[ArrayBasedMapData] || d
Expand All @@ -171,31 +159,72 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
withInfo(
scanExec,
"Full native scan disabled because nested types for default values are not supported")
return scanExec
}

if (encryptionEnabled(hadoopConf) && scanImpl != CometConf.SCAN_NATIVE_COMET) {
if (!isEncryptionConfigSupported(hadoopConf)) {
withInfo(scanExec, s"$scanImpl does not support encryption")
}
}

// check that schema is supported
checkSchema(scanExec, scanImpl, r)

if (hasExplainInfo(scanExec)) {
// could not accelerate, and plan is already tagged with fallback reasons
scanExec
} else {
// this is confusing, but we always insert a CometScanExec here, which may replaced
// with a CometNativeExec when CometExecRule runs, depending on the scanImpl value.
CometScanExec(scanExec, session, scanImpl)
COMET_NATIVE_SCAN_IMPL.get() match {
case SCAN_AUTO =>
// TODO add support for native_datafusion in the future
nativeIcebergCompatScan(session, scanExec, r, hadoopConf)
.orElse(nativeCometScan(session, scanExec, r, hadoopConf))
.getOrElse(scanExec)
case SCAN_NATIVE_DATAFUSION =>
nativeDataFusionScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
case SCAN_NATIVE_ICEBERG_COMPAT =>
nativeIcebergCompatScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
case SCAN_NATIVE_COMET =>
nativeCometScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
}

case _ =>
withInfo(scanExec, s"Unsupported relation ${scanExec.relation}")
}
}

private def nativeDataFusionScan(
session: SparkSession,
scanExec: FileSourceScanExec,
r: HadoopFsRelation,
hadoopConf: Configuration): Option[SparkPlan] = {
if (!CometNativeScan.isSupported(scanExec)) {
return None
}
if (encryptionEnabled(hadoopConf) && !isEncryptionConfigSupported(hadoopConf)) {
withInfo(scanExec, s"$SCAN_NATIVE_DATAFUSION does not support encryption")
return None
}
if (!isSchemaSupported(scanExec, SCAN_NATIVE_DATAFUSION, r)) {
return None
}
Some(CometScanExec(scanExec, session, SCAN_NATIVE_DATAFUSION))
}

private def nativeIcebergCompatScan(
session: SparkSession,
scanExec: FileSourceScanExec,
r: HadoopFsRelation,
hadoopConf: Configuration): Option[SparkPlan] = {
if (encryptionEnabled(hadoopConf) && !isEncryptionConfigSupported(hadoopConf)) {
withInfo(scanExec, s"$SCAN_NATIVE_ICEBERG_COMPAT does not support encryption")
return None
}
if (!isSchemaSupported(scanExec, SCAN_NATIVE_ICEBERG_COMPAT, r)) {
return None
}
Some(CometScanExec(scanExec, session, SCAN_NATIVE_ICEBERG_COMPAT))
}

private def nativeCometScan(
session: SparkSession,
scanExec: FileSourceScanExec,
r: HadoopFsRelation,
hadoopConf: Configuration): Option[SparkPlan] = {
if (!isSchemaSupported(scanExec, SCAN_NATIVE_COMET, r)) {
return None
}
Some(CometScanExec(scanExec, session, SCAN_NATIVE_COMET))
}

private def transformV2Scan(scanExec: BatchScanExec): SparkPlan = {

scanExec.scan match {
Expand Down Expand Up @@ -612,20 +641,32 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
private def isDynamicPruningFilter(e: Expression): Boolean =
e.exists(_.isInstanceOf[PlanExpression[_]])

def checkSchema(scanExec: FileSourceScanExec, scanImpl: String, r: HadoopFsRelation): Unit = {
private def isSchemaSupported(
scanExec: FileSourceScanExec,
scanImpl: String,
r: HadoopFsRelation): Boolean = {
val fallbackReasons = new ListBuffer[String]()
val typeChecker = CometScanTypeChecker(scanImpl)
val schemaSupported =
typeChecker.isSchemaSupported(scanExec.requiredSchema, fallbackReasons)
if (!schemaSupported) {
withInfo(scanExec, s"Unsupported schema ${scanExec.requiredSchema} for $scanImpl")
withInfo(
scanExec,
s"Unsupported schema ${scanExec.requiredSchema} " +
s"for $scanImpl: ${fallbackReasons.mkString(", ")}")
return false
}
val partitionSchemaSupported =
typeChecker.isSchemaSupported(r.partitionSchema, fallbackReasons)
if (!partitionSchemaSupported) {
fallbackReasons += s"Unsupported partitioning schema ${r.partitionSchema} for $scanImpl"
withInfo(
scanExec,
s"Unsupported partitioning schema ${scanExec.requiredSchema} " +
s"for $scanImpl: ${fallbackReasons
.mkString(", ")}")
return false
}
withInfos(scanExec, fallbackReasons.toSet)
true
}
}

Expand Down
Loading