[lake/flink] Fix lake tiering doesn't work in flink 2.2#2657
[lake/flink] Fix lake tiering doesn't work in flink 2.2#2657wuchong merged 2 commits intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR updates Fluss’s Flink tiering integration to remain compatible across Flink versions (notably Flink 2.2), primarily by adapting TypeInformation#createSerializer to Flink’s newer SerializerConfig-based signature and by adjusting IT test class structure to run per supported Flink version.
Changes:
- Update several
TypeInformationimplementations to supportcreateSerializer(SerializerConfig)while retaining olderExecutionConfigentrypoints. - Make the shared
TieringITCaseabstract and add version-specific concrete ITCase subclasses (1.18/1.19/1.20/2.2) to ensure test discovery/execution per module. - Add a Flink 1.18-only
SerializerConfigplaceholder type to avoid linkage issues when running shaded common code on Flink 1.18.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringITCase.java | Make base tiering IT case abstract so it can be inherited by per-version concrete test classes. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultTypeInfo.java | Add createSerializer(SerializerConfig) to align with newer Flink API while keeping legacy path. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/CommittableMessageTypeInfo.java | Same serializer-signature compatibility update for committer messages. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordTypeInformation.java | Add createSerializer(SerializerConfig) alongside deprecated ExecutionConfig serializer creation. |
| fluss-flink/fluss-flink-2.2/src/test/java/org/apache/fluss/flink/tiering/Flink22TieringITCase.java | New Flink 2.2 concrete tiering IT test entrypoint. |
| fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/tiering/Flink20TieringITCase.java | New Flink 1.20 concrete tiering IT test entrypoint. |
| fluss-flink/fluss-flink-1.19/src/test/java/org/apache/fluss/flink/tiering/Flink19TieringITCase.java | New Flink 1.19 concrete tiering IT test entrypoint. |
| fluss-flink/fluss-flink-1.18/src/test/java/org/apache/fluss/flink/tiering/Flink18TieringITCase.java | New Flink 1.18 concrete tiering IT test entrypoint. |
| fluss-flink/fluss-flink-1.18/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java | Add Flink 1.18 placeholder SerializerConfig to prevent classloading/linkage issues. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...mon/src/main/java/org/apache/fluss/flink/sink/shuffle/StatisticsOrRecordTypeInformation.java
Outdated
Show resolved
Hide resolved
...uss-flink-1.18/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java
Outdated
Show resolved
Hide resolved
...mmon/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultTypeInfo.java
Outdated
Show resolved
Hide resolved
...ommon/src/main/java/org/apache/fluss/flink/tiering/committer/CommittableMessageTypeInfo.java
Outdated
Show resolved
Hide resolved
wuchong
left a comment
There was a problem hiding this comment.
I suggest to introduce a TypeInformationAdapter that handles createSerializer for different flink versions, rather than relying on overriding the SerializerConfig class.
Thanks for the suggestion. Working on it. |
Purpose
Linked issue: close #2658
Brief change log
implements createSerializer(org.apache.flink.api.common.serialization.SerializerConfig)
Tests
API and Format
Documentation