diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java index 165ae7b844..8b1619bc95 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java @@ -221,6 +221,9 @@ public void removeLakeTable(long tableId) { tieringStates.remove(tableId); liveTieringTableIds.remove(tableId); tableTierEpoch.remove(tableId); + // pendingTieringTables could potentially contain duplicates as + // it's just a queue, so ensure we remove all entries + while (pendingTieringTables.remove(tableId)) {} }); } @@ -256,19 +259,33 @@ public LakeTieringTableInfo requestTable() { return inLock( lock, () -> { - Long tableId = pendingTieringTables.poll(); - // no any pending table, return directly - if (tableId == null) { - return null; + while (true) { + Long tableId = pendingTieringTables.poll(); + // no any pending table, return directly + if (tableId == null) { + return null; + } + + TablePath tablePath = tablePaths.get(tableId); + // the table has been dropped, request again + if (tablePath == null) { + continue; + } + + TieringState state = tieringStates.get(tableId); + if (state != TieringState.Pending) { + // stale queue entry, ignore + LOG.debug( + "requestTable: skipping table {} because state is {}", + tableId, + state); + continue; + } + + doHandleStateChange(tableId, TieringState.Tiering); + long tieringEpoch = tableTierEpoch.get(tableId); + return new LakeTieringTableInfo(tableId, tablePath, tieringEpoch); } - TablePath tablePath = tablePaths.get(tableId); - // the table has been dropped, request again - if (tablePath == null) { - return requestTable(); - } - doHandleStateChange(tableId, TieringState.Tiering); - long tieringEpoch = tableTierEpoch.get(tableId); - return new LakeTieringTableInfo(tableId, tablePath, tieringEpoch); }); } @@ -374,6 +391,8 @@ private void doHandleStateChange(long tableId, TieringState targetState) { targetState); return; } + + doStateChange(tableId, currentState, targetState); switch (targetState) { case New: case Initialized: @@ -399,7 +418,6 @@ private void doHandleStateChange(long tableId, TieringState targetState) { // do nothing break; } - doStateChange(tableId, currentState, targetState); } private boolean isValidStateTransition( diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java index fb0b75b18a..028bb8349f 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java @@ -26,6 +26,8 @@ import org.apache.fluss.metadata.TablePath; import org.apache.fluss.server.entity.LakeTieringTableInfo; import org.apache.fluss.server.utils.timer.DefaultTimer; +import org.apache.fluss.server.utils.timer.Timer; +import org.apache.fluss.server.utils.timer.TimerTask; import org.apache.fluss.testutils.common.ManuallyTriggeredScheduledExecutorService; import org.apache.fluss.types.DataTypes; import org.apache.fluss.utils.clock.ManualClock; @@ -34,10 +36,14 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.lang.reflect.Field; import java.time.Duration; +import java.util.ArrayDeque; import java.util.Arrays; import java.util.List; import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.TimeUnit; import static org.apache.fluss.server.coordinator.LakeTableTieringManager.TIERING_SERVICE_TIMEOUT_MS; import static org.apache.fluss.testutils.common.CommonTestUtils.waitValue; @@ -238,6 +244,161 @@ void testTieringFail() { assertRequestTable(tableId1, tablePath1, 2); } + @Test + void testRequestTableSkipsStaleQueueEntries() throws Exception { + long tableId1 = 1L; + TablePath tablePath1 = TablePath.of("db", "table1"); + TableInfo tableInfo1 = createTableInfo(tableId1, tablePath1, Duration.ofSeconds(10)); + tableTieringManager.addNewLakeTable(tableInfo1); + + long tableId2 = 2L; + TablePath tablePath2 = TablePath.of("db", "table2"); + TableInfo tableInfo2 = createTableInfo(tableId2, tablePath2, Duration.ofSeconds(10)); + tableTieringManager.addNewLakeTable(tableInfo2); + + manualClock.advanceTime(Duration.ofSeconds(10)); + + // Wait until we actually get the first assignable table + LakeTieringTableInfo first = + waitValue( + () -> Optional.ofNullable(tableTieringManager.requestTable()), + Duration.ofSeconds(10), + "First requestTable() timed out"); + + long tieringTableId = first.tableId(); + long pendingTableId = (tieringTableId == tableId1) ? tableId2 : tableId1; + TablePath pendingPath = (pendingTableId == tableId1) ? tablePath1 : tablePath2; + + // Verify first table is tiering (so it will be stale if queued again) + tableTieringManager.renewTieringHeartbeat(tieringTableId, 1L); + + // Force a stale entry to the head of the pending queue + ArrayDeque pendingDeque = + getPrivateField(tableTieringManager, "pendingTieringTables"); + pendingDeque.addFirst(tieringTableId); + + // Now the next request should skip the stale tiering id and return the other pending table. + LakeTieringTableInfo second = + waitValue( + () -> Optional.ofNullable(tableTieringManager.requestTable()), + Duration.ofSeconds(10), + "Second requestTable() timed out"); + + assertThat(second).isEqualTo(new LakeTieringTableInfo(pendingTableId, pendingPath, 1L)); + } + + @Test + void testScheduledStateRecordedBeforeTimerCallbackRuns() throws Exception { + Timer immediateTimer = new ImmediateTimer(); + + LakeTableTieringManager manager = + new LakeTableTieringManager( + immediateTimer, lakeTieringServiceTimeoutChecker, manualClock); + + try { + long tableId = 1L; + TablePath tablePath = TablePath.of("db", "table1"); + TableInfo tableInfo = createTableInfo(tableId, tablePath, Duration.ofMinutes(3)); + + // lastTieredTime is older than freshness -> computed delay is negative + long lastTieredTime = manualClock.milliseconds() - Duration.ofMinutes(3).toMillis(); + + manager.initWithLakeTables(List.of(Tuple2.of(tableInfo, lastTieredTime))); + + // Table should be immediately requestable + LakeTieringTableInfo table = manager.requestTable(); + assertThat(table).isEqualTo(new LakeTieringTableInfo(tableId, tablePath, 1L)); + } finally { + manager.close(); + } + } + + @Test + void testRemoveLakeTableRemovesAllPendingQueueOccurrences() throws Exception { + long tableId1 = 1L; + TablePath tablePath1 = TablePath.of("db", "table1"); + TableInfo tableInfo1 = createTableInfo(tableId1, tablePath1, Duration.ofSeconds(10)); + tableTieringManager.addNewLakeTable(tableInfo1); + + long tableId2 = 2L; + TablePath tablePath2 = TablePath.of("db", "table2"); + TableInfo tableInfo2 = createTableInfo(tableId2, tablePath2, Duration.ofSeconds(10)); + tableTieringManager.addNewLakeTable(tableInfo2); + + // Trigger scheduling + manualClock.advanceTime(Duration.ofSeconds(10)); + + // Wait until at least ONE table is pending and can be requested + LakeTieringTableInfo first = + waitValue( + () -> Optional.ofNullable(tableTieringManager.requestTable()), + Duration.ofSeconds(10), + "Request tiering table timeout"); + + // Force it back into the pending state due to failure + tableTieringManager.reportTieringFail(first.tableId(), first.tieringEpoch()); + + // Inject duplicates into the queue (simulating retries, potential late timers, etc.) + Queue pendingQueue = getPrivateField(tableTieringManager, "pendingTieringTables"); + pendingQueue.add(tableId1); + pendingQueue.add(tableId1); + pendingQueue.add(tableId1); + + assertThat(countOccurrences(pendingQueue, tableId1)).isGreaterThanOrEqualTo(2); + + // Purge tableId1 (and any duplicates) + tableTieringManager.removeLakeTable(tableId1); + + assertThat(countOccurrences(pendingQueue, tableId1)).isZero(); + + // Now table2 should still be requestable eventually (don't assume immediate next poll). + LakeTieringTableInfo t2 = + waitValue( + () -> Optional.ofNullable(tableTieringManager.requestTable()), + Duration.ofSeconds(10), + "Request tiering table timeout"); + + assertThat(t2.tableId()).isEqualTo(tableId2); + assertThat(t2.tablePath()).isEqualTo(tablePath2); + } + + private static int countOccurrences(Queue queue, long tableId) { + int count = 0; + for (Long v : queue) { + if (v != null && v == tableId) { + count++; + } + } + return count; + } + + /** + * Timer implementation used for testing that immediately executes in the caller thread when + * added. + */ + private static final class ImmediateTimer implements Timer { + @Override + public void add(TimerTask timerTask) { + timerTask.run(); + } + + @Override + public boolean advanceClock(long waitMs) throws InterruptedException { + TimeUnit.MILLISECONDS.sleep(Math.min(waitMs, 5)); + return false; + } + + @Override + public int numOfTimerTasks() { + return 0; + } + + @Override + public void shutdown() { + // no-op + } + } + private TableInfo createTableInfo(long tableId, TablePath tablePath, Duration freshness) { TableDescriptor tableDescriptor = TableDescriptor.builder() @@ -264,4 +425,11 @@ private void assertRequestTable(long tableId, TablePath tablePath, long tieredEp "Request tiering table timout"); assertThat(table).isEqualTo(new LakeTieringTableInfo(tableId, tablePath, tieredEpoch)); } + + @SuppressWarnings("unchecked") + private static T getPrivateField(Object target, String fieldName) throws Exception { + Field f = target.getClass().getDeclaredField(fieldName); + f.setAccessible(true); + return (T) f.get(target); + } }