diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java index aa1a97b443..1c093482aa 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java @@ -47,6 +47,30 @@ public class FlussDataTypeToIcebergDataType implements DataTypeVisitor { public static final FlussDataTypeToIcebergDataType INSTANCE = new FlussDataTypeToIcebergDataType(); + private final RowType root; + private int nextId; + + FlussDataTypeToIcebergDataType() { + this.root = null; + this.nextId = 0; + } + + FlussDataTypeToIcebergDataType(int startId) { + this.root = null; + this.nextId = startId; + } + + FlussDataTypeToIcebergDataType(RowType root) { + this.root = root; + this.nextId = root.getFieldCount(); + } + + private int getNextId() { + int next = nextId; + nextId += 1; + return next; + } + @Override public Type visit(CharType charType) { return Types.StringType.get(); @@ -129,7 +153,12 @@ public Type visit(LocalZonedTimestampType localZonedTimestampType) { @Override public Type visit(ArrayType arrayType) { - throw new UnsupportedOperationException("Unsupported array type"); + Type elementType = arrayType.getElementType().accept(this); + if (arrayType.getElementType().isNullable()) { + return Types.ListType.ofOptional(getNextId(), elementType); + } else { + return Types.ListType.ofRequired(getNextId(), elementType); + } } @Override diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java index 03afa011c6..9bd9d54e6c 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java @@ -178,6 +178,11 @@ public Schema convertToIcebergSchema(TableDescriptor tableDescriptor, boolean is List fields = new ArrayList<>(); int fieldId = 0; + int totalTopLevelFields = + tableDescriptor.getSchema().getColumns().size() + SYSTEM_COLUMNS.size(); + FlussDataTypeToIcebergDataType converter = + new FlussDataTypeToIcebergDataType(totalTopLevelFields); + // general columns for (org.apache.fluss.metadata.Schema.Column column : tableDescriptor.getSchema().getColumns()) { @@ -192,16 +197,14 @@ public Schema convertToIcebergSchema(TableDescriptor tableDescriptor, boolean is Types.NestedField.optional( fieldId++, colName, - column.getDataType() - .accept(FlussDataTypeToIcebergDataType.INSTANCE), + column.getDataType().accept(converter), column.getComment().orElse(null)); } else { field = Types.NestedField.required( fieldId++, colName, - column.getDataType() - .accept(FlussDataTypeToIcebergDataType.INSTANCE), + column.getDataType().accept(converter), column.getComment().orElse(null)); } fields.add(field); diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java new file mode 100644 index 0000000000..2671e045cf --- /dev/null +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.iceberg.source; + +import org.apache.fluss.row.InternalArray; +import org.apache.fluss.types.ArrayType; +import org.apache.fluss.types.BigIntType; +import org.apache.fluss.types.BinaryType; +import org.apache.fluss.types.BooleanType; +import org.apache.fluss.types.BytesType; +import org.apache.fluss.types.CharType; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DateType; +import org.apache.fluss.types.DecimalType; +import org.apache.fluss.types.DoubleType; +import org.apache.fluss.types.FloatType; +import org.apache.fluss.types.IntType; +import org.apache.fluss.types.LocalZonedTimestampType; +import org.apache.fluss.types.SmallIntType; +import org.apache.fluss.types.StringType; +import org.apache.fluss.types.TimeType; +import org.apache.fluss.types.TimestampType; +import org.apache.fluss.types.TinyIntType; +import org.apache.fluss.utils.DateTimeUtils; + +import java.nio.ByteBuffer; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.AbstractList; + +/** Adapter class for converting Fluss InternalArray to a Java List for Iceberg. */ +public class FlussArrayAsIcebergList extends AbstractList { + + private final InternalArray flussArray; + private final DataType elementType; + + public FlussArrayAsIcebergList(InternalArray flussArray, DataType elementType) { + this.flussArray = flussArray; + this.elementType = elementType; + } + + @Override + public Object get(int index) { + if (flussArray.isNullAt(index)) { + return null; + } + + if (elementType instanceof BooleanType) { + return flussArray.getBoolean(index); + } else if (elementType instanceof TinyIntType) { + return (int) flussArray.getByte(index); + } else if (elementType instanceof SmallIntType) { + return (int) flussArray.getShort(index); + } else if (elementType instanceof IntType) { + return flussArray.getInt(index); + } else if (elementType instanceof BigIntType) { + return flussArray.getLong(index); + } else if (elementType instanceof FloatType) { + return flussArray.getFloat(index); + } else if (elementType instanceof DoubleType) { + return flussArray.getDouble(index); + } else if (elementType instanceof StringType) { + return flussArray.getString(index).toString(); + } else if (elementType instanceof CharType) { + CharType charType = (CharType) elementType; + return flussArray.getChar(index, charType.getLength()).toString(); + } else if (elementType instanceof BytesType || elementType instanceof BinaryType) { + return ByteBuffer.wrap(flussArray.getBytes(index)); + } else if (elementType instanceof DecimalType) { + DecimalType decimalType = (DecimalType) elementType; + return flussArray + .getDecimal(index, decimalType.getPrecision(), decimalType.getScale()) + .toBigDecimal(); + } else if (elementType instanceof LocalZonedTimestampType) { + LocalZonedTimestampType ltzType = (LocalZonedTimestampType) elementType; + return toIcebergTimestampLtz( + flussArray.getTimestampLtz(index, ltzType.getPrecision()).toInstant()); + } else if (elementType instanceof TimestampType) { + TimestampType tsType = (TimestampType) elementType; + return flussArray.getTimestampNtz(index, tsType.getPrecision()).toLocalDateTime(); + } else if (elementType instanceof DateType) { + return DateTimeUtils.toLocalDate(flussArray.getInt(index)); + } else if (elementType instanceof TimeType) { + return DateTimeUtils.toLocalTime(flussArray.getInt(index)); + } else if (elementType instanceof ArrayType) { + InternalArray innerArray = flussArray.getArray(index); + return innerArray == null + ? null + : new FlussArrayAsIcebergList( + innerArray, ((ArrayType) elementType).getElementType()); + } else { + throw new UnsupportedOperationException( + "Unsupported array element type conversion for Fluss type: " + + elementType.getClass().getSimpleName()); + } + } + + @Override + public int size() { + return flussArray.size(); + } + + private OffsetDateTime toIcebergTimestampLtz(Instant instant) { + return OffsetDateTime.ofInstant(instant, ZoneOffset.UTC); + } +} diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java index 805bddcb6c..70dc8ea6f8 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java @@ -17,7 +17,9 @@ package org.apache.fluss.lake.iceberg.source; +import org.apache.fluss.row.InternalArray; import org.apache.fluss.row.InternalRow; +import org.apache.fluss.types.ArrayType; import org.apache.fluss.types.BigIntType; import org.apache.fluss.types.BinaryType; import org.apache.fluss.types.BooleanType; @@ -169,6 +171,14 @@ private FlussRowToIcebergFieldConverter createTypeConverter(DataType flussType, return row -> DateTimeUtils.toLocalDate(row.getInt(pos)); } else if (flussType instanceof TimeType) { return row -> DateTimeUtils.toLocalTime(row.getInt(pos)); + } else if (flussType instanceof ArrayType) { + ArrayType arrayType = (ArrayType) flussType; + return row -> { + InternalArray array = row.getArray(pos); + return array == null + ? null + : new FlussArrayAsIcebergList(array, arrayType.getElementType()); + }; } else { throw new UnsupportedOperationException( "Unsupported data type conversion for Fluss type: " diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java new file mode 100644 index 0000000000..23aecefec7 --- /dev/null +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergArrayAsFlussArray.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.iceberg.source; + +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.InternalArray; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.TimestampLtz; +import org.apache.fluss.row.TimestampNtz; +import org.apache.fluss.utils.BytesUtils; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.util.List; + +/** Adapter for Iceberg List as Fluss InternalArray. */ +public class IcebergArrayAsFlussArray implements InternalArray { + + private final List icebergList; + + public IcebergArrayAsFlussArray(List icebergList) { + this.icebergList = icebergList; + } + + @Override + public int size() { + return icebergList.size(); + } + + @Override + public boolean isNullAt(int pos) { + return icebergList.get(pos) == null; + } + + @Override + public boolean getBoolean(int pos) { + return (boolean) icebergList.get(pos); + } + + @Override + public byte getByte(int pos) { + Object value = icebergList.get(pos); + return ((Integer) value).byteValue(); + } + + @Override + public short getShort(int pos) { + Object value = icebergList.get(pos); + return ((Integer) value).shortValue(); + } + + @Override + public int getInt(int pos) { + return (Integer) icebergList.get(pos); + } + + @Override + public long getLong(int pos) { + return (Long) icebergList.get(pos); + } + + @Override + public float getFloat(int pos) { + return (float) icebergList.get(pos); + } + + @Override + public double getDouble(int pos) { + return (double) icebergList.get(pos); + } + + @Override + public BinaryString getChar(int pos, int length) { + String value = (String) icebergList.get(pos); + return BinaryString.fromBytes(value.getBytes()); + } + + @Override + public BinaryString getString(int pos) { + String value = (String) icebergList.get(pos); + return BinaryString.fromBytes(value.getBytes()); + } + + @Override + public Decimal getDecimal(int pos, int precision, int scale) { + BigDecimal bigDecimal = (BigDecimal) icebergList.get(pos); + return Decimal.fromBigDecimal(bigDecimal, precision, scale); + } + + @Override + public TimestampNtz getTimestampNtz(int pos, int precision) { + LocalDateTime localDateTime = (LocalDateTime) icebergList.get(pos); + return TimestampNtz.fromLocalDateTime(localDateTime); + } + + @Override + public TimestampLtz getTimestampLtz(int pos, int precision) { + OffsetDateTime offsetDateTime = (OffsetDateTime) icebergList.get(pos); + return TimestampLtz.fromInstant(offsetDateTime.toInstant()); + } + + @Override + public byte[] getBinary(int pos, int length) { + ByteBuffer byteBuffer = (ByteBuffer) icebergList.get(pos); + return BytesUtils.toArray(byteBuffer); + } + + @Override + public byte[] getBytes(int pos) { + ByteBuffer byteBuffer = (ByteBuffer) icebergList.get(pos); + return BytesUtils.toArray(byteBuffer); + } + + @Override + public InternalArray getArray(int pos) { + List nestedList = (List) icebergList.get(pos); + return nestedList == null ? null : new IcebergArrayAsFlussArray(nestedList); + } + + @Override + public InternalRow getRow(int pos, int numFields) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean[] toBooleanArray() { + boolean[] result = new boolean[icebergList.size()]; + for (int i = 0; i < icebergList.size(); i++) { + result[i] = (boolean) icebergList.get(i); + } + return result; + } + + @Override + public byte[] toByteArray() { + byte[] result = new byte[icebergList.size()]; + for (int i = 0; i < icebergList.size(); i++) { + result[i] = ((Integer) icebergList.get(i)).byteValue(); + } + return result; + } + + @Override + public short[] toShortArray() { + short[] result = new short[icebergList.size()]; + for (int i = 0; i < icebergList.size(); i++) { + result[i] = ((Integer) icebergList.get(i)).shortValue(); + } + return result; + } + + @Override + public int[] toIntArray() { + int[] result = new int[icebergList.size()]; + for (int i = 0; i < icebergList.size(); i++) { + result[i] = (int) icebergList.get(i); + } + return result; + } + + @Override + public long[] toLongArray() { + long[] result = new long[icebergList.size()]; + for (int i = 0; i < icebergList.size(); i++) { + result[i] = (long) icebergList.get(i); + } + return result; + } + + @Override + public float[] toFloatArray() { + float[] result = new float[icebergList.size()]; + for (int i = 0; i < icebergList.size(); i++) { + result[i] = (float) icebergList.get(i); + } + return result; + } + + @Override + public double[] toDoubleArray() { + double[] result = new double[icebergList.size()]; + for (int i = 0; i < icebergList.size(); i++) { + result[i] = (double) icebergList.get(i); + } + return result; + } +} diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java index dd76bd2ec2..2bd80893f0 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java @@ -32,6 +32,7 @@ import java.nio.ByteBuffer; import java.time.LocalDateTime; import java.time.OffsetDateTime; +import java.util.List; import static org.apache.fluss.lake.iceberg.IcebergLakeCatalog.SYSTEM_COLUMNS; @@ -151,8 +152,12 @@ public byte[] getBytes(int pos) { @Override public InternalArray getArray(int pos) { - // TODO: Support Array type conversion from Iceberg to Fluss - throw new UnsupportedOperationException(); + Object value = icebergRecord.get(pos); + if (value == null) { + return null; + } + List icebergList = (List) value; + return new IcebergArrayAsFlussArray(icebergList); } @Override diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java index 4d7582c53e..038ebd8a7a 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java @@ -125,6 +125,9 @@ private static DataType convertIcebergTypeToFlussType(Type icebergType) { } else if (icebergType instanceof Types.DecimalType) { Types.DecimalType decimalType = (Types.DecimalType) icebergType; return DataTypes.DECIMAL(decimalType.precision(), decimalType.scale()); + } else if (icebergType instanceof Types.ListType) { + Types.ListType listType = (Types.ListType) icebergType; + return DataTypes.ARRAY(convertIcebergTypeToFlussType(listType.elementType())); } throw new UnsupportedOperationException( "Unsupported data type conversion for Iceberg type: " diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java index 0518290a5f..a8b62e7736 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java @@ -25,6 +25,7 @@ import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.GenericArray; import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; @@ -98,6 +99,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L), TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, partition)); expectedRows.add( Row.of( @@ -116,6 +118,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L), TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, partition)); } } else { @@ -137,6 +140,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273183L), TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, null), Row.of( true, @@ -154,6 +158,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L), TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, null)); } @@ -196,6 +201,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L), TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, partition)); expectedRows2.add( Row.ofKind( @@ -215,6 +221,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273501L), TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, + new float[] {2.1f, 2.2f, 2.3f}, partition)); } } else { @@ -236,6 +243,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273201L), TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, + new float[] {1.1f, 1.2f, 1.3f}, null)); expectedRows2.add( Row.ofKind( @@ -255,6 +263,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception { TimestampNtz.fromMillis(1698235273501L), TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, + new float[] {2.1f, 2.2f, 2.3f}, null)); } @@ -358,6 +367,7 @@ private void writeFullTypeRow(TablePath tablePath, String partition) throws Exce TimestampNtz.fromMillis(1698235273501L), TimestampNtz.fromMillis(1698235273501L, 8000), new byte[] {5, 6, 7, 8}, + new GenericArray(new float[] {2.1f, 2.2f, 2.3f}), partition)); writeRows(tablePath, rows, false); } @@ -411,7 +421,8 @@ protected long createPkTableFullType(TablePath tablePath, int bucketNum, boolean .column("c13", DataTypes.TIMESTAMP(3)) .column("c14", DataTypes.TIMESTAMP(6)) .column("c15", DataTypes.BINARY(4)) - .column("c16", DataTypes.STRING()); + .column("c16", DataTypes.ARRAY(DataTypes.FLOAT())) + .column("c17", DataTypes.STRING()); TableDescriptor.Builder tableBuilder = TableDescriptor.builder() @@ -421,8 +432,8 @@ protected long createPkTableFullType(TablePath tablePath, int bucketNum, boolean if (isPartitioned) { tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true); - tableBuilder.partitionedBy("c16"); - schemaBuilder.primaryKey("c4", "c16"); + tableBuilder.partitionedBy("c17"); + schemaBuilder.primaryKey("c4", "c17"); tableBuilder.property( ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, AutoPartitionTimeUnit.YEAR); } else { @@ -450,6 +461,7 @@ private List generateKvRowsFullType(@Nullable String partition) { TimestampNtz.fromMillis(1698235273183L), TimestampNtz.fromMillis(1698235273183L, 6000), new byte[] {1, 2, 3, 4}, + new GenericArray(new float[] {1.1f, 1.2f, 1.3f}), partition), row( true, @@ -467,6 +479,7 @@ private List generateKvRowsFullType(@Nullable String partition) { TimestampNtz.fromMillis(1698235273201L), TimestampNtz.fromMillis(1698235273201L, 6000), new byte[] {1, 2, 3, 4}, + new GenericArray(new float[] {1.1f, 1.2f, 1.3f}), partition)); } diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java new file mode 100644 index 0000000000..0c3ec679c3 --- /dev/null +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.iceberg.source; + +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.GenericArray; +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.types.DataTypes; +import org.apache.fluss.types.RowType; + +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for {@link FlussRowAsIcebergRecord} with array types. */ +class FlussRowAsIcebergRecordTest { + + @Test + void testArrayWithAllTypes() { + Types.StructType structType = + Types.StructType.of( + Types.NestedField.required( + 0, + "bool_array", + Types.ListType.ofRequired(1, Types.BooleanType.get())), + Types.NestedField.required( + 2, + "byte_array", + Types.ListType.ofRequired(3, Types.IntegerType.get())), + Types.NestedField.required( + 4, + "short_array", + Types.ListType.ofRequired(5, Types.IntegerType.get())), + Types.NestedField.required( + 6, + "int_array", + Types.ListType.ofRequired(7, Types.IntegerType.get())), + Types.NestedField.required( + 8, + "long_array", + Types.ListType.ofRequired(9, Types.LongType.get())), + Types.NestedField.required( + 10, + "float_array", + Types.ListType.ofRequired(11, Types.FloatType.get())), + Types.NestedField.required( + 12, + "double_array", + Types.ListType.ofRequired(13, Types.DoubleType.get())), + Types.NestedField.required( + 14, + "string_array", + Types.ListType.ofRequired(15, Types.StringType.get())), + Types.NestedField.required( + 16, + "decimal_array", + Types.ListType.ofRequired(17, Types.DecimalType.of(10, 2))), + Types.NestedField.required( + 18, + "timestamp_ntz_array", + Types.ListType.ofRequired(19, Types.TimestampType.withoutZone())), + Types.NestedField.required( + 20, + "timestamp_ltz_array", + Types.ListType.ofRequired(21, Types.TimestampType.withZone())), + Types.NestedField.required( + 22, + "binary_array", + Types.ListType.ofRequired(23, Types.BinaryType.get())), + Types.NestedField.required( + 24, + "nested_array", + Types.ListType.ofRequired( + 25, + Types.ListType.ofRequired(26, Types.IntegerType.get()))), + Types.NestedField.required( + 27, + "nullable_int_array", + Types.ListType.ofOptional(28, Types.IntegerType.get())), + Types.NestedField.optional( + 29, + "null_array", + Types.ListType.ofRequired(30, Types.IntegerType.get()))); + + RowType flussRowType = + RowType.of( + DataTypes.ARRAY(DataTypes.BOOLEAN()), + DataTypes.ARRAY(DataTypes.TINYINT()), + DataTypes.ARRAY(DataTypes.SMALLINT()), + DataTypes.ARRAY(DataTypes.INT()), + DataTypes.ARRAY(DataTypes.BIGINT()), + DataTypes.ARRAY(DataTypes.FLOAT()), + DataTypes.ARRAY(DataTypes.DOUBLE()), + DataTypes.ARRAY(DataTypes.STRING()), + DataTypes.ARRAY(DataTypes.DECIMAL(10, 2)), + DataTypes.ARRAY(DataTypes.TIMESTAMP(6)), + DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ(6)), + DataTypes.ARRAY(DataTypes.BYTES()), + DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())), + DataTypes.ARRAY(DataTypes.INT()), + DataTypes.ARRAY(DataTypes.INT())); + + GenericRow genericRow = new GenericRow(15); + genericRow.setField(0, new GenericArray(new boolean[] {true, false, true})); + genericRow.setField(1, new GenericArray(new byte[] {1, 2, 3})); + genericRow.setField(2, new GenericArray(new short[] {100, 200, 300})); + genericRow.setField(3, new GenericArray(new int[] {1000, 2000, 3000})); + genericRow.setField(4, new GenericArray(new long[] {10000L, 20000L, 30000L})); + genericRow.setField(5, new GenericArray(new float[] {1.1f, 2.2f, 3.3f})); + genericRow.setField(6, new GenericArray(new double[] {1.11, 2.22, 3.33})); + genericRow.setField( + 7, + new GenericArray( + new Object[] { + BinaryString.fromString("hello"), + BinaryString.fromString("world"), + BinaryString.fromString("test") + })); + genericRow.setField( + 8, + new GenericArray( + new Object[] { + Decimal.fromBigDecimal(new BigDecimal("123.45"), 10, 2), + Decimal.fromBigDecimal(new BigDecimal("678.90"), 10, 2) + })); + genericRow.setField( + 9, + new GenericArray( + new Object[] { + org.apache.fluss.row.TimestampNtz.fromLocalDateTime( + LocalDateTime.now()), + org.apache.fluss.row.TimestampNtz.fromLocalDateTime( + LocalDateTime.now().plusSeconds(1)) + })); + genericRow.setField( + 10, + new GenericArray( + new Object[] { + org.apache.fluss.row.TimestampLtz.fromEpochMillis( + System.currentTimeMillis()), + org.apache.fluss.row.TimestampLtz.fromEpochMillis( + System.currentTimeMillis() + 1000) + })); + genericRow.setField( + 11, + new GenericArray( + new Object[] {"hello".getBytes(), "world".getBytes(), "test".getBytes()})); + genericRow.setField( + 12, + new GenericArray( + new Object[] { + new GenericArray(new int[] {1, 2}), + new GenericArray(new int[] {3, 4, 5}) + })); + genericRow.setField(13, new GenericArray(new Object[] {1, null, 3})); + genericRow.setField(14, null); + + FlussRowAsIcebergRecord record = new FlussRowAsIcebergRecord(structType, flussRowType); + record.internalRow = genericRow; + + // Test boolean array + List boolArray = (List) record.get(0); + assertThat(boolArray.size()).isEqualTo(3); + assertThat(boolArray.get(0)).isEqualTo(true); + assertThat(boolArray.get(1)).isEqualTo(false); + assertThat(boolArray.get(2)).isEqualTo(true); + + // Test byte array + List byteArray = (List) record.get(1); + assertThat(byteArray.size()).isEqualTo(3); + assertThat(byteArray.get(0)).isEqualTo(1); + assertThat(byteArray.get(1)).isEqualTo(2); + assertThat(byteArray.get(2)).isEqualTo(3); + + // Test short array + List shortArray = (List) record.get(2); + assertThat(shortArray.size()).isEqualTo(3); + assertThat(shortArray.get(0)).isEqualTo(100); + assertThat(shortArray.get(1)).isEqualTo(200); + assertThat(shortArray.get(2)).isEqualTo(300); + + // Test int array + List intArray = (List) record.get(3); + assertThat(intArray.size()).isEqualTo(3); + assertThat(intArray.get(0)).isEqualTo(1000); + assertThat(intArray.get(1)).isEqualTo(2000); + assertThat(intArray.get(2)).isEqualTo(3000); + + // Test long array + List longArray = (List) record.get(4); + assertThat(longArray.size()).isEqualTo(3); + assertThat(longArray.get(0)).isEqualTo(10000L); + assertThat(longArray.get(1)).isEqualTo(20000L); + assertThat(longArray.get(2)).isEqualTo(30000L); + + // Test float array + List floatArray = (List) record.get(5); + assertThat(floatArray.size()).isEqualTo(3); + assertThat(floatArray.get(0)).isEqualTo(1.1f); + assertThat(floatArray.get(1)).isEqualTo(2.2f); + assertThat(floatArray.get(2)).isEqualTo(3.3f); + + // Test double array + List doubleArray = (List) record.get(6); + assertThat(doubleArray.size()).isEqualTo(3); + assertThat(doubleArray.get(0)).isEqualTo(1.11); + assertThat(doubleArray.get(1)).isEqualTo(2.22); + assertThat(doubleArray.get(2)).isEqualTo(3.33); + + // Test string array + List stringArray = (List) record.get(7); + assertThat(stringArray.size()).isEqualTo(3); + assertThat(stringArray.get(0)).isEqualTo("hello"); + assertThat(stringArray.get(1)).isEqualTo("world"); + assertThat(stringArray.get(2)).isEqualTo("test"); + + // Test decimal array + List decimalArray = (List) record.get(8); + assertThat(decimalArray.size()).isEqualTo(2); + assertThat(decimalArray.get(0)).isEqualTo(new BigDecimal("123.45")); + assertThat(decimalArray.get(1)).isEqualTo(new BigDecimal("678.90")); + + // Test timestamp array + List timestampNtzArray = (List) record.get(9); + assertThat(timestampNtzArray).isNotNull(); + assertThat(timestampNtzArray.size()).isEqualTo(2); + assertThat(timestampNtzArray.get(0)).isInstanceOf(LocalDateTime.class); + assertThat(timestampNtzArray.get(1)).isInstanceOf(LocalDateTime.class); + + // Test timestamp_ltz array + List timestampLtzArray = (List) record.get(10); + assertThat(timestampLtzArray).isNotNull(); + assertThat(timestampLtzArray.size()).isEqualTo(2); + assertThat(timestampLtzArray.get(0)).isInstanceOf(OffsetDateTime.class); + assertThat(timestampLtzArray.get(1)).isInstanceOf(OffsetDateTime.class); + + // Test binary array + List binaryArray = (List) record.get(11); + assertThat(binaryArray).isNotNull(); + assertThat(binaryArray.size()).isEqualTo(3); + assertThat(binaryArray.get(0)).isInstanceOf(ByteBuffer.class); + assertThat(((ByteBuffer) binaryArray.get(0)).array()).isEqualTo("hello".getBytes()); + assertThat(((ByteBuffer) binaryArray.get(1)).array()).isEqualTo("world".getBytes()); + assertThat(((ByteBuffer) binaryArray.get(2)).array()).isEqualTo("test".getBytes()); + + // Test nested array (array>) + List outerArray = (List) record.get(12); + assertThat(outerArray).isNotNull(); + assertThat(outerArray.size()).isEqualTo(2); + + List innerArray1 = (List) outerArray.get(0); + assertThat(innerArray1.size()).isEqualTo(2); + assertThat(innerArray1.get(0)).isEqualTo(1); + assertThat(innerArray1.get(1)).isEqualTo(2); + + List innerArray2 = (List) outerArray.get(1); + assertThat(innerArray2.size()).isEqualTo(3); + assertThat(innerArray2.get(0)).isEqualTo(3); + assertThat(innerArray2.get(1)).isEqualTo(4); + assertThat(innerArray2.get(2)).isEqualTo(5); + + // Test array with null elements + List nullableArray = (List) record.get(13); + assertThat(nullableArray).isNotNull(); + assertThat(nullableArray.size()).isEqualTo(3); + assertThat(nullableArray.get(0)).isEqualTo(1); + assertThat(nullableArray.get(1)).isNull(); + assertThat(nullableArray.get(2)).isEqualTo(3); + + // Test null array + assertThat(record.get(14)).isNull(); + } +} diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java index ba59b1b742..6443e85294 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java @@ -347,7 +347,7 @@ private List genKvRow( } private GenericRecord toRecord(long offset, GenericRow row, ChangeType changeType) { - return new GenericRecord(offset, System.currentTimeMillis(), changeType, row); + return new GenericRecord(offset, 1000000000L + offset, changeType, row); } private void createTable( diff --git a/website/docs/streaming-lakehouse/integrate-data-lakes/iceberg.md b/website/docs/streaming-lakehouse/integrate-data-lakes/iceberg.md index cd2926caf3..8cadf2c2af 100644 --- a/website/docs/streaming-lakehouse/integrate-data-lakes/iceberg.md +++ b/website/docs/streaming-lakehouse/integrate-data-lakes/iceberg.md @@ -500,6 +500,7 @@ When integrating with Iceberg, Fluss automatically converts between Fluss data t | TIMESTAMP WITH LOCAL TIMEZONE | TIMESTAMP (with timezone) | | | BINARY | BINARY | | | BYTES | BINARY | Converted to BINARY | +| ARRAY | LIST | | ## Maintenance and Optimization