diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java index a833ac863da3a..0d9b74a7f2dc1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java @@ -18,6 +18,7 @@ package org.apache.hudi.io; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieAvroIndexedRecord; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieRecord; @@ -44,7 +45,6 @@ import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; -import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; @@ -275,20 +275,7 @@ private CDCTransformer getTransformer() { } private GenericRecord removeCommitMetadata(GenericRecord record) { - return record == null ? null : getRecordWithoutMetadata(record); - } - - private GenericRecord getRecordWithoutMetadata(GenericRecord record) { - Schema avroSchema = dataSchema.getAvroSchema(); - if (record.getSchema().getFields().size() == avroSchema.getFields().size()) { - return record; - } else { - GenericData.Record rec = new GenericData.Record(avroSchema); - for (Schema.Field field : avroSchema.getFields()) { - rec.put(field.pos(), record.get(field.name())); - } - return rec; - } + return record == null ? null : HoodieAvroUtils.projectRecordToNewSchemaShallow(record, dataSchema.getAvroSchema()); } // ------------------------------------------------------------------------- diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 0dce463eef0c5..f2951fa05a023 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -1370,6 +1370,46 @@ public static BigDecimal convertBytesToBigDecimal(byte[] value, int precision, i scale, new MathContext(precision, RoundingMode.HALF_UP)); } + /** + * Projects a record to a new schema by performing a shallow copy of fields. + * Best used for adding or removing top-level metadata fields. + *
+ * This is a high-performance alternative to deep rewriting. It only iterates through + * the top-level fields of the target schema and pulls values from the source record + * by field name. + *
+ *
+ * This is significantly faster than {@link #rewriteRecordWithNewSchema} for: + * 1. Wide records (many top-level fields): Reduces CPU overhead/recursion. + * 2. Deeply nested records: Uses reference-copying for nested structures instead of rebuilding them. + *
+ * Warning: This method does not recursively rewrite/transform nested records, arrays,
+ * or maps. It assumes that the underlying values for each field are already
+ * compatible with the target schema.
+ *
+ * @param record The source GenericRecord to project.
+ * @param targetSchema The schema to project the record into.
+ * @return A new GenericRecord matching targetSchema, or the original record if
+ * the schemas are identical in field count.
+ */
+
+ public static GenericRecord projectRecordToNewSchemaShallow(IndexedRecord record, Schema targetSchema) {
+ if (record.getSchema().getFields().size() == targetSchema.getFields().size()) {
+ return (GenericRecord) record;
+ } else {
+ GenericData.Record rec = new GenericData.Record(targetSchema);
+ for (Schema.Field field : targetSchema.getFields()) {
+ Field sourceField = record.getSchema().getField(field.name());
+ if (sourceField == null) {
+ rec.put(field.pos(), null);
+ } else {
+ rec.put(field.pos(), record.get(sourceField.pos()));
+ }
+ }
+ return rec;
+ }
+ }
+
/**
* Avro does not support type promotion from numbers to string. This function returns true if
* it will be necessary to rewrite the record to support this promotion.
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
index 2b3822851febb..5337cb86f13ed 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
@@ -152,7 +152,7 @@ public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) {
public HoodieRecord prependMetaFields(Schema recordSchema, Schema targetSchema, MetadataValues metadataValues, Properties props) {
try {
Option