-
Notifications
You must be signed in to change notification settings - Fork 462
[common] Introduce MAP type for ARROW, COMPACTED and INDEXED formats #2190
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[common] Introduce MAP type for ARROW, COMPACTED and INDEXED formats #2190
Conversation
26bf237 to
ed1f8fc
Compare
ed1f8fc to
a7642db
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces MAP type support for the INDEXED row format, achieving feature parity with ARROW and COMPACTED formats. The implementation adds MAP serialization/deserialization capabilities across the Fluss common layer, Flink integration, and lake integrations (Paimon, Iceberg placeholder).
Key Changes:
- Added MAP type support to IndexedRow format with
IndexedRowReader.readMap()andIndexedRowWriter.writeMap()methods - Introduced
GenericMap,BinaryMap, and format-specific map implementations (IndexedMap, CompactedMap, AlignedMap) with corresponding serializers - Extended Flink, Paimon, and Arrow integrations to handle MAP type conversions and vector operations
Reviewed changes
Copilot reviewed 66 out of 66 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
IndexedRowReader.java |
Added readMap() method and MAP case in field reader factory |
IndexedRowWriter.java |
Added writeMap() method for MAP serialization |
BinaryMap.java |
New binary representation of MAP type with key/value arrays |
GenericMap.java |
Generic MAP implementation wrapping Java maps |
MapSerializer.java |
Serializer for converting InternalMap to BinaryMap |
FlinkAsFlussMap.java |
Adapter for Flink MapData to Fluss InternalMap |
PaimonMapAsFlussMap.java |
Adapter for Paimon map to Fluss InternalMap |
ArrowMapWriter.java / ArrowMapColumnVector.java |
Arrow format MAP support |
| Various test files | Comprehensive test coverage for MAP operations |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Object arr, int offset, int length, int elementSize) { | ||
| final long headerInBytes = calculateHeaderInBytes(length); | ||
| final long valueRegionInBytes = elementSize * length; | ||
| final long valueRegionInBytes = (long) elementSize * length; |
Copilot
AI
Dec 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cast to (long) is applied only to elementSize, not to the entire multiplication result. This could still cause integer overflow when elementSize * length exceeds Integer.MAX_VALUE before the cast is applied. The cast should wrap the entire multiplication expression: (long) elementSize * length.
|
|
||
| java.util.Map<Object, Object> javaMap = new java.util.LinkedHashMap<>(); | ||
| for (int i = 0; i < values.length; i += 2) { | ||
| javaMap.put(values[i], (i + 1 < values.length) ? values[i + 1] : null); |
Copilot
AI
Dec 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test is always true, because of this condition.
| javaMap.put(values[i], (i + 1 < values.length) ? values[i + 1] : null); | |
| javaMap.put(values[i], values[i + 1]); |
| * @since 0.9 | ||
| */ | ||
| @PublicEvolving | ||
| public class BinaryMap extends BinarySection implements InternalMap { |
Copilot
AI
Dec 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Class BinaryMap overrides hashCode but not equals.
wuchong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @XuQianJin-Stars for the contribution. I left some comments.
|
|
||
| assertThat(row.getMap(4)).isInstanceOf(GenericMap.class); | ||
| GenericMap simpleMap = (GenericMap) row.getMap(4); | ||
| assertThat(simpleMap.size()).isGreaterThan(0); | ||
|
|
||
| assertThat(row.getMap(5)).isInstanceOf(GenericMap.class); | ||
| GenericMap nestedMap = (GenericMap) row.getMap(5); | ||
| assertThat(nestedMap.size()).isGreaterThan(0); | ||
|
|
||
| assertThat(row.getMap(6)).isInstanceOf(GenericMap.class); | ||
| GenericMap mapWithArray = (GenericMap) row.getMap(6); | ||
| assertThat(mapWithArray.size()).isGreaterThan(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this approach doesn’t verify the actual field values, and it significantly inflates the line count, making the code harder to maintain.
Moreover, we’ll soon need to add assertions for column projection here as well.
My suggestion: prepare a list of expected rows upfront and perform a direct assert equals comparison with the actual result.
This keeps the test concise, expressive, and much easier to maintain.
| public InternalMap getMap(int pos) { | ||
| assertIndexIsValid(pos); | ||
| int fieldOffset = getFieldOffset(pos); | ||
| final long offsetAndSize = BinarySegmentUtils.getLong(segments, fieldOffset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep align with others to use final long offsetAndSize = segments[0].getLong(fieldOffset) as we don't support multi-segments yet.
| private void init() { | ||
| if (!inited) { | ||
| FieldVector mapVector = ((MapVector) vector).getDataVector(); | ||
| StructVector structVector = (StructVector) mapVector; | ||
| FieldVector keyVector = structVector.getChildrenFromFields().get(0); | ||
| FieldVector valueVector = structVector.getChildrenFromFields().get(1); | ||
| this.keyColumnVector = ArrowUtils.createArrowColumnVector(keyVector, keyType); | ||
| this.valueColumnVector = ArrowUtils.createArrowColumnVector(valueVector, valueType); | ||
| inited = true; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initialize in the constructor and remove this method.
| public ColumnVector getKeyColumnVector() { | ||
| init(); | ||
| return keyColumnVector; | ||
| } | ||
|
|
||
| public ColumnVector getValueColumnVector() { | ||
| init(); | ||
| return valueColumnVector; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unsed, remove
| @Override | ||
| public InternalMap getMap(int i) { | ||
| init(); | ||
| MapVector mapVector = (MapVector) vector; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Direct store vector as MapVector in ArrowMapColumnVector to avoid casting per record.
| BinaryWriter.ValueWriter setter = | ||
| BinaryWriter.createValueWriter( | ||
| DataTypes.MAP(DataTypes.INT().copy(false), DataTypes.STRING()), | ||
| BinaryRow.BinaryRowFormat.ALIGNED); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use COMPACTED format in tests by default as it is more widely used than ALIGNED and INDEXED.
| } | ||
|
|
||
| @Test | ||
| public void testSize() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove! as this has been tested in other test methods.
| InternalArray keyArray = flussMap.keyArray(); | ||
| InternalArray valueArray = flussMap.valueArray(); | ||
| int size = flussMap.size(); | ||
| java.util.Map<Object, Object> javaMap = new java.util.HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| } | ||
|
|
||
| @Test | ||
| void testMapTypesInLogTable() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found that the current tests not cover all the cases, like missing PK table tests for Map and Row type, missing partitioned table test for Row type.
Could you merge the map type tests and row type tests into the existing testArrayTypesInPrimaryKeyTable, testArrayTypesInPartitionedLogTable, and testArrayTypesInLogTable, and rename them to testComplexTypesInPrimaryKeyTable, testComplexTypesInPartitionedLogTable, and testComplexTypesInLogTable? This can also reduce the test time.
Besides, could you add more nested type tests? Like
array<map<string, double>>row<map<int, array<float>>map<bigint, row<string, array<string>, array<int>>
|
|
||
| // map | ||
| assertThat(flinkRow.getMap(22)).isNotNull(); | ||
| assertThat(flinkRow.getMap(22).size()).isEqualTo(3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert map values.
Purpose
Linked issue: close #1973
Introduce MAP type support for INDEXED format, achieving feature parity with ARROW and COMPACTED formats for MAP data type handling.
Brief change log
readMap()method toIndexedRowReaderfor MAP type deserialization, following the same pattern asreadArray()UnsupportedOperationExceptionfor MAP type inIndexedRowReader.createFieldReader()InternalMapimport toIndexedRowReaderBinaryWriterTest.testCreateValueSetterForMapThrowsException()totestValueSetterWithMapType()- changed from exception test to positive validation testIndexedRowTest.assertAllTypeEquals()to include MAP type assertions (validates size, key array, value array, and null handling)IndexedRowReaderTest.assertAllTypeEqualsForReader()to verify MAP readingTests
Unit Tests:
IndexedRowTest(7 tests) - Validates MAP read/write operations with IndexedRowIndexedRowReaderTest(2 tests) - Validates MAP deserialization with IndexedRowReaderBinaryWriterTest(18 tests) - Validates MAP type ValueWriter creation and usageArrowReaderWriterTest(2 tests) - Confirms ARROW format MAP support continues to workCompactedRowTest(20 tests) - Confirms COMPACTED format MAP support continues to workTest Results: All 49 tests passed successfully (0 failures, 0 errors, 0 skipped)
Test Command:
mvn test -pl fluss-common -Dtest=IndexedRowTest,IndexedRowReaderTest,BinaryWriterTest,ArrowReaderWriterTest,CompactedRowTest -Dcheckstyle.skip=true -Dspotless.check.skip=true### API and Format
API: No breaking changes. This is purely additive functionality that removes an
UnsupportedOperationExceptionand enables existing MAP type infrastructure for INDEXED format.Storage Format: No format changes. The MAP storage format in INDEXED rows was already defined and uses the same
BinaryMapserialization mechanism as other formats. This change only enables reading MAP data that was already writeable.Documentation
No documentation changes required. This change brings INDEXED format to feature parity with ARROW and COMPACTED formats for MAP type support. MAP type usage is already documented in existing Fluss documentation. The change completes internal implementation of existing functionality rather than introducing new user-facing features.