Skip to content

Conversation

@XuQianJin-Stars
Copy link
Contributor

Purpose

Linked issue: close #1973

Introduce MAP type support for INDEXED format, achieving feature parity with ARROW and COMPACTED formats for MAP data type handling.

Brief change log

  • Added readMap() method to IndexedRowReader for MAP type deserialization, following the same pattern as readArray()
  • Removed UnsupportedOperationException for MAP type in IndexedRowReader.createFieldReader()
  • Added missing InternalMap import to IndexedRowReader
  • Updated BinaryWriterTest.testCreateValueSetterForMapThrowsException() to testValueSetterWithMapType() - changed from exception test to positive validation test
  • Extended IndexedRowTest.assertAllTypeEquals() to include MAP type assertions (validates size, key array, value array, and null handling)
  • Extended IndexedRowReaderTest.assertAllTypeEqualsForReader() to verify MAP reading

Tests

Unit Tests:

  • IndexedRowTest (7 tests) - Validates MAP read/write operations with IndexedRow
  • IndexedRowReaderTest (2 tests) - Validates MAP deserialization with IndexedRowReader
  • BinaryWriterTest (18 tests) - Validates MAP type ValueWriter creation and usage
  • ArrowReaderWriterTest (2 tests) - Confirms ARROW format MAP support continues to work
  • CompactedRowTest (20 tests) - Confirms COMPACTED format MAP support continues to work

Test Results: All 49 tests passed successfully (0 failures, 0 errors, 0 skipped)

Test Command:
mvn test -pl fluss-common -Dtest=IndexedRowTest,IndexedRowReaderTest,BinaryWriterTest,ArrowReaderWriterTest,CompactedRowTest -Dcheckstyle.skip=true -Dspotless.check.skip=true### API and Format

API: No breaking changes. This is purely additive functionality that removes an UnsupportedOperationException and enables existing MAP type infrastructure for INDEXED format.

Storage Format: No format changes. The MAP storage format in INDEXED rows was already defined and uses the same BinaryMap serialization mechanism as other formats. This change only enables reading MAP data that was already writeable.

Documentation

No documentation changes required. This change brings INDEXED format to feature parity with ARROW and COMPACTED formats for MAP type support. MAP type usage is already documented in existing Fluss documentation. The change completes internal implementation of existing functionality rather than introducing new user-facing features.

@XuQianJin-Stars XuQianJin-Stars force-pushed the feature/issue-1973-support-map-format branch 3 times, most recently from 26bf237 to ed1f8fc Compare December 24, 2025 15:57
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces MAP type support for the INDEXED row format, achieving feature parity with ARROW and COMPACTED formats. The implementation adds MAP serialization/deserialization capabilities across the Fluss common layer, Flink integration, and lake integrations (Paimon, Iceberg placeholder).

Key Changes:

  • Added MAP type support to IndexedRow format with IndexedRowReader.readMap() and IndexedRowWriter.writeMap() methods
  • Introduced GenericMap, BinaryMap, and format-specific map implementations (IndexedMap, CompactedMap, AlignedMap) with corresponding serializers
  • Extended Flink, Paimon, and Arrow integrations to handle MAP type conversions and vector operations

Reviewed changes

