Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ public class FlinkSourceSplitReader implements SplitReader<RecordAndPos, SourceS

@Nullable private final LakeSource<LakeSplit> lakeSource;

// table id, will be null when haven't received any split
private Long tableId;
private final Long tableId;

private final Map<TableBucket, Long> stoppingOffsets;
private LakeSplitReaderGenerator lakeSplitReaderGenerator;
Expand All @@ -127,6 +126,7 @@ public FlinkSourceSplitReader(
new FlinkMetricRegistry(flinkSourceReaderMetrics.getSourceReaderMetricGroup());
this.connection = ConnectionFactory.createConnection(flussConf, flinkMetricRegistry);
this.table = connection.getTable(tablePath);
this.tableId = table.getTableInfo().getTableId();
this.sourceOutputType = sourceOutputType;
this.boundedSplits = new ArrayDeque<>();
this.subscribedBuckets = new HashMap<>();
Expand Down Expand Up @@ -187,15 +187,15 @@ public void handleSplitsChanges(SplitsChange<SourceSplitBase> splitsChanges) {
}
for (SourceSplitBase sourceSplitBase : splitsChanges.splits()) {
LOG.info("add split {}", sourceSplitBase.splitId());
// init table id
if (tableId == null) {
tableId = sourceSplitBase.getTableBucket().getTableId();
} else {
checkArgument(
tableId.equals(sourceSplitBase.getTableBucket().getTableId()),
"table id not equal across splits {}",
splitsChanges.splits());
}
checkArgument(
tableId.equals(sourceSplitBase.getTableBucket().getTableId()),
"Table ID mismatch: expected %s, but split contains %s for table '%s'. "
+ "This usually happens when a table with the same name was dropped and recreated "
+ "between job runs, causing metadata inconsistency. "
+ "To resolve this, please restart the job **without** using the previous savepoint or checkpoint.",
tableId,
sourceSplitBase.getTableBucket().getTableId(),
table.getTableInfo().getTablePath());

if (sourceSplitBase.isHybridSnapshotLogSplit()) {
HybridSnapshotLogSplit hybridSnapshotLogSplit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.server.testutils.FlussClusterExtension;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.utils.types.Tuple2;

import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -60,6 +61,7 @@
import static org.apache.fluss.flink.utils.FlinkTestBase.dropPartitions;
import static org.apache.fluss.flink.utils.FlinkTestBase.waitUntilPartitions;
import static org.apache.fluss.testutils.DataTestUtils.row;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** IT case for flink table source fail over. */
abstract class FlinkTableSourceFailOverITCase {
Expand All @@ -86,16 +88,27 @@ abstract class FlinkTableSourceFailOverITCase {
org.apache.fluss.config.Configuration clientConf;
ZooKeeperClient zkClient;
Connection conn;
MiniClusterWithClientResource cluster;

@BeforeEach
protected void beforeEach() {
protected void beforeEach() throws Exception {
clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
conn = ConnectionFactory.createConnection(clientConf);

cluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getFileBasedCheckpointsConfig(savepointDir))
.setNumberTaskManagers(2)
.setNumberSlotsPerTaskManager(2)
.build());
cluster.before();
}

@AfterEach
protected void afterEach() throws Exception {
cluster.after();
conn.close();
}

Expand All @@ -121,100 +134,125 @@ private StreamTableEnvironment initTableEnvironment(@Nullable String savepointPa

@Test
void testRestore() throws Exception {
final int numTaskManagers = 2;
final int numSlotsPerTaskManager = 2;
TablePath tablePath = TablePath.of("fluss", "test_recreate_table");
Tuple2<String, CloseableIterator<Row>> savepointPathAndResults =
runWithSavepoint(tablePath);
StreamTableEnvironment tEnv = initTableEnvironment(savepointPathAndResults.f0);
CloseableIterator<Row> results = savepointPathAndResults.f1;
TableResult insertResult =
tEnv.executeSql(
String.format(
"insert into result_table select * from %s",
tablePath.getTableName()));
// append a new row again to check if the source can restore the state correctly
Table table = conn.getTable(tablePath);
AppendWriter writer = table.newAppend().createWriter();
writer.append(row(5, "5000")).get();
List<String> expected = new ArrayList<>();
expected.add("+I[5, 5000]");
assertResultsIgnoreOrder(results, expected, true);
// cancel the insert job
insertResult.getJobClient().get().cancel().get();
}

// Start Flink
MiniClusterWithClientResource cluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getFileBasedCheckpointsConfig(savepointDir))
.setNumberTaskManagers(numTaskManagers)
.setNumberSlotsPerTaskManager(numSlotsPerTaskManager)
.build());
@Test
void testRestoreWithRecreateTable() throws Exception {
TablePath tablePath = TablePath.of("fluss", "test_recreate_table");
Tuple2<String, CloseableIterator<Row>> savepointPathAndResults =
runWithSavepoint(tablePath);
StreamTableEnvironment tEnv = initTableEnvironment(savepointPathAndResults.f0);

cluster.before();
// drop and recreate the table.
tEnv.executeSql(String.format("drop table %s", tablePath.getTableName()));
tEnv.executeSql(
String.format(
"create table %s (" + "a int, b varchar" + ") partitioned by (b) ",
tablePath.getTableName()));

TableResult insertResult =
tEnv.executeSql(
String.format(
"insert into result_table select * from %s",
tablePath.getTableName()));
assertThatThrownBy(() -> insertResult.getJobClient().get().getJobExecutionResult().get())
.rootCause()
.hasMessageContaining(
"Table ID mismatch: expected 2, but split contains 0 for table 'fluss.test_recreate_table'. "
+ "This usually happens when a table with the same name was dropped and recreated between job runs, "
+ "causing metadata inconsistency. To resolve this, please restart the job **without** "
+ "using the previous savepoint or checkpoint.");
}

private Tuple2<String, CloseableIterator<Row>> runWithSavepoint(TablePath tablePath)
throws Exception {
StreamTableEnvironment tEnv = initTableEnvironment(null);
tEnv.executeSql(
String.format(
"create table %s ("
+ "a int, b varchar"
+ ") partitioned by (b) "
+ "with ("
+ "'table.auto-partition.enabled' = 'true',"
+ "'table.auto-partition.time-unit' = 'year',"
+ "'scan.partition.discovery.interval' = '100ms',"
+ "'table.auto-partition.num-precreate' = '1')",
tablePath.getTableName()));
tEnv.executeSql("create table result_table (a int, b varchar)");

// create a partition manually
createPartitions(zkClient, tablePath, Collections.singletonList("4000"));
waitUntilPartitions(zkClient, tablePath, 2);

try {
StreamTableEnvironment tEnv = initTableEnvironment(null);
tEnv.executeSql(
"create table test_partitioned ("
+ "a int, b varchar"
+ ") partitioned by (b) "
+ "with ("
+ "'table.auto-partition.enabled' = 'true',"
+ "'table.auto-partition.time-unit' = 'year',"
+ "'scan.partition.discovery.interval' = '100ms',"
+ "'table.auto-partition.num-precreate' = '1')");
tEnv.executeSql("create table result_table (a int, b varchar)");

TablePath tablePath = TablePath.of("fluss", "test_partitioned");

// create a partition manually
createPartitions(zkClient, tablePath, Collections.singletonList("4000"));
waitUntilPartitions(zkClient, tablePath, 2);

// append 3 records for each partition
Table table = conn.getTable(tablePath);
AppendWriter writer = table.newAppend().createWriter();
String thisYear = String.valueOf(Year.now().getValue());
List<String> expected = new ArrayList<>();
for (int i = 0; i < 3; i++) {
writer.append(row(i, thisYear));
writer.append(row(i, "4000"));
expected.add("+I[" + i + ", " + thisYear + "]");
expected.add("+I[" + i + ", 4000]");
}
writer.flush();

// execute the query to fetch logs from the table
TableResult insertResult =
tEnv.executeSql("insert into result_table select * from test_partitioned");
// we have to create a intermediate table to collect result,
// because CollectSink can't be restored from savepoint
CloseableIterator<Row> results =
tEnv.executeSql("select * from result_table").collect();
assertResultsIgnoreOrder(results, expected, false);
expected.clear();

// drop the partition manually
dropPartitions(zkClient, tablePath, Collections.singleton("4000"));
waitUntilPartitions(zkClient, tablePath, 1);

// create a new partition again and append records into it
createPartitions(zkClient, tablePath, Collections.singletonList("5000"));
waitUntilPartitions(zkClient, tablePath, 2);
writer.append(row(4, "5000")).get();
expected.add("+I[4, 5000]");
// if the source subscribes the new partition successfully,
// it should have removed the old partition successfully
assertResultsIgnoreOrder(results, expected, false);
expected.clear();

// now, stop the job with save point
String savepointPath =
insertResult
.getJobClient()
.get()
.stopWithSavepoint(
false,
savepointDir.getAbsolutePath(),
SavepointFormatType.CANONICAL)
.get();

tEnv = initTableEnvironment(savepointPath);
insertResult =
tEnv.executeSql("insert into result_table select * from test_partitioned");
// append a new row again to check if the source can restore the state correctly
writer.append(row(5, "5000")).get();
expected.add("+I[5, 5000]");
assertResultsIgnoreOrder(results, expected, true);
// cancel the insert job
insertResult.getJobClient().get().cancel().get();
} finally {
// stop the cluster and thereby cancel the job
cluster.after();
// append 3 records for each partition
Table table = conn.getTable(tablePath);
AppendWriter writer = table.newAppend().createWriter();
String thisYear = String.valueOf(Year.now().getValue());
List<String> expected = new ArrayList<>();
for (int i = 0; i < 3; i++) {
writer.append(row(i, thisYear));
writer.append(row(i, "4000"));
expected.add("+I[" + i + ", " + thisYear + "]");
expected.add("+I[" + i + ", 4000]");
}
writer.flush();

// execute the query to fetch logs from the table
TableResult insertResult =
tEnv.executeSql(
String.format(
"insert into result_table select * from %s",
tablePath.getTableName()));
// we have to create an intermediate table to collect result,
// because CollectSink can't be restored from savepoint
CloseableIterator<Row> results = tEnv.executeSql("select * from result_table").collect();
assertResultsIgnoreOrder(results, expected, false);
expected.clear();

// drop the partition manually
dropPartitions(zkClient, tablePath, Collections.singleton("4000"));
waitUntilPartitions(zkClient, tablePath, 1);

// create a new partition again and append records into it
createPartitions(zkClient, tablePath, Collections.singletonList("5000"));
waitUntilPartitions(zkClient, tablePath, 2);
writer.append(row(4, "5000")).get();
expected.add("+I[4, 5000]");
// if the source subscribes the new partition successfully,
// it should have removed the old partition successfully
assertResultsIgnoreOrder(results, expected, false);
expected.clear();

// now, stop the job with save point
String savepointPath =
insertResult
.getJobClient()
.get()
.stopWithSavepoint(
false,
savepointDir.getAbsolutePath(),
SavepointFormatType.CANONICAL)
.get();
return Tuple2.of(savepointPath, results);
}

private static Configuration getFileBasedCheckpointsConfig(File savepointDir) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,29 @@ void testHandleHybridSnapshotLogSplitChangesAndFetch() throws Exception {
}
}

@Test
void testTableIdChange() throws Exception {
TablePath tablePath = TablePath.of(DEFAULT_DB, "test-only-snapshot-table");
long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR);
try (FlinkSourceSplitReader splitReader =
createSplitReader(tablePath, DEFAULT_PK_TABLE_SCHEMA.getRowType())) {
assertThatThrownBy(
() ->
splitReader.handleSplitsChanges(
new SplitsAddition<>(
Collections.singletonList(
new LogSplit(
new TableBucket(tableId + 1, 0),
null,
0)))))
.hasMessageContaining(
"Table ID mismatch: expected 0, but split contains 1 for table 'test-flink-db.test-only-snapshot-table'. "
+ "This usually happens when a table with the same name was dropped and recreated between job runs, "
+ "causing metadata inconsistency. To resolve this, please restart the job **without** using "
+ "the previous savepoint or checkpoint.");
}
}

private Map<String, List<RecordAndPos>> constructRecords(
Map<TableBucket, List<InternalRow>> rows) {
Map<String, List<RecordAndPos>> expectedRecords = new HashMap<>();
Expand Down