diff --git a/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java b/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java index ef97abf74c..8ce9f96c34 100644 --- a/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java @@ -63,6 +63,13 @@ public abstract class AbstractColumnReader implements AutoCloseable { /** A pointer to the native implementation of ColumnReader. */ protected long nativeHandle; + /** + * Whether to enable schema evolution in Comet. For instance, promoting a integer column to a long + * column, a float column to a double column, etc. This is automatically enabled when reading from + * Iceberg tables. + */ + protected boolean supportsSchemaEvolution; + public AbstractColumnReader( DataType type, Type fieldType, @@ -80,9 +87,11 @@ public AbstractColumnReader( DataType type, ColumnDescriptor descriptor, boolean useDecimal128, - boolean useLegacyDateTimestamp) { + boolean useLegacyDateTimestamp, + boolean supportsSchemaEvolution) { this(type, null, descriptor, useDecimal128, useLegacyDateTimestamp); - TypeUtil.checkParquetType(descriptor, type); + this.supportsSchemaEvolution = supportsSchemaEvolution; + TypeUtil.checkParquetType(descriptor, type, supportsSchemaEvolution); } public ColumnDescriptor getDescriptor() { @@ -120,7 +129,10 @@ public void close() { protected void initNative() { LOG.debug("initializing the native column reader"); - DataType readType = (boolean) CometConf.COMET_SCHEMA_EVOLUTION_ENABLED().get() ? type : null; + DataType readType = + ((boolean) CometConf.COMET_SCHEMA_EVOLUTION_ENABLED().get() || supportsSchemaEvolution) + ? type + : null; boolean useLegacyDateTimestampOrNTZ = useLegacyDateTimestamp || type == TimestampNTZType$.MODULE$; nativeHandle = diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java b/common/src/main/java/org/apache/comet/parquet/BatchReader.java index 663406d0a9..2a5c283d67 100644 --- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/BatchReader.java @@ -583,7 +583,8 @@ private boolean loadNextRowGroupIfNecessary() throws Throwable { capacity, useDecimal128, useLazyMaterialization, - useLegacyDateTimestamp); + useLegacyDateTimestamp, + false); reader.setPageReader(rowGroupReader.getPageReader(columns.get(i))); columnReaders[i] = reader; } diff --git a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java index 9502aa265d..25b2ee937c 100644 --- a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java @@ -99,8 +99,9 @@ public ColumnReader( CometSchemaImporter importer, int batchSize, boolean useDecimal128, - boolean useLegacyDateTimestamp) { - super(type, descriptor, useDecimal128, useLegacyDateTimestamp); + boolean useLegacyDateTimestamp, + boolean supportsSchemaEvolution) { + super(type, descriptor, useDecimal128, useLegacyDateTimestamp, supportsSchemaEvolution); assert batchSize > 0 : "Batch size must be positive, found " + batchSize; this.batchSize = batchSize; this.importer = importer; diff --git a/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java b/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java index b22278ea78..9d53a921f1 100644 --- a/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java @@ -49,8 +49,16 @@ public LazyColumnReader( CometSchemaImporter importer, int batchSize, boolean useDecimal128, - boolean useLegacyDateTimestamp) { - super(sparkReadType, descriptor, importer, batchSize, useDecimal128, useLegacyDateTimestamp); + boolean useLegacyDateTimestamp, + boolean supportsSchemaEvolution) { + super( + sparkReadType, + descriptor, + importer, + batchSize, + useDecimal128, + useLegacyDateTimestamp, + supportsSchemaEvolution); this.batchSize = 0; // the batch size is set later in `readBatch` this.vector = new CometLazyVector(sparkReadType, this, useDecimal128); } diff --git a/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java b/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java index 2820c42f89..e9be78128d 100644 --- a/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java @@ -45,7 +45,7 @@ public class MetadataColumnReader extends AbstractColumnReader { public MetadataColumnReader( DataType type, ColumnDescriptor descriptor, boolean useDecimal128, boolean isConstant) { // TODO: should we handle legacy dates & timestamps for metadata columns? - super(type, descriptor, useDecimal128, false); + super(type, descriptor, useDecimal128, false, false); this.isConstant = isConstant; } diff --git a/common/src/main/java/org/apache/comet/parquet/TypeUtil.java b/common/src/main/java/org/apache/comet/parquet/TypeUtil.java index c31497e472..2a6dc28669 100644 --- a/common/src/main/java/org/apache/comet/parquet/TypeUtil.java +++ b/common/src/main/java/org/apache/comet/parquet/TypeUtil.java @@ -116,11 +116,14 @@ public static ColumnDescriptor convertToParquet(StructField field) { * @param descriptor descriptor for a Parquet primitive column * @param sparkType Spark read type */ - public static void checkParquetType(ColumnDescriptor descriptor, DataType sparkType) { + public static void checkParquetType( + ColumnDescriptor descriptor, DataType sparkType, boolean allowTypePromotion) { PrimitiveType.PrimitiveTypeName typeName = descriptor.getPrimitiveType().getPrimitiveTypeName(); LogicalTypeAnnotation logicalTypeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation(); - boolean allowTypePromotion = (boolean) CometConf.COMET_SCHEMA_EVOLUTION_ENABLED().get(); + if (!allowTypePromotion) { + allowTypePromotion = (boolean) CometConf.COMET_SCHEMA_EVOLUTION_ENABLED().get(); + } if (sparkType instanceof NullType) { return; diff --git a/common/src/main/java/org/apache/comet/parquet/Utils.java b/common/src/main/java/org/apache/comet/parquet/Utils.java index 2f9c507366..f3f64254fa 100644 --- a/common/src/main/java/org/apache/comet/parquet/Utils.java +++ b/common/src/main/java/org/apache/comet/parquet/Utils.java @@ -28,19 +28,6 @@ public class Utils { - /** This method is called from Apache Iceberg. */ - public static ColumnReader getColumnReader( - DataType type, - ColumnDescriptor descriptor, - CometSchemaImporter importer, - int batchSize, - boolean useDecimal128, - boolean useLazyMaterialization) { - // TODO: support `useLegacyDateTimestamp` for Iceberg - return getColumnReader( - type, descriptor, importer, batchSize, useDecimal128, useLazyMaterialization, true); - } - public static ColumnReader getColumnReader( DataType type, ColumnDescriptor descriptor, @@ -48,13 +35,26 @@ public static ColumnReader getColumnReader( int batchSize, boolean useDecimal128, boolean useLazyMaterialization, - boolean useLegacyDateTimestamp) { + boolean useLegacyDateTimestamp, + boolean supportsSchemaEvolution) { if (useLazyMaterialization && supportLazyMaterialization(type)) { return new LazyColumnReader( - type, descriptor, importer, batchSize, useDecimal128, useLegacyDateTimestamp); + type, + descriptor, + importer, + batchSize, + useDecimal128, + useLegacyDateTimestamp, + supportsSchemaEvolution); } else { return new ColumnReader( - type, descriptor, importer, batchSize, useDecimal128, useLegacyDateTimestamp); + type, + descriptor, + importer, + batchSize, + useDecimal128, + useLegacyDateTimestamp, + supportsSchemaEvolution); } } diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index aab0cbc17a..17bdac2964 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -30,7 +30,6 @@ import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.HadoopFsRelation import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan -import org.apache.spark.sql.internal.SQLConf import org.apache.comet.CometConf import org.apache.comet.CometConf._ @@ -180,8 +179,6 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] { } if (s.isCometEnabled && schemaSupported) { - // When reading from Iceberg, we automatically enable type promotion - SQLConf.get.setConfString(COMET_SCHEMA_EVOLUTION_ENABLED.key, "true") CometBatchScanExec( scanExec.clone().asInstanceOf[BatchScanExec], runtimeFilters = scanExec.runtimeFilters)