Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.fluss.types.BooleanType;
import org.apache.fluss.types.BytesType;
import org.apache.fluss.types.CharType;
import org.apache.fluss.types.DataField;
import org.apache.fluss.types.DataTypeVisitor;
import org.apache.fluss.types.DateType;
import org.apache.fluss.types.DecimalType;
Expand All @@ -41,6 +42,9 @@
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

import java.util.ArrayList;
import java.util.List;

/** Convert from Fluss's data type to Iceberg's data type. */
public class FlussDataTypeToIcebergDataType implements DataTypeVisitor<Type> {

Expand Down Expand Up @@ -168,6 +172,28 @@ public Type visit(MapType mapType) {

@Override
public Type visit(RowType rowType) {
throw new UnsupportedOperationException("Unsupported row type");
List<Types.NestedField> fields = new ArrayList<>();

for (DataField field : rowType.getFields()) {
Type fieldType = field.getType().accept(this);

if (field.getType().isNullable()) {
fields.add(
Types.NestedField.optional(
getNextId(),
field.getName(),
fieldType,
field.getDescription().orElse(null)));
} else {
fields.add(
Types.NestedField.required(
getNextId(),
field.getName(),
fieldType,
field.getDescription().orElse(null)));
}
}

return Types.StructType.of(fields);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.fluss.lake.iceberg.source;

import org.apache.fluss.lake.iceberg.FlussDataTypeToIcebergDataType;
import org.apache.fluss.row.InternalArray;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.types.ArrayType;
Expand Down Expand Up @@ -179,6 +180,15 @@ private FlussRowToIcebergFieldConverter createTypeConverter(DataType flussType,
? null
: new FlussArrayAsIcebergList(array, arrayType.getElementType());
};
} else if (flussType instanceof RowType) {
RowType rowType = (RowType) flussType;
Types.StructType nestedStructType =
(Types.StructType) rowType.accept(FlussDataTypeToIcebergDataType.INSTANCE);

return row -> {
InternalRow nestedRow = row.getRow(pos, rowType.getFieldCount());
return new FlussRowAsIcebergRecord(nestedStructType, rowType, nestedRow);
};
} else {
throw new UnsupportedOperationException(
"Unsupported data type conversion for Fluss type: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ public class IcebergRecordAsFlussRow implements InternalRow {

public IcebergRecordAsFlussRow() {}

public IcebergRecordAsFlussRow(Record icebergRecord) {
this.icebergRecord = icebergRecord;
}

public IcebergRecordAsFlussRow replaceIcebergRecord(Record icebergRecord) {
this.icebergRecord = icebergRecord;
return this;
Expand Down Expand Up @@ -169,7 +173,18 @@ public InternalMap getMap(int pos) {

@Override
public InternalRow getRow(int pos, int numFields) {
// TODO: Support Row type conversion from Iceberg to Fluss
throw new UnsupportedOperationException();
Object value = icebergRecord.get(pos);
if (value == null) {
return null;
}
if (value instanceof Record) {
return new IcebergRecordAsFlussRow((Record) value);
} else {
throw new IllegalArgumentException(
"Expected Iceberg Record for nested row at position "
+ pos
+ " but found: "
+ value.getClass().getName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.types.DataField;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.DataTypes;
import org.apache.fluss.types.RowType;
Expand All @@ -38,6 +39,7 @@

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.List;

import static org.apache.fluss.metadata.ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR;
Expand Down Expand Up @@ -128,7 +130,16 @@ private static DataType convertIcebergTypeToFlussType(Type icebergType) {
} else if (icebergType instanceof Types.ListType) {
Types.ListType listType = (Types.ListType) icebergType;
return DataTypes.ARRAY(convertIcebergTypeToFlussType(listType.elementType()));
} else if (icebergType.isStructType()) {
Types.StructType structType = icebergType.asStructType();
List<DataField> fields = new ArrayList<>();
for (Types.NestedField nestedField : structType.fields()) {
DataType fieldType = convertIcebergTypeToFlussType(nestedField.type());
fields.add(new DataField(nestedField.name(), fieldType));
}
return DataTypes.ROW(fields.toArray(new DataField[0]));
}

throw new UnsupportedOperationException(
"Unsupported data type conversion for Iceberg type: "
+ icebergType.getClass().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,12 @@ void testReadLogTableFullType(boolean isPartitioned) throws Exception {
// check filter push down
assertThat(plan)
.contains("TableSourceScan(")
.contains("LogicalFilter(condition=[=($15, _UTF-16LE'" + partition + "'")
.contains("LogicalFilter(condition=[=($16, _UTF-16LE'" + partition + "'")
.contains("filter=[=(p, _UTF-16LE'" + partition + "'");

List<Row> expectedFiltered =
writtenRows.stream()
.filter(r -> partition.equals(r.getField(15)))
.filter(r -> partition.equals(r.getField(16)))
.collect(Collectors.toList());

List<Row> actualFiltered =
Expand Down Expand Up @@ -295,7 +295,12 @@ protected long createFullTypeLogTable(
.column("f_timestamp_ltz2", DataTypes.TIMESTAMP_LTZ(6))
.column("f_timestamp_ntz1", DataTypes.TIMESTAMP(3))
.column("f_timestamp_ntz2", DataTypes.TIMESTAMP(6))
.column("f_binary", DataTypes.BINARY(4));
.column("f_binary", DataTypes.BINARY(4))
.column(
"f_row",
DataTypes.ROW(
DataTypes.FIELD("f_nested_int", DataTypes.INT()),
DataTypes.FIELD("f_nested_string", DataTypes.STRING())));

TableDescriptor.Builder tableBuilder =
TableDescriptor.builder()
Expand Down Expand Up @@ -342,7 +347,8 @@ private List<Row> writeFullTypeRows(
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
TimestampNtz.fromMillis(1698235273501L),
TimestampNtz.fromMillis(1698235273501L, 8000),
new byte[] {5, 6, 7, 8}));
new byte[] {5, 6, 7, 8},
row(10, "nested_string")));

flinkRows.add(
Row.of(
Expand All @@ -364,7 +370,8 @@ private List<Row> writeFullTypeRows(
Instant.ofEpochMilli(1698235273501L),
ZoneId.of("UTC"))
.plusNanos(8000),
new byte[] {5, 6, 7, 8}));
new byte[] {5, 6, 7, 8},
Row.of(10, "nested_string")));
} else {
rows.add(
row(
Expand All @@ -383,6 +390,7 @@ private List<Row> writeFullTypeRows(
TimestampNtz.fromMillis(1698235273501L),
TimestampNtz.fromMillis(1698235273501L, 8000),
new byte[] {5, 6, 7, 8},
row(10, "nested_string"),
partition));

flinkRows.add(
Expand All @@ -406,6 +414,7 @@ private List<Row> writeFullTypeRows(
ZoneId.of("UTC"))
.plusNanos(8000),
new byte[] {5, 6, 7, 8},
Row.of(10, "nested_string"),
partition));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.row.BinaryString;
import org.apache.fluss.row.Decimal;
import org.apache.fluss.row.GenericArray;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.TimestampLtz;
import org.apache.fluss.row.TimestampNtz;
Expand Down Expand Up @@ -100,7 +102,8 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
TimestampNtz.fromMillis(1698235273183L, 6000),
new byte[] {1, 2, 3, 4},
new float[] {1.1f, 1.2f, 1.3f},
partition));
partition,
Row.of(1, "nested_row1")));
expectedRows.add(
Row.of(
true,
Expand All @@ -119,7 +122,8 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
TimestampNtz.fromMillis(1698235273201L, 6000),
new byte[] {1, 2, 3, 4},
new float[] {1.1f, 1.2f, 1.3f},
partition));
partition,
Row.of(2, "nested_row2")));
}
} else {
expectedRows =
Expand All @@ -141,7 +145,8 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
TimestampNtz.fromMillis(1698235273183L, 6000),
new byte[] {1, 2, 3, 4},
new float[] {1.1f, 1.2f, 1.3f},
null),
null,
Row.of(1, "nested_row1")),
Row.of(
true,
(byte) 10,
Expand All @@ -159,7 +164,8 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
TimestampNtz.fromMillis(1698235273201L, 6000),
new byte[] {1, 2, 3, 4},
new float[] {1.1f, 1.2f, 1.3f},
null));
null,
Row.of(2, "nested_row2")));
}

String query = "select * from " + tableName;
Expand Down Expand Up @@ -202,7 +208,8 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
TimestampNtz.fromMillis(1698235273201L, 6000),
new byte[] {1, 2, 3, 4},
new float[] {1.1f, 1.2f, 1.3f},
partition));
partition,
Row.of(2, "nested_row2")));
expectedRows2.add(
Row.ofKind(
RowKind.UPDATE_AFTER,
Expand All @@ -222,7 +229,8 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
TimestampNtz.fromMillis(1698235273501L, 8000),
new byte[] {5, 6, 7, 8},
new float[] {2.1f, 2.2f, 2.3f},
partition));
partition,
Row.of(3, "nested_update")));
}
} else {
expectedRows2.add(
Expand All @@ -244,7 +252,8 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
TimestampNtz.fromMillis(1698235273201L, 6000),
new byte[] {1, 2, 3, 4},
new float[] {1.1f, 1.2f, 1.3f},
null));
null,
Row.of(2, "nested_row2")));
expectedRows2.add(
Row.ofKind(
RowKind.UPDATE_AFTER,
Expand All @@ -264,7 +273,8 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
TimestampNtz.fromMillis(1698235273501L, 8000),
new byte[] {5, 6, 7, 8},
new float[] {2.1f, 2.2f, 2.3f},
null));
null,
Row.of(3, "nested_update")));
}

if (isPartitioned) {
Expand Down Expand Up @@ -349,6 +359,10 @@ void testReadIcebergLakeTable(boolean isPartitioned) throws Exception {
}

private void writeFullTypeRow(TablePath tablePath, String partition) throws Exception {
GenericRow nestedRow = new GenericRow(2);
nestedRow.setField(0, 3);
nestedRow.setField(1, BinaryString.fromString("nested_update"));

List<InternalRow> rows =
Collections.singletonList(
row(
Expand All @@ -368,7 +382,8 @@ private void writeFullTypeRow(TablePath tablePath, String partition) throws Exce
TimestampNtz.fromMillis(1698235273501L, 8000),
new byte[] {5, 6, 7, 8},
new GenericArray(new float[] {2.1f, 2.2f, 2.3f}),
partition));
partition,
nestedRow));
writeRows(tablePath, rows, false);
}

Expand Down Expand Up @@ -422,7 +437,12 @@ protected long createPkTableFullType(TablePath tablePath, int bucketNum, boolean
.column("c14", DataTypes.TIMESTAMP(6))
.column("c15", DataTypes.BINARY(4))
.column("c16", DataTypes.ARRAY(DataTypes.FLOAT()))
.column("c17", DataTypes.STRING());
.column("c17", DataTypes.STRING())
.column(
"c18",
DataTypes.ROW(
DataTypes.FIELD("a", DataTypes.INT()),
DataTypes.FIELD("b", DataTypes.STRING())));

TableDescriptor.Builder tableBuilder =
TableDescriptor.builder()
Expand All @@ -444,6 +464,14 @@ protected long createPkTableFullType(TablePath tablePath, int bucketNum, boolean
}

private List<InternalRow> generateKvRowsFullType(@Nullable String partition) {
GenericRow nestedRow1 = new GenericRow(2);
nestedRow1.setField(0, 1);
nestedRow1.setField(1, BinaryString.fromString("nested_row1"));

GenericRow nestedRow2 = new GenericRow(2);
nestedRow2.setField(0, 2);
nestedRow2.setField(1, BinaryString.fromString("nested_row2"));

return Arrays.asList(
row(
false,
Expand All @@ -462,7 +490,8 @@ private List<InternalRow> generateKvRowsFullType(@Nullable String partition) {
TimestampNtz.fromMillis(1698235273183L, 6000),
new byte[] {1, 2, 3, 4},
new GenericArray(new float[] {1.1f, 1.2f, 1.3f}),
partition),
partition,
nestedRow1),
row(
true,
(byte) 10,
Expand All @@ -480,7 +509,8 @@ private List<InternalRow> generateKvRowsFullType(@Nullable String partition) {
TimestampNtz.fromMillis(1698235273201L, 6000),
new byte[] {1, 2, 3, 4},
new GenericArray(new float[] {1.1f, 1.2f, 1.3f}),
partition));
partition,
nestedRow2));
}

private Map<TableBucket, Long> getBucketLogEndOffset(
Expand Down
Loading
Loading