From 899fe4fda97785647ca0c9384e04810d05e3f9a0 Mon Sep 17 00:00:00 2001 From: huaxingao Date: Wed, 7 May 2025 19:23:33 -0700 Subject: [PATCH 1/3] Support Schema Evolution in iceberg --- .../comet/parquet/AbstractColumnReader.java | 16 +++++++--- .../org/apache/comet/parquet/BatchReader.java | 3 +- .../apache/comet/parquet/ColumnReader.java | 5 +-- .../comet/parquet/LazyColumnReader.java | 12 +++++-- .../comet/parquet/MetadataColumnReader.java | 2 +- .../org/apache/comet/parquet/TypeUtil.java | 6 ++-- .../java/org/apache/comet/parquet/Utils.java | 32 +++++++++---------- .../scala/org/apache/comet/CometConf.scala | 10 ------ .../apache/comet/rules/CometScanRule.scala | 2 -- .../comet/parquet/ParquetReadSuite.scala | 28 ---------------- 10 files changed, 46 insertions(+), 70 deletions(-) diff --git a/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java b/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java index ef97abf74c..728f5c75f4 100644 --- a/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java @@ -27,7 +27,6 @@ import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.TimestampNTZType$; -import org.apache.comet.CometConf; import org.apache.comet.vector.CometVector; /** Base class for Comet Parquet column reader implementations. */ @@ -63,6 +62,13 @@ public abstract class AbstractColumnReader implements AutoCloseable { /** A pointer to the native implementation of ColumnReader. */ protected long nativeHandle; + /** + * Whether to enable schema evolution in Comet. For instance, promoting a integer column to a long + * column, a float column to a double column, etc. This is automatically enabled when reading from + * Iceberg tables. + */ + protected boolean supportsSchemaEvolution; + public AbstractColumnReader( DataType type, Type fieldType, @@ -80,9 +86,11 @@ public AbstractColumnReader( DataType type, ColumnDescriptor descriptor, boolean useDecimal128, - boolean useLegacyDateTimestamp) { + boolean useLegacyDateTimestamp, + boolean supportsSchemaEvolution) { this(type, null, descriptor, useDecimal128, useLegacyDateTimestamp); - TypeUtil.checkParquetType(descriptor, type); + this.supportsSchemaEvolution = supportsSchemaEvolution; + TypeUtil.checkParquetType(descriptor, type, supportsSchemaEvolution); } public ColumnDescriptor getDescriptor() { @@ -120,7 +128,7 @@ public void close() { protected void initNative() { LOG.debug("initializing the native column reader"); - DataType readType = (boolean) CometConf.COMET_SCHEMA_EVOLUTION_ENABLED().get() ? type : null; + DataType readType = supportsSchemaEvolution ? type : null; boolean useLegacyDateTimestampOrNTZ = useLegacyDateTimestamp || type == TimestampNTZType$.MODULE$; nativeHandle = diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java b/common/src/main/java/org/apache/comet/parquet/BatchReader.java index 663406d0a9..2a5c283d67 100644 --- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/BatchReader.java @@ -583,7 +583,8 @@ private boolean loadNextRowGroupIfNecessary() throws Throwable { capacity, useDecimal128, useLazyMaterialization, - useLegacyDateTimestamp); + useLegacyDateTimestamp, + false); reader.setPageReader(rowGroupReader.getPageReader(columns.get(i))); columnReaders[i] = reader; } diff --git a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java index 9502aa265d..25b2ee937c 100644 --- a/common/src/main/java/org/apache/comet/parquet/ColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/ColumnReader.java @@ -99,8 +99,9 @@ public ColumnReader( CometSchemaImporter importer, int batchSize, boolean useDecimal128, - boolean useLegacyDateTimestamp) { - super(type, descriptor, useDecimal128, useLegacyDateTimestamp); + boolean useLegacyDateTimestamp, + boolean supportsSchemaEvolution) { + super(type, descriptor, useDecimal128, useLegacyDateTimestamp, supportsSchemaEvolution); assert batchSize > 0 : "Batch size must be positive, found " + batchSize; this.batchSize = batchSize; this.importer = importer; diff --git a/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java b/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java index b22278ea78..9d53a921f1 100644 --- a/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/LazyColumnReader.java @@ -49,8 +49,16 @@ public LazyColumnReader( CometSchemaImporter importer, int batchSize, boolean useDecimal128, - boolean useLegacyDateTimestamp) { - super(sparkReadType, descriptor, importer, batchSize, useDecimal128, useLegacyDateTimestamp); + boolean useLegacyDateTimestamp, + boolean supportsSchemaEvolution) { + super( + sparkReadType, + descriptor, + importer, + batchSize, + useDecimal128, + useLegacyDateTimestamp, + supportsSchemaEvolution); this.batchSize = 0; // the batch size is set later in `readBatch` this.vector = new CometLazyVector(sparkReadType, this, useDecimal128); } diff --git a/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java b/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java index 2820c42f89..e9be78128d 100644 --- a/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java @@ -45,7 +45,7 @@ public class MetadataColumnReader extends AbstractColumnReader { public MetadataColumnReader( DataType type, ColumnDescriptor descriptor, boolean useDecimal128, boolean isConstant) { // TODO: should we handle legacy dates & timestamps for metadata columns? - super(type, descriptor, useDecimal128, false); + super(type, descriptor, useDecimal128, false, false); this.isConstant = isConstant; } diff --git a/common/src/main/java/org/apache/comet/parquet/TypeUtil.java b/common/src/main/java/org/apache/comet/parquet/TypeUtil.java index c31497e472..a15e89e1c9 100644 --- a/common/src/main/java/org/apache/comet/parquet/TypeUtil.java +++ b/common/src/main/java/org/apache/comet/parquet/TypeUtil.java @@ -31,8 +31,6 @@ import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.*; -import org.apache.comet.CometConf; - public class TypeUtil { /** Converts the input Spark 'field' into a Parquet column descriptor. */ @@ -116,11 +114,11 @@ public static ColumnDescriptor convertToParquet(StructField field) { * @param descriptor descriptor for a Parquet primitive column * @param sparkType Spark read type */ - public static void checkParquetType(ColumnDescriptor descriptor, DataType sparkType) { + public static void checkParquetType( + ColumnDescriptor descriptor, DataType sparkType, boolean allowTypePromotion) { PrimitiveType.PrimitiveTypeName typeName = descriptor.getPrimitiveType().getPrimitiveTypeName(); LogicalTypeAnnotation logicalTypeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation(); - boolean allowTypePromotion = (boolean) CometConf.COMET_SCHEMA_EVOLUTION_ENABLED().get(); if (sparkType instanceof NullType) { return; diff --git a/common/src/main/java/org/apache/comet/parquet/Utils.java b/common/src/main/java/org/apache/comet/parquet/Utils.java index 2f9c507366..f3f64254fa 100644 --- a/common/src/main/java/org/apache/comet/parquet/Utils.java +++ b/common/src/main/java/org/apache/comet/parquet/Utils.java @@ -28,19 +28,6 @@ public class Utils { - /** This method is called from Apache Iceberg. */ - public static ColumnReader getColumnReader( - DataType type, - ColumnDescriptor descriptor, - CometSchemaImporter importer, - int batchSize, - boolean useDecimal128, - boolean useLazyMaterialization) { - // TODO: support `useLegacyDateTimestamp` for Iceberg - return getColumnReader( - type, descriptor, importer, batchSize, useDecimal128, useLazyMaterialization, true); - } - public static ColumnReader getColumnReader( DataType type, ColumnDescriptor descriptor, @@ -48,13 +35,26 @@ public static ColumnReader getColumnReader( int batchSize, boolean useDecimal128, boolean useLazyMaterialization, - boolean useLegacyDateTimestamp) { + boolean useLegacyDateTimestamp, + boolean supportsSchemaEvolution) { if (useLazyMaterialization && supportLazyMaterialization(type)) { return new LazyColumnReader( - type, descriptor, importer, batchSize, useDecimal128, useLegacyDateTimestamp); + type, + descriptor, + importer, + batchSize, + useDecimal128, + useLegacyDateTimestamp, + supportsSchemaEvolution); } else { return new ColumnReader( - type, descriptor, importer, batchSize, useDecimal128, useLegacyDateTimestamp); + type, + descriptor, + importer, + batchSize, + useDecimal128, + useLegacyDateTimestamp, + supportsSchemaEvolution); } } diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 65f99a899f..eddc9b78e4 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -539,16 +539,6 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(true) - val COMET_SCHEMA_EVOLUTION_ENABLED: ConfigEntry[Boolean] = conf( - "spark.comet.schemaEvolution.enabled") - .internal() - .doc( - "Whether to enable schema evolution in Comet. For instance, promoting a integer " + - "column to a long column, a float column to a double column, etc. This is automatically" + - "enabled when reading from Iceberg tables.") - .booleanConf - .createWithDefault(COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT) - val COMET_SPARK_TO_ARROW_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.sparkToColumnar.enabled") .internal() diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index aab0cbc17a..31e57fe32e 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -180,8 +180,6 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] { } if (s.isCometEnabled && schemaSupported) { - // When reading from Iceberg, we automatically enable type promotion - SQLConf.get.setConfString(COMET_SCHEMA_EVOLUTION_ENABLED.key, "true") CometBatchScanExec( scanExec.clone().asInstanceOf[BatchScanExec], runtimeFilters = scanExec.runtimeFilters) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index f485039aeb..2ad08fe0e4 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -1217,34 +1217,6 @@ abstract class ParquetReadSuite extends CometTestBase { } } - test("schema evolution") { - Seq(true, false).foreach { enableSchemaEvolution => - Seq(true, false).foreach { useDictionary => - { - withSQLConf( - CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.key -> enableSchemaEvolution.toString) { - val data = (0 until 100).map(i => { - val v = if (useDictionary) i % 5 else i - (v, v.toFloat) - }) - val readSchema = - StructType( - Seq(StructField("_1", LongType, false), StructField("_2", DoubleType, false))) - - withParquetDataFrame(data, schema = Some(readSchema)) { df => - // TODO: validate with Spark 3.x and 'usingDataFusionParquetExec=true' - if (enableSchemaEvolution || usingDataFusionParquetExec(conf)) { - checkAnswer(df, data.map(Row.fromTuple)) - } else { - assertThrows[SparkException](df.collect()) - } - } - } - } - } - } - } - test("scan metrics") { // https://github.com/apache/datafusion-comet/issues/1441 assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_ICEBERG_COMPAT) From 6fc8b85c9ff8a2d78a4095aec4197c1fd81bf407 Mon Sep 17 00:00:00 2001 From: huaxingao Date: Thu, 8 May 2025 16:34:52 -0700 Subject: [PATCH 2/3] rebase and address comments --- spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 31e57fe32e..17bdac2964 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -30,7 +30,6 @@ import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.HadoopFsRelation import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan -import org.apache.spark.sql.internal.SQLConf import org.apache.comet.CometConf import org.apache.comet.CometConf._ From 74a10584061a88347da570241c5c4b31e4ce8a58 Mon Sep 17 00:00:00 2001 From: huaxingao Date: Mon, 12 May 2025 17:08:57 -0700 Subject: [PATCH 3/3] put back CometConf.COMET_SCHEMA_EVOLUTION_ENABLED --- .../comet/parquet/AbstractColumnReader.java | 6 +++- .../org/apache/comet/parquet/TypeUtil.java | 5 ++++ .../scala/org/apache/comet/CometConf.scala | 10 +++++++ .../comet/parquet/ParquetReadSuite.scala | 28 +++++++++++++++++++ 4 files changed, 48 insertions(+), 1 deletion(-) diff --git a/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java b/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java index 728f5c75f4..8ce9f96c34 100644 --- a/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java +++ b/common/src/main/java/org/apache/comet/parquet/AbstractColumnReader.java @@ -27,6 +27,7 @@ import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.TimestampNTZType$; +import org.apache.comet.CometConf; import org.apache.comet.vector.CometVector; /** Base class for Comet Parquet column reader implementations. */ @@ -128,7 +129,10 @@ public void close() { protected void initNative() { LOG.debug("initializing the native column reader"); - DataType readType = supportsSchemaEvolution ? type : null; + DataType readType = + ((boolean) CometConf.COMET_SCHEMA_EVOLUTION_ENABLED().get() || supportsSchemaEvolution) + ? type + : null; boolean useLegacyDateTimestampOrNTZ = useLegacyDateTimestamp || type == TimestampNTZType$.MODULE$; nativeHandle = diff --git a/common/src/main/java/org/apache/comet/parquet/TypeUtil.java b/common/src/main/java/org/apache/comet/parquet/TypeUtil.java index a15e89e1c9..2a6dc28669 100644 --- a/common/src/main/java/org/apache/comet/parquet/TypeUtil.java +++ b/common/src/main/java/org/apache/comet/parquet/TypeUtil.java @@ -31,6 +31,8 @@ import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.*; +import org.apache.comet.CometConf; + public class TypeUtil { /** Converts the input Spark 'field' into a Parquet column descriptor. */ @@ -119,6 +121,9 @@ public static void checkParquetType( PrimitiveType.PrimitiveTypeName typeName = descriptor.getPrimitiveType().getPrimitiveTypeName(); LogicalTypeAnnotation logicalTypeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation(); + if (!allowTypePromotion) { + allowTypePromotion = (boolean) CometConf.COMET_SCHEMA_EVOLUTION_ENABLED().get(); + } if (sparkType instanceof NullType) { return; diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index eddc9b78e4..65f99a899f 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -539,6 +539,16 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(true) + val COMET_SCHEMA_EVOLUTION_ENABLED: ConfigEntry[Boolean] = conf( + "spark.comet.schemaEvolution.enabled") + .internal() + .doc( + "Whether to enable schema evolution in Comet. For instance, promoting a integer " + + "column to a long column, a float column to a double column, etc. This is automatically" + + "enabled when reading from Iceberg tables.") + .booleanConf + .createWithDefault(COMET_SCHEMA_EVOLUTION_ENABLED_DEFAULT) + val COMET_SPARK_TO_ARROW_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.sparkToColumnar.enabled") .internal() diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 2ad08fe0e4..f485039aeb 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -1217,6 +1217,34 @@ abstract class ParquetReadSuite extends CometTestBase { } } + test("schema evolution") { + Seq(true, false).foreach { enableSchemaEvolution => + Seq(true, false).foreach { useDictionary => + { + withSQLConf( + CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.key -> enableSchemaEvolution.toString) { + val data = (0 until 100).map(i => { + val v = if (useDictionary) i % 5 else i + (v, v.toFloat) + }) + val readSchema = + StructType( + Seq(StructField("_1", LongType, false), StructField("_2", DoubleType, false))) + + withParquetDataFrame(data, schema = Some(readSchema)) { df => + // TODO: validate with Spark 3.x and 'usingDataFusionParquetExec=true' + if (enableSchemaEvolution || usingDataFusionParquetExec(conf)) { + checkAnswer(df, data.map(Row.fromTuple)) + } else { + assertThrows[SparkException](df.collect()) + } + } + } + } + } + } + } + test("scan metrics") { // https://github.com/apache/datafusion-comet/issues/1441 assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_ICEBERG_COMPAT)