From 597355816f199c98fb049781eedbcbf3073d53cc Mon Sep 17 00:00:00 2001 From: forwardxu Date: Sun, 28 Dec 2025 09:20:56 +0800 Subject: [PATCH 1/4] [lake/iceberg] Support tier array type for iceberg This commit adds support for array type conversion between Fluss and Iceberg, enabling tiering of tables with array columns to Iceberg lakehouse. Key changes: - Updated FlussDataTypeToIcebergDataType to convert Fluss ARRAY to Iceberg LIST type - Created FlussArrayAsIcebergList adapter to convert Fluss InternalArray to Java List - Updated FlussRowAsIcebergRecord to handle array field conversion - Added array type support in IcebergConversions for bidirectional type mapping - Added comprehensive unit tests for array type conversion with various element types - Added integration tests for array type tiering with both primary key and log tables - Updated documentation to include ARRAY -> LIST type mapping --- .../FlussDataTypeToIcebergDataType.java | 2 +- .../source/FlussArrayAsIcebergList.java | 122 ++++++ .../source/FlussRowAsIcebergRecord.java | 10 + .../iceberg/utils/IcebergConversions.java | 3 + .../source/FlussRowAsIcebergRecordTest.java | 394 ++++++++++++++++++ .../iceberg/tiering/IcebergTieringTest.java | 253 +++++++++++ .../integrate-data-lakes/iceberg.md | 1 + 7 files changed, 784 insertions(+), 1 deletion(-) create mode 100644 fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java create mode 100644 fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java index aa1a97b443..9d01e744a4 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java @@ -129,7 +129,7 @@ public Type visit(LocalZonedTimestampType localZonedTimestampType) { @Override public Type visit(ArrayType arrayType) { - throw new UnsupportedOperationException("Unsupported array type"); + return Types.ListType.ofOptional(0, arrayType.getElementType().accept(this)); } @Override diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java new file mode 100644 index 0000000000..2671e045cf --- /dev/null +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.iceberg.source; + +import org.apache.fluss.row.InternalArray; +import org.apache.fluss.types.ArrayType; +import org.apache.fluss.types.BigIntType; +import org.apache.fluss.types.BinaryType; +import org.apache.fluss.types.BooleanType; +import org.apache.fluss.types.BytesType; +import org.apache.fluss.types.CharType; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DateType; +import org.apache.fluss.types.DecimalType; +import org.apache.fluss.types.DoubleType; +import org.apache.fluss.types.FloatType; +import org.apache.fluss.types.IntType; +import org.apache.fluss.types.LocalZonedTimestampType; +import org.apache.fluss.types.SmallIntType; +import org.apache.fluss.types.StringType; +import org.apache.fluss.types.TimeType; +import org.apache.fluss.types.TimestampType; +import org.apache.fluss.types.TinyIntType; +import org.apache.fluss.utils.DateTimeUtils; + +import java.nio.ByteBuffer; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.AbstractList; + +/** Adapter class for converting Fluss InternalArray to a Java List for Iceberg. */ +public class FlussArrayAsIcebergList extends AbstractList { + + private final InternalArray flussArray; + private final DataType elementType; + + public FlussArrayAsIcebergList(InternalArray flussArray, DataType elementType) { + this.flussArray = flussArray; + this.elementType = elementType; + } + + @Override + public Object get(int index) { + if (flussArray.isNullAt(index)) { + return null; + } + + if (elementType instanceof BooleanType) { + return flussArray.getBoolean(index); + } else if (elementType instanceof TinyIntType) { + return (int) flussArray.getByte(index); + } else if (elementType instanceof SmallIntType) { + return (int) flussArray.getShort(index); + } else if (elementType instanceof IntType) { + return flussArray.getInt(index); + } else if (elementType instanceof BigIntType) { + return flussArray.getLong(index); + } else if (elementType instanceof FloatType) { + return flussArray.getFloat(index); + } else if (elementType instanceof DoubleType) { + return flussArray.getDouble(index); + } else if (elementType instanceof StringType) { + return flussArray.getString(index).toString(); + } else if (elementType instanceof CharType) { + CharType charType = (CharType) elementType; + return flussArray.getChar(index, charType.getLength()).toString(); + } else if (elementType instanceof BytesType || elementType instanceof BinaryType) { + return ByteBuffer.wrap(flussArray.getBytes(index)); + } else if (elementType instanceof DecimalType) { + DecimalType decimalType = (DecimalType) elementType; + return flussArray + .getDecimal(index, decimalType.getPrecision(), decimalType.getScale()) + .toBigDecimal(); + } else if (elementType instanceof LocalZonedTimestampType) { + LocalZonedTimestampType ltzType = (LocalZonedTimestampType) elementType; + return toIcebergTimestampLtz( + flussArray.getTimestampLtz(index, ltzType.getPrecision()).toInstant()); + } else if (elementType instanceof TimestampType) { + TimestampType tsType = (TimestampType) elementType; + return flussArray.getTimestampNtz(index, tsType.getPrecision()).toLocalDateTime(); + } else if (elementType instanceof DateType) { + return DateTimeUtils.toLocalDate(flussArray.getInt(index)); + } else if (elementType instanceof TimeType) { + return DateTimeUtils.toLocalTime(flussArray.getInt(index)); + } else if (elementType instanceof ArrayType) { + InternalArray innerArray = flussArray.getArray(index); + return innerArray == null + ? null + : new FlussArrayAsIcebergList( + innerArray, ((ArrayType) elementType).getElementType()); + } else { + throw new UnsupportedOperationException( + "Unsupported array element type conversion for Fluss type: " + + elementType.getClass().getSimpleName()); + } + } + + @Override + public int size() { + return flussArray.size(); + } + + private OffsetDateTime toIcebergTimestampLtz(Instant instant) { + return OffsetDateTime.ofInstant(instant, ZoneOffset.UTC); + } +} diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java index 805bddcb6c..70dc8ea6f8 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java @@ -17,7 +17,9 @@ package org.apache.fluss.lake.iceberg.source; +import org.apache.fluss.row.InternalArray; import org.apache.fluss.row.InternalRow; +import org.apache.fluss.types.ArrayType; import org.apache.fluss.types.BigIntType; import org.apache.fluss.types.BinaryType; import org.apache.fluss.types.BooleanType; @@ -169,6 +171,14 @@ private FlussRowToIcebergFieldConverter createTypeConverter(DataType flussType, return row -> DateTimeUtils.toLocalDate(row.getInt(pos)); } else if (flussType instanceof TimeType) { return row -> DateTimeUtils.toLocalTime(row.getInt(pos)); + } else if (flussType instanceof ArrayType) { + ArrayType arrayType = (ArrayType) flussType; + return row -> { + InternalArray array = row.getArray(pos); + return array == null + ? null + : new FlussArrayAsIcebergList(array, arrayType.getElementType()); + }; } else { throw new UnsupportedOperationException( "Unsupported data type conversion for Fluss type: " diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java index 4d7582c53e..038ebd8a7a 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java @@ -125,6 +125,9 @@ private static DataType convertIcebergTypeToFlussType(Type icebergType) { } else if (icebergType instanceof Types.DecimalType) { Types.DecimalType decimalType = (Types.DecimalType) icebergType; return DataTypes.DECIMAL(decimalType.precision(), decimalType.scale()); + } else if (icebergType instanceof Types.ListType) { + Types.ListType listType = (Types.ListType) icebergType; + return DataTypes.ARRAY(convertIcebergTypeToFlussType(listType.elementType())); } throw new UnsupportedOperationException( "Unsupported data type conversion for Iceberg type: " diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java new file mode 100644 index 0000000000..c778fd4f82 --- /dev/null +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.iceberg.source; + +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.GenericArray; +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.types.DataTypes; +import org.apache.fluss.types.RowType; + +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for {@link FlussRowAsIcebergRecord} with array types. */ +class FlussRowAsIcebergRecordTest { + + @Test + void testArrayWithIntElements() { + Types.StructType structType = + Types.StructType.of( + Types.NestedField.required(0, "id", Types.IntegerType.get()), + Types.NestedField.required( + 1, + "int_array", + Types.ListType.ofRequired(2, Types.IntegerType.get()))); + + RowType flussRowType = RowType.of(DataTypes.INT(), DataTypes.ARRAY(DataTypes.INT())); + + GenericRow genericRow = new GenericRow(2); + genericRow.setField(0, 42); + genericRow.setField(1, new GenericArray(new int[] {1, 2, 3, 4, 5})); + + FlussRowAsIcebergRecord record = new FlussRowAsIcebergRecord(structType, flussRowType); + record.internalRow = genericRow; + + assertThat(record.get(0)).isEqualTo(42); + List array = (List) record.get(1); + assertThat(array).isNotNull(); + assertThat(array.size()).isEqualTo(5); + assertThat(array.get(0)).isEqualTo(1); + assertThat(array.get(1)).isEqualTo(2); + assertThat(array.get(2)).isEqualTo(3); + assertThat(array.get(3)).isEqualTo(4); + assertThat(array.get(4)).isEqualTo(5); + } + + @Test + void testArrayWithStringElements() { + Types.StructType structType = + Types.StructType.of( + Types.NestedField.required( + 0, + "string_array", + Types.ListType.ofRequired(1, Types.StringType.get()))); + + RowType flussRowType = RowType.of(DataTypes.ARRAY(DataTypes.STRING())); + + GenericRow genericRow = new GenericRow(1); + genericRow.setField( + 0, + new GenericArray( + new Object[] { + BinaryString.fromString("hello"), BinaryString.fromString("world") + })); + + FlussRowAsIcebergRecord record = new FlussRowAsIcebergRecord(structType, flussRowType); + record.internalRow = genericRow; + + List array = (List) record.get(0); + assertThat(array).isNotNull(); + assertThat(array.size()).isEqualTo(2); + assertThat(array.get(0)).isEqualTo("hello"); + assertThat(array.get(1)).isEqualTo("world"); + } + + @Test + void testNestedArrayType() { + Types.StructType structType = + Types.StructType.of( + Types.NestedField.required( + 0, + "nested_array", + Types.ListType.ofRequired( + 1, Types.ListType.ofRequired(2, Types.IntegerType.get())))); + + RowType flussRowType = RowType.of(DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT()))); + + GenericRow genericRow = new GenericRow(1); + genericRow.setField( + 0, + new GenericArray( + new Object[] { + new GenericArray(new int[] {1, 2}), + new GenericArray(new int[] {3, 4, 5}) + })); + + FlussRowAsIcebergRecord record = new FlussRowAsIcebergRecord(structType, flussRowType); + record.internalRow = genericRow; + + List outerArray = (List) record.get(0); + assertThat(outerArray).isNotNull(); + assertThat(outerArray.size()).isEqualTo(2); + + List innerArray1 = (List) outerArray.get(0); + assertThat(innerArray1.size()).isEqualTo(2); + assertThat(innerArray1.get(0)).isEqualTo(1); + assertThat(innerArray1.get(1)).isEqualTo(2); + + List innerArray2 = (List) outerArray.get(1); + assertThat(innerArray2.size()).isEqualTo(3); + assertThat(innerArray2.get(0)).isEqualTo(3); + assertThat(innerArray2.get(1)).isEqualTo(4); + assertThat(innerArray2.get(2)).isEqualTo(5); + } + + @Test + void testArrayWithAllPrimitiveTypes() { + Types.StructType structType = + Types.StructType.of( + Types.NestedField.required( + 0, + "bool_array", + Types.ListType.ofRequired(1, Types.BooleanType.get())), + Types.NestedField.required( + 2, + "byte_array", + Types.ListType.ofRequired(3, Types.IntegerType.get())), + Types.NestedField.required( + 4, + "short_array", + Types.ListType.ofRequired(5, Types.IntegerType.get())), + Types.NestedField.required( + 6, + "int_array", + Types.ListType.ofRequired(7, Types.IntegerType.get())), + Types.NestedField.required( + 8, + "long_array", + Types.ListType.ofRequired(9, Types.LongType.get())), + Types.NestedField.required( + 10, + "float_array", + Types.ListType.ofRequired(11, Types.FloatType.get())), + Types.NestedField.required( + 12, + "double_array", + Types.ListType.ofRequired(13, Types.DoubleType.get()))); + + RowType flussRowType = + RowType.of( + DataTypes.ARRAY(DataTypes.BOOLEAN()), + DataTypes.ARRAY(DataTypes.TINYINT()), + DataTypes.ARRAY(DataTypes.SMALLINT()), + DataTypes.ARRAY(DataTypes.INT()), + DataTypes.ARRAY(DataTypes.BIGINT()), + DataTypes.ARRAY(DataTypes.FLOAT()), + DataTypes.ARRAY(DataTypes.DOUBLE())); + + GenericRow genericRow = new GenericRow(7); + genericRow.setField(0, new GenericArray(new boolean[] {true, false, true})); + genericRow.setField(1, new GenericArray(new byte[] {1, 2, 3})); + genericRow.setField(2, new GenericArray(new short[] {100, 200, 300})); + genericRow.setField(3, new GenericArray(new int[] {1000, 2000, 3000})); + genericRow.setField(4, new GenericArray(new long[] {10000L, 20000L, 30000L})); + genericRow.setField(5, new GenericArray(new float[] {1.1f, 2.2f, 3.3f})); + genericRow.setField(6, new GenericArray(new double[] {1.11, 2.22, 3.33})); + + FlussRowAsIcebergRecord record = new FlussRowAsIcebergRecord(structType, flussRowType); + record.internalRow = genericRow; + + List boolArray = (List) record.get(0); + assertThat(boolArray.size()).isEqualTo(3); + assertThat(boolArray.get(0)).isEqualTo(true); + assertThat(boolArray.get(1)).isEqualTo(false); + assertThat(boolArray.get(2)).isEqualTo(true); + + List byteArray = (List) record.get(1); + assertThat(byteArray.size()).isEqualTo(3); + assertThat(byteArray.get(0)).isEqualTo(1); + assertThat(byteArray.get(1)).isEqualTo(2); + assertThat(byteArray.get(2)).isEqualTo(3); + + List shortArray = (List) record.get(2); + assertThat(shortArray.size()).isEqualTo(3); + assertThat(shortArray.get(0)).isEqualTo(100); + assertThat(shortArray.get(1)).isEqualTo(200); + assertThat(shortArray.get(2)).isEqualTo(300); + + List intArray = (List) record.get(3); + assertThat(intArray.size()).isEqualTo(3); + assertThat(intArray.get(0)).isEqualTo(1000); + assertThat(intArray.get(1)).isEqualTo(2000); + assertThat(intArray.get(2)).isEqualTo(3000); + + List longArray = (List) record.get(4); + assertThat(longArray.size()).isEqualTo(3); + assertThat(longArray.get(0)).isEqualTo(10000L); + assertThat(longArray.get(1)).isEqualTo(20000L); + assertThat(longArray.get(2)).isEqualTo(30000L); + + List floatArray = (List) record.get(5); + assertThat(floatArray.size()).isEqualTo(3); + assertThat(floatArray.get(0)).isEqualTo(1.1f); + assertThat(floatArray.get(1)).isEqualTo(2.2f); + assertThat(floatArray.get(2)).isEqualTo(3.3f); + + List doubleArray = (List) record.get(6); + assertThat(doubleArray.size()).isEqualTo(3); + assertThat(doubleArray.get(0)).isEqualTo(1.11); + assertThat(doubleArray.get(1)).isEqualTo(2.22); + assertThat(doubleArray.get(2)).isEqualTo(3.33); + } + + @Test + void testArrayWithDecimalElements() { + Types.StructType structType = + Types.StructType.of( + Types.NestedField.required( + 0, + "decimal_array", + Types.ListType.ofRequired(1, Types.DecimalType.of(10, 2)))); + + RowType flussRowType = RowType.of(DataTypes.ARRAY(DataTypes.DECIMAL(10, 2))); + + GenericRow genericRow = new GenericRow(1); + genericRow.setField( + 0, + new GenericArray( + new Object[] { + Decimal.fromBigDecimal(new BigDecimal("123.45"), 10, 2), + Decimal.fromBigDecimal(new BigDecimal("678.90"), 10, 2) + })); + + FlussRowAsIcebergRecord record = new FlussRowAsIcebergRecord(structType, flussRowType); + record.internalRow = genericRow; + + List array = (List) record.get(0); + assertThat(array.size()).isEqualTo(2); + assertThat(array.get(0)).isEqualTo(new BigDecimal("123.45")); + assertThat(array.get(1)).isEqualTo(new BigDecimal("678.90")); + } + + @Test + void testArrayWithTimestampElements() { + Types.StructType structType = + Types.StructType.of( + Types.NestedField.required( + 0, + "timestamp_ntz_array", + Types.ListType.ofRequired(1, Types.TimestampType.withoutZone())), + Types.NestedField.required( + 2, + "timestamp_ltz_array", + Types.ListType.ofRequired(3, Types.TimestampType.withZone()))); + + RowType flussRowType = + RowType.of( + DataTypes.ARRAY(DataTypes.TIMESTAMP(6)), + DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ(6))); + + GenericRow genericRow = new GenericRow(2); + genericRow.setField( + 0, + new GenericArray( + new Object[] { + org.apache.fluss.row.TimestampNtz.fromLocalDateTime( + LocalDateTime.now()), + org.apache.fluss.row.TimestampNtz.fromLocalDateTime( + LocalDateTime.now().plusSeconds(1)) + })); + genericRow.setField( + 1, + new GenericArray( + new Object[] { + org.apache.fluss.row.TimestampLtz.fromEpochMillis( + System.currentTimeMillis()), + org.apache.fluss.row.TimestampLtz.fromEpochMillis( + System.currentTimeMillis() + 1000) + })); + + FlussRowAsIcebergRecord record = new FlussRowAsIcebergRecord(structType, flussRowType); + record.internalRow = genericRow; + + List timestampNtzArray = (List) record.get(0); + assertThat(timestampNtzArray).isNotNull(); + assertThat(timestampNtzArray.size()).isEqualTo(2); + assertThat(timestampNtzArray.get(0)).isInstanceOf(LocalDateTime.class); + assertThat(timestampNtzArray.get(1)).isInstanceOf(LocalDateTime.class); + + List timestampLtzArray = (List) record.get(1); + assertThat(timestampLtzArray).isNotNull(); + assertThat(timestampLtzArray.size()).isEqualTo(2); + assertThat(timestampLtzArray.get(0)).isInstanceOf(OffsetDateTime.class); + assertThat(timestampLtzArray.get(1)).isInstanceOf(OffsetDateTime.class); + } + + @Test + void testArrayWithNullElements() { + Types.StructType structType = + Types.StructType.of( + Types.NestedField.required( + 0, + "nullable_int_array", + Types.ListType.ofOptional(1, Types.IntegerType.get()))); + + RowType flussRowType = RowType.of(DataTypes.ARRAY(DataTypes.INT())); + + GenericRow genericRow = new GenericRow(1); + genericRow.setField(0, new GenericArray(new Object[] {1, null, 3})); + + FlussRowAsIcebergRecord record = new FlussRowAsIcebergRecord(structType, flussRowType); + record.internalRow = genericRow; + + List array = (List) record.get(0); + assertThat(array).isNotNull(); + assertThat(array.size()).isEqualTo(3); + assertThat(array.get(0)).isEqualTo(1); + assertThat(array.get(1)).isNull(); + assertThat(array.get(2)).isEqualTo(3); + } + + @Test + void testNullArray() { + Types.StructType structType = + Types.StructType.of( + Types.NestedField.optional( + 0, + "nullable_array", + Types.ListType.ofRequired(1, Types.IntegerType.get()))); + + RowType flussRowType = RowType.of(DataTypes.ARRAY(DataTypes.INT())); + + GenericRow genericRow = new GenericRow(1); + genericRow.setField(0, null); + + FlussRowAsIcebergRecord record = new FlussRowAsIcebergRecord(structType, flussRowType); + record.internalRow = genericRow; + + assertThat(record.get(0)).isNull(); + } + + @Test + void testArrayWithBinaryElements() { + Types.StructType structType = + Types.StructType.of( + Types.NestedField.required( + 0, + "binary_array", + Types.ListType.ofRequired(1, Types.BinaryType.get()))); + + RowType flussRowType = RowType.of(DataTypes.ARRAY(DataTypes.BYTES())); + + GenericRow genericRow = new GenericRow(1); + genericRow.setField( + 0, + new GenericArray( + new Object[] {"hello".getBytes(), "world".getBytes(), "test".getBytes()})); + + FlussRowAsIcebergRecord record = new FlussRowAsIcebergRecord(structType, flussRowType); + record.internalRow = genericRow; + + List array = (List) record.get(0); + assertThat(array).isNotNull(); + assertThat(array.size()).isEqualTo(3); + assertThat(array.get(0)).isInstanceOf(ByteBuffer.class); + assertThat(((ByteBuffer) array.get(0)).array()).isEqualTo("hello".getBytes()); + assertThat(((ByteBuffer) array.get(1)).array()).isEqualTo("world".getBytes()); + assertThat(((ByteBuffer) array.get(2)).array()).isEqualTo("test".getBytes()); + } +} diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java index ba59b1b742..a9a8426149 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java @@ -32,6 +32,7 @@ import org.apache.fluss.record.GenericRecord; import org.apache.fluss.record.LogRecord; import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.GenericArray; import org.apache.fluss.row.GenericRow; import org.apache.fluss.types.DataTypes; import org.apache.fluss.utils.types.Tuple2; @@ -441,4 +442,256 @@ private void verifyTableRecords( .isEqualTo(expectRecord.timestamp()); } } + + @ParameterizedTest + @MethodSource("tieringWriteArgs") + void testTieringWriteTableWithArrayType(boolean isPrimaryKeyTable, boolean isPartitionedTable) + throws Exception { + TablePath tablePath = + TablePath.of( + "iceberg", + String.format( + "test_tiering_array_%s_%s", + isPrimaryKeyTable ? "pk" : "log", + isPartitionedTable ? "partitioned" : "unpartitioned")); + createTableWithArrayType(tablePath, isPrimaryKeyTable, isPartitionedTable); + + TableDescriptor descriptor = + TableDescriptor.builder() + .schema( + org.apache.fluss.metadata.Schema.newBuilder() + .column("c1", DataTypes.INT()) + .column("c2", DataTypes.ARRAY(DataTypes.INT())) + .column("c3", DataTypes.STRING()) + .build()) + .distributedBy(BUCKET_NUM) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .build(); + TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 1L); + + Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath)); + + Map, List> recordsByBucket = new HashMap<>(); + Map partitionIdAndName = + isPartitionedTable + ? new HashMap() { + { + put(1L, "p1"); + put(2L, "p2"); + } + } + : Collections.singletonMap(null, null); + + List icebergWriteResults = new ArrayList<>(); + SimpleVersionedSerializer writeResultSerializer = + icebergLakeTieringFactory.getWriteResultSerializer(); + SimpleVersionedSerializer committableSerializer = + icebergLakeTieringFactory.getCommittableSerializer(); + + // write data + for (int bucket = 0; bucket < BUCKET_NUM; bucket++) { + for (Map.Entry entry : partitionIdAndName.entrySet()) { + String partition = entry.getValue(); + try (LakeWriter writer = + createLakeWriter(tablePath, bucket, partition, entry.getKey(), tableInfo)) { + Tuple2 partitionBucket = Tuple2.of(partition, bucket); + Tuple2, List> writeAndExpectRecords = + isPrimaryKeyTable + ? genArrayTypePrimaryKeyTableRecords(partition, bucket) + : Tuple2.of( + genArrayTypeLogTableRecords(partition, bucket, 5), + genArrayTypeLogTableRecords(partition, bucket, 5)); + + List writtenRecords = writeAndExpectRecords.f0; + List expectRecords = writeAndExpectRecords.f1; + recordsByBucket.put(partitionBucket, expectRecords); + + for (LogRecord record : writtenRecords) { + writer.write(record); + } + IcebergWriteResult result = writer.complete(); + byte[] serialized = writeResultSerializer.serialize(result); + icebergWriteResults.add( + writeResultSerializer.deserialize( + writeResultSerializer.getVersion(), serialized)); + } + } + } + + // commit data + try (LakeCommitter lakeCommitter = + createLakeCommitter(tablePath, tableInfo)) { + IcebergCommittable icebergCommittable = + lakeCommitter.toCommittable(icebergWriteResults); + byte[] serialized = committableSerializer.serialize(icebergCommittable); + icebergCommittable = + committableSerializer.deserialize( + committableSerializer.getVersion(), serialized); + long snapshot = + lakeCommitter.commit(icebergCommittable, Collections.singletonMap("k1", "v1")); + icebergTable.refresh(); + Snapshot icebergSnapshot = icebergTable.currentSnapshot(); + assertThat(snapshot).isEqualTo(icebergSnapshot.snapshotId()); + } + + // verify data + for (int bucket = 0; bucket < BUCKET_NUM; bucket++) { + for (String partition : partitionIdAndName.values()) { + Tuple2 partitionBucket = Tuple2.of(partition, bucket); + List expectRecords = recordsByBucket.get(partitionBucket); + CloseableIterator actualRecords = + getIcebergRows(icebergTable, partition, bucket); + verifyArrayTypeRecords(actualRecords, expectRecords, bucket, partition); + } + } + } + + private List genArrayTypeLogTableRecords( + @Nullable String partition, int bucket, int numRecords) { + List logRecords = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + GenericRow genericRow = new GenericRow(3); + genericRow.setField(0, i); + genericRow.setField(1, new GenericArray(new int[] {i, i + 1, i + 2})); + if (partition != null) { + genericRow.setField(2, BinaryString.fromString(partition)); + } else { + genericRow.setField(2, BinaryString.fromString("bucket" + bucket)); + } + + LogRecord logRecord = + new GenericRecord( + i, System.currentTimeMillis(), ChangeType.APPEND_ONLY, genericRow); + logRecords.add(logRecord); + } + return logRecords; + } + + private Tuple2, List> genArrayTypePrimaryKeyTableRecords( + @Nullable String partition, int bucket) { + int offset = -1; + List writtenLogRecords = new ArrayList<>(); + List expectLogRecords = new ArrayList<>(); + + // gen +I + GenericRow insertRow = genArrayTypeKvRow(partition, bucket, 0, 0); + writtenLogRecords.add(toRecord(++offset, insertRow, INSERT)); + expectLogRecords.add(writtenLogRecords.get(writtenLogRecords.size() - 1)); + + // gen +U (update) + GenericRow updateRow = genArrayTypeKvRow(partition, bucket, 0, 1); + writtenLogRecords.add(toRecord(++offset, updateRow, UPDATE_AFTER)); + expectLogRecords.set(0, writtenLogRecords.get(writtenLogRecords.size() - 1)); + + // gen +I (another key) + GenericRow insertRow2 = genArrayTypeKvRow(partition, bucket, 1, 2); + writtenLogRecords.add(toRecord(++offset, insertRow2, INSERT)); + expectLogRecords.add(writtenLogRecords.get(writtenLogRecords.size() - 1)); + + return Tuple2.of(writtenLogRecords, expectLogRecords); + } + + private GenericRow genArrayTypeKvRow( + @Nullable String partition, int bucket, int key, int arrayBase) { + GenericRow genericRow = new GenericRow(3); + genericRow.setField(0, key); + genericRow.setField( + 1, new GenericArray(new int[] {arrayBase, arrayBase + 1, arrayBase + 2})); + if (partition != null) { + genericRow.setField(2, BinaryString.fromString(partition)); + } else { + genericRow.setField(2, BinaryString.fromString("bucket" + bucket)); + } + return genericRow; + } + + private void createTableWithArrayType( + TablePath tablePath, boolean isPrimaryTable, boolean isPartitionedTable) { + Namespace namespace = Namespace.of(tablePath.getDatabaseName()); + if (icebergCatalog instanceof SupportsNamespaces) { + SupportsNamespaces ns = (SupportsNamespaces) icebergCatalog; + if (!ns.namespaceExists(namespace)) { + ns.createNamespace(namespace); + } + } + + Set identifierFieldIds = new HashSet<>(); + if (isPrimaryTable) { + identifierFieldIds.add(1); + if (isPartitionedTable) { + identifierFieldIds.add(3); + } + } + + org.apache.iceberg.Schema schema = + new org.apache.iceberg.Schema( + Arrays.asList( + Types.NestedField.required(1, "c1", Types.IntegerType.get()), + Types.NestedField.optional( + 2, + "c2", + Types.ListType.ofRequired(7, Types.IntegerType.get())), + Types.NestedField.required(3, "c3", Types.StringType.get()), + Types.NestedField.required( + 4, BUCKET_COLUMN_NAME, Types.IntegerType.get()), + Types.NestedField.required( + 5, OFFSET_COLUMN_NAME, Types.LongType.get()), + Types.NestedField.required( + 6, TIMESTAMP_COLUMN_NAME, Types.TimestampType.withZone())), + identifierFieldIds); + + PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); + if (isPartitionedTable) { + builder.identity("c3"); + } + + PartitionSpec partitionSpec; + if (isPrimaryTable) { + partitionSpec = builder.bucket("c1", BUCKET_NUM).build(); + } else { + partitionSpec = builder.identity(BUCKET_COLUMN_NAME).build(); + } + + TableIdentifier tableId = + TableIdentifier.of(tablePath.getDatabaseName(), tablePath.getTableName()); + icebergCatalog.createTable(tableId, schema, partitionSpec); + } + + private void verifyArrayTypeRecords( + CloseableIterator actualRecords, + List expectRecords, + int expectBucket, + @Nullable String partition) { + for (LogRecord expectRecord : expectRecords) { + Record actualRecord = actualRecords.next(); + // check business columns: + assertThat(actualRecord.get(0)).isEqualTo(expectRecord.getRow().getInt(0)); + + // check array column + List actualArray = (List) actualRecord.get(1); + org.apache.fluss.row.InternalArray expectedArray = expectRecord.getRow().getArray(1); + assertThat(actualArray).isNotNull(); + assertThat(actualArray.size()).isEqualTo(expectedArray.size()); + for (int i = 0; i < expectedArray.size(); i++) { + assertThat(actualArray.get(i)).isEqualTo(expectedArray.getInt(i)); + } + + assertThat(actualRecord.get(2, String.class)) + .isEqualTo(expectRecord.getRow().getString(2).toString()); + if (partition != null) { + assertThat(actualRecord.get(2, String.class)).isEqualTo(partition); + } + + // check system columns: __bucket, __offset, __timestamp + assertThat(actualRecord.get(3)).isEqualTo(expectBucket); + assertThat(actualRecord.get(4)).isEqualTo(expectRecord.logOffset()); + assertThat( + actualRecord + .get(5, OffsetDateTime.class) + .atZoneSameInstant(ZoneOffset.UTC) + .toInstant() + .toEpochMilli()) + .isEqualTo(expectRecord.timestamp()); + } + } } diff --git a/website/docs/streaming-lakehouse/integrate-data-lakes/iceberg.md b/website/docs/streaming-lakehouse/integrate-data-lakes/iceberg.md index cd2926caf3..8cadf2c2af 100644 --- a/website/docs/streaming-lakehouse/integrate-data-lakes/iceberg.md +++ b/website/docs/streaming-lakehouse/integrate-data-lakes/iceberg.md @@ -500,6 +500,7 @@ When integrating with Iceberg, Fluss automatically converts between Fluss data t | TIMESTAMP WITH LOCAL TIMEZONE | TIMESTAMP (with timezone) | | | BINARY | BINARY | | | BYTES | BINARY | Converted to BINARY | +| ARRAY | LIST | | ## Maintenance and Optimization From ccefc4985af5f9a3c67070fb7aa102c1ffd39bd8 Mon Sep 17 00:00:00 2001 From: forwardxu Date: Tue, 30 Dec 2025 08:23:04 +0800 Subject: [PATCH 2/4] [lake/iceberg] Support tier array type for iceberg --- .../FlussDataTypeToIcebergDataType.java | 31 +- .../lake/iceberg/IcebergLakeCatalog.java | 11 +- .../source/IcebergArrayAsFlussArray.java | 206 +++++++++ .../source/IcebergRecordAsFlussRow.java | 9 +- .../FlinkUnionReadPrimaryKeyTableITCase.java | 19 +- .../source/FlussRowAsIcebergRecordTest.java | 392 +++++++----------- 6 files changed, 413 insertions(+), 255 deletions(-) create mode 100644 fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java index 9d01e744a4..1c093482aa 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java @@ -47,6 +47,30 @@ public class FlussDataTypeToIcebergDataType implements DataTypeVisitor { public static final FlussDataTypeToIcebergDataType INSTANCE = new FlussDataTypeToIcebergDataType(); + private final RowType root; + private int nextId; + + FlussDataTypeToIcebergDataType() { + this.root = null; + this.nextId = 0; + } + + FlussDataTypeToIcebergDataType(int startId) { + this.root = null; + this.nextId = startId; + } + + FlussDataTypeToIcebergDataType(RowType root) { + this.root = root; + this.nextId = root.getFieldCount(); + } + + private int getNextId() { + int next = nextId; + nextId += 1; + return next; + } + @Override public Type visit(CharType charType) { return Types.StringType.get(); @@ -129,7 +153,12 @@ public Type visit(LocalZonedTimestampType localZonedTimestampType) { @Override public Type visit(ArrayType arrayType) { - return Types.ListType.ofOptional(0, arrayType.getElementType().accept(this)); + Type elementType = arrayType.getElementType().accept(this); + if (arrayType.getElementType().isNullable()) { + return Types.ListType.ofOptional(getNextId(), elementType); + } else { + return Types.ListType.ofRequired(getNextId(), elementType); + } } @Override diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java index 03afa011c6..9bd9d54e6c 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java @@ -178,6 +178,11 @@ public Schema convertToIcebergSchema(TableDescriptor tableDescriptor, boolean is List fields = new ArrayList<>(); int fieldId = 0; + int totalTopLevelFields = + tableDescriptor.getSchema().getColumns().size() + SYSTEM_COLUMNS.size(); + FlussDataTypeToIcebergDataType converter = + new FlussDataTypeToIcebergDataType(totalTopLevelFields); + // general columns for (org.apache.fluss.metadata.Schema.Column column : tableDescriptor.getSchema().getColumns()) { @@ -192,16 +197,14 @@ public Schema convertToIcebergSchema(TableDescriptor tableDescriptor, boolean is Types.NestedField.optional( fieldId++, colName, - column.getDataType() - .accept(FlussDataTypeToIcebergDataType.INSTANCE), + column.getDataType().accept(converter), column.getComment().orElse(null)); } else { field = Types.NestedField.required( fieldId++, colName, - column.getDataType() - .accept(FlussDataTypeToIcebergDataType.INSTANCE), + column.getDataType().accept(converter), column.getComment().orElse(null)); } fields.add(field); diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java new file mode 100644 index 0000000000..23aecefec7 --- /dev/null +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.iceberg.source; + +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.InternalArray; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.TimestampLtz; +import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.utils.BytesUtils; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.util.List; + +/** Adapter for Iceberg List as Fluss InternalArray. */ +public class IcebergArrayAsFlussArray implements InternalArray { + + private final List icebergList; + + public IcebergArrayAsFlussArray(List icebergList) { + this.icebergList = icebergList; + } + + @Override + public int size() { + return icebergList.size(); + } + + @Override + public boolean isNullAt(int pos) { + return icebergList.get(pos) == null; + } + + @Override + public boolean getBoolean(int pos) { + return (boolean) icebergList.get(pos); + } + + @Override + public byte getByte(int pos) { + Object value = icebergList.get(pos); + return ((Integer) value).byteValue(); + } + + @Override + public short getShort(int pos) { + Object value = icebergList.get(pos); + return ((Integer) value).shortValue(); + } + + @Override + public int getInt(int pos) { + return (Integer) icebergList.get(pos); + } + + @Override + public long getLong(int pos) { + return (Long) icebergList.get(pos); + } + + @Override + public float getFloat(int pos) { + return (float) icebergList.get(pos); + } + + @Override + public double getDouble(int pos) { + return (double) icebergList.get(pos); + } + + @Override + public BinaryString getChar(int pos, int length) { + String value = (String) icebergList.get(pos); + return BinaryString.fromBytes(value.getBytes()); + } + + @Override + public BinaryString getString(int pos) { + String value = (String) icebergList.get(pos); + return BinaryString.fromBytes(value.getBytes()); + } + + @Override + public Decimal getDecimal(int pos, int precision, int scale) { + BigDecimal bigDecimal = (BigDecimal) icebergList.get(pos); + return Decimal.fromBigDecimal(bigDecimal, precision, scale); + } + + @Override + public TimestampNtz getTimestampNtz(int pos, int precision) { + LocalDateTime localDateTime = (LocalDateTime) icebergList.get(pos); + return TimestampNtz.fromLocalDateTime(localDateTime); + } + + @Override + public TimestampLtz getTimestampLtz(int pos, int precision) { + OffsetDateTime offsetDateTime = (OffsetDateTime) icebergList.get(pos); + return TimestampLtz.fromInstant(offsetDateTime.toInstant()); + } + + @Override + public byte[] getBinary(int pos, int length) { + ByteBuffer byteBuffer = (ByteBuffer) icebergList.get(pos); + return BytesUtils.toArray(byteBuffer); + } + + @Override + public byte[] getBytes(int pos) { + ByteBuffer byteBuffer = (ByteBuffer) icebergList.get(pos); + return BytesUtils.toArray(byteBuffer); + } + + @Override + public InternalArray getArray(int pos) { + List nestedList = (List) icebergList.get(pos); + return nestedList == null ? null : new IcebergArrayAsFlussArray(nestedList); + } + + @Override + public InternalRow getRow(int pos, int numFields) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean[] toBooleanArray() { + boolean[] result = new boolean[icebergList.size()]; + for (int i = 0; i < icebergList.size(); i++) { + result[i] = (boolean) icebergList.get(i); + } + return result; + } + + @Override + public byte[] toByteArray() { + byte[] result = new byte[icebergList.size()]; + for (int i = 0; i < icebergList.size(); i++) { + result[i] = ((Integer) icebergList.get(i)).byteValue(); + } + return result; + } + + @Override + public short[] toShortArray() { + short[] result = new short[icebergList.size()]; + for (int i = 0; i < icebergList.size(); i++) { + result[i] = ((Integer) icebergList.get(i)).shortValue(); + } + return result; + } + + @Override + public int[] toIntArray() { + int[] result = new int[icebergList.size()]; + for (int i = 0; i < icebergList.size(); i++) { + result[i] = (int) icebergList.get(i); + } + return result; + } + + @Override + public long[] toLongArray() { + long[] result = new long[icebergList.size()]; + for (int i = 0; i < icebergList.size(); i++) { + result[i] = (long) icebergList.get(i); + } + return result; + } + + @Override + public float[] toFloatArray() { + float[] result = new float[icebergList.size()]; + for (int i = 0; i < icebergList.size(); i++) { + result[i] = (float) icebergList.get(i); + } + return result; + } + + @Override + public double[] toDoubleArray() { + double[] result = new double[icebergList.size()]; + for (int i = 0; i < icebergList.size(); i++) { + result[i] = (double) icebergList.get(i); + } + return result; + } +} diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java index dd76bd2ec2..2bd80893f0 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java @@ -32,6 +32,7 @@ import java.nio.ByteBuffer; import java.time.LocalDateTime; import java.time.OffsetDateTime; +import java.util.List; import static org.apache.fluss.lake.iceberg.IcebergLakeCatalog.SYSTEM_COLUMNS; @@ -151,8 +152,12 @@ public byte[] getBytes(int pos) { @Override public InternalArray getArray(int pos) { - // TODO: Support Array type conversion from Iceberg to Fluss - throw new UnsupportedOperationException(); + Object value = icebergRecord.get(pos); + if (value == null) { + return null; + } + List icebergList = (List) value; + return new IcebergArrayAsFlussArray(icebergList); } @Override diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java index 0518290a5f..a8b62e7736 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java @@ -25,6 +25,7 @@ import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.GenericArray; import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; @@ -98,6 +99,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L), TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, partition)); expectedRows.add( Row.of( @@ -116,6 +118,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L), TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, partition)); } } else { @@ -137,6 +140,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L), TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, null), Row.of( true, @@ -154,6 +158,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L), TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, null)); } @@ -196,6 +201,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L), TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, partition)); expectedRows2.add( Row.ofKind( @@ -215,6 +221,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273501L), TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, + new float[] {2.1f, 2.2f, 2.3f}, partition)); } } else { @@ -236,6 +243,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L), TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, null)); expectedRows2.add( Row.ofKind( @@ -255,6 +263,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273501L), TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, + new float[] {2.1f, 2.2f, 2.3f}, null)); } @@ -358,6 +367,7 @@ private void writeFullTypeRow(TablePath tablePath, String partition) throws Exce TimestampNtz.fromMillis(1698235273501L), TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, + new GenericArray(new float[] {2.1f, 2.2f, 2.3f}), partition)); writeRows(tablePath, rows, false); } @@ -411,7 +421,8 @@ protected long createPkTableFullType(TablePath tablePath, int bucketNum, boolean .column("c13", DataTypes.TIMESTAMP(3)) .column("c14", DataTypes.TIMESTAMP(6)) .column("c15", DataTypes.BINARY(4)) - .column("c16", DataTypes.STRING()); + .column("c16", DataTypes.ARRAY(DataTypes.FLOAT())) + .column("c17", DataTypes.STRING()); TableDescriptor.Builder tableBuilder = TableDescriptor.builder() @@ -421,8 +432,8 @@ protected long createPkTableFullType(TablePath tablePath, int bucketNum, boolean if (isPartitioned) { tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true); - tableBuilder.partitionedBy("c16"); - schemaBuilder.primaryKey("c4", "c16"); + tableBuilder.partitionedBy("c17"); + schemaBuilder.primaryKey("c4", "c17"); tableBuilder.property( ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, AutoPartitionTimeUnit.YEAR); } else { @@ -450,6 +461,7 @@ private List generateKvRowsFullType(@Nullable String partition) { TimestampNtz.fromMillis(1698235273183L), TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, + new GenericArray(new float[] {1.1f, 1.2f, 1.3f}), partition), row( true, @@ -467,6 +479,7 @@ private List generateKvRowsFullType(@Nullable String partition) { TimestampNtz.fromMillis(1698235273201L), TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, + new GenericArray(new float[] {1.1f, 1.2f, 1.3f}), partition)); } diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java index c778fd4f82..0c3ec679c3 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java @@ -39,106 +39,7 @@ class FlussRowAsIcebergRecordTest { @Test - void testArrayWithIntElements() { - Types.StructType structType = - Types.StructType.of( - Types.NestedField.required(0, "id", Types.IntegerType.get()), - Types.NestedField.required( - 1, - "int_array", - Types.ListType.ofRequired(2, Types.IntegerType.get()))); - - RowType flussRowType = RowType.of(DataTypes.INT(), DataTypes.ARRAY(DataTypes.INT())); - - GenericRow genericRow = new GenericRow(2); - genericRow.setField(0, 42); - genericRow.setField(1, new GenericArray(new int[] {1, 2, 3, 4, 5})); - - FlussRowAsIcebergRecord record = new FlussRowAsIcebergRecord(structType, flussRowType); - record.internalRow = genericRow; - - assertThat(record.get(0)).isEqualTo(42); - List array = (List) record.get(1); - assertThat(array).isNotNull(); - assertThat(array.size()).isEqualTo(5); - assertThat(array.get(0)).isEqualTo(1); - assertThat(array.get(1)).isEqualTo(2); - assertThat(array.get(2)).isEqualTo(3); - assertThat(array.get(3)).isEqualTo(4); - assertThat(array.get(4)).isEqualTo(5); - } - - @Test - void testArrayWithStringElements() { - Types.StructType structType = - Types.StructType.of( - Types.NestedField.required( - 0, - "string_array", - Types.ListType.ofRequired(1, Types.StringType.get()))); - - RowType flussRowType = RowType.of(DataTypes.ARRAY(DataTypes.STRING())); - - GenericRow genericRow = new GenericRow(1); - genericRow.setField( - 0, - new GenericArray( - new Object[] { - BinaryString.fromString("hello"), BinaryString.fromString("world") - })); - - FlussRowAsIcebergRecord record = new FlussRowAsIcebergRecord(structType, flussRowType); - record.internalRow = genericRow; - - List array = (List) record.get(0); - assertThat(array).isNotNull(); - assertThat(array.size()).isEqualTo(2); - assertThat(array.get(0)).isEqualTo("hello"); - assertThat(array.get(1)).isEqualTo("world"); - } - - @Test - void testNestedArrayType() { - Types.StructType structType = - Types.StructType.of( - Types.NestedField.required( - 0, - "nested_array", - Types.ListType.ofRequired( - 1, Types.ListType.ofRequired(2, Types.IntegerType.get())))); - - RowType flussRowType = RowType.of(DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT()))); - - GenericRow genericRow = new GenericRow(1); - genericRow.setField( - 0, - new GenericArray( - new Object[] { - new GenericArray(new int[] {1, 2}), - new GenericArray(new int[] {3, 4, 5}) - })); - - FlussRowAsIcebergRecord record = new FlussRowAsIcebergRecord(structType, flussRowType); - record.internalRow = genericRow; - - List outerArray = (List) record.get(0); - assertThat(outerArray).isNotNull(); - assertThat(outerArray.size()).isEqualTo(2); - - List innerArray1 = (List) outerArray.get(0); - assertThat(innerArray1.size()).isEqualTo(2); - assertThat(innerArray1.get(0)).isEqualTo(1); - assertThat(innerArray1.get(1)).isEqualTo(2); - - List innerArray2 = (List) outerArray.get(1); - assertThat(innerArray2.size()).isEqualTo(3); - assertThat(innerArray2.get(0)).isEqualTo(3); - assertThat(innerArray2.get(1)).isEqualTo(4); - assertThat(innerArray2.get(2)).isEqualTo(5); - } - - @Test - void testArrayWithAllPrimitiveTypes() { + void testArrayWithAllTypes() { Types.StructType structType = Types.StructType.of( Types.NestedField.required( @@ -168,7 +69,41 @@ void testArrayWithAllPrimitiveTypes() { Types.NestedField.required( 12, "double_array", - Types.ListType.ofRequired(13, Types.DoubleType.get()))); + Types.ListType.ofRequired(13, Types.DoubleType.get())), + Types.NestedField.required( + 14, + "string_array", + Types.ListType.ofRequired(15, Types.StringType.get())), + Types.NestedField.required( + 16, + "decimal_array", + Types.ListType.ofRequired(17, Types.DecimalType.of(10, 2))), + Types.NestedField.required( + 18, + "timestamp_ntz_array", + Types.ListType.ofRequired(19, Types.TimestampType.withoutZone())), + Types.NestedField.required( + 20, + "timestamp_ltz_array", + Types.ListType.ofRequired(21, Types.TimestampType.withZone())), + Types.NestedField.required( + 22, + "binary_array", + Types.ListType.ofRequired(23, Types.BinaryType.get())), + Types.NestedField.required( + 24, + "nested_array", + Types.ListType.ofRequired( + 25, + Types.ListType.ofRequired(26, Types.IntegerType.get()))), + Types.NestedField.required( + 27, + "nullable_int_array", + Types.ListType.ofOptional(28, Types.IntegerType.get())), + Types.NestedField.optional( + 29, + "null_array", + Types.ListType.ofRequired(30, Types.IntegerType.get()))); RowType flussRowType = RowType.of( @@ -178,9 +113,17 @@ void testArrayWithAllPrimitiveTypes() { DataTypes.ARRAY(DataTypes.INT()), DataTypes.ARRAY(DataTypes.BIGINT()), DataTypes.ARRAY(DataTypes.FLOAT()), - DataTypes.ARRAY(DataTypes.DOUBLE())); + DataTypes.ARRAY(DataTypes.DOUBLE()), + DataTypes.ARRAY(DataTypes.STRING()), + DataTypes.ARRAY(DataTypes.DECIMAL(10, 2)), + DataTypes.ARRAY(DataTypes.TIMESTAMP(6)), + DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ(6)), + DataTypes.ARRAY(DataTypes.BYTES()), + DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())), + DataTypes.ARRAY(DataTypes.INT()), + DataTypes.ARRAY(DataTypes.INT())); - GenericRow genericRow = new GenericRow(7); + GenericRow genericRow = new GenericRow(15); genericRow.setField(0, new GenericArray(new boolean[] {true, false, true})); genericRow.setField(1, new GenericArray(new byte[] {1, 2, 3})); genericRow.setField(2, new GenericArray(new short[] {100, 200, 300})); @@ -188,207 +131,166 @@ void testArrayWithAllPrimitiveTypes() { genericRow.setField(4, new GenericArray(new long[] {10000L, 20000L, 30000L})); genericRow.setField(5, new GenericArray(new float[] {1.1f, 2.2f, 3.3f})); genericRow.setField(6, new GenericArray(new double[] {1.11, 2.22, 3.33})); + genericRow.setField( + 7, + new GenericArray( + new Object[] { + BinaryString.fromString("hello"), + BinaryString.fromString("world"), + BinaryString.fromString("test") + })); + genericRow.setField( + 8, + new GenericArray( + new Object[] { + Decimal.fromBigDecimal(new BigDecimal("123.45"), 10, 2), + Decimal.fromBigDecimal(new BigDecimal("678.90"), 10, 2) + })); + genericRow.setField( + 9, + new GenericArray( + new Object[] { + org.apache.fluss.row.TimestampNtz.fromLocalDateTime( + LocalDateTime.now()), + org.apache.fluss.row.TimestampNtz.fromLocalDateTime( + LocalDateTime.now().plusSeconds(1)) + })); + genericRow.setField( + 10, + new GenericArray( + new Object[] { + org.apache.fluss.row.TimestampLtz.fromEpochMillis( + System.currentTimeMillis()), + org.apache.fluss.row.TimestampLtz.fromEpochMillis( + System.currentTimeMillis() + 1000) + })); + genericRow.setField( + 11, + new GenericArray( + new Object[] {"hello".getBytes(), "world".getBytes(), "test".getBytes()})); + genericRow.setField( + 12, + new GenericArray( + new Object[] { + new GenericArray(new int[] {1, 2}), + new GenericArray(new int[] {3, 4, 5}) + })); + genericRow.setField(13, new GenericArray(new Object[] {1, null, 3})); + genericRow.setField(14, null); FlussRowAsIcebergRecord record = new FlussRowAsIcebergRecord(structType, flussRowType); record.internalRow = genericRow; + // Test boolean array List boolArray = (List) record.get(0); assertThat(boolArray.size()).isEqualTo(3); assertThat(boolArray.get(0)).isEqualTo(true); assertThat(boolArray.get(1)).isEqualTo(false); assertThat(boolArray.get(2)).isEqualTo(true); + // Test byte array List byteArray = (List) record.get(1); assertThat(byteArray.size()).isEqualTo(3); assertThat(byteArray.get(0)).isEqualTo(1); assertThat(byteArray.get(1)).isEqualTo(2); assertThat(byteArray.get(2)).isEqualTo(3); + // Test short array List shortArray = (List) record.get(2); assertThat(shortArray.size()).isEqualTo(3); assertThat(shortArray.get(0)).isEqualTo(100); assertThat(shortArray.get(1)).isEqualTo(200); assertThat(shortArray.get(2)).isEqualTo(300); + // Test int array List intArray = (List) record.get(3); assertThat(intArray.size()).isEqualTo(3); assertThat(intArray.get(0)).isEqualTo(1000); assertThat(intArray.get(1)).isEqualTo(2000); assertThat(intArray.get(2)).isEqualTo(3000); + // Test long array List longArray = (List) record.get(4); assertThat(longArray.size()).isEqualTo(3); assertThat(longArray.get(0)).isEqualTo(10000L); assertThat(longArray.get(1)).isEqualTo(20000L); assertThat(longArray.get(2)).isEqualTo(30000L); + // Test float array List floatArray = (List) record.get(5); assertThat(floatArray.size()).isEqualTo(3); assertThat(floatArray.get(0)).isEqualTo(1.1f); assertThat(floatArray.get(1)).isEqualTo(2.2f); assertThat(floatArray.get(2)).isEqualTo(3.3f); + // Test double array List doubleArray = (List) record.get(6); assertThat(doubleArray.size()).isEqualTo(3); assertThat(doubleArray.get(0)).isEqualTo(1.11); assertThat(doubleArray.get(1)).isEqualTo(2.22); assertThat(doubleArray.get(2)).isEqualTo(3.33); - } - - @Test - void testArrayWithDecimalElements() { - Types.StructType structType = - Types.StructType.of( - Types.NestedField.required( - 0, - "decimal_array", - Types.ListType.ofRequired(1, Types.DecimalType.of(10, 2)))); - - RowType flussRowType = RowType.of(DataTypes.ARRAY(DataTypes.DECIMAL(10, 2))); - - GenericRow genericRow = new GenericRow(1); - genericRow.setField( - 0, - new GenericArray( - new Object[] { - Decimal.fromBigDecimal(new BigDecimal("123.45"), 10, 2), - Decimal.fromBigDecimal(new BigDecimal("678.90"), 10, 2) - })); - - FlussRowAsIcebergRecord record = new FlussRowAsIcebergRecord(structType, flussRowType); - record.internalRow = genericRow; - - List array = (List) record.get(0); - assertThat(array.size()).isEqualTo(2); - assertThat(array.get(0)).isEqualTo(new BigDecimal("123.45")); - assertThat(array.get(1)).isEqualTo(new BigDecimal("678.90")); - } - - @Test - void testArrayWithTimestampElements() { - Types.StructType structType = - Types.StructType.of( - Types.NestedField.required( - 0, - "timestamp_ntz_array", - Types.ListType.ofRequired(1, Types.TimestampType.withoutZone())), - Types.NestedField.required( - 2, - "timestamp_ltz_array", - Types.ListType.ofRequired(3, Types.TimestampType.withZone()))); - RowType flussRowType = - RowType.of( - DataTypes.ARRAY(DataTypes.TIMESTAMP(6)), - DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ(6))); - - GenericRow genericRow = new GenericRow(2); - genericRow.setField( - 0, - new GenericArray( - new Object[] { - org.apache.fluss.row.TimestampNtz.fromLocalDateTime( - LocalDateTime.now()), - org.apache.fluss.row.TimestampNtz.fromLocalDateTime( - LocalDateTime.now().plusSeconds(1)) - })); - genericRow.setField( - 1, - new GenericArray( - new Object[] { - org.apache.fluss.row.TimestampLtz.fromEpochMillis( - System.currentTimeMillis()), - org.apache.fluss.row.TimestampLtz.fromEpochMillis( - System.currentTimeMillis() + 1000) - })); - - FlussRowAsIcebergRecord record = new FlussRowAsIcebergRecord(structType, flussRowType); - record.internalRow = genericRow; - - List timestampNtzArray = (List) record.get(0); + // Test string array + List stringArray = (List) record.get(7); + assertThat(stringArray.size()).isEqualTo(3); + assertThat(stringArray.get(0)).isEqualTo("hello"); + assertThat(stringArray.get(1)).isEqualTo("world"); + assertThat(stringArray.get(2)).isEqualTo("test"); + + // Test decimal array + List decimalArray = (List) record.get(8); + assertThat(decimalArray.size()).isEqualTo(2); + assertThat(decimalArray.get(0)).isEqualTo(new BigDecimal("123.45")); + assertThat(decimalArray.get(1)).isEqualTo(new BigDecimal("678.90")); + + // Test timestamp array + List timestampNtzArray = (List) record.get(9); assertThat(timestampNtzArray).isNotNull(); assertThat(timestampNtzArray.size()).isEqualTo(2); assertThat(timestampNtzArray.get(0)).isInstanceOf(LocalDateTime.class); assertThat(timestampNtzArray.get(1)).isInstanceOf(LocalDateTime.class); - List timestampLtzArray = (List) record.get(1); + // Test timestamp_ltz array + List timestampLtzArray = (List) record.get(10); assertThat(timestampLtzArray).isNotNull(); assertThat(timestampLtzArray.size()).isEqualTo(2); assertThat(timestampLtzArray.get(0)).isInstanceOf(OffsetDateTime.class); assertThat(timestampLtzArray.get(1)).isInstanceOf(OffsetDateTime.class); - } - - @Test - void testArrayWithNullElements() { - Types.StructType structType = - Types.StructType.of( - Types.NestedField.required( - 0, - "nullable_int_array", - Types.ListType.ofOptional(1, Types.IntegerType.get()))); - - RowType flussRowType = RowType.of(DataTypes.ARRAY(DataTypes.INT())); - - GenericRow genericRow = new GenericRow(1); - genericRow.setField(0, new GenericArray(new Object[] {1, null, 3})); - - FlussRowAsIcebergRecord record = new FlussRowAsIcebergRecord(structType, flussRowType); - record.internalRow = genericRow; - - List array = (List) record.get(0); - assertThat(array).isNotNull(); - assertThat(array.size()).isEqualTo(3); - assertThat(array.get(0)).isEqualTo(1); - assertThat(array.get(1)).isNull(); - assertThat(array.get(2)).isEqualTo(3); - } - - @Test - void testNullArray() { - Types.StructType structType = - Types.StructType.of( - Types.NestedField.optional( - 0, - "nullable_array", - Types.ListType.ofRequired(1, Types.IntegerType.get()))); - - RowType flussRowType = RowType.of(DataTypes.ARRAY(DataTypes.INT())); - - GenericRow genericRow = new GenericRow(1); - genericRow.setField(0, null); - FlussRowAsIcebergRecord record = new FlussRowAsIcebergRecord(structType, flussRowType); - record.internalRow = genericRow; - - assertThat(record.get(0)).isNull(); - } - - @Test - void testArrayWithBinaryElements() { - Types.StructType structType = - Types.StructType.of( - Types.NestedField.required( - 0, - "binary_array", - Types.ListType.ofRequired(1, Types.BinaryType.get()))); + // Test binary array + List binaryArray = (List) record.get(11); + assertThat(binaryArray).isNotNull(); + assertThat(binaryArray.size()).isEqualTo(3); + assertThat(binaryArray.get(0)).isInstanceOf(ByteBuffer.class); + assertThat(((ByteBuffer) binaryArray.get(0)).array()).isEqualTo("hello".getBytes()); + assertThat(((ByteBuffer) binaryArray.get(1)).array()).isEqualTo("world".getBytes()); + assertThat(((ByteBuffer) binaryArray.get(2)).array()).isEqualTo("test".getBytes()); + + // Test nested array (array>) + List outerArray = (List) record.get(12); + assertThat(outerArray).isNotNull(); + assertThat(outerArray.size()).isEqualTo(2); - RowType flussRowType = RowType.of(DataTypes.ARRAY(DataTypes.BYTES())); + List innerArray1 = (List) outerArray.get(0); + assertThat(innerArray1.size()).isEqualTo(2); + assertThat(innerArray1.get(0)).isEqualTo(1); + assertThat(innerArray1.get(1)).isEqualTo(2); - GenericRow genericRow = new GenericRow(1); - genericRow.setField( - 0, - new GenericArray( - new Object[] {"hello".getBytes(), "world".getBytes(), "test".getBytes()})); + List innerArray2 = (List) outerArray.get(1); + assertThat(innerArray2.size()).isEqualTo(3); + assertThat(innerArray2.get(0)).isEqualTo(3); + assertThat(innerArray2.get(1)).isEqualTo(4); + assertThat(innerArray2.get(2)).isEqualTo(5); - FlussRowAsIcebergRecord record = new FlussRowAsIcebergRecord(structType, flussRowType); - record.internalRow = genericRow; + // Test array with null elements + List nullableArray = (List) record.get(13); + assertThat(nullableArray).isNotNull(); + assertThat(nullableArray.size()).isEqualTo(3); + assertThat(nullableArray.get(0)).isEqualTo(1); + assertThat(nullableArray.get(1)).isNull(); + assertThat(nullableArray.get(2)).isEqualTo(3); - List array = (List) record.get(0); - assertThat(array).isNotNull(); - assertThat(array.size()).isEqualTo(3); - assertThat(array.get(0)).isInstanceOf(ByteBuffer.class); - assertThat(((ByteBuffer) array.get(0)).array()).isEqualTo("hello".getBytes()); - assertThat(((ByteBuffer) array.get(1)).array()).isEqualTo("world".getBytes()); - assertThat(((ByteBuffer) array.get(2)).array()).isEqualTo("test".getBytes()); + // Test null array + assertThat(record.get(14)).isNull(); } } From 908dcb92fc8deb05b55679e131ab47451110ac00 Mon Sep 17 00:00:00 2001 From: forwardxu Date: Tue, 30 Dec 2025 08:53:31 +0800 Subject: [PATCH 3/4] [lake/iceberg] Support tier array type for iceberg --- .../fluss/lake/iceberg/tiering/IcebergTieringTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java index a9a8426149..7420bc96d2 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java @@ -348,7 +348,7 @@ private List genKvRow( } private GenericRecord toRecord(long offset, GenericRow row, ChangeType changeType) { - return new GenericRecord(offset, System.currentTimeMillis(), changeType, row); + return new GenericRecord(offset, 1000000000L + offset, changeType, row); } private void createTable( @@ -549,6 +549,7 @@ void testTieringWriteTableWithArrayType(boolean isPrimaryKeyTable, boolean isPar private List genArrayTypeLogTableRecords( @Nullable String partition, int bucket, int numRecords) { List logRecords = new ArrayList<>(); + long timestamp = 1000000000L; for (int i = 0; i < numRecords; i++) { GenericRow genericRow = new GenericRow(3); genericRow.setField(0, i); @@ -560,8 +561,7 @@ private List genArrayTypeLogTableRecords( } LogRecord logRecord = - new GenericRecord( - i, System.currentTimeMillis(), ChangeType.APPEND_ONLY, genericRow); + new GenericRecord(i, timestamp + i, ChangeType.APPEND_ONLY, genericRow); logRecords.add(logRecord); } return logRecords; From 9eee7dcf763167533400a9382155f5a7e642d099 Mon Sep 17 00:00:00 2001 From: forwardxu Date: Mon, 5 Jan 2026 09:52:11 +0800 Subject: [PATCH 4/4] [lake/iceberg] Support tier array type for iceberg --- .../iceberg/tiering/IcebergTieringTest.java | 253 ------------------ 1 file changed, 253 deletions(-) diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java index 7420bc96d2..6443e85294 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java @@ -32,7 +32,6 @@ import org.apache.fluss.record.GenericRecord; import org.apache.fluss.record.LogRecord; import org.apache.fluss.row.BinaryString; -import org.apache.fluss.row.GenericArray; import org.apache.fluss.row.GenericRow; import org.apache.fluss.types.DataTypes; import org.apache.fluss.utils.types.Tuple2; @@ -442,256 +441,4 @@ private void verifyTableRecords( .isEqualTo(expectRecord.timestamp()); } } - - @ParameterizedTest - @MethodSource("tieringWriteArgs") - void testTieringWriteTableWithArrayType(boolean isPrimaryKeyTable, boolean isPartitionedTable) - throws Exception { - TablePath tablePath = - TablePath.of( - "iceberg", - String.format( - "test_tiering_array_%s_%s", - isPrimaryKeyTable ? "pk" : "log", - isPartitionedTable ? "partitioned" : "unpartitioned")); - createTableWithArrayType(tablePath, isPrimaryKeyTable, isPartitionedTable); - - TableDescriptor descriptor = - TableDescriptor.builder() - .schema( - org.apache.fluss.metadata.Schema.newBuilder() - .column("c1", DataTypes.INT()) - .column("c2", DataTypes.ARRAY(DataTypes.INT())) - .column("c3", DataTypes.STRING()) - .build()) - .distributedBy(BUCKET_NUM) - .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) - .build(); - TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 1L); - - Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath)); - - Map, List> recordsByBucket = new HashMap<>(); - Map partitionIdAndName = - isPartitionedTable - ? new HashMap() { - { - put(1L, "p1"); - put(2L, "p2"); - } - } - : Collections.singletonMap(null, null); - - List icebergWriteResults = new ArrayList<>(); - SimpleVersionedSerializer writeResultSerializer = - icebergLakeTieringFactory.getWriteResultSerializer(); - SimpleVersionedSerializer committableSerializer = - icebergLakeTieringFactory.getCommittableSerializer(); - - // write data - for (int bucket = 0; bucket < BUCKET_NUM; bucket++) { - for (Map.Entry entry : partitionIdAndName.entrySet()) { - String partition = entry.getValue(); - try (LakeWriter writer = - createLakeWriter(tablePath, bucket, partition, entry.getKey(), tableInfo)) { - Tuple2 partitionBucket = Tuple2.of(partition, bucket); - Tuple2, List> writeAndExpectRecords = - isPrimaryKeyTable - ? genArrayTypePrimaryKeyTableRecords(partition, bucket) - : Tuple2.of( - genArrayTypeLogTableRecords(partition, bucket, 5), - genArrayTypeLogTableRecords(partition, bucket, 5)); - - List writtenRecords = writeAndExpectRecords.f0; - List expectRecords = writeAndExpectRecords.f1; - recordsByBucket.put(partitionBucket, expectRecords); - - for (LogRecord record : writtenRecords) { - writer.write(record); - } - IcebergWriteResult result = writer.complete(); - byte[] serialized = writeResultSerializer.serialize(result); - icebergWriteResults.add( - writeResultSerializer.deserialize( - writeResultSerializer.getVersion(), serialized)); - } - } - } - - // commit data - try (LakeCommitter lakeCommitter = - createLakeCommitter(tablePath, tableInfo)) { - IcebergCommittable icebergCommittable = - lakeCommitter.toCommittable(icebergWriteResults); - byte[] serialized = committableSerializer.serialize(icebergCommittable); - icebergCommittable = - committableSerializer.deserialize( - committableSerializer.getVersion(), serialized); - long snapshot = - lakeCommitter.commit(icebergCommittable, Collections.singletonMap("k1", "v1")); - icebergTable.refresh(); - Snapshot icebergSnapshot = icebergTable.currentSnapshot(); - assertThat(snapshot).isEqualTo(icebergSnapshot.snapshotId()); - } - - // verify data - for (int bucket = 0; bucket < BUCKET_NUM; bucket++) { - for (String partition : partitionIdAndName.values()) { - Tuple2 partitionBucket = Tuple2.of(partition, bucket); - List expectRecords = recordsByBucket.get(partitionBucket); - CloseableIterator actualRecords = - getIcebergRows(icebergTable, partition, bucket); - verifyArrayTypeRecords(actualRecords, expectRecords, bucket, partition); - } - } - } - - private List genArrayTypeLogTableRecords( - @Nullable String partition, int bucket, int numRecords) { - List logRecords = new ArrayList<>(); - long timestamp = 1000000000L; - for (int i = 0; i < numRecords; i++) { - GenericRow genericRow = new GenericRow(3); - genericRow.setField(0, i); - genericRow.setField(1, new GenericArray(new int[] {i, i + 1, i + 2})); - if (partition != null) { - genericRow.setField(2, BinaryString.fromString(partition)); - } else { - genericRow.setField(2, BinaryString.fromString("bucket" + bucket)); - } - - LogRecord logRecord = - new GenericRecord(i, timestamp + i, ChangeType.APPEND_ONLY, genericRow); - logRecords.add(logRecord); - } - return logRecords; - } - - private Tuple2, List> genArrayTypePrimaryKeyTableRecords( - @Nullable String partition, int bucket) { - int offset = -1; - List writtenLogRecords = new ArrayList<>(); - List expectLogRecords = new ArrayList<>(); - - // gen +I - GenericRow insertRow = genArrayTypeKvRow(partition, bucket, 0, 0); - writtenLogRecords.add(toRecord(++offset, insertRow, INSERT)); - expectLogRecords.add(writtenLogRecords.get(writtenLogRecords.size() - 1)); - - // gen +U (update) - GenericRow updateRow = genArrayTypeKvRow(partition, bucket, 0, 1); - writtenLogRecords.add(toRecord(++offset, updateRow, UPDATE_AFTER)); - expectLogRecords.set(0, writtenLogRecords.get(writtenLogRecords.size() - 1)); - - // gen +I (another key) - GenericRow insertRow2 = genArrayTypeKvRow(partition, bucket, 1, 2); - writtenLogRecords.add(toRecord(++offset, insertRow2, INSERT)); - expectLogRecords.add(writtenLogRecords.get(writtenLogRecords.size() - 1)); - - return Tuple2.of(writtenLogRecords, expectLogRecords); - } - - private GenericRow genArrayTypeKvRow( - @Nullable String partition, int bucket, int key, int arrayBase) { - GenericRow genericRow = new GenericRow(3); - genericRow.setField(0, key); - genericRow.setField( - 1, new GenericArray(new int[] {arrayBase, arrayBase + 1, arrayBase + 2})); - if (partition != null) { - genericRow.setField(2, BinaryString.fromString(partition)); - } else { - genericRow.setField(2, BinaryString.fromString("bucket" + bucket)); - } - return genericRow; - } - - private void createTableWithArrayType( - TablePath tablePath, boolean isPrimaryTable, boolean isPartitionedTable) { - Namespace namespace = Namespace.of(tablePath.getDatabaseName()); - if (icebergCatalog instanceof SupportsNamespaces) { - SupportsNamespaces ns = (SupportsNamespaces) icebergCatalog; - if (!ns.namespaceExists(namespace)) { - ns.createNamespace(namespace); - } - } - - Set identifierFieldIds = new HashSet<>(); - if (isPrimaryTable) { - identifierFieldIds.add(1); - if (isPartitionedTable) { - identifierFieldIds.add(3); - } - } - - org.apache.iceberg.Schema schema = - new org.apache.iceberg.Schema( - Arrays.asList( - Types.NestedField.required(1, "c1", Types.IntegerType.get()), - Types.NestedField.optional( - 2, - "c2", - Types.ListType.ofRequired(7, Types.IntegerType.get())), - Types.NestedField.required(3, "c3", Types.StringType.get()), - Types.NestedField.required( - 4, BUCKET_COLUMN_NAME, Types.IntegerType.get()), - Types.NestedField.required( - 5, OFFSET_COLUMN_NAME, Types.LongType.get()), - Types.NestedField.required( - 6, TIMESTAMP_COLUMN_NAME, Types.TimestampType.withZone())), - identifierFieldIds); - - PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); - if (isPartitionedTable) { - builder.identity("c3"); - } - - PartitionSpec partitionSpec; - if (isPrimaryTable) { - partitionSpec = builder.bucket("c1", BUCKET_NUM).build(); - } else { - partitionSpec = builder.identity(BUCKET_COLUMN_NAME).build(); - } - - TableIdentifier tableId = - TableIdentifier.of(tablePath.getDatabaseName(), tablePath.getTableName()); - icebergCatalog.createTable(tableId, schema, partitionSpec); - } - - private void verifyArrayTypeRecords( - CloseableIterator actualRecords, - List expectRecords, - int expectBucket, - @Nullable String partition) { - for (LogRecord expectRecord : expectRecords) { - Record actualRecord = actualRecords.next(); - // check business columns: - assertThat(actualRecord.get(0)).isEqualTo(expectRecord.getRow().getInt(0)); - - // check array column - List actualArray = (List) actualRecord.get(1); - org.apache.fluss.row.InternalArray expectedArray = expectRecord.getRow().getArray(1); - assertThat(actualArray).isNotNull(); - assertThat(actualArray.size()).isEqualTo(expectedArray.size()); - for (int i = 0; i < expectedArray.size(); i++) { - assertThat(actualArray.get(i)).isEqualTo(expectedArray.getInt(i)); - } - - assertThat(actualRecord.get(2, String.class)) - .isEqualTo(expectRecord.getRow().getString(2).toString()); - if (partition != null) { - assertThat(actualRecord.get(2, String.class)).isEqualTo(partition); - } - - // check system columns: __bucket, __offset, __timestamp - assertThat(actualRecord.get(3)).isEqualTo(expectBucket); - assertThat(actualRecord.get(4)).isEqualTo(expectRecord.logOffset()); - assertThat( - actualRecord - .get(5, OffsetDateTime.class) - .atZoneSameInstant(ZoneOffset.UTC) - .toInstant() - .toEpochMilli()) - .isEqualTo(expectRecord.timestamp()); - } - } }