From a2269ff32e014169187dce92dbbe04e38c0e7ae6 Mon Sep 17 00:00:00 2001 From: Nikolay Volik Date: Tue, 29 Jul 2025 13:45:38 +0200 Subject: [PATCH] [core] Fixes apache/paimon#5875 Add required Field IDs to support ID-based column pruning. https://iceberg.apache.org/spec/#avro --- .../manifest/IcebergManifestEntry.java | 21 ++- .../iceberg/manifest/IcebergManifestFile.java | 12 +- .../manifest/IcebergManifestFileMeta.java | 4 +- .../iceberg/manifest/IcebergManifestList.java | 4 +- .../iceberg/IcebergCompatibilityTest.java | 161 +++++++++++++++++- .../format/avro/AvroSchemaConverter.java | 100 +++++++---- 6 files changed, 264 insertions(+), 38 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntry.java index 1e8108a9405e..40aea8c498fd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntry.java @@ -18,6 +18,7 @@ package org.apache.paimon.iceberg.manifest; +import org.apache.paimon.iceberg.metadata.IcebergPartitionField; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; @@ -111,16 +112,34 @@ public IcebergDataFileMeta file() { } public static RowType schema(RowType partitionType) { + + RowType icebergPartition = icebergPartitionType(partitionType); + List fields = new ArrayList<>(); fields.add(new DataField(0, "status", DataTypes.INT().notNull())); fields.add(new DataField(1, "snapshot_id", DataTypes.BIGINT())); fields.add(new DataField(3, "sequence_number", DataTypes.BIGINT())); fields.add(new DataField(4, "file_sequence_number", DataTypes.BIGINT())); fields.add( - new DataField(2, "data_file", IcebergDataFileMeta.schema(partitionType).notNull())); + new DataField( + 2, "data_file", IcebergDataFileMeta.schema(icebergPartition).notNull())); return new RowType(false, fields); } + // Use correct Field IDs to support ID-based column pruning. + // https://iceberg.apache.org/spec/#avro + public static RowType icebergPartitionType(RowType partitionType) { + List fields = new ArrayList<>(); + for (int i = 0; i < partitionType.getFields().size(); i++) { + fields.add( + partitionType + .getFields() + .get(i) + .newId(IcebergPartitionField.FIRST_FIELD_ID + i)); + } + return partitionType.copy(fields); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java index 43ce1016a9b5..fe1b9eff2cc6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java @@ -102,8 +102,18 @@ public static IcebergManifestFile create(FileStoreTable table, IcebergPathFactor avroOptions.set( "avro.row-name-mapping", "org.apache.paimon.avro.generated.record:manifest_entry," + + "iceberg:true," + "manifest_entry_data_file:r2," - + "r2_partition:r102"); + + "r2_partition:r102," + + "kv_name_r2_null_value_counts:k121_v122," + + "k_id_k121_v122:121," + + "v_id_k121_v122:122," + + "kv_name_r2_lower_bounds:k126_v127," + + "k_id_k126_v127:126," + + "v_id_k126_v127:127," + + "kv_name_r2_upper_bounds:k129_v130," + + "k_id_k129_v130:129," + + "v_id_k129_v130:130"); FileFormat manifestFileAvro = FileFormat.fromIdentifier("avro", avroOptions); return new IcebergManifestFile( table.fileIO(), diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java index c5fcb6005fcb..da3e0c24029e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java @@ -186,7 +186,7 @@ private static RowType schemaForIcebergNew() { fields.add(new DataField(514, "deleted_rows_count", DataTypes.BIGINT().notNull())); fields.add( new DataField( - 508, "partitions", DataTypes.ARRAY(IcebergPartitionSummary.schema()))); + 507, "partitions", DataTypes.ARRAY(IcebergPartitionSummary.schema()))); return new RowType(false, fields); } @@ -209,7 +209,7 @@ private static RowType schemaForIceberg1_4() { fields.add(new DataField(514, "deleted_rows_count", DataTypes.BIGINT().notNull())); fields.add( new DataField( - 508, "partitions", DataTypes.ARRAY(IcebergPartitionSummary.schema()))); + 507, "partitions", DataTypes.ARRAY(IcebergPartitionSummary.schema()))); return new RowType(false, fields); } diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java index ef78969a24d0..856e00ee95c8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java @@ -63,7 +63,9 @@ public static IcebergManifestList create(FileStoreTable table, IcebergPathFactor avroOptions.set( "avro.row-name-mapping", "org.apache.paimon.avro.generated.record:manifest_file," - + "manifest_file_partitions:r508"); + + "iceberg:true," + + "manifest_file_partitions:r508," + + "array_id_r508:508"); FileFormat fileFormat = FileFormat.fromIdentifier("avro", avroOptions); RowType manifestType = IcebergManifestFileMeta.schema( diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java index 5cb45cbc6f51..c58ae1eefc51 100644 --- a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java @@ -55,6 +55,8 @@ import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; +import org.apache.avro.Schema.Field; +import org.apache.avro.Schema.Type; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.SeekableFileInput; import org.apache.avro.generic.GenericData; @@ -84,6 +86,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; @@ -1114,6 +1117,114 @@ public void testPartitionedPrimaryKeyTable() throws Exception { Record::toString); } + @Test + public void testIcebergAvroFieldIds() throws Exception { + + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT(), DataTypes.VARCHAR(20), DataTypes.VARCHAR(20) + }, + new String[] {"k", "country", "day"}); + FileStoreTable table = + createPaimonTable( + rowType, + Arrays.asList("country", "day"), + Collections.singletonList("k"), + -1); + + String commitUser = UUID.randomUUID().toString(); + TableWriteImpl write = table.newWrite(commitUser); + TableCommitImpl commit = table.newCommit(commitUser); + + write.write( + GenericRow.of( + 1, BinaryString.fromString("Switzerland"), BinaryString.fromString("June")), + 1); + write.write( + GenericRow.of( + 2, BinaryString.fromString("Australia"), BinaryString.fromString("July")), + 1); + write.write( + GenericRow.of( + 3, BinaryString.fromString("Brazil"), BinaryString.fromString("October")), + 1); + write.write( + GenericRow.of( + 4, + BinaryString.fromString("Grand Duchy of Luxembourg"), + BinaryString.fromString("November")), + 1); + commit.commit(1, write.prepareCommit(false, 1)); + assertThat(getIcebergResult()) + .containsExactlyInAnyOrder( + "Record(1, Switzerland, June)", + "Record(2, Australia, July)", + "Record(3, Brazil, October)", + "Record(4, Grand Duchy of Luxembourg, November)"); + + org.apache.iceberg.Table icebergTable = getIcebergTable(); + String manifestListLocation = icebergTable.currentSnapshot().manifestListLocation(); + + Map manifestListFieldIdsMap = + parseAvroSchemaFieldIds(manifestListLocation); + assertThat(manifestListFieldIdsMap) + .hasSize(19) + .containsEntry("manifest_file:r508:contains_null", 509) + .containsEntry("manifest_file:r508:contains_nan", 518) + .containsEntry("manifest_file:added_snapshot_id", 503) + .containsEntry("manifest_file:added_files_count", 504) + .containsEntry("manifest_file:deleted_rows_count", 514) + .containsEntry("manifest_file:added_rows_count", 512) + .containsEntry("manifest_file:manifest_length", 501) + .containsEntry("manifest_file:partition_spec_id", 502) + .containsEntry("manifest_file:deleted_files_count", 506) + .containsEntry("manifest_file:partitions", 507) + .containsEntry("manifest_file:existing_files_count", 505) + .containsEntry("manifest_file:r508:upper_bound", 511) + .containsEntry("manifest_file:sequence_number", 515) + .containsEntry("manifest_file:min_sequence_number", 516) + .containsEntry("manifest_file:r508:lower_bound", 510) + .containsEntry("manifest_file:manifest_path", 500) + .containsEntry("manifest_file:content", 517) + .containsEntry("manifest_file:existing_rows_count", 513) + .containsEntry("r508", 508); + + String manifestPath = + icebergTable.currentSnapshot().allManifests(icebergTable.io()).get(0).path(); + Map manifestFieldIdsMap = parseAvroSchemaFieldIds(manifestPath); + assertThat(manifestFieldIdsMap) + .hasSize(28) + .containsEntry("manifest_entry:status", 0) + .containsEntry("manifest_entry:snapshot_id", 1) + .containsEntry("manifest_entry:data_file", 2) + .containsEntry("manifest_entry:sequence_number", 3) + .containsEntry("manifest_entry:file_sequence_number", 4) + .containsEntry("manifest_entry:r2:file_path", 100) + .containsEntry("manifest_entry:r2:file_format", 101) + .containsEntry("manifest_entry:r2:partition", 102) + .containsEntry("manifest_entry:r2:record_count", 103) + .containsEntry("manifest_entry:r2:file_size_in_bytes", 104) + .containsEntry("manifest_entry:r2:null_value_counts", 110) + .containsEntry("manifest_entry:r2:k121_v122:value", 122) + .containsEntry("manifest_entry:r2:k121_v122:key", 121) + .containsEntry("manifest_entry:r2:lower_bounds", 125) + .containsEntry("manifest_entry:r2:k126_v127:key", 126) + .containsEntry("manifest_entry:r2:k126_v127:value", 127) + .containsEntry("manifest_entry:r2:upper_bounds", 128) + .containsEntry("manifest_entry:r2:k129_v130:key", 129) + .containsEntry("manifest_entry:r2:k129_v130:value", 130) + .containsEntry("manifest_entry:r2:content", 134) + .containsEntry("manifest_entry:r2:referenced_data_file", 143) + .containsEntry("manifest_entry:r2:content_offset", 144) + .containsEntry("manifest_entry:r2:content_size_in_bytes", 145) + .containsEntry("manifest_entry:r2:r102:country", 1000) + .containsEntry("manifest_entry:r2:r102:day", 1001); + + write.close(); + commit.close(); + } + private void runCompatibilityTest( RowType rowType, List partitionKeys, @@ -1158,6 +1269,7 @@ private void runCompatibilityTest( } private static class TestRecord { + private final BinaryRow partition; private final GenericRow record; @@ -1214,13 +1326,17 @@ private List getIcebergResult() throws Exception { icebergTable -> IcebergGenerics.read(icebergTable).build(), Record::toString); } + private org.apache.iceberg.Table getIcebergTable() { + HadoopCatalog icebergCatalog = new HadoopCatalog(new Configuration(), tempDir.toString()); + TableIdentifier icebergIdentifier = TableIdentifier.of("mydb.db", "t"); + return icebergCatalog.loadTable(icebergIdentifier); + } + private List getIcebergResult( Function> query, Function icebergRecordToString) throws Exception { - HadoopCatalog icebergCatalog = new HadoopCatalog(new Configuration(), tempDir.toString()); - TableIdentifier icebergIdentifier = TableIdentifier.of("mydb.db", "t"); - org.apache.iceberg.Table icebergTable = icebergCatalog.loadTable(icebergIdentifier); + org.apache.iceberg.Table icebergTable = getIcebergTable(); CloseableIterable result = query.apply(icebergTable); List actual = new ArrayList<>(); for (Record record : result) { @@ -1229,4 +1345,43 @@ private List getIcebergResult( result.close(); return actual; } + + private Map parseAvroSchemaFieldIds(String avroPath) throws Exception { + Map fieldIdMap = new HashMap<>(); + try (DataFileReader dataFileReader = + new DataFileReader<>( + new SeekableFileInput(new File(avroPath)), new GenericDatumReader<>())) { + org.apache.avro.Schema schema = dataFileReader.getSchema(); + parseAvroFields(schema, fieldIdMap, schema.getName()); + } + return fieldIdMap; + } + + private void parseAvroFields( + org.apache.avro.Schema schema, Map fieldIdMap, String rootName) { + for (Field field : schema.getFields()) { + Object fieldId = field.getObjectProp("field-id"); + fieldIdMap.put(rootName + ":" + field.name(), (Integer) fieldId); + + org.apache.avro.Schema fieldSchema = field.schema(); + if (fieldSchema.getType() == org.apache.avro.Schema.Type.UNION) { + fieldSchema = + fieldSchema.getTypes().stream() + .filter(s -> s.getType() != Type.NULL) + .findFirst() + .get(); + } + if (Objects.requireNonNull(fieldSchema.getType()) == Type.RECORD) { + parseAvroFields(fieldSchema, fieldIdMap, rootName + ":" + fieldSchema.getName()); + } else if (fieldSchema.getType() == Type.ARRAY) { + org.apache.avro.Schema elementType = fieldSchema.getElementType(); + Object elementId = fieldSchema.getObjectProp("element-id"); + fieldIdMap.put(elementType.getName(), (Integer) elementId); + if (elementType.getType() == Type.RECORD) { + parseAvroFields( + elementType, fieldIdMap, rootName + ":" + elementType.getName()); + } + } + } + } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java index 87c08396f3da..5497751fcb93 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java @@ -19,6 +19,7 @@ package org.apache.paimon.format.avro; import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.types.DecimalType; @@ -33,13 +34,17 @@ import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; +import org.apache.avro.SchemaBuilder.ArrayBuilder; +import org.apache.avro.SchemaBuilder.FieldBuilder; -import java.util.List; import java.util.Map; +import java.util.Optional; /** Converts an Avro schema into Paimon's type information. */ public class AvroSchemaConverter { + private static final String ICEBERG = "iceberg"; + private AvroSchemaConverter() { // private } @@ -163,20 +168,25 @@ public static Schema convertToSchema( return nullable ? nullableSchema(decimal) : decimal; case ROW: RowType rowType = (RowType) dataType; - List fieldNames = rowType.getFieldNames(); // we have to make sure the record name is different in a Schema SchemaBuilder.FieldAssembler builder = SchemaBuilder.builder().record(rowName).fields(); - for (int i = 0; i < rowType.getFieldCount(); i++) { - String fieldName = fieldNames.get(i); - DataType fieldType = rowType.getTypeAt(i); + for (DataField dataField : rowType.getFields()) { + String fieldName = dataField.name(); + DataType fieldType = dataField.type(); + + FieldBuilder name = builder.name(fieldName); + + // Add required Field IDs to support ID-based column pruning. + // https://iceberg.apache.org/spec/#avro + if (rowNameMapping.containsKey(ICEBERG)) { + name.prop("field-id", dataField.id()); + } + SchemaBuilder.GenericDefault fieldBuilder = - builder.name(fieldName) - .type( - convertToSchema( - fieldType, - rowName + "_" + fieldName, - rowNameMapping)); + name.type( + convertToSchema( + fieldType, rowName + "_" + fieldName, rowNameMapping)); if (fieldType.isNullable()) { builder = fieldBuilder.withDefault(null); @@ -195,19 +205,43 @@ public static Schema convertToSchema( // Avro only natively support map with string key. // To represent a map with non-string key, we use an array containing several // rows. The first field of a row is the key, and the second field is the value. - SchemaBuilder.GenericDefault kvBuilder = - SchemaBuilder.builder() - .record(rowName) - .fields() - .name("key") - .type( - convertToSchema( - keyType, rowName + "_key", rowNameMapping)) + + // Add required Field IDs to support ID-based column pruning. + // https://iceberg.apache.org/spec/#avro + if (rowNameMapping.containsKey(ICEBERG)) { + rowName = + Optional.ofNullable(rowNameMapping.get("kv_name_" + rowName)) + .orElse(rowName); + } + + FieldBuilder key = + SchemaBuilder.builder().record(rowName).fields().name("key"); + + // Add required Field IDs to support ID-based column pruning. + // https://iceberg.apache.org/spec/#avro + if (rowNameMapping.containsKey(ICEBERG)) { + Optional.ofNullable(rowNameMapping.get("k_id_" + rowName)) + .map(Integer::valueOf) + .ifPresent(fieldId -> key.prop("field-id", fieldId)); + } + + FieldBuilder value = + key.type(convertToSchema(keyType, rowName + "_key", rowNameMapping)) .noDefault() - .name("value") - .type( - convertToSchema( - valueType, rowName + "_value", rowNameMapping)); + .name("value"); + + // Add required Field IDs to support ID-based column pruning. + // https://iceberg.apache.org/spec/#avro + if (rowNameMapping.containsKey(ICEBERG)) { + Optional.ofNullable(rowNameMapping.get("v_id_" + rowName)) + .map(Integer::valueOf) + .ifPresent(fieldId -> value.prop("field-id", fieldId)); + } + + SchemaBuilder.GenericDefault kvBuilder = + value.type( + convertToSchema(valueType, rowName + "_value", rowNameMapping)); + SchemaBuilder.FieldAssembler assembler = valueType.isNullable() ? kvBuilder.withDefault(null) @@ -226,14 +260,20 @@ public static Schema convertToSchema( return nullable ? nullableSchema(map) : map; case ARRAY: ArrayType arrayType = (ArrayType) dataType; + DataType elementType = arrayType.getElementType(); + + ArrayBuilder arrayBuilder = SchemaBuilder.builder().array(); + + // Add required Field IDs to support ID-based column pruning. + // https://iceberg.apache.org/spec/#avro + if (rowNameMapping.containsKey(ICEBERG)) { + Optional.ofNullable(rowNameMapping.get("array_id_" + rowName)) + .map(Integer::valueOf) + .ifPresent(elementId -> arrayBuilder.prop("element-id", elementId)); + } + Schema array = - SchemaBuilder.builder() - .array() - .items( - convertToSchema( - arrayType.getElementType(), - rowName, - rowNameMapping)); + arrayBuilder.items(convertToSchema(elementType, rowName, rowNameMapping)); return nullable ? nullableSchema(array) : array; default: throw new UnsupportedOperationException(