Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
114 changes: 79 additions & 35 deletions spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.sql.types._

import org.apache.comet.{CometConf, CometNativeException, DataTypeSupport}
import org.apache.comet.CometConf._
import org.apache.comet.CometSparkSessionExtensions.{hasExplainInfo, isCometLoaded, withInfo, withInfos}
import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, isSpark40Plus, withInfo, withInfos}
import org.apache.comet.DataTypeSupport.isComplexType
import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflection}
import org.apache.comet.objectstore.NativeConfig
Expand Down Expand Up @@ -145,21 +145,9 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
if (!CometScanExec.isFileFormatSupported(r.fileFormat)) {
return withInfo(scanExec, s"Unsupported file format ${r.fileFormat}")
}
val hadoopConf = r.sparkSession.sessionState.newHadoopConfWithOptions(r.options)

var scanImpl = COMET_NATIVE_SCAN_IMPL.get()

val hadoopConf = scanExec.relation.sparkSession.sessionState
.newHadoopConfWithOptions(scanExec.relation.options)

// if scan is auto then pick the best available scan
if (scanImpl == SCAN_AUTO) {
scanImpl = selectScan(scanExec, r.partitionSchema, hadoopConf)
}

if (scanImpl == SCAN_NATIVE_DATAFUSION && !CometNativeScan.isSupported(scanExec)) {
return scanExec
}

// TODO is this restriction valid for all native scan types?
val possibleDefaultValues = getExistenceDefaultValues(scanExec.requiredSchema)
if (possibleDefaultValues.exists(d => {
d != null && (d.isInstanceOf[ArrayBasedMapData] || d
Expand All @@ -173,29 +161,73 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
"Full native scan disabled because nested types for default values are not supported")
}

if (encryptionEnabled(hadoopConf) && scanImpl != CometConf.SCAN_NATIVE_COMET) {
if (!isEncryptionConfigSupported(hadoopConf)) {
withInfo(scanExec, s"$scanImpl does not support encryption")
}
}

// check that schema is supported
checkSchema(scanExec, scanImpl, r)

if (hasExplainInfo(scanExec)) {
// could not accelerate, and plan is already tagged with fallback reasons
scanExec
} else {
// this is confusing, but we always insert a CometScanExec here, which may replaced
// with a CometNativeExec when CometExecRule runs, depending on the scanImpl value.
CometScanExec(scanExec, session, scanImpl)
COMET_NATIVE_SCAN_IMPL.get() match {
case SCAN_AUTO =>
nativeDataFusionScan(session, scanExec, r, hadoopConf)
.orElse(nativeIcebergCompatScan(session, scanExec, r, hadoopConf))
.orElse(nativeCometScan(session, scanExec, r, hadoopConf))
.getOrElse(scanExec)
case SCAN_NATIVE_DATAFUSION =>
nativeDataFusionScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
case SCAN_NATIVE_ICEBERG_COMPAT =>
nativeIcebergCompatScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
case SCAN_NATIVE_COMET =>
nativeCometScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
}

case _ =>
withInfo(scanExec, s"Unsupported relation ${scanExec.relation}")
}
}

private def nativeDataFusionScan(
session: SparkSession,
scanExec: FileSourceScanExec,
r: HadoopFsRelation,
hadoopConf: Configuration): Option[SparkPlan] = {
if (isSpark40Plus) {
// there are still issues with Spark 4 support
return None
}
if (!CometNativeScan.isSupported(scanExec)) {
return None
}
if (encryptionEnabled(hadoopConf) && !isEncryptionConfigSupported(hadoopConf)) {
withInfo(scanExec, s"$SCAN_NATIVE_DATAFUSION does not support encryption")
return None
}
if (!isSchemaSupported(scanExec, SCAN_NATIVE_DATAFUSION, r)) {
return None
}
Some(CometScanExec(scanExec, session, SCAN_NATIVE_DATAFUSION))
}

private def nativeIcebergCompatScan(
session: SparkSession,
scanExec: FileSourceScanExec,
r: HadoopFsRelation,
hadoopConf: Configuration): Option[SparkPlan] = {
if (encryptionEnabled(hadoopConf) && !isEncryptionConfigSupported(hadoopConf)) {
withInfo(scanExec, s"$SCAN_NATIVE_ICEBERG_COMPAT does not support encryption")
return None
}
if (!isSchemaSupported(scanExec, SCAN_NATIVE_ICEBERG_COMPAT, r)) {
return None
}
Some(CometScanExec(scanExec, session, SCAN_NATIVE_ICEBERG_COMPAT))
}

private def nativeCometScan(
session: SparkSession,
scanExec: FileSourceScanExec,
r: HadoopFsRelation,
hadoopConf: Configuration): Option[SparkPlan] = {
if (!isSchemaSupported(scanExec, SCAN_NATIVE_COMET, r)) {
return None
}
Some(CometScanExec(scanExec, session, SCAN_NATIVE_COMET))
}

private def transformV2Scan(scanExec: BatchScanExec): SparkPlan = {

scanExec.scan match {
Expand Down Expand Up @@ -612,20 +644,32 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
private def isDynamicPruningFilter(e: Expression): Boolean =
e.exists(_.isInstanceOf[PlanExpression[_]])

def checkSchema(scanExec: FileSourceScanExec, scanImpl: String, r: HadoopFsRelation): Unit = {
private def isSchemaSupported(
scanExec: FileSourceScanExec,
scanImpl: String,
r: HadoopFsRelation): Boolean = {
val fallbackReasons = new ListBuffer[String]()
val typeChecker = CometScanTypeChecker(scanImpl)
val schemaSupported =
typeChecker.isSchemaSupported(scanExec.requiredSchema, fallbackReasons)
if (!schemaSupported) {
withInfo(scanExec, s"Unsupported schema ${scanExec.requiredSchema} for $scanImpl")
withInfo(
scanExec,
s"Unsupported schema ${scanExec.requiredSchema} " +
s"for $scanImpl: ${fallbackReasons.mkString(", ")}")
return false
}
val partitionSchemaSupported =
typeChecker.isSchemaSupported(r.partitionSchema, fallbackReasons)
if (!partitionSchemaSupported) {
fallbackReasons += s"Unsupported partitioning schema ${r.partitionSchema} for $scanImpl"
withInfo(
scanExec,
s"Unsupported partitioning schema ${scanExec.requiredSchema} " +
s"for $scanImpl: ${fallbackReasons
.mkString(", ")}")
return false
}
withInfos(scanExec, fallbackReasons.toSet)
true
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
: : : +- CometBroadcastExchange (6)
: : : +- CometProject (5)
: : : +- CometFilter (4)
: : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim (3)
: : : +- CometNativeScan parquet spark_catalog.default.date_dim (3)
: : +- CometBroadcastExchange (25)
: : +- CometFilter (24)
: : +- CometHashAggregate (23)
Expand All @@ -35,11 +35,11 @@
: +- CometBroadcastExchange (31)
: +- CometProject (30)
: +- CometFilter (29)
: +- CometScan [native_iceberg_compat] parquet spark_catalog.default.store (28)
: +- CometNativeScan parquet spark_catalog.default.store (28)
+- CometBroadcastExchange (37)
+- CometProject (36)
+- CometFilter (35)
+- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer (34)
+- CometNativeScan parquet spark_catalog.default.customer (34)


(1) CometScan [native_iceberg_compat] parquet spark_catalog.default.store_returns
Expand All @@ -54,7 +54,7 @@ ReadSchema: struct<sr_customer_sk:int,sr_store_sk:int,sr_return_amt:decimal(7,2)
Input [4]: [sr_customer_sk#1, sr_store_sk#2, sr_return_amt#3, sr_returned_date_sk#4]
Condition : (isnotnull(sr_store_sk#2) AND isnotnull(sr_customer_sk#1))

(3) CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
(3) CometNativeScan parquet spark_catalog.default.date_dim
Output [2]: [d_date_sk#6, d_year#7]
Batched: true
Location [not included in comparison]/{warehouse_dir}/date_dim]
Expand Down Expand Up @@ -169,7 +169,7 @@ Arguments: [ctr_store_sk#10], [ctr_store_sk#19], Inner, (cast(ctr_total_return#1
Input [5]: [ctr_customer_sk#9, ctr_store_sk#10, ctr_total_return#11, (avg(ctr_total_return) * 1.2)#23, ctr_store_sk#19]
Arguments: [ctr_customer_sk#9, ctr_store_sk#10], [ctr_customer_sk#9, ctr_store_sk#10]

(28) CometScan [native_iceberg_compat] parquet spark_catalog.default.store
(28) CometNativeScan parquet spark_catalog.default.store
Output [2]: [s_store_sk#24, s_state#25]
Batched: true
Location [not included in comparison]/{warehouse_dir}/store]
Expand Down Expand Up @@ -197,7 +197,7 @@ Arguments: [ctr_store_sk#10], [s_store_sk#24], Inner, BuildRight
Input [3]: [ctr_customer_sk#9, ctr_store_sk#10, s_store_sk#24]
Arguments: [ctr_customer_sk#9], [ctr_customer_sk#9]

(34) CometScan [native_iceberg_compat] parquet spark_catalog.default.customer
(34) CometNativeScan parquet spark_catalog.default.customer
Output [2]: [c_customer_sk#26, c_customer_id#27]
Batched: true
Location [not included in comparison]/{warehouse_dir}/customer]
Expand Down Expand Up @@ -239,10 +239,10 @@ BroadcastExchange (46)
+- * CometColumnarToRow (45)
+- CometProject (44)
+- CometFilter (43)
+- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim (42)
+- CometNativeScan parquet spark_catalog.default.date_dim (42)


(42) CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
(42) CometNativeScan parquet spark_catalog.default.date_dim
Output [2]: [d_date_sk#6, d_year#7]
Batched: true
Location [not included in comparison]/{warehouse_dir}/date_dim]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ CometColumnarToRow
: : : : +- CometColumnarToRow
: : : : +- CometProject
: : : : +- CometFilter
: : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
: : : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : : +- CometBroadcastExchange
: : : +- CometProject
: : : +- CometFilter
: : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
: : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : +- CometBroadcastExchange
: : +- CometFilter
: : +- CometHashAggregate
Expand All @@ -40,14 +40,14 @@ CometColumnarToRow
: : +- CometBroadcastExchange
: : +- CometProject
: : +- CometFilter
: : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
: : +- CometNativeScan parquet spark_catalog.default.date_dim
: +- CometBroadcastExchange
: +- CometProject
: +- CometFilter
: +- CometScan [native_iceberg_compat] parquet spark_catalog.default.store
: +- CometNativeScan parquet spark_catalog.default.store
+- CometBroadcastExchange
+- CometProject
+- CometFilter
+- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer
+- CometNativeScan parquet spark_catalog.default.customer

Comet accelerated 46 out of 49 eligible operators (93%). Final plan contains 2 transitions between Spark and Comet.
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ WholeStageCodegen (1)
InputAdapter
CometProject [d_date_sk]
CometFilter [d_date_sk,d_year]
CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim [d_date_sk,d_year]
CometNativeScan parquet spark_catalog.default.date_dim [d_date_sk,d_year]
CometBroadcastExchange [d_date_sk] #3
CometProject [d_date_sk]
CometFilter [d_date_sk,d_year]
CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim [d_date_sk,d_year]
CometNativeScan parquet spark_catalog.default.date_dim [d_date_sk,d_year]
CometBroadcastExchange [(avg(ctr_total_return) * 1.2),ctr_store_sk] #4
CometFilter [(avg(ctr_total_return) * 1.2),ctr_store_sk]
CometHashAggregate [sum,count] [(avg(ctr_total_return) * 1.2),ctr_store_sk,avg(ctr_total_return)]
Expand All @@ -45,8 +45,8 @@ WholeStageCodegen (1)
CometBroadcastExchange [s_store_sk] #7
CometProject [s_store_sk]
CometFilter [s_store_sk,s_state]
CometScan [native_iceberg_compat] parquet spark_catalog.default.store [s_store_sk,s_state]
CometNativeScan parquet spark_catalog.default.store [s_store_sk,s_state]
CometBroadcastExchange [c_customer_sk,c_customer_id] #8
CometProject [c_customer_id] [c_customer_sk,c_customer_id]
CometFilter [c_customer_sk,c_customer_id]
CometScan [native_iceberg_compat] parquet spark_catalog.default.customer [c_customer_sk,c_customer_id]
CometNativeScan parquet spark_catalog.default.customer [c_customer_sk,c_customer_id]
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ TakeOrderedAndProject (47)
: : : :- * CometColumnarToRow (12)
: : : : +- CometBroadcastHashJoin (11)
: : : : :- CometFilter (2)
: : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer (1)
: : : : : +- CometNativeScan parquet spark_catalog.default.customer (1)
: : : : +- CometBroadcastExchange (10)
: : : : +- CometProject (9)
: : : : +- CometBroadcastHashJoin (8)
: : : : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.store_sales (3)
: : : : +- CometBroadcastExchange (7)
: : : : +- CometProject (6)
: : : : +- CometFilter (5)
: : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim (4)
: : : : +- CometNativeScan parquet spark_catalog.default.date_dim (4)
: : : +- BroadcastExchange (18)
: : : +- * CometColumnarToRow (17)
: : : +- CometProject (16)
Expand All @@ -40,15 +40,15 @@ TakeOrderedAndProject (47)
: +- * CometColumnarToRow (32)
: +- CometProject (31)
: +- CometFilter (30)
: +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_address (29)
: +- CometNativeScan parquet spark_catalog.default.customer_address (29)
+- BroadcastExchange (40)
+- * CometColumnarToRow (39)
+- CometProject (38)
+- CometFilter (37)
+- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_demographics (36)
+- CometNativeScan parquet spark_catalog.default.customer_demographics (36)


(1) CometScan [native_iceberg_compat] parquet spark_catalog.default.customer
(1) CometNativeScan parquet spark_catalog.default.customer
Output [3]: [c_customer_sk#3, c_current_cdemo_sk#4, c_current_addr_sk#5]
Batched: true
Location [not included in comparison]/{warehouse_dir}/customer]
Expand All @@ -66,7 +66,7 @@ Location: InMemoryFileIndex []
PartitionFilters: [isnotnull(ss_sold_date_sk#7), dynamicpruningexpression(ss_sold_date_sk#7 IN dynamicpruning#8)]
ReadSchema: struct<ss_customer_sk:int>

(4) CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
(4) CometNativeScan parquet spark_catalog.default.date_dim
Output [3]: [d_date_sk#9, d_year#10, d_moy#11]
Batched: true
Location [not included in comparison]/{warehouse_dir}/date_dim]
Expand Down Expand Up @@ -178,7 +178,7 @@ Condition : (exists#2 OR exists#1)
Output [2]: [c_current_cdemo_sk#4, c_current_addr_sk#5]
Input [5]: [c_customer_sk#3, c_current_cdemo_sk#4, c_current_addr_sk#5, exists#2, exists#1]

(29) CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_address
(29) CometNativeScan parquet spark_catalog.default.customer_address
Output [2]: [ca_address_sk#20, ca_county#21]
Batched: true
Location [not included in comparison]/{warehouse_dir}/customer_address]
Expand Down Expand Up @@ -210,7 +210,7 @@ Join condition: None
Output [1]: [c_current_cdemo_sk#4]
Input [3]: [c_current_cdemo_sk#4, c_current_addr_sk#5, ca_address_sk#20]

(36) CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_demographics
(36) CometNativeScan parquet spark_catalog.default.customer_demographics
Output [9]: [cd_demo_sk#22, cd_gender#23, cd_marital_status#24, cd_education_status#25, cd_purchase_estimate#26, cd_credit_rating#27, cd_dep_count#28, cd_dep_employed_count#29, cd_dep_college_count#30]
Batched: true
Location [not included in comparison]/{warehouse_dir}/customer_demographics]
Expand Down Expand Up @@ -274,10 +274,10 @@ BroadcastExchange (52)
+- * CometColumnarToRow (51)
+- CometProject (50)
+- CometFilter (49)
+- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim (48)
+- CometNativeScan parquet spark_catalog.default.date_dim (48)


(48) CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
(48) CometNativeScan parquet spark_catalog.default.date_dim
Output [3]: [d_date_sk#9, d_year#10, d_moy#11]
Batched: true
Location [not included in comparison]/{warehouse_dir}/date_dim]
Expand Down
Loading
Loading