From 4c8c8e66535a1aa63caa5b963796f10618871854 Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Thu, 11 Dec 2025 16:23:24 +0800 Subject: [PATCH 1/5] [flink] Flink Source need to check tableId on recovery in case that table is removed and created. --- .../source/reader/FlinkSourceSplitReader.java | 17 ++++++----------- .../reader/FlinkSourceSplitReaderTest.java | 19 +++++++++++++++++++ 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java index 9a2ab7aeba..c0c4acf45f 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java @@ -104,8 +104,7 @@ public class FlinkSourceSplitReader implements SplitReader lakeSource; - // table id, will be null when haven't received any split - private Long tableId; + private final Long tableId; private final Map stoppingOffsets; private LakeSplitReaderGenerator lakeSplitReaderGenerator; @@ -127,6 +126,7 @@ public FlinkSourceSplitReader( new FlinkMetricRegistry(flinkSourceReaderMetrics.getSourceReaderMetricGroup()); this.connection = ConnectionFactory.createConnection(flussConf, flinkMetricRegistry); this.table = connection.getTable(tablePath); + this.tableId = table.getTableInfo().getTableId(); this.sourceOutputType = sourceOutputType; this.boundedSplits = new ArrayDeque<>(); this.subscribedBuckets = new HashMap<>(); @@ -187,15 +187,10 @@ public void handleSplitsChanges(SplitsChange splitsChanges) { } for (SourceSplitBase sourceSplitBase : splitsChanges.splits()) { LOG.info("add split {}", sourceSplitBase.splitId()); - // init table id - if (tableId == null) { - tableId = sourceSplitBase.getTableBucket().getTableId(); - } else { - checkArgument( - tableId.equals(sourceSplitBase.getTableBucket().getTableId()), - "table id not equal across splits {}", - splitsChanges.splits()); - } + checkArgument( + tableId.equals(sourceSplitBase.getTableBucket().getTableId()), + "table id not equal across splits {}", + splitsChanges.splits()); if (sourceSplitBase.isHybridSnapshotLogSplit()) { HybridSnapshotLogSplit hybridSnapshotLogSplit = diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java index 44bd1e98d8..e98a062ba5 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java @@ -158,6 +158,25 @@ void testHandleHybridSnapshotLogSplitChangesAndFetch() throws Exception { } } + @Test + void testTableIdChange() throws Exception { + TablePath tablePath = TablePath.of(DEFAULT_DB, "test-only-snapshot-table"); + long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR); + try (FlinkSourceSplitReader splitReader = + createSplitReader(tablePath, DEFAULT_PK_TABLE_SCHEMA.getRowType())) { + assertThatThrownBy( + () -> + splitReader.handleSplitsChanges( + new SplitsAddition<>( + Collections.singletonList( + new LogSplit( + new TableBucket(tableId + 1, 0), + null, + 0))))) + .hasMessageContaining("table id not equal across splits"); + } + } + private Map> constructRecords( Map> rows) { Map> expectedRecords = new HashMap<>(); From 50247a4fe45ed7368ccff7f3a5b64ec770a19660 Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Mon, 29 Dec 2025 15:42:27 +0800 Subject: [PATCH 2/5] modified based on cr --- .../source/reader/FlinkSourceSplitReader.java | 6 +- .../FlinkTableSourceFailOverITCase.java | 215 ++++++++++-------- .../reader/FlinkSourceSplitReaderTest.java | 5 +- 3 files changed, 132 insertions(+), 94 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java index c0c4acf45f..dcf6f032c4 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java @@ -189,8 +189,10 @@ public void handleSplitsChanges(SplitsChange splitsChanges) { LOG.info("add split {}", sourceSplitBase.splitId()); checkArgument( tableId.equals(sourceSplitBase.getTableBucket().getTableId()), - "table id not equal across splits {}", - splitsChanges.splits()); + "table id %s is different with table id %s from splits, the same name table `%s` is dropped and recreated before the job restarted, which maybe cause inconsistency, please restarted the job without savepoint or checkpoint ", + tableId, + sourceSplitBase.getTableBucket().getTableId(), + table.getTableInfo().getTablePath()); if (sourceSplitBase.isHybridSnapshotLogSplit()) { HybridSnapshotLogSplit hybridSnapshotLogSplit = diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java index d228000ac4..52e91c020c 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java @@ -25,6 +25,7 @@ import org.apache.fluss.metadata.TablePath; import org.apache.fluss.server.testutils.FlussClusterExtension; import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.utils.types.Tuple2; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; @@ -60,6 +61,7 @@ import static org.apache.fluss.flink.utils.FlinkTestBase.dropPartitions; import static org.apache.fluss.flink.utils.FlinkTestBase.waitUntilPartitions; import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** IT case for flink table source fail over. */ abstract class FlinkTableSourceFailOverITCase { @@ -86,16 +88,27 @@ abstract class FlinkTableSourceFailOverITCase { org.apache.fluss.config.Configuration clientConf; ZooKeeperClient zkClient; Connection conn; + MiniClusterWithClientResource cluster; @BeforeEach - protected void beforeEach() { + protected void beforeEach() throws Exception { clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig(); zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(); conn = ConnectionFactory.createConnection(clientConf); + + cluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getFileBasedCheckpointsConfig(savepointDir)) + .setNumberTaskManagers(2) + .setNumberSlotsPerTaskManager(2) + .build()); + cluster.before(); } @AfterEach protected void afterEach() throws Exception { + cluster.after(); conn.close(); } @@ -121,100 +134,120 @@ private StreamTableEnvironment initTableEnvironment(@Nullable String savepointPa @Test void testRestore() throws Exception { - final int numTaskManagers = 2; - final int numSlotsPerTaskManager = 2; + TablePath tablePath = TablePath.of("fluss", "test_recreate_table"); + Tuple2> savepointPathAndResults = + runWithSavepoint(tablePath); + StreamTableEnvironment tEnv = initTableEnvironment(savepointPathAndResults.f0); + CloseableIterator results = savepointPathAndResults.f1; + TableResult insertResult = + tEnv.executeSql( + String.format( + "insert into result_table select * from %s", + tablePath.getTableName())); + // append a new row again to check if the source can restore the state correctly + Table table = conn.getTable(tablePath); + AppendWriter writer = table.newAppend().createWriter(); + writer.append(row(5, "5000")).get(); + List expected = new ArrayList<>(); + expected.add("+I[5, 5000]"); + assertResultsIgnoreOrder(results, expected, true); + // cancel the insert job + insertResult.getJobClient().get().cancel().get(); + } - // Start Flink - MiniClusterWithClientResource cluster = - new MiniClusterWithClientResource( - new MiniClusterResourceConfiguration.Builder() - .setConfiguration(getFileBasedCheckpointsConfig(savepointDir)) - .setNumberTaskManagers(numTaskManagers) - .setNumberSlotsPerTaskManager(numSlotsPerTaskManager) - .build()); + @Test + void testRestoreWithRecreateTable() throws Exception { + TablePath tablePath = TablePath.of("fluss", "test_recreate_table"); + Tuple2> savepointPathAndResults = + runWithSavepoint(tablePath); + StreamTableEnvironment tEnv = initTableEnvironment(savepointPathAndResults.f0); - cluster.before(); + // drop and recreate the table. + tEnv.executeSql(String.format("drop table %s", tablePath.getTableName())); + tEnv.executeSql( + String.format("create table %s (a int, b varchar)", tablePath.getTableName())); + + TableResult insertResult = + tEnv.executeSql( + String.format( + "insert into result_table select * from %s", + tablePath.getTableName())); + assertThatThrownBy(() -> insertResult.getJobClient().get().getJobExecutionResult().get()) + .rootCause() + .hasMessageContaining( + "the same name table `fluss.test_recreate_table` is dropped and recreated before the job restarted, which maybe cause inconsistency, please restarted the job without savepoint or checkpoint"); + } + + private Tuple2> runWithSavepoint(TablePath tablePath) + throws Exception { + StreamTableEnvironment tEnv = initTableEnvironment(null); + tEnv.executeSql( + String.format( + "create table %s (" + + "a int, b varchar" + + ") partitioned by (b) " + + "with (" + + "'table.auto-partition.enabled' = 'true'," + + "'table.auto-partition.time-unit' = 'year'," + + "'scan.partition.discovery.interval' = '100ms'," + + "'table.auto-partition.num-precreate' = '1')", + tablePath.getTableName())); + tEnv.executeSql("create table result_table (a int, b varchar)"); + + // create a partition manually + createPartitions(zkClient, tablePath, Collections.singletonList("4000")); + waitUntilPartitions(zkClient, tablePath, 2); - try { - StreamTableEnvironment tEnv = initTableEnvironment(null); - tEnv.executeSql( - "create table test_partitioned (" - + "a int, b varchar" - + ") partitioned by (b) " - + "with (" - + "'table.auto-partition.enabled' = 'true'," - + "'table.auto-partition.time-unit' = 'year'," - + "'scan.partition.discovery.interval' = '100ms'," - + "'table.auto-partition.num-precreate' = '1')"); - tEnv.executeSql("create table result_table (a int, b varchar)"); - - TablePath tablePath = TablePath.of("fluss", "test_partitioned"); - - // create a partition manually - createPartitions(zkClient, tablePath, Collections.singletonList("4000")); - waitUntilPartitions(zkClient, tablePath, 2); - - // append 3 records for each partition - Table table = conn.getTable(tablePath); - AppendWriter writer = table.newAppend().createWriter(); - String thisYear = String.valueOf(Year.now().getValue()); - List expected = new ArrayList<>(); - for (int i = 0; i < 3; i++) { - writer.append(row(i, thisYear)); - writer.append(row(i, "4000")); - expected.add("+I[" + i + ", " + thisYear + "]"); - expected.add("+I[" + i + ", 4000]"); - } - writer.flush(); - - // execute the query to fetch logs from the table - TableResult insertResult = - tEnv.executeSql("insert into result_table select * from test_partitioned"); - // we have to create a intermediate table to collect result, - // because CollectSink can't be restored from savepoint - CloseableIterator results = - tEnv.executeSql("select * from result_table").collect(); - assertResultsIgnoreOrder(results, expected, false); - expected.clear(); - - // drop the partition manually - dropPartitions(zkClient, tablePath, Collections.singleton("4000")); - waitUntilPartitions(zkClient, tablePath, 1); - - // create a new partition again and append records into it - createPartitions(zkClient, tablePath, Collections.singletonList("5000")); - waitUntilPartitions(zkClient, tablePath, 2); - writer.append(row(4, "5000")).get(); - expected.add("+I[4, 5000]"); - // if the source subscribes the new partition successfully, - // it should have removed the old partition successfully - assertResultsIgnoreOrder(results, expected, false); - expected.clear(); - - // now, stop the job with save point - String savepointPath = - insertResult - .getJobClient() - .get() - .stopWithSavepoint( - false, - savepointDir.getAbsolutePath(), - SavepointFormatType.CANONICAL) - .get(); - - tEnv = initTableEnvironment(savepointPath); - insertResult = - tEnv.executeSql("insert into result_table select * from test_partitioned"); - // append a new row again to check if the source can restore the state correctly - writer.append(row(5, "5000")).get(); - expected.add("+I[5, 5000]"); - assertResultsIgnoreOrder(results, expected, true); - // cancel the insert job - insertResult.getJobClient().get().cancel().get(); - } finally { - // stop the cluster and thereby cancel the job - cluster.after(); + // append 3 records for each partition + Table table = conn.getTable(tablePath); + AppendWriter writer = table.newAppend().createWriter(); + String thisYear = String.valueOf(Year.now().getValue()); + List expected = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + writer.append(row(i, thisYear)); + writer.append(row(i, "4000")); + expected.add("+I[" + i + ", " + thisYear + "]"); + expected.add("+I[" + i + ", 4000]"); } + writer.flush(); + + // execute the query to fetch logs from the table + TableResult insertResult = + tEnv.executeSql( + String.format( + "insert into result_table select * from %s", + tablePath.getTableName())); + // we have to create an intermediate table to collect result, + // because CollectSink can't be restored from savepoint + CloseableIterator results = tEnv.executeSql("select * from result_table").collect(); + assertResultsIgnoreOrder(results, expected, false); + expected.clear(); + + // drop the partition manually + dropPartitions(zkClient, tablePath, Collections.singleton("4000")); + waitUntilPartitions(zkClient, tablePath, 1); + + // create a new partition again and append records into it + createPartitions(zkClient, tablePath, Collections.singletonList("5000")); + waitUntilPartitions(zkClient, tablePath, 2); + writer.append(row(4, "5000")).get(); + expected.add("+I[4, 5000]"); + // if the source subscribes the new partition successfully, + // it should have removed the old partition successfully + assertResultsIgnoreOrder(results, expected, false); + expected.clear(); + + // now, stop the job with save point + String savepointPath = + insertResult + .getJobClient() + .get() + .stopWithSavepoint( + false, + savepointDir.getAbsolutePath(), + SavepointFormatType.CANONICAL) + .get(); + return Tuple2.of(savepointPath, results); } private static Configuration getFileBasedCheckpointsConfig(File savepointDir) { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java index e98a062ba5..a691deb1ff 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java @@ -173,7 +173,10 @@ void testTableIdChange() throws Exception { new TableBucket(tableId + 1, 0), null, 0))))) - .hasMessageContaining("table id not equal across splits"); + .hasMessageContaining( + String.format( + "the same name table `%s` is dropped and recreated before the job restarted, which maybe cause inconsistency, please restarted the job without savepoint or checkpoint", + tablePath)); } } From 5b50b0c5ff48c134ff074c285e1225e7e01fb7b9 Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Mon, 29 Dec 2025 18:03:34 +0800 Subject: [PATCH 3/5] fix test. --- .../fluss/flink/source/FlinkTableSourceFailOverITCase.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java index 52e91c020c..04c49191ba 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java @@ -165,7 +165,9 @@ void testRestoreWithRecreateTable() throws Exception { // drop and recreate the table. tEnv.executeSql(String.format("drop table %s", tablePath.getTableName())); tEnv.executeSql( - String.format("create table %s (a int, b varchar)", tablePath.getTableName())); + String.format( + "create table %s (" + "a int, b varchar" + ") partitioned by (b) ", + tablePath.getTableName())); TableResult insertResult = tEnv.executeSql( From eb78f84a35d79825f70c2141574ebd5ae6623cd7 Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Mon, 29 Dec 2025 19:14:26 +0800 Subject: [PATCH 4/5] improve error message --- .../fluss/flink/source/reader/FlinkSourceSplitReader.java | 5 ++++- .../fluss/flink/source/FlinkTableSourceFailOverITCase.java | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java index dcf6f032c4..bd5305a363 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java @@ -189,7 +189,10 @@ public void handleSplitsChanges(SplitsChange splitsChanges) { LOG.info("add split {}", sourceSplitBase.splitId()); checkArgument( tableId.equals(sourceSplitBase.getTableBucket().getTableId()), - "table id %s is different with table id %s from splits, the same name table `%s` is dropped and recreated before the job restarted, which maybe cause inconsistency, please restarted the job without savepoint or checkpoint ", + "Table ID mismatch: expected %s, but split contains %s for table '%s'. " + + "This usually happens when a table with the same name was dropped and recreated " + + "between job runs, causing metadata inconsistency. " + + "To resolve this, please restart the job **without** using the previous savepoint or checkpoint.", tableId, sourceSplitBase.getTableBucket().getTableId(), table.getTableInfo().getTablePath()); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java index 04c49191ba..916b10654f 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java @@ -177,7 +177,10 @@ void testRestoreWithRecreateTable() throws Exception { assertThatThrownBy(() -> insertResult.getJobClient().get().getJobExecutionResult().get()) .rootCause() .hasMessageContaining( - "the same name table `fluss.test_recreate_table` is dropped and recreated before the job restarted, which maybe cause inconsistency, please restarted the job without savepoint or checkpoint"); + "Table ID mismatch: expected 2, but split contains 0 for table 'fluss.test_recreate_table'. " + + "This usually happens when a table with the same name was dropped and recreated between job runs, " + + "causing metadata inconsistency. To resolve this, please restart the job **without** " + + "using the previous savepoint or checkpoint."); } private Tuple2> runWithSavepoint(TablePath tablePath) From c5dce01550b20735edbdb4b98f80c2e9dff2e536 Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Mon, 29 Dec 2025 20:16:38 +0800 Subject: [PATCH 5/5] fix test case --- .../flink/source/reader/FlinkSourceSplitReaderTest.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java index a691deb1ff..ffdd956529 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java @@ -174,9 +174,10 @@ void testTableIdChange() throws Exception { null, 0))))) .hasMessageContaining( - String.format( - "the same name table `%s` is dropped and recreated before the job restarted, which maybe cause inconsistency, please restarted the job without savepoint or checkpoint", - tablePath)); + "Table ID mismatch: expected 0, but split contains 1 for table 'test-flink-db.test-only-snapshot-table'. " + + "This usually happens when a table with the same name was dropped and recreated between job runs, " + + "causing metadata inconsistency. To resolve this, please restart the job **without** using " + + "the previous savepoint or checkpoint."); } }