diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java index a833ac863da3a..0d9b74a7f2dc1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java @@ -18,6 +18,7 @@ package org.apache.hudi.io; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieAvroIndexedRecord; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieRecord; @@ -44,7 +45,6 @@ import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; -import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; @@ -275,20 +275,7 @@ private CDCTransformer getTransformer() { } private GenericRecord removeCommitMetadata(GenericRecord record) { - return record == null ? null : getRecordWithoutMetadata(record); - } - - private GenericRecord getRecordWithoutMetadata(GenericRecord record) { - Schema avroSchema = dataSchema.getAvroSchema(); - if (record.getSchema().getFields().size() == avroSchema.getFields().size()) { - return record; - } else { - GenericData.Record rec = new GenericData.Record(avroSchema); - for (Schema.Field field : avroSchema.getFields()) { - rec.put(field.pos(), record.get(field.name())); - } - return rec; - } + return record == null ? null : HoodieAvroUtils.projectRecordToNewSchemaShallow(record, dataSchema.getAvroSchema()); } // ------------------------------------------------------------------------- diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 0dce463eef0c5..f2951fa05a023 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -1370,6 +1370,46 @@ public static BigDecimal convertBytesToBigDecimal(byte[] value, int precision, i scale, new MathContext(precision, RoundingMode.HALF_UP)); } + /** + * Projects a record to a new schema by performing a shallow copy of fields. + * Best used for adding or removing top-level metadata fields. + *

+ * This is a high-performance alternative to deep rewriting. It only iterates through + * the top-level fields of the target schema and pulls values from the source record + * by field name. + *

+ *

+ * This is significantly faster than {@link #rewriteRecordWithNewSchema} for: + * 1. Wide records (many top-level fields): Reduces CPU overhead/recursion. + * 2. Deeply nested records: Uses reference-copying for nested structures instead of rebuilding them. + *

+ * Warning: This method does not recursively rewrite/transform nested records, arrays, + * or maps. It assumes that the underlying values for each field are already + * compatible with the target schema. + * + * @param record The source GenericRecord to project. + * @param targetSchema The schema to project the record into. + * @return A new GenericRecord matching targetSchema, or the original record if + * the schemas are identical in field count. + */ + + public static GenericRecord projectRecordToNewSchemaShallow(IndexedRecord record, Schema targetSchema) { + if (record.getSchema().getFields().size() == targetSchema.getFields().size()) { + return (GenericRecord) record; + } else { + GenericData.Record rec = new GenericData.Record(targetSchema); + for (Schema.Field field : targetSchema.getFields()) { + Field sourceField = record.getSchema().getField(field.name()); + if (sourceField == null) { + rec.put(field.pos(), null); + } else { + rec.put(field.pos(), record.get(sourceField.pos())); + } + } + return rec; + } + } + /** * Avro does not support type promotion from numbers to string. This function returns true if * it will be necessary to rewrite the record to support this promotion. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java index 2b3822851febb..5337cb86f13ed 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java @@ -152,7 +152,7 @@ public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) { public HoodieRecord prependMetaFields(Schema recordSchema, Schema targetSchema, MetadataValues metadataValues, Properties props) { try { Option avroRecordOpt = getData().getInsertValue(recordSchema, props); - GenericRecord newAvroRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecordOpt.get(), targetSchema); + GenericRecord newAvroRecord = HoodieAvroUtils.projectRecordToNewSchemaShallow(avroRecordOpt.get(), targetSchema); updateMetadataValuesInternal(newAvroRecord, metadataValues); return new HoodieAvroIndexedRecord(getKey(), newAvroRecord, getOperation(), this.currentLocation, this.newLocation); } catch (IOException e) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AvroProjection.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AvroProjection.scala deleted file mode 100644 index 06b465a2e18cc..0000000000000 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AvroProjection.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.hudi - -import org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema - -import org.apache.avro.Schema -import org.apache.avro.generic.GenericRecord - -abstract class AvroProjection extends (GenericRecord => GenericRecord) - -object AvroProjection { - - /** - * Creates projection into provided [[Schema]] allowing to convert [[GenericRecord]] into - * new schema - */ - def create(schema: Schema): AvroProjection = { - val projection = (record: GenericRecord) => rewriteRecordWithNewSchema(record, schema) - (record: GenericRecord) => if (record.getSchema == schema) { - record - } else { - projection(record) - } - } - -}