From 0f77a3f9865593cd536f8a119dd56431c6cd60ff Mon Sep 17 00:00:00 2001 From: Sergey Troshkov Date: Tue, 23 Dec 2025 17:13:51 +0700 Subject: [PATCH 1/3] perf: use shallow projection where applicable --- .../org/apache/hudi/io/HoodieCDCLogger.java | 17 +------ .../org/apache/hudi/avro/HoodieAvroUtils.java | 39 ++++++++++++++++ .../hudi/common/model/HoodieAvroRecord.java | 2 +- .../org/apache/hudi/AvroProjection.scala | 44 ------------------- 4 files changed, 42 insertions(+), 60 deletions(-) delete mode 100644 hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AvroProjection.scala diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java index a833ac863da3a..0d9b74a7f2dc1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java @@ -18,6 +18,7 @@ package org.apache.hudi.io; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieAvroIndexedRecord; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieRecord; @@ -44,7 +45,6 @@ import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; -import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; @@ -275,20 +275,7 @@ private CDCTransformer getTransformer() { } private GenericRecord removeCommitMetadata(GenericRecord record) { - return record == null ? null : getRecordWithoutMetadata(record); - } - - private GenericRecord getRecordWithoutMetadata(GenericRecord record) { - Schema avroSchema = dataSchema.getAvroSchema(); - if (record.getSchema().getFields().size() == avroSchema.getFields().size()) { - return record; - } else { - GenericData.Record rec = new GenericData.Record(avroSchema); - for (Schema.Field field : avroSchema.getFields()) { - rec.put(field.pos(), record.get(field.name())); - } - return rec; - } + return record == null ? null : HoodieAvroUtils.projectRecordToNewSchemaShallow(record, dataSchema.getAvroSchema()); } // ------------------------------------------------------------------------- diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 0dce463eef0c5..d39292c5166c8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -1370,6 +1370,45 @@ public static BigDecimal convertBytesToBigDecimal(byte[] value, int precision, i scale, new MathContext(precision, RoundingMode.HALF_UP)); } + /** + * Projects a record to a new schema by performing a shallow copy of fields. + * Best used for adding or removing top-level metadata fields. + *

+ * This is a high-performance alternative to deep rewriting. It only iterates through + * the top-level fields of the target schema and pulls values from the source record + * by field name. + *

+ *

+ * This is significantly faster than {@link #rewriteRecordWithNewSchema} for: + * 1. Wide records (many top-level fields): Reduces CPU overhead/recursion. + * 2. Deeply nested records: Uses reference-copying for nested structures instead of rebuilding them. + *

+ * Warning: This method does not recursively rewrite/transform nested records, arrays, + * or maps. It assumes that the underlying values for each field are already + * compatible with the target schema. + * + * @param record The source GenericRecord to project. + * @param targetSchema The schema to project the record into. + * @return A new GenericRecord matching targetSchema, or the original record if + * the schemas are identical in field count. + */ + + public static GenericRecord projectRecordToNewSchemaShallow(GenericRecord record, Schema targetSchema) { + if (record.getSchema().getFields().size() == targetSchema.getFields().size()) { + return record; + } else { + GenericData.Record rec = new GenericData.Record(targetSchema); + for (Schema.Field field : targetSchema.getFields()) { + if (record.hasField(field.name())) { + rec.put(field.pos(), record.get(field.name())); + } else { + rec.put(field.pos(), null); + } + } + return rec; + } + } + /** * Avro does not support type promotion from numbers to string. This function returns true if * it will be necessary to rewrite the record to support this promotion. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java index 2b3822851febb..a8f08854833bf 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java @@ -152,7 +152,7 @@ public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) { public HoodieRecord prependMetaFields(Schema recordSchema, Schema targetSchema, MetadataValues metadataValues, Properties props) { try { Option avroRecordOpt = getData().getInsertValue(recordSchema, props); - GenericRecord newAvroRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecordOpt.get(), targetSchema); + GenericRecord newAvroRecord = HoodieAvroUtils.projectRecordToNewSchemaShallow((GenericRecord) avroRecordOpt.get(), targetSchema); updateMetadataValuesInternal(newAvroRecord, metadataValues); return new HoodieAvroIndexedRecord(getKey(), newAvroRecord, getOperation(), this.currentLocation, this.newLocation); } catch (IOException e) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AvroProjection.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AvroProjection.scala deleted file mode 100644 index 06b465a2e18cc..0000000000000 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AvroProjection.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.hudi - -import org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema - -import org.apache.avro.Schema -import org.apache.avro.generic.GenericRecord - -abstract class AvroProjection extends (GenericRecord => GenericRecord) - -object AvroProjection { - - /** - * Creates projection into provided [[Schema]] allowing to convert [[GenericRecord]] into - * new schema - */ - def create(schema: Schema): AvroProjection = { - val projection = (record: GenericRecord) => rewriteRecordWithNewSchema(record, schema) - (record: GenericRecord) => if (record.getSchema == schema) { - record - } else { - projection(record) - } - } - -} From d4d5c6afb4e1f7da043b1433441af801c76439b4 Mon Sep 17 00:00:00 2001 From: Sergey Troshkov Date: Thu, 25 Dec 2025 11:52:31 +0700 Subject: [PATCH 2/3] fix: fix based on review --- .../main/java/org/apache/hudi/avro/HoodieAvroUtils.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index d39292c5166c8..a8a30ad5faee4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -1399,10 +1399,11 @@ public static GenericRecord projectRecordToNewSchemaShallow(GenericRecord record } else { GenericData.Record rec = new GenericData.Record(targetSchema); for (Schema.Field field : targetSchema.getFields()) { - if (record.hasField(field.name())) { - rec.put(field.pos(), record.get(field.name())); - } else { + Field sourceField = record.getSchema().getField(field.name()); + if (sourceField == null) { rec.put(field.pos(), null); + } else { + rec.put(field.pos(), record.get(sourceField.pos())); } } return rec; From 836860e9769cd08f81b815bfc596ba7242e2600f Mon Sep 17 00:00:00 2001 From: Sergey Troshkov Date: Mon, 29 Dec 2025 14:40:00 +0700 Subject: [PATCH 3/3] fix: fix based on review v2 --- .../src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java | 4 ++-- .../java/org/apache/hudi/common/model/HoodieAvroRecord.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index a8a30ad5faee4..f2951fa05a023 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -1393,9 +1393,9 @@ public static BigDecimal convertBytesToBigDecimal(byte[] value, int precision, i * the schemas are identical in field count. */ - public static GenericRecord projectRecordToNewSchemaShallow(GenericRecord record, Schema targetSchema) { + public static GenericRecord projectRecordToNewSchemaShallow(IndexedRecord record, Schema targetSchema) { if (record.getSchema().getFields().size() == targetSchema.getFields().size()) { - return record; + return (GenericRecord) record; } else { GenericData.Record rec = new GenericData.Record(targetSchema); for (Schema.Field field : targetSchema.getFields()) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java index a8f08854833bf..5337cb86f13ed 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java @@ -152,7 +152,7 @@ public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) { public HoodieRecord prependMetaFields(Schema recordSchema, Schema targetSchema, MetadataValues metadataValues, Properties props) { try { Option avroRecordOpt = getData().getInsertValue(recordSchema, props); - GenericRecord newAvroRecord = HoodieAvroUtils.projectRecordToNewSchemaShallow((GenericRecord) avroRecordOpt.get(), targetSchema); + GenericRecord newAvroRecord = HoodieAvroUtils.projectRecordToNewSchemaShallow(avroRecordOpt.get(), targetSchema); updateMetadataValuesInternal(newAvroRecord, metadataValues); return new HoodieAvroIndexedRecord(getKey(), newAvroRecord, getOperation(), this.currentLocation, this.newLocation); } catch (IOException e) {