Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.iceberg.manifest;

import org.apache.paimon.iceberg.metadata.IcebergPartitionField;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -111,16 +112,34 @@ public IcebergDataFileMeta file() {
}

public static RowType schema(RowType partitionType) {

RowType icebergPartition = icebergPartitionType(partitionType);

List<DataField> fields = new ArrayList<>();
fields.add(new DataField(0, "status", DataTypes.INT().notNull()));
fields.add(new DataField(1, "snapshot_id", DataTypes.BIGINT()));
fields.add(new DataField(3, "sequence_number", DataTypes.BIGINT()));
fields.add(new DataField(4, "file_sequence_number", DataTypes.BIGINT()));
fields.add(
new DataField(2, "data_file", IcebergDataFileMeta.schema(partitionType).notNull()));
new DataField(
2, "data_file", IcebergDataFileMeta.schema(icebergPartition).notNull()));
return new RowType(false, fields);
}

// Use correct Field IDs to support ID-based column pruning.
// https://iceberg.apache.org/spec/#avro
public static RowType icebergPartitionType(RowType partitionType) {
List<DataField> fields = new ArrayList<>();
for (int i = 0; i < partitionType.getFields().size(); i++) {
fields.add(
partitionType
.getFields()
.get(i)
.newId(IcebergPartitionField.FIRST_FIELD_ID + i));
}
return partitionType.copy(fields);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,18 @@ public static IcebergManifestFile create(FileStoreTable table, IcebergPathFactor
avroOptions.set(
"avro.row-name-mapping",
"org.apache.paimon.avro.generated.record:manifest_entry,"
+ "iceberg:true,"
+ "manifest_entry_data_file:r2,"
+ "r2_partition:r102");
+ "r2_partition:r102,"
+ "kv_name_r2_null_value_counts:k121_v122,"
+ "k_id_k121_v122:121,"
+ "v_id_k121_v122:122,"
+ "kv_name_r2_lower_bounds:k126_v127,"
+ "k_id_k126_v127:126,"
+ "v_id_k126_v127:127,"
+ "kv_name_r2_upper_bounds:k129_v130,"
+ "k_id_k129_v130:129,"
+ "v_id_k129_v130:130");
FileFormat manifestFileAvro = FileFormat.fromIdentifier("avro", avroOptions);
return new IcebergManifestFile(
table.fileIO(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ private static RowType schemaForIcebergNew() {
fields.add(new DataField(514, "deleted_rows_count", DataTypes.BIGINT().notNull()));
fields.add(
new DataField(
508, "partitions", DataTypes.ARRAY(IcebergPartitionSummary.schema())));
507, "partitions", DataTypes.ARRAY(IcebergPartitionSummary.schema())));
return new RowType(false, fields);
}

Expand All @@ -209,7 +209,7 @@ private static RowType schemaForIceberg1_4() {
fields.add(new DataField(514, "deleted_rows_count", DataTypes.BIGINT().notNull()));
fields.add(
new DataField(
508, "partitions", DataTypes.ARRAY(IcebergPartitionSummary.schema())));
507, "partitions", DataTypes.ARRAY(IcebergPartitionSummary.schema())));
return new RowType(false, fields);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ public static IcebergManifestList create(FileStoreTable table, IcebergPathFactor
avroOptions.set(
"avro.row-name-mapping",
"org.apache.paimon.avro.generated.record:manifest_file,"
+ "manifest_file_partitions:r508");
+ "iceberg:true,"
+ "manifest_file_partitions:r508,"
+ "array_id_r508:508");
FileFormat fileFormat = FileFormat.fromIdentifier("avro", avroOptions);
RowType manifestType =
IcebergManifestFileMeta.schema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;

import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.SeekableFileInput;
import org.apache.avro.generic.GenericData;
Expand Down Expand Up @@ -84,6 +86,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -1114,6 +1117,114 @@ public void testPartitionedPrimaryKeyTable() throws Exception {
Record::toString);
}

@Test
public void testIcebergAvroFieldIds() throws Exception {

RowType rowType =
RowType.of(
new DataType[] {
DataTypes.INT(), DataTypes.VARCHAR(20), DataTypes.VARCHAR(20)
},
new String[] {"k", "country", "day"});
FileStoreTable table =
createPaimonTable(
rowType,
Arrays.asList("country", "day"),
Collections.singletonList("k"),
-1);

String commitUser = UUID.randomUUID().toString();
TableWriteImpl<?> write = table.newWrite(commitUser);
TableCommitImpl commit = table.newCommit(commitUser);

write.write(
GenericRow.of(
1, BinaryString.fromString("Switzerland"), BinaryString.fromString("June")),
1);
write.write(
GenericRow.of(
2, BinaryString.fromString("Australia"), BinaryString.fromString("July")),
1);
write.write(
GenericRow.of(
3, BinaryString.fromString("Brazil"), BinaryString.fromString("October")),
1);
write.write(
GenericRow.of(
4,
BinaryString.fromString("Grand Duchy of Luxembourg"),
BinaryString.fromString("November")),
1);
commit.commit(1, write.prepareCommit(false, 1));
assertThat(getIcebergResult())
.containsExactlyInAnyOrder(
"Record(1, Switzerland, June)",
"Record(2, Australia, July)",
"Record(3, Brazil, October)",
"Record(4, Grand Duchy of Luxembourg, November)");

org.apache.iceberg.Table icebergTable = getIcebergTable();
String manifestListLocation = icebergTable.currentSnapshot().manifestListLocation();

Map<String, Integer> manifestListFieldIdsMap =
parseAvroSchemaFieldIds(manifestListLocation);
assertThat(manifestListFieldIdsMap)
.hasSize(19)
.containsEntry("manifest_file:r508:contains_null", 509)
.containsEntry("manifest_file:r508:contains_nan", 518)
.containsEntry("manifest_file:added_snapshot_id", 503)
.containsEntry("manifest_file:added_files_count", 504)
.containsEntry("manifest_file:deleted_rows_count", 514)
.containsEntry("manifest_file:added_rows_count", 512)
.containsEntry("manifest_file:manifest_length", 501)
.containsEntry("manifest_file:partition_spec_id", 502)
.containsEntry("manifest_file:deleted_files_count", 506)
.containsEntry("manifest_file:partitions", 507)
.containsEntry("manifest_file:existing_files_count", 505)
.containsEntry("manifest_file:r508:upper_bound", 511)
.containsEntry("manifest_file:sequence_number", 515)
.containsEntry("manifest_file:min_sequence_number", 516)
.containsEntry("manifest_file:r508:lower_bound", 510)
.containsEntry("manifest_file:manifest_path", 500)
.containsEntry("manifest_file:content", 517)
.containsEntry("manifest_file:existing_rows_count", 513)
.containsEntry("r508", 508);

String manifestPath =
icebergTable.currentSnapshot().allManifests(icebergTable.io()).get(0).path();
Map<String, Integer> manifestFieldIdsMap = parseAvroSchemaFieldIds(manifestPath);
assertThat(manifestFieldIdsMap)
.hasSize(28)
.containsEntry("manifest_entry:status", 0)
.containsEntry("manifest_entry:snapshot_id", 1)
.containsEntry("manifest_entry:data_file", 2)
.containsEntry("manifest_entry:sequence_number", 3)
.containsEntry("manifest_entry:file_sequence_number", 4)
.containsEntry("manifest_entry:r2:file_path", 100)
.containsEntry("manifest_entry:r2:file_format", 101)
.containsEntry("manifest_entry:r2:partition", 102)
.containsEntry("manifest_entry:r2:record_count", 103)
.containsEntry("manifest_entry:r2:file_size_in_bytes", 104)
.containsEntry("manifest_entry:r2:null_value_counts", 110)
.containsEntry("manifest_entry:r2:k121_v122:value", 122)
.containsEntry("manifest_entry:r2:k121_v122:key", 121)
.containsEntry("manifest_entry:r2:lower_bounds", 125)
.containsEntry("manifest_entry:r2:k126_v127:key", 126)
.containsEntry("manifest_entry:r2:k126_v127:value", 127)
.containsEntry("manifest_entry:r2:upper_bounds", 128)
.containsEntry("manifest_entry:r2:k129_v130:key", 129)
.containsEntry("manifest_entry:r2:k129_v130:value", 130)
.containsEntry("manifest_entry:r2:content", 134)
.containsEntry("manifest_entry:r2:referenced_data_file", 143)
.containsEntry("manifest_entry:r2:content_offset", 144)
.containsEntry("manifest_entry:r2:content_size_in_bytes", 145)
.containsEntry("manifest_entry:r2:r102:country", 1000)
.containsEntry("manifest_entry:r2:r102:day", 1001);

write.close();
commit.close();
}

private void runCompatibilityTest(
RowType rowType,
List<String> partitionKeys,
Expand Down Expand Up @@ -1158,6 +1269,7 @@ private void runCompatibilityTest(
}

private static class TestRecord {

private final BinaryRow partition;
private final GenericRow record;

Expand Down Expand Up @@ -1214,13 +1326,17 @@ private List<String> getIcebergResult() throws Exception {
icebergTable -> IcebergGenerics.read(icebergTable).build(), Record::toString);
}

private org.apache.iceberg.Table getIcebergTable() {
HadoopCatalog icebergCatalog = new HadoopCatalog(new Configuration(), tempDir.toString());
TableIdentifier icebergIdentifier = TableIdentifier.of("mydb.db", "t");
return icebergCatalog.loadTable(icebergIdentifier);
}

private List<String> getIcebergResult(
Function<org.apache.iceberg.Table, CloseableIterable<Record>> query,
Function<Record, String> icebergRecordToString)
throws Exception {
HadoopCatalog icebergCatalog = new HadoopCatalog(new Configuration(), tempDir.toString());
TableIdentifier icebergIdentifier = TableIdentifier.of("mydb.db", "t");
org.apache.iceberg.Table icebergTable = icebergCatalog.loadTable(icebergIdentifier);
org.apache.iceberg.Table icebergTable = getIcebergTable();
CloseableIterable<Record> result = query.apply(icebergTable);
List<String> actual = new ArrayList<>();
for (Record record : result) {
Expand All @@ -1229,4 +1345,43 @@ private List<String> getIcebergResult(
result.close();
return actual;
}

private Map<String, Integer> parseAvroSchemaFieldIds(String avroPath) throws Exception {
Map<String, Integer> fieldIdMap = new HashMap<>();
try (DataFileReader<GenericRecord> dataFileReader =
new DataFileReader<>(
new SeekableFileInput(new File(avroPath)), new GenericDatumReader<>())) {
org.apache.avro.Schema schema = dataFileReader.getSchema();
parseAvroFields(schema, fieldIdMap, schema.getName());
}
return fieldIdMap;
}

private void parseAvroFields(
org.apache.avro.Schema schema, Map<String, Integer> fieldIdMap, String rootName) {
for (Field field : schema.getFields()) {
Object fieldId = field.getObjectProp("field-id");
fieldIdMap.put(rootName + ":" + field.name(), (Integer) fieldId);

org.apache.avro.Schema fieldSchema = field.schema();
if (fieldSchema.getType() == org.apache.avro.Schema.Type.UNION) {
fieldSchema =
fieldSchema.getTypes().stream()
.filter(s -> s.getType() != Type.NULL)
.findFirst()
.get();
}
if (Objects.requireNonNull(fieldSchema.getType()) == Type.RECORD) {
parseAvroFields(fieldSchema, fieldIdMap, rootName + ":" + fieldSchema.getName());
} else if (fieldSchema.getType() == Type.ARRAY) {
org.apache.avro.Schema elementType = fieldSchema.getElementType();
Object elementId = fieldSchema.getObjectProp("element-id");
fieldIdMap.put(elementType.getName(), (Integer) elementId);
if (elementType.getType() == Type.RECORD) {
parseAvroFields(
elementType, fieldIdMap, rootName + ":" + elementType.getName());
}
}
}
}
}
Loading
Loading