From 864fd46691acff667c845428a2764e8bc2958588 Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Mon, 29 Dec 2025 11:55:05 +0800 Subject: [PATCH 1/2] [client] Change SchemaNotExistException as retriable exception. --- .../apache/fluss/client/write/SenderTest.java | 80 ++++++++++++++++--- .../exception/SchemaNotExistException.java | 2 +- .../org/apache/fluss/server/kv/KvTablet.java | 1 - .../tablet/TestTabletServerGateway.java | 4 +- 4 files changed, 74 insertions(+), 13 deletions(-) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java index f1d9481a64..b0f61c9eae 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java @@ -25,13 +25,20 @@ import org.apache.fluss.config.Configuration; import org.apache.fluss.config.MemorySize; import org.apache.fluss.exception.TimeoutException; +import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metadata.TablePath; import org.apache.fluss.record.MemoryLogRecords; +import org.apache.fluss.row.BinaryRow; import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.encode.CompactedKeyEncoder; import org.apache.fluss.rpc.entity.ProduceLogResultForBucket; +import org.apache.fluss.rpc.entity.PutKvResultForBucket; import org.apache.fluss.rpc.messages.ApiMessage; import org.apache.fluss.rpc.messages.ProduceLogRequest; import org.apache.fluss.rpc.messages.ProduceLogResponse; +import org.apache.fluss.rpc.messages.PutKvResponse; import org.apache.fluss.rpc.protocol.Errors; import org.apache.fluss.server.tablet.TestTabletServerGateway; import org.apache.fluss.utils.clock.SystemClock; @@ -52,11 +59,19 @@ import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID; import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH; +import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE; +import static org.apache.fluss.record.TestData.DATA1_SCHEMA_PK; import static org.apache.fluss.record.TestData.DATA1_TABLE_ID; +import static org.apache.fluss.record.TestData.DATA1_TABLE_ID_PK; import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO; +import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO_PK; import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH; +import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK; +import static org.apache.fluss.rpc.protocol.Errors.SCHEMA_NOT_EXIST; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getProduceLogData; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeProduceLogResponse; +import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makePutKvResponse; +import static org.apache.fluss.testutils.DataTestUtils.compactedRow; import static org.apache.fluss.testutils.DataTestUtils.row; import static org.apache.fluss.testutils.common.CommonTestUtils.retry; import static org.assertj.core.api.Assertions.assertThat; @@ -93,7 +108,7 @@ void testSimple() throws Exception { appendToAccumulator(tb1, row(1, "a"), future::complete); sender.runOnce(); assertThat(sender.numOfInFlightBatches(tb1)).isEqualTo(1); - finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, offset, 1)); + finishRequest(tb1, 0, createProduceLogResponse(tb1, offset, 1)); sender.runOnce(); assertThat(sender.numOfInFlightBatches(tb1)).isEqualTo(0); @@ -118,7 +133,7 @@ void testRetries() throws Exception { sender1.runOnce(); assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(1); long offset = 0; - finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, offset, 1)); + finishRequest(tb1, 0, createProduceLogResponse(tb1, offset, 1)); sender1.runOnce(); assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(0); @@ -131,13 +146,13 @@ void testRetries() throws Exception { assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(1); // timeout error can retry send. - finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, Errors.REQUEST_TIME_OUT)); + finishRequest(tb1, 0, createProduceLogResponse(tb1, Errors.REQUEST_TIME_OUT)); sender1.runOnce(); assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(1); // Even if timeout error can retry send, but the retry number > maxRetries, which will // return error. - finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, Errors.REQUEST_TIME_OUT)); + finishRequest(tb1, 0, createProduceLogResponse(tb1, Errors.REQUEST_TIME_OUT)); sender1.runOnce(); assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(0); assertThat(future.get()) @@ -168,12 +183,12 @@ void testCanRetryWithoutIdempotence() throws Exception { assertThat(firstRequest).isInstanceOf(ProduceLogRequest.class); assertThat(hasIdempotentRecords(tb1, (ProduceLogRequest) firstRequest)).isFalse(); // first complete with retriable error. - finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, Errors.REQUEST_TIME_OUT)); + finishRequest(tb1, 0, createProduceLogResponse(tb1, Errors.REQUEST_TIME_OUT)); sender.runOnce(); assertThat(future.isDone()).isFalse(); // second retry complete. - finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, 0L, 1L)); + finishRequest(tb1, 0, createProduceLogResponse(tb1, 0L, 1L)); sender.runOnce(); assertThat(future.isDone()).isTrue(); assertThat(future.get()).isNull(); @@ -690,7 +705,7 @@ void testSendWhenDestinationIsNullInMetadata() throws Exception { sender.runOnce(); assertThat(sender.numOfInFlightBatches(tb1)).isEqualTo(1); - finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, offset, 1)); + finishRequest(tb1, 0, createProduceLogResponse(tb1, offset, 1)); // send again, should send nothing since no batch in queue sender.runOnce(); @@ -698,9 +713,44 @@ void testSendWhenDestinationIsNullInMetadata() throws Exception { assertThat(future.get()).isNull(); } + @Test + void testRetryPutKeyWithSchemaNotExistException() throws Exception { + TableBucket tableBucket = new TableBucket(DATA1_TABLE_ID_PK, 0); + + BinaryRow row = compactedRow(DATA1_ROW_TYPE, new Object[] {1, "a"}); + int[] pkIndex = DATA1_SCHEMA_PK.getPrimaryKeyIndexes(); + byte[] key = new CompactedKeyEncoder(DATA1_ROW_TYPE, pkIndex).encodeKey(row); + CompletableFuture future = new CompletableFuture<>(); + accumulator.append( + WriteRecord.forUpsert( + DATA1_TABLE_INFO_PK, + PhysicalTablePath.of(DATA1_TABLE_PATH_PK), + row, + key, + key, + WriteFormat.COMPACTED_KV, + null), + future::complete, + metadataUpdater.getCluster(), + 0, + false); + sender.runOnce(); + finishRequest(tableBucket, 0, createPutKvResponse(tableBucket, SCHEMA_NOT_EXIST)); + assertThat(sender.numOfInFlightBatches(tableBucket)).isEqualTo(0); + + // retry to put kv request again + sender.runOnce(); + assertThat(sender.numOfInFlightBatches(tableBucket)).isEqualTo(1); + finishRequest(tableBucket, 0, createPutKvResponse(tableBucket, 1)); + assertThat(sender.numOfInFlightBatches(tableBucket)).isEqualTo(0); + assertThat(future.get()).isNull(); + } + private TestingMetadataUpdater initializeMetadataUpdater() { - return new TestingMetadataUpdater( - Collections.singletonMap(DATA1_TABLE_PATH, DATA1_TABLE_INFO)); + Map tableInfos = new HashMap<>(); + tableInfos.put(DATA1_TABLE_PATH, DATA1_TABLE_INFO); + tableInfos.put(DATA1_TABLE_PATH_PK, DATA1_TABLE_INFO_PK); + return new TestingMetadataUpdater(tableInfos); } private void appendToAccumulator(TableBucket tb, GenericRow row, WriteCallback writeCallback) @@ -721,7 +771,7 @@ private ApiMessage getRequest(TableBucket tb, int index) { return gateway.getRequest(index); } - private void finishProduceLogRequest(TableBucket tb, int index, ProduceLogResponse response) { + private void finishRequest(TableBucket tb, int index, ApiMessage response) { TestTabletServerGateway gateway = (TestTabletServerGateway) metadataUpdater.newTabletServerClientForNode( @@ -762,6 +812,16 @@ private ProduceLogResponse createProduceLogResponse(TableBucket tb, Errors error Collections.singletonList(new ProduceLogResultForBucket(tb, error.toApiError()))); } + private PutKvResponse createPutKvResponse(TableBucket tb, long endOffset) { + return makePutKvResponse( + Collections.singletonList(new PutKvResultForBucket(tb, endOffset))); + } + + private PutKvResponse createPutKvResponse(TableBucket tb, Errors error) { + return makePutKvResponse( + Collections.singletonList(new PutKvResultForBucket(tb, error.toApiError()))); + } + private Sender setupWithIdempotenceState() { return setupWithIdempotenceState(createIdempotenceManager(false)); } diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/SchemaNotExistException.java b/fluss-common/src/main/java/org/apache/fluss/exception/SchemaNotExistException.java index 9a73d98f58..5e02e987a8 100644 --- a/fluss-common/src/main/java/org/apache/fluss/exception/SchemaNotExistException.java +++ b/fluss-common/src/main/java/org/apache/fluss/exception/SchemaNotExistException.java @@ -25,7 +25,7 @@ * @since 0.1 */ @PublicEvolving -public class SchemaNotExistException extends ApiException { +public class SchemaNotExistException extends RetriableException { public SchemaNotExistException(String message, Throwable cause) { super(message, cause); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java index 0dde0a53df..d6cc086664 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java @@ -326,7 +326,6 @@ public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[] target private void validateSchemaId(short schemaIdOfNewData, short latestSchemaId) { if (schemaIdOfNewData > latestSchemaId || schemaIdOfNewData < 0) { - // TODO: we may need to support retriable exception here throw new SchemaNotExistException( "Invalid schema id: " + schemaIdOfNewData diff --git a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java index 8ebcc57489..500d197fcf 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java @@ -184,7 +184,9 @@ public CompletableFuture fetchLog(FetchLogRequest request) { @Override public CompletableFuture putKv(PutKvRequest request) { - return null; + CompletableFuture response = new CompletableFuture<>(); + requests.add(Tuple2.of(request, response)); + return response; } @Override From d165682149842d8122b396fe6fae16289d0f2d84 Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Mon, 29 Dec 2025 11:48:38 +0800 Subject: [PATCH 2/2] remove waitAllSchemaSync. --- .../admin/ClientToServerITCaseBase.java | 4 --- .../fluss/client/table/FlussTableITCase.java | 3 -- .../batch/KvSnapshotBatchScannerITCase.java | 1 - .../flink/sink/FlinkTableSinkITCase.java | 4 --- .../lookup/FlinkLookupFunctionTest.java | 1 - .../testutils/FlussClusterExtension.java | 29 ------------------- 6 files changed, 42 deletions(-) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java index 8270ed7882..311d5a94ff 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java @@ -248,10 +248,6 @@ public static void waitAllReplicasReady(long tableId, int expectBucketCount) { } } - public static void waitAllSchemaSync(TablePath tablePath, int schemaId) { - FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(tablePath, schemaId); - } - protected static void verifyRows( RowType rowType, Map> actualRows, diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java index 27b6690a9e..a65529648f 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java @@ -275,7 +275,6 @@ void testPutAndLookup() throws Exception { TableChange.ColumnPosition.last())), false) .get(); - waitAllSchemaSync(tablePath, 2); Table newSchemaTable = conn.getTable(tableInfo.getTablePath()); // schema change case1: read new data with new schema. verifyPutAndLookup(newSchemaTable, new Object[] {2, "b", "bb"}); @@ -363,7 +362,6 @@ void testPutAndPrefixLookup() throws Exception { TableChange.ColumnPosition.last())), false) .get(); - waitAllSchemaSync(tablePath, 2); try (Connection connection = ConnectionFactory.createConnection(clientConf); Table newSchemaTable = connection.getTable(tableInfo.getTablePath())) { // schema change case1: read new data with new schema. @@ -1056,7 +1054,6 @@ void testPutAndProjectDuringAddColumn() throws Exception { TableChange.ColumnPosition.last())), false) .get(); - waitAllSchemaSync(tablePath, 2); try (Connection connection = ConnectionFactory.createConnection(clientConf); Table newSchemaTable = connection.getTable(tablePath)) { UpsertWriter oldSchemaUpsertWriter = table.newUpsert().createWriter(); diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java index 62b5bdc133..ea1aaf2378 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java @@ -139,7 +139,6 @@ void testScanSnapshotDuringSchemaChange() throws Exception { TableChange.ColumnPosition.last())), false) .get(); - FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(tablePath, 2); Schema newSchema = Schema.newBuilder() diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java index 68fa741d48..15e59295c7 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java @@ -188,7 +188,6 @@ void testAppendLogDuringAddColumn(boolean compressed) throws Exception { CloseableIterator rowIter = tEnv.executeSql("select * from sink_test").collect(); // add new column tEnv.executeSql("alter table sink_test add add_column int").await(); - FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(TablePath.of(DEFAULT_DB, "sink_test"), 2); tEnv.executeSql( "INSERT INTO sink_test " + "VALUES (4, 3504, 'jerry', 4), " @@ -417,7 +416,6 @@ void testPutDuringAddColumn() throws Exception { CloseableIterator rowIter = tEnv.executeSql("select * from sink_test").collect(); // add new column tEnv.executeSql("alter table sink_test add add_column int").await(); - FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(TablePath.of(DEFAULT_DB, "sink_test"), 2); tEnv.executeSql( "INSERT INTO sink_test " + "VALUES (4, 3504, 'jerry', 4), " @@ -505,7 +503,6 @@ void testPartialUpsertDuringAddColumn() throws Exception { CloseableIterator rowIter = tEnv.executeSql("select * from sink_test").collect(); // add new column tEnv.executeSql("alter table sink_test add add_column string").await(); - FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(TablePath.of(DEFAULT_DB, "sink_test"), 2); tEnv.executeSql( "INSERT INTO sink_test(add_column, a ) VALUES ('new_value', 1), ('new_value', 2)") .await(); @@ -819,7 +816,6 @@ void testDeleteAndUpdateStmtOnPkTable() throws Exception { tBatchEnv .executeSql(String.format("alter table %s add new_added_column int", tableName)) .await(); - FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(TablePath.of(DEFAULT_DB, tableName), 2); tBatchEnv .executeSql("UPDATE " + tableName + " SET new_added_column = 2 WHERE a = 4") .await(); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java index 9903e7c45f..4d136c865a 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java @@ -173,7 +173,6 @@ void testSchemaChange() throws Exception { TableChange.ColumnPosition.last())), false) .get(); - FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(tablePath, 2); try (Table table = conn.getTable(tablePath)) { UpsertWriter upsertWriter = table.newUpsert().createWriter(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java index a1ee83fa69..bab31f10b2 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java @@ -25,7 +25,6 @@ import org.apache.fluss.config.MemorySize; import org.apache.fluss.fs.local.LocalFileSystem; import org.apache.fluss.metadata.PhysicalTablePath; -import org.apache.fluss.metadata.SchemaInfo; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.metrics.registry.MetricRegistry; @@ -52,7 +51,6 @@ import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; import org.apache.fluss.server.kv.snapshot.CompletedSnapshotHandle; import org.apache.fluss.server.metadata.ServerInfo; -import org.apache.fluss.server.metadata.ServerSchemaCache; import org.apache.fluss.server.metadata.TabletServerMetadataCache; import org.apache.fluss.server.replica.Replica; import org.apache.fluss.server.replica.ReplicaManager; @@ -66,7 +64,6 @@ import org.apache.fluss.server.zk.data.PartitionAssignment; import org.apache.fluss.server.zk.data.RemoteLogManifestHandle; import org.apache.fluss.server.zk.data.TableAssignment; -import org.apache.fluss.server.zk.data.TableRegistration; import org.apache.fluss.utils.FileUtils; import org.apache.fluss.utils.clock.Clock; import org.apache.fluss.utils.clock.SystemClock; @@ -663,32 +660,6 @@ public void waitUntilAllReplicaReady(TableBucket tableBucket) { }); } - public void waitAllSchemaSync(TablePath tablePath, int schemaId) { - ZooKeeperClient zkClient = getZooKeeperClient(); - retry( - Duration.ofMinutes(1), - () -> { - TableRegistration tableRegistration = zkClient.getTable(tablePath).get(); - int bucketCount = tableRegistration.bucketCount; - long tableId = tableRegistration.tableId; - for (int bucketId = 0; bucketId < bucketCount; bucketId++) { - TableBucket tableBucket = new TableBucket(tableId, bucketId); - Optional leaderAndIsrOpt = - zkClient.getLeaderAndIsr(tableBucket); - assertThat(leaderAndIsrOpt).isPresent(); - int leader = leaderAndIsrOpt.get().leader(); - TabletServer tabletServer = getTabletServerById(leader); - ServerSchemaCache serverSchemaCache = - tabletServer.getMetadataCache().getServerSchemaCache(); - Map latestSchemaByTablePath = - serverSchemaCache.getLatestSchemaByTableId(); - assertThat(latestSchemaByTablePath).containsKey(tableId); - assertThat(latestSchemaByTablePath.get(tableId).getSchemaId()) - .isEqualTo(schemaId); - } - }); - } - /** * Wait until some log segments copy to remote. This method can only ensure that there are at * least one log segment has been copied to remote, but it does not ensure that all log segments