Copilot reviewed 66 out of 66 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
IndexedRowReader.java Added readMap() method and MAP case in field reader factory
IndexedRowWriter.java Added writeMap() method for MAP serialization
BinaryMap.java New binary representation of MAP type with key/value arrays
GenericMap.java Generic MAP implementation wrapping Java maps
MapSerializer.java Serializer for converting InternalMap to BinaryMap
FlinkAsFlussMap.java Adapter for Flink MapData to Fluss InternalMap
PaimonMapAsFlussMap.java Adapter for Paimon map to Fluss InternalMap
ArrowMapWriter.java / ArrowMapColumnVector.java Arrow format MAP support
Various test files Comprehensive test coverage for MAP operations

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Object arr, int offset, int length, int elementSize) {
final long headerInBytes = calculateHeaderInBytes(length);
final long valueRegionInBytes = elementSize * length;
final long valueRegionInBytes = (long) elementSize * length;
Copy link

Copilot AI Dec 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cast to (long) is applied only to elementSize, not to the entire multiplication result. This could still cause integer overflow when elementSize * length exceeds Integer.MAX_VALUE before the cast is applied. The cast should wrap the entire multiplication expression: (long) elementSize * length.

Copilot uses AI. Check for mistakes.

java.util.Map<Object, Object> javaMap = new java.util.LinkedHashMap<>();
for (int i = 0; i < values.length; i += 2) {
javaMap.put(values[i], (i + 1 < values.length) ? values[i + 1] : null);
Copy link

Copilot AI Dec 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test is always true, because of this condition.

Suggested change
javaMap.put(values[i], (i + 1 < values.length) ? values[i + 1] : null);
javaMap.put(values[i], values[i + 1]);

Copilot uses AI. Check for mistakes.
* @since 0.9
*/
@PublicEvolving
public class BinaryMap extends BinarySection implements InternalMap {
Copy link

Copilot AI Dec 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Class BinaryMap overrides hashCode but not equals.

Copilot uses AI. Check for mistakes.
Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @XuQianJin-Stars for the contribution. I left some comments.

Comment on lines +359 to +370

assertThat(row.getMap(4)).isInstanceOf(GenericMap.class);
GenericMap simpleMap = (GenericMap) row.getMap(4);
assertThat(simpleMap.size()).isGreaterThan(0);

assertThat(row.getMap(5)).isInstanceOf(GenericMap.class);
GenericMap nestedMap = (GenericMap) row.getMap(5);
assertThat(nestedMap.size()).isGreaterThan(0);

assertThat(row.getMap(6)).isInstanceOf(GenericMap.class);
GenericMap mapWithArray = (GenericMap) row.getMap(6);
assertThat(mapWithArray.size()).isGreaterThan(0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this approach doesn’t verify the actual field values, and it significantly inflates the line count, making the code harder to maintain.

Moreover, we’ll soon need to add assertions for column projection here as well.

My suggestion: prepare a list of expected rows upfront and perform a direct assert equals comparison with the actual result.

This keeps the test concise, expressive, and much easier to maintain.

public InternalMap getMap(int pos) {
assertIndexIsValid(pos);
int fieldOffset = getFieldOffset(pos);
final long offsetAndSize = BinarySegmentUtils.getLong(segments, fieldOffset);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep align with others to use final long offsetAndSize = segments[0].getLong(fieldOffset) as we don't support multi-segments yet.

Comment on lines +46 to +56
private void init() {
if (!inited) {
FieldVector mapVector = ((MapVector) vector).getDataVector();
StructVector structVector = (StructVector) mapVector;
FieldVector keyVector = structVector.getChildrenFromFields().get(0);
FieldVector valueVector = structVector.getChildrenFromFields().get(1);
this.keyColumnVector = ArrowUtils.createArrowColumnVector(keyVector, keyType);
this.valueColumnVector = ArrowUtils.createArrowColumnVector(valueVector, valueType);
inited = true;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initialize in the constructor and remove this method.

Comment on lines +68 to +76
public ColumnVector getKeyColumnVector() {
init();
return keyColumnVector;
}

public ColumnVector getValueColumnVector() {
init();
return valueColumnVector;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsed, remove

@Override
public InternalMap getMap(int i) {
init();
MapVector mapVector = (MapVector) vector;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Direct store vector as MapVector in ArrowMapColumnVector to avoid casting per record.

BinaryWriter.ValueWriter setter =
BinaryWriter.createValueWriter(
DataTypes.MAP(DataTypes.INT().copy(false), DataTypes.STRING()),
BinaryRow.BinaryRowFormat.ALIGNED);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use COMPACTED format in tests by default as it is more widely used than ALIGNED and INDEXED.

}

@Test
public void testSize() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove! as this has been tested in other test methods.

InternalArray keyArray = flussMap.keyArray();
InternalArray valueArray = flussMap.valueArray();
int size = flussMap.size();
java.util.Map<Object, Object> javaMap = new java.util.HashMap<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}

@Test
void testMapTypesInLogTable() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that the current tests not cover all the cases, like missing PK table tests for Map and Row type, missing partitioned table test for Row type.

Could you merge the map type tests and row type tests into the existing testArrayTypesInPrimaryKeyTable, testArrayTypesInPartitionedLogTable, and testArrayTypesInLogTable, and rename them to testComplexTypesInPrimaryKeyTable, testComplexTypesInPartitionedLogTable, and testComplexTypesInLogTable? This can also reduce the test time.

Besides, could you add more nested type tests? Like

  • array<map<string, double>>
  • row<map<int, array<float>>
  • map<bigint, row<string, array<string>, array<int>>


// map
assertThat(flinkRow.getMap(22)).isNotNull();
assertThat(flinkRow.getMap(22).size()).isEqualTo(3);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert map values.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support Map type in log table (Arrow row format)

2 participants