From 48a345513835860b64393374dd7285873dbd294a Mon Sep 17 00:00:00 2001 From: SeungMin Date: Tue, 30 Dec 2025 01:00:21 +0900 Subject: [PATCH 1/9] feat: support tiered nested row type for iceberg --- .../FlussDataTypeToIcebergDataType.java | 29 ++++++++++++++++++- .../source/FlussRowAsIcebergRecord.java | 11 ++++++- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java index 1c093482aa..c0277f4c57 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java @@ -23,6 +23,7 @@ import org.apache.fluss.types.BooleanType; import org.apache.fluss.types.BytesType; import org.apache.fluss.types.CharType; +import org.apache.fluss.types.DataField; import org.apache.fluss.types.DataTypeVisitor; import org.apache.fluss.types.DateType; import org.apache.fluss.types.DecimalType; @@ -41,6 +42,9 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import java.util.ArrayList; +import java.util.List; + /** Convert from Fluss's data type to Iceberg's data type. */ public class FlussDataTypeToIcebergDataType implements DataTypeVisitor { @@ -168,6 +172,29 @@ public Type visit(MapType mapType) { @Override public Type visit(RowType rowType) { - throw new UnsupportedOperationException("Unsupported row type"); + List fields = new ArrayList<>(); + int fieldId = 0; + + for (DataField field : rowType.getFields()) { + Type fieldType = field.getType().accept(this); + + if (field.getType().isNullable()) { + fields.add( + Types.NestedField.optional( + fieldId++, + field.getName(), + fieldType, + field.getDescription().orElse(null))); + } else { + fields.add( + Types.NestedField.required( + fieldId++, + field.getName(), + fieldType, + field.getDescription().orElse(null))); + } + } + + return Types.StructType.of(fields); } } diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java index 70dc8ea6f8..1fa13cbfa3 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java @@ -17,6 +17,7 @@ package org.apache.fluss.lake.iceberg.source; +import org.apache.fluss.lake.iceberg.FlussDataTypeToIcebergDataType; import org.apache.fluss.row.InternalArray; import org.apache.fluss.row.InternalRow; import org.apache.fluss.types.ArrayType; @@ -39,7 +40,6 @@ import org.apache.fluss.types.TimestampType; import org.apache.fluss.types.TinyIntType; import org.apache.fluss.utils.DateTimeUtils; - import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Types; @@ -179,6 +179,15 @@ private FlussRowToIcebergFieldConverter createTypeConverter(DataType flussType, ? null : new FlussArrayAsIcebergList(array, arrayType.getElementType()); }; + } else if (flussType instanceof RowType) { + RowType rowType = (RowType) flussType; + Types.StructType nestedStructType = + (Types.StructType) rowType.accept(FlussDataTypeToIcebergDataType.INSTANCE); + + return row -> { + InternalRow nestedRow = row.getRow(pos, rowType.getFieldCount()); + return new FlussRowAsIcebergRecord(nestedStructType, rowType, nestedRow); + }; } else { throw new UnsupportedOperationException( "Unsupported data type conversion for Fluss type: " From 3bf4b79481ef825a84a20f6c301e481a49860675 Mon Sep 17 00:00:00 2001 From: SeungMin Date: Thu, 1 Jan 2026 16:54:53 +0900 Subject: [PATCH 2/9] feat: support tiered nested row type for iceberg (iceberg struct to fluss row) --- .../source/FlussRowAsIcebergRecord.java | 1 + .../source/IcebergRecordAsFlussRow.java | 19 +++++++++++++++++-- .../iceberg/utils/IcebergConversions.java | 11 +++++++++++ 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java index 1fa13cbfa3..6d7070001a 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java @@ -40,6 +40,7 @@ import org.apache.fluss.types.TimestampType; import org.apache.fluss.types.TinyIntType; import org.apache.fluss.utils.DateTimeUtils; + import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Types; diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java index 8143433b0f..2a560e8d15 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java @@ -44,6 +44,10 @@ public class IcebergRecordAsFlussRow implements InternalRow { public IcebergRecordAsFlussRow() {} + public IcebergRecordAsFlussRow(Record icebergRecord) { + this.icebergRecord = icebergRecord; + } + public IcebergRecordAsFlussRow replaceIcebergRecord(Record icebergRecord) { this.icebergRecord = icebergRecord; return this; @@ -169,7 +173,18 @@ public InternalMap getMap(int pos) { @Override public InternalRow getRow(int pos, int numFields) { - // TODO: Support Row type conversion from Iceberg to Fluss - throw new UnsupportedOperationException(); + Object value = icebergRecord.get(pos); + if (value == null) { + return null; + } + if (value instanceof Record) { + return new IcebergRecordAsFlussRow((Record) value); + } else { + throw new IllegalArgumentException( + "Expected Iceberg Record for nested row at position " + + pos + + " but found: " + + value.getClass().getName()); + } } } diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java index 038ebd8a7a..b75ce8ad44 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java @@ -21,6 +21,7 @@ import org.apache.fluss.metadata.TablePath; import org.apache.fluss.row.GenericRow; import org.apache.fluss.row.InternalRow; +import org.apache.fluss.types.DataField; import org.apache.fluss.types.DataType; import org.apache.fluss.types.DataTypes; import org.apache.fluss.types.RowType; @@ -38,6 +39,7 @@ import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.List; import static org.apache.fluss.metadata.ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR; @@ -128,7 +130,16 @@ private static DataType convertIcebergTypeToFlussType(Type icebergType) { } else if (icebergType instanceof Types.ListType) { Types.ListType listType = (Types.ListType) icebergType; return DataTypes.ARRAY(convertIcebergTypeToFlussType(listType.elementType())); + } else if (icebergType.isStructType()) { + Types.StructType structType = icebergType.asStructType(); + List fields = new ArrayList<>(); + for (Types.NestedField nestedField : structType.fields()) { + DataType fieldType = convertIcebergTypeToFlussType(nestedField.type()); + fields.add(new DataField(nestedField.name(), fieldType)); + } + return DataTypes.ROW(fields.toArray(new DataField[0])); } + throw new UnsupportedOperationException( "Unsupported data type conversion for Iceberg type: " + icebergType.getClass().getName()); From c5b44917a0024fd6b0df64096d3b6e326fc1390c Mon Sep 17 00:00:00 2001 From: SeungMin Date: Thu, 1 Jan 2026 19:13:13 +0900 Subject: [PATCH 3/9] test: added nested row to IcebergRecordAsFlussRowTest --- .../source/IcebergRecordAsFlussRowTest.java | 57 +++++++++++++++++-- 1 file changed, 52 insertions(+), 5 deletions(-) diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java index 42ac4d6fbb..077f4222d3 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java @@ -18,6 +18,7 @@ package org.apache.fluss.lake.iceberg.source; +import org.apache.fluss.row.InternalRow; import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; @@ -61,10 +62,27 @@ void setUp() { optional(11, "timestamp_ltz", Types.TimestampType.withZone()), optional(12, "binary_data", Types.BinaryType.get()), optional(13, "char_data", Types.StringType.get()), + optional( + 14, + "nested_row", + Types.StructType.of( + required(15, "city", Types.StringType.get()), + required( + 16, + "address", + Types.StructType.of( + required( + 17, + "subfield1", + Types.StringType.get()), + required( + 18, + "subfield2", + Types.IntegerType.get()))))), // System columns - required(14, "__bucket", Types.IntegerType.get()), - required(15, "__offset", Types.LongType.get()), - required(16, "__timestamp", Types.TimestampType.withZone())); + required(19, "__bucket", Types.IntegerType.get()), + required(20, "__offset", Types.LongType.get()), + required(21, "__timestamp", Types.TimestampType.withZone())); record = GenericRecord.create(schema); } @@ -82,7 +100,7 @@ void testGetFieldCount() { icebergRecordAsFlussRow.replaceIcebergRecord(record); // Should return count excluding system columns (3 system columns) - assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(13); + assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(14); } @Test @@ -145,6 +163,35 @@ void testAllDataTypes() { .isEqualTo("Hello"); // char_data // Test field count (excluding system columns) - assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(13); + assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(14); + } + + @Test + void testNestedRow() { + String cityValue = "Seoul"; + String subfield1Value = "string value"; + Integer subfield2Value = 12345; + Record nestedRecord = + GenericRecord.create(record.struct().fields().get(13).type().asStructType()); + nestedRecord.setField("city", "Seoul"); + + Record deepNestedRecord = + GenericRecord.create(nestedRecord.struct().fields().get(1).type().asStructType()); + deepNestedRecord.setField("subfield1", subfield1Value); + deepNestedRecord.setField("subfield2", subfield2Value); + + nestedRecord.setField("address", deepNestedRecord); + + record.setField("nested_row", nestedRecord); + icebergRecordAsFlussRow.replaceIcebergRecord(record); + + InternalRow nestedRow = icebergRecordAsFlussRow.getRow(13, 2); + assertThat(nestedRow).isNotNull(); + assertThat(nestedRow.getString(0).toString()).isEqualTo(cityValue); + + InternalRow deepNestedRow = nestedRow.getRow(1, 2); + assertThat(deepNestedRow).isNotNull(); + assertThat(deepNestedRow.getString(0).toString()).isEqualTo(subfield1Value); + assertThat(deepNestedRow.getInt(1)).isEqualTo(subfield2Value); } } From ac215c863d6f74ebe83858a48c9d7f5a617a93e8 Mon Sep 17 00:00:00 2001 From: SeungMin Date: Thu, 1 Jan 2026 19:24:41 +0900 Subject: [PATCH 4/9] fix: spotless:apply --- .../fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java index 077f4222d3..2aeb223ce4 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java @@ -19,6 +19,7 @@ package org.apache.fluss.lake.iceberg.source; import org.apache.fluss.row.InternalRow; + import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; From 10fb75cb1f5ecd29968d367effeb4cd73ea0f8d2 Mon Sep 17 00:00:00 2001 From: SeungMin Date: Mon, 5 Jan 2026 20:37:10 +0900 Subject: [PATCH 5/9] test: added nested row tests to FlussRowAsIcebergRecordTest --- .../source/FlussRowAsIcebergRecordTest.java | 121 +++++++++++++++++- 1 file changed, 120 insertions(+), 1 deletion(-) diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java index 0c3ec679c3..c739000e83 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java @@ -23,7 +23,7 @@ import org.apache.fluss.row.GenericRow; import org.apache.fluss.types.DataTypes; import org.apache.fluss.types.RowType; - +import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Test; @@ -293,4 +293,123 @@ void testArrayWithAllTypes() { // Test null array assertThat(record.get(14)).isNull(); } + + @Test + void testNestedRow() { + Types.StructType structType = + Types.StructType.of( + // simple row + Types.NestedField.required( + 0, + "simple_row", + Types.StructType.of( + Types.NestedField.required( + 1, "id", Types.IntegerType.get()), + Types.NestedField.required( + 2, "name", Types.StringType.get()))), + // nested row + Types.NestedField.required( + 3, + "nested_row", + Types.StructType.of( + Types.NestedField.required( + 4, "id", Types.IntegerType.get()), + Types.NestedField.required( + 5, + "inner", + Types.StructType.of( + Types.NestedField.required( + 6, "val", Types.DoubleType.get()), + Types.NestedField.required( + 7, + "flag", + Types.BooleanType.get()))))), + // array row + Types.NestedField.required( + 8, + "array_row", + Types.StructType.of( + Types.NestedField.required( + 9, + "ids", + Types.ListType.ofRequired( + 10, Types.IntegerType.get())))), + // nullable row + Types.NestedField.optional( + 11, + "nullable_row", + Types.StructType.of( + Types.NestedField.required( + 12, "id", Types.IntegerType.get())))); + + RowType flussRowType = + RowType.of( + // simple row + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD("name", DataTypes.STRING())), + // nested row + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD( + "inner", + DataTypes.ROW( + DataTypes.FIELD("val", DataTypes.DOUBLE()), + DataTypes.FIELD("flag", DataTypes.BOOLEAN())))), + // row_with array + DataTypes.ROW(DataTypes.FIELD("ids", DataTypes.ARRAY(DataTypes.INT()))), + // nullable row + DataTypes.ROW(DataTypes.FIELD("id", DataTypes.INT()))); + + GenericRow genericRow = new GenericRow(4); + + // Simple Row + GenericRow simpleRow = new GenericRow(2); + simpleRow.setField(0, 100); + simpleRow.setField(1, BinaryString.fromString("fluss")); + genericRow.setField(0, simpleRow); + + // Nested Row + GenericRow innerRow = new GenericRow(2); + innerRow.setField(0, 3.14); + innerRow.setField(1, true); + GenericRow nestedRow = new GenericRow(2); + nestedRow.setField(0, 200); + nestedRow.setField(1, innerRow); + genericRow.setField(1, nestedRow); + + // Array Row + GenericRow rowWithArray = new GenericRow(1); + rowWithArray.setField(0, new GenericArray(new int[] {1, 2, 3})); + genericRow.setField(2, rowWithArray); + + // Nullable Row + genericRow.setField(3, null); + + FlussRowAsIcebergRecord record = new FlussRowAsIcebergRecord(structType, flussRowType); + record.internalRow = genericRow; + + // Verify Simple Row + Record icebergSimpleRow = (Record) record.get(0); + assertThat(icebergSimpleRow.get(0)).isEqualTo(100); + assertThat(icebergSimpleRow.get(1)).isEqualTo("fluss"); + + // Verify Nested Row + Record icebergNestedRow = (Record) record.get(1); + assertThat(icebergNestedRow.get(0)).isEqualTo(200); + Record icebergInnerRow = (Record) icebergNestedRow.get(1); + assertThat(icebergInnerRow.get(0)).isEqualTo(3.14); + assertThat(icebergInnerRow.get(1)).isEqualTo(true); + + // Verify Row with Array + Record icebergRowWithArray = (Record) record.get(2); + List ids = (List) icebergRowWithArray.get(0); + assertThat(ids.size()).isEqualTo(3); + assertThat(ids.get(0)).isEqualTo(1); + assertThat(ids.get(1)).isEqualTo(2); + assertThat(ids.get(2)).isEqualTo(3); + + // Verify Nullable Row + assertThat(record.get(3)).isNull(); + } } From 98575d871d8cd8a15525b1578239258af90ae16c Mon Sep 17 00:00:00 2001 From: SeungMin Date: Mon, 5 Jan 2026 20:45:57 +0900 Subject: [PATCH 6/9] fix: set nested field id with getNextID() --- .../fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java index c0277f4c57..bd01d08afd 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java @@ -173,7 +173,6 @@ public Type visit(MapType mapType) { @Override public Type visit(RowType rowType) { List fields = new ArrayList<>(); - int fieldId = 0; for (DataField field : rowType.getFields()) { Type fieldType = field.getType().accept(this); @@ -181,14 +180,14 @@ public Type visit(RowType rowType) { if (field.getType().isNullable()) { fields.add( Types.NestedField.optional( - fieldId++, + getNextId(), field.getName(), fieldType, field.getDescription().orElse(null))); } else { fields.add( Types.NestedField.required( - fieldId++, + getNextId(), field.getName(), fieldType, field.getDescription().orElse(null))); From ae3a2b5b47898c66920261c35a278e45a580ef3d Mon Sep 17 00:00:00 2001 From: SeungMin Date: Mon, 5 Jan 2026 20:46:54 +0900 Subject: [PATCH 7/9] fix: spotless:apply --- .../fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java index c739000e83..247c33d128 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java @@ -23,6 +23,7 @@ import org.apache.fluss.row.GenericRow; import org.apache.fluss.types.DataTypes; import org.apache.fluss.types.RowType; + import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Test; From 373035c3299377ea46835c9d97bcf2e19c7adf17 Mon Sep 17 00:00:00 2001 From: SeungMin Date: Mon, 5 Jan 2026 20:51:44 +0900 Subject: [PATCH 8/9] test: added nested row to union read test --- .../FlinkUnionReadPrimaryKeyTableITCase.java | 54 ++++++++++++++----- 1 file changed, 42 insertions(+), 12 deletions(-) diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java index a8b62e7736..6cc48ede0d 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java @@ -24,8 +24,10 @@ import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.Decimal; import org.apache.fluss.row.GenericArray; +import org.apache.fluss.row.GenericRow; import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; @@ -100,7 +102,8 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, - partition)); + partition, + Row.of(1, "nested_row1"))); expectedRows.add( Row.of( true, @@ -119,7 +122,8 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, - partition)); + partition, + Row.of(2, "nested_row2"))); } } else { expectedRows = @@ -141,7 +145,8 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, - null), + null, + Row.of(1, "nested_row1")), Row.of( true, (byte) 10, @@ -159,7 +164,8 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, - null)); + null, + Row.of(2, "nested_row2"))); } String query = "select * from " + tableName; @@ -202,7 +208,8 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, - partition)); + partition, + Row.of(2, "nested_row2"))); expectedRows2.add( Row.ofKind( RowKind.UPDATE_AFTER, @@ -222,7 +229,8 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, new float[] {2.1f, 2.2f, 2.3f}, - partition)); + partition, + Row.of(3, "nested_update"))); } } else { expectedRows2.add( @@ -244,7 +252,8 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, new float[] {1.1f, 1.2f, 1.3f}, - null)); + null, + Row.of(2, "nested_row2"))); expectedRows2.add( Row.ofKind( RowKind.UPDATE_AFTER, @@ -264,7 +273,8 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, new float[] {2.1f, 2.2f, 2.3f}, - null)); + null, + Row.of(3, "nested_update"))); } if (isPartitioned) { @@ -349,6 +359,10 @@ void testReadIcebergLakeTable(boolean isPartitioned) throws Exception { } private void writeFullTypeRow(TablePath tablePath, String partition) throws Exception { + GenericRow nestedRow = new GenericRow(2); + nestedRow.setField(0, 3); + nestedRow.setField(1, BinaryString.fromString("nested_update")); + List rows = Collections.singletonList( row( @@ -368,7 +382,8 @@ private void writeFullTypeRow(TablePath tablePath, String partition) throws Exce TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, new GenericArray(new float[] {2.1f, 2.2f, 2.3f}), - partition)); + partition, + nestedRow)); writeRows(tablePath, rows, false); } @@ -422,7 +437,12 @@ protected long createPkTableFullType(TablePath tablePath, int bucketNum, boolean .column("c14", DataTypes.TIMESTAMP(6)) .column("c15", DataTypes.BINARY(4)) .column("c16", DataTypes.ARRAY(DataTypes.FLOAT())) - .column("c17", DataTypes.STRING()); + .column("c17", DataTypes.STRING()) + .column( + "c18", + DataTypes.ROW( + DataTypes.FIELD("a", DataTypes.INT()), + DataTypes.FIELD("b", DataTypes.STRING()))); TableDescriptor.Builder tableBuilder = TableDescriptor.builder() @@ -444,6 +464,14 @@ protected long createPkTableFullType(TablePath tablePath, int bucketNum, boolean } private List generateKvRowsFullType(@Nullable String partition) { + GenericRow nestedRow1 = new GenericRow(2); + nestedRow1.setField(0, 1); + nestedRow1.setField(1, BinaryString.fromString("nested_row1")); + + GenericRow nestedRow2 = new GenericRow(2); + nestedRow2.setField(0, 2); + nestedRow2.setField(1, BinaryString.fromString("nested_row2")); + return Arrays.asList( row( false, @@ -462,7 +490,8 @@ private List generateKvRowsFullType(@Nullable String partition) { TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, new GenericArray(new float[] {1.1f, 1.2f, 1.3f}), - partition), + partition, + nestedRow1), row( true, (byte) 10, @@ -480,7 +509,8 @@ private List generateKvRowsFullType(@Nullable String partition) { TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, new GenericArray(new float[] {1.1f, 1.2f, 1.3f}), - partition)); + partition, + nestedRow2)); } private Map getBucketLogEndOffset( From d954ee73a74737caf51be230249167c1a33b7cf4 Mon Sep 17 00:00:00 2001 From: SeungMin Date: Mon, 5 Jan 2026 23:45:55 +0900 Subject: [PATCH 9/9] test: added nested row to union read log table test --- .../flink/FlinkUnionReadLogTableITCase.java | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java index 168f51cd2a..ef353d989d 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java @@ -126,12 +126,12 @@ void testReadLogTableFullType(boolean isPartitioned) throws Exception { // check filter push down assertThat(plan) .contains("TableSourceScan(") - .contains("LogicalFilter(condition=[=($15, _UTF-16LE'" + partition + "'") + .contains("LogicalFilter(condition=[=($16, _UTF-16LE'" + partition + "'") .contains("filter=[=(p, _UTF-16LE'" + partition + "'"); List expectedFiltered = writtenRows.stream() - .filter(r -> partition.equals(r.getField(15))) + .filter(r -> partition.equals(r.getField(16))) .collect(Collectors.toList()); List actualFiltered = @@ -295,7 +295,12 @@ protected long createFullTypeLogTable( .column("f_timestamp_ltz2", DataTypes.TIMESTAMP_LTZ(6)) .column("f_timestamp_ntz1", DataTypes.TIMESTAMP(3)) .column("f_timestamp_ntz2", DataTypes.TIMESTAMP(6)) - .column("f_binary", DataTypes.BINARY(4)); + .column("f_binary", DataTypes.BINARY(4)) + .column( + "f_row", + DataTypes.ROW( + DataTypes.FIELD("f_nested_int", DataTypes.INT()), + DataTypes.FIELD("f_nested_string", DataTypes.STRING()))); TableDescriptor.Builder tableBuilder = TableDescriptor.builder() @@ -342,7 +347,8 @@ private List writeFullTypeRows( TimestampLtz.fromEpochMillis(1698235273400L, 7000), TimestampNtz.fromMillis(1698235273501L), TimestampNtz.fromMillis(1698235273501L, 8000), - new byte[] {5, 6, 7, 8})); + new byte[] {5, 6, 7, 8}, + row(10, "nested_string"))); flinkRows.add( Row.of( @@ -364,7 +370,8 @@ private List writeFullTypeRows( Instant.ofEpochMilli(1698235273501L), ZoneId.of("UTC")) .plusNanos(8000), - new byte[] {5, 6, 7, 8})); + new byte[] {5, 6, 7, 8}, + Row.of(10, "nested_string"))); } else { rows.add( row( @@ -383,6 +390,7 @@ private List writeFullTypeRows( TimestampNtz.fromMillis(1698235273501L), TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, + row(10, "nested_string"), partition)); flinkRows.add( @@ -406,6 +414,7 @@ private List writeFullTypeRows( ZoneId.of("UTC")) .plusNanos(8000), new byte[] {5, 6, 7, 8}, + Row.of(10, "nested_string"), partition)); } }