diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index c1059ac062..aa1397d105 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1811,6 +1811,23 @@ public class ConfigOptions { + ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT + " is false."); + public static final ConfigOption LAKE_TIERING_TABLE_DURATION_MAX = + key("lake.tiering.table.duration.max") + .durationType() + .defaultValue(Duration.ofMinutes(30)) + .withDescription( + "The maximum duration for tiering a single table. If tiering a table exceeds this duration, " + + "it will be force completed: the tiering will be finalized and committed to the data lake " + + "(e.g., Paimon) immediately, even if they haven't reached their desired stopping offsets."); + + public static final ConfigOption LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL = + key("lake.tiering.table.duration.detect-interval") + .durationType() + .defaultValue(Duration.ofSeconds(30)) + .withDescription( + "The interval to check if a table tiering operation has reached the maximum duration. " + + "The enumerator will periodically check tiering tables and force complete those that exceed the maximum duration."); + // ------------------------------------------------------------------------ // ConfigOptions for fluss kafka // ------------------------------------------------------------------------ diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java index 74cf91e0e0..70774c8a60 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java @@ -34,6 +34,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; +import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL; +import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_MAX; import static org.apache.fluss.flink.tiering.source.TieringSource.TIERING_SOURCE_TRANSFORMATION_UID; import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL; import static org.apache.fluss.utils.Preconditions.checkNotNull; @@ -89,6 +91,17 @@ public JobClient build() throws Exception { tieringSourceBuilder.withPollTieringTableIntervalMs( flussConfig.get(POLL_TIERING_TABLE_INTERVAL).toMillis()); } + + if (lakeTieringConfig.get(LAKE_TIERING_TABLE_DURATION_MAX) != null) { + tieringSourceBuilder.withTieringTableDurationMax( + lakeTieringConfig.get(LAKE_TIERING_TABLE_DURATION_MAX).toMillis()); + } + + if (lakeTieringConfig.get(LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL) != null) { + tieringSourceBuilder.withTieringTableDurationDetectInterval( + lakeTieringConfig.get(LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL).toMillis()); + } + TieringSource tieringSource = tieringSourceBuilder.build(); DataStreamSource source = env.fromSource( diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringReachMaxDurationEvent.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringReachMaxDurationEvent.java new file mode 100644 index 0000000000..dd3f32e262 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringReachMaxDurationEvent.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering.event; + +import org.apache.flink.api.connector.source.SourceEvent; + +import java.util.Objects; + +/** + * SourceEvent used to notify TieringSourceReader that a table has reached the maximum tiering + * duration and should be force completed. + */ +public class TieringReachMaxDurationEvent implements SourceEvent { + + private static final long serialVersionUID = 1L; + + private final long tableId; + + public TieringReachMaxDurationEvent(long tableId) { + this.tableId = tableId; + } + + public long getTableId() { + return tableId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof TieringReachMaxDurationEvent)) { + return false; + } + TieringReachMaxDurationEvent that = (TieringReachMaxDurationEvent) o; + return tableId == that.tableId; + } + + @Override + public int hashCode() { + return Objects.hashCode(tableId); + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java index b2d8c28dc9..5d637c9071 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java @@ -36,12 +36,16 @@ import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.streaming.api.graph.StreamGraphHasherV2; import java.nio.charset.StandardCharsets; +import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL; +import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_MAX; import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL; /** @@ -61,14 +65,20 @@ public class TieringSource private final Configuration flussConf; private final LakeTieringFactory lakeTieringFactory; private final long pollTieringTableIntervalMs; + private final long tieringTableDurationMaxMs; + private final long tieringTableDurationDetectIntervalMs; public TieringSource( Configuration flussConf, LakeTieringFactory lakeTieringFactory, - long pollTieringTableIntervalMs) { + long pollTieringTableIntervalMs, + long tieringTableDurationMaxMs, + long tieringTableDurationDetectIntervalMs) { this.flussConf = flussConf; this.lakeTieringFactory = lakeTieringFactory; this.pollTieringTableIntervalMs = pollTieringTableIntervalMs; + this.tieringTableDurationMaxMs = tieringTableDurationMaxMs; + this.tieringTableDurationDetectIntervalMs = tieringTableDurationDetectIntervalMs; } @Override @@ -78,19 +88,26 @@ public Boundedness getBoundedness() { @Override public SplitEnumerator createEnumerator( - SplitEnumeratorContext splitEnumeratorContext) throws Exception { + SplitEnumeratorContext splitEnumeratorContext) { return new TieringSourceEnumerator( - flussConf, splitEnumeratorContext, pollTieringTableIntervalMs); + flussConf, + splitEnumeratorContext, + pollTieringTableIntervalMs, + tieringTableDurationMaxMs, + tieringTableDurationDetectIntervalMs); } @Override public SplitEnumerator restoreEnumerator( SplitEnumeratorContext splitEnumeratorContext, - TieringSourceEnumeratorState tieringSourceEnumeratorState) - throws Exception { + TieringSourceEnumeratorState tieringSourceEnumeratorState) { // stateless operator return new TieringSourceEnumerator( - flussConf, splitEnumeratorContext, pollTieringTableIntervalMs); + flussConf, + splitEnumeratorContext, + pollTieringTableIntervalMs, + tieringTableDurationMaxMs, + tieringTableDurationDetectIntervalMs); } @Override @@ -107,8 +124,11 @@ public SimpleVersionedSerializer getSplitSerializer() { @Override public SourceReader, TieringSplit> createReader( SourceReaderContext sourceReaderContext) { + FutureCompletingBlockingQueue>> + elementsQueue = new FutureCompletingBlockingQueue<>(); Connection connection = ConnectionFactory.createConnection(flussConf); - return new TieringSourceReader<>(sourceReaderContext, connection, lakeTieringFactory); + return new TieringSourceReader<>( + elementsQueue, sourceReaderContext, connection, lakeTieringFactory); } /** This follows the operator uid hash generation logic of flink {@link StreamGraphHasherV2}. */ @@ -126,6 +146,10 @@ public static class Builder { private final LakeTieringFactory lakeTieringFactory; private long pollTieringTableIntervalMs = POLL_TIERING_TABLE_INTERVAL.defaultValue().toMillis(); + private long tieringTableDurationMaxMs = + LAKE_TIERING_TABLE_DURATION_MAX.defaultValue().toMillis(); + private long tieringTableDurationDetectIntervalMs = + LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL.defaultValue().toMillis(); public Builder( Configuration flussConf, LakeTieringFactory lakeTieringFactory) { @@ -138,8 +162,24 @@ public Builder withPollTieringTableIntervalMs(long pollTieringTable return this; } + public Builder withTieringTableDurationMax(long tieringTableDurationMaxMs) { + this.tieringTableDurationMaxMs = tieringTableDurationMaxMs; + return this; + } + + public Builder withTieringTableDurationDetectInterval( + long tieringTableDurationDetectIntervalMs) { + this.tieringTableDurationDetectIntervalMs = tieringTableDurationDetectIntervalMs; + return this; + } + public TieringSource build() { - return new TieringSource<>(flussConf, lakeTieringFactory, pollTieringTableIntervalMs); + return new TieringSource<>( + flussConf, + lakeTieringFactory, + pollTieringTableIntervalMs, + tieringTableDurationMaxMs, + tieringTableDurationDetectIntervalMs); } } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java new file mode 100644 index 0000000000..68db87671f --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering.source; + +import org.apache.fluss.flink.adapter.SingleThreadFetcherManagerAdapter; +import org.apache.fluss.flink.tiering.source.split.TieringSplit; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher; +import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTask; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; + +import java.util.Collection; +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** + * The SplitFetcherManager for tiering source. This class is needed to help notify a table reaches + * to deadline of tiering to {@link TieringSplitReader}. + */ +public class TieringSourceFetcherManager + extends SingleThreadFetcherManagerAdapter< + TableBucketWriteResult, TieringSplit> { + + public TieringSourceFetcherManager( + FutureCompletingBlockingQueue>> + elementsQueue, + Supplier, TieringSplit>> + splitReaderSupplier, + Configuration configuration, + Consumer> splitFinishedHook) { + super(elementsQueue, splitReaderSupplier, configuration, splitFinishedHook); + } + + public void markTableReachTieringMaxDuration(long tableId) { + if (!fetchers.isEmpty()) { + // The fetcher thread is still running. This should be the majority of the cases. + fetchers.values() + .forEach( + splitFetcher -> + enqueueMarkTableReachTieringMaxDurationTask( + splitFetcher, tableId)); + } else { + SplitFetcher, TieringSplit> splitFetcher = + createSplitFetcher(); + enqueueMarkTableReachTieringMaxDurationTask(splitFetcher, tableId); + startFetcher(splitFetcher); + } + } + + private void enqueueMarkTableReachTieringMaxDurationTask( + SplitFetcher, TieringSplit> splitFetcher, + long reachTieringDeadlineTable) { + splitFetcher.enqueueTask( + new SplitFetcherTask() { + @Override + public boolean run() { + ((TieringSplitReader) splitFetcher.getSplitReader()) + .handleTableReachTieringMaxDuration(reachTieringDeadlineTable); + return true; + } + + @Override + public void wakeUp() { + // do nothing + } + }); + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java index a6b14b320d..63e73bab97 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java @@ -18,23 +18,31 @@ package org.apache.fluss.flink.tiering.source; import org.apache.fluss.annotation.Internal; +import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.client.Connection; +import org.apache.fluss.flink.adapter.SingleThreadMultiplexSourceReaderBaseAdapter; +import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent; import org.apache.fluss.flink.tiering.source.split.TieringSplit; import org.apache.fluss.flink.tiering.source.state.TieringSplitState; import org.apache.fluss.lake.writer.LakeTieringFactory; +import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; -import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Map; +import static org.apache.fluss.flink.tiering.source.TieringSplitReader.DEFAULT_POLL_TIMEOUT; + /** A {@link SourceReader} that read records from Fluss and write to lake. */ @Internal public final class TieringSourceReader - extends SingleThreadMultiplexSourceReaderBase< + extends SingleThreadMultiplexSourceReaderBaseAdapter< TableBucketWriteResult, TableBucketWriteResult, TieringSplit, @@ -43,11 +51,29 @@ public final class TieringSourceReader private final Connection connection; public TieringSourceReader( + FutureCompletingBlockingQueue>> + elementsQueue, SourceReaderContext context, Connection connection, LakeTieringFactory lakeTieringFactory) { + this(elementsQueue, context, connection, lakeTieringFactory, DEFAULT_POLL_TIMEOUT); + } + + @VisibleForTesting + TieringSourceReader( + FutureCompletingBlockingQueue>> + elementsQueue, + SourceReaderContext context, + Connection connection, + LakeTieringFactory lakeTieringFactory, + Duration pollTimeout) { super( - () -> new TieringSplitReader<>(connection, lakeTieringFactory), + elementsQueue, + new TieringSourceFetcherManager<>( + elementsQueue, + () -> new TieringSplitReader<>(connection, lakeTieringFactory, pollTimeout), + context.getConfiguration(), + (ignore) -> {}), new TableBucketWriteResultEmitter<>(), context.getConfiguration(), context); @@ -89,6 +115,17 @@ protected TieringSplit toSplitType(String splitId, TieringSplitState splitState) return splitState.toSourceSplit(); } + @Override + public void handleSourceEvents(SourceEvent sourceEvent) { + if (sourceEvent instanceof TieringReachMaxDurationEvent) { + TieringReachMaxDurationEvent reachMaxDurationEvent = + (TieringReachMaxDurationEvent) sourceEvent; + long tableId = reachMaxDurationEvent.getTableId(); + ((TieringSourceFetcherManager) splitFetcherManager) + .markTableReachTieringMaxDuration(tableId); + } + } + @Override public void close() throws Exception { super.close(); diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java index b9fe79e3d3..2a797add33 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java @@ -57,6 +57,7 @@ import static org.apache.fluss.utils.Preconditions.checkArgument; import static org.apache.fluss.utils.Preconditions.checkNotNull; +import static org.apache.fluss.utils.Preconditions.checkState; /** The {@link SplitReader} implementation which will read Fluss and write to lake. */ public class TieringSplitReader @@ -64,18 +65,25 @@ public class TieringSplitReader private static final Logger LOG = LoggerFactory.getLogger(TieringSplitReader.class); - private static final Duration POLL_TIMEOUT = Duration.ofMillis(10000L); + public static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofMillis(10_000L); // unknown bucket timestamp for empty split or snapshot split private static final long UNKNOWN_BUCKET_TIMESTAMP = -1; + // unknown bucket offset for empty split or snapshot split + private static final long UNKNOWN_BUCKET_OFFSET = -1; + private final LakeTieringFactory lakeTieringFactory; + private final Duration pollTimeout; + // the id for the pending tables to be tiered private final Queue pendingTieringTables; // the table_id to the pending splits private final Map> pendingTieringSplits; + private final Set reachTieringMaxDurationTables; + private final Map> lakeWriters; private final Connection connection; @@ -92,38 +100,54 @@ public class TieringSplitReader // map from table bucket to split id private final Map currentTableSplitsByBucket; private final Map currentTableStoppingOffsets; - private final Set currentTableEmptyLogSplits; + + private final Map currentTableTieredOffsetAndTimestamp; + + private final Set currentEmptySplits; public TieringSplitReader( Connection connection, LakeTieringFactory lakeTieringFactory) { + this(connection, lakeTieringFactory, DEFAULT_POLL_TIMEOUT); + } + + protected TieringSplitReader( + Connection connection, + LakeTieringFactory lakeTieringFactory, + Duration pollTimeout) { this.lakeTieringFactory = lakeTieringFactory; // owned by TieringSourceReader this.connection = connection; this.pendingTieringTables = new ArrayDeque<>(); this.pendingTieringSplits = new HashMap<>(); this.currentTableStoppingOffsets = new HashMap<>(); - this.currentTableEmptyLogSplits = new HashSet<>(); + this.currentTableTieredOffsetAndTimestamp = new HashMap<>(); + this.currentEmptySplits = new HashSet<>(); this.currentTableSplitsByBucket = new HashMap<>(); this.lakeWriters = new HashMap<>(); this.currentPendingSnapshotSplits = new ArrayDeque<>(); + this.reachTieringMaxDurationTables = new HashSet<>(); + this.pollTimeout = pollTimeout; } @Override public RecordsWithSplitIds> fetch() throws IOException { // check empty splits - if (!currentTableEmptyLogSplits.isEmpty()) { - LOG.info("Empty split(s) {} finished.", currentTableEmptyLogSplits); - TableBucketWriteResultWithSplitIds records = forEmptySplits(currentTableEmptyLogSplits); - currentTableEmptyLogSplits.forEach( + if (!currentEmptySplits.isEmpty()) { + LOG.info("Empty split(s) {} finished.", currentEmptySplits); + TableBucketWriteResultWithSplitIds records = forEmptySplits(currentEmptySplits); + currentEmptySplits.forEach( split -> currentTableSplitsByBucket.remove(split.getTableBucket())); mayFinishCurrentTable(); - currentTableEmptyLogSplits.clear(); + currentEmptySplits.clear(); return records; } checkSplitOrStartNext(); // may read snapshot firstly if (currentSnapshotSplitReader != null) { + // for snapshot split, we don't force to complete it + // since we rely on the log offset for the snapshot to + // do next tiering, if force to complete, we can't get the log offset CloseableIterator recordIterator = currentSnapshotSplitReader.readBatch(); if (recordIterator == null) { LOG.info("Split {} is finished", currentSnapshotSplit.splitId()); @@ -134,7 +158,11 @@ public RecordsWithSplitIds> fetch() throws I } } else { if (currentLogScanner != null) { - ScanRecords scanRecords = currentLogScanner.poll(POLL_TIMEOUT); + // force to complete records + if (reachTieringMaxDurationTables.contains(currentTableId)) { + return forceCompleteTieringLogRecords(); + } + ScanRecords scanRecords = currentLogScanner.poll(pollTimeout); return forLogRecords(scanRecords); } else { return emptyTableBucketWriteResultWithSplitIds(); @@ -152,6 +180,15 @@ public void handleSplitsChanges(SplitsChange splitsChange) { } for (TieringSplit split : splitsChange.splits()) { LOG.info("add split {}", split.splitId()); + if (split.isForceIgnore()) { + // if the split is forced to ignore, + // mark it as empty + LOG.info( + "ignore split {} since the split is set to force to ignore", + split.splitId()); + currentEmptySplits.add(split); + continue; + } long tableId = split.getTableBucket().getTableId(); // the split belongs to the current table if (currentTableId != null && currentTableId == tableId) { @@ -248,6 +285,58 @@ private void mayCreateLogScanner() { } } + private RecordsWithSplitIds> + forceCompleteTieringLogRecords() throws IOException { + Map> writeResults = new HashMap<>(); + Map finishedSplitIds = new HashMap<>(); + + // force finish all splits + Iterator> currentTieringSplitsIterator = + currentTableSplitsByBucket.entrySet().iterator(); + while (currentTieringSplitsIterator.hasNext()) { + Map.Entry entry = currentTieringSplitsIterator.next(); + TableBucket bucket = entry.getKey(); + TieringSplit split = entry.getValue(); + if (split != null && split.isTieringLogSplit()) { + // get the current offset, timestamp that tiered so far + LogOffsetAndTimestamp logOffsetAndTimestamp = + currentTableTieredOffsetAndTimestamp.get(bucket); + long logEndOffset = + logOffsetAndTimestamp == null + ? UNKNOWN_BUCKET_OFFSET + // logEndOffset is equal to offset tiered + 1 + : logOffsetAndTimestamp.logOffset + 1; + long timestamp = + logOffsetAndTimestamp == null + ? UNKNOWN_BUCKET_TIMESTAMP + : logOffsetAndTimestamp.timestamp; + TableBucketWriteResult bucketWriteResult = + completeLakeWriter( + bucket, split.getPartitionName(), logEndOffset, timestamp); + + if (logEndOffset == UNKNOWN_BUCKET_OFFSET) { + // when the log end offset is unknown, the write result must be + // null, otherwise, we should throw exception directly to avoid data + // inconsistent + checkState( + bucketWriteResult.writeResult() == null, + "bucketWriteResult must be null when log end offset is unknown when tiering " + + split); + } + + writeResults.put(bucket, bucketWriteResult); + finishedSplitIds.put(bucket, split.splitId()); + LOG.info( + "Split {} is forced to be finished due to tiering reach max duration.", + split.splitId()); + currentTieringSplitsIterator.remove(); + } + } + reachTieringMaxDurationTables.remove(this.currentTableId); + mayFinishCurrentTable(); + return new TableBucketWriteResultWithSplitIds(writeResults, finishedSplitIds); + } + private RecordsWithSplitIds> forLogRecords( ScanRecords scanRecords) throws IOException { Map> writeResults = new HashMap<>(); @@ -272,6 +361,9 @@ private RecordsWithSplitIds> forLogRecords( } } ScanRecord lastRecord = bucketScanRecords.get(bucketScanRecords.size() - 1); + currentTableTieredOffsetAndTimestamp.put( + bucket, + new LogOffsetAndTimestamp(lastRecord.logOffset(), lastRecord.timestamp())); // has arrived into the end of the split, if (lastRecord.logOffset() >= stoppingOffset - 1) { currentTableStoppingOffsets.remove(bucket); @@ -293,7 +385,11 @@ private RecordsWithSplitIds> forLogRecords( lastRecord.timestamp())); // put split of the bucket finishedSplitIds.put(bucket, currentSplitId); - LOG.info("Split {} has been finished.", currentSplitId); + LOG.info( + "Finish tier bucket {} for table {}, split: {}.", + bucket, + currentTablePath, + currentSplitId); } } @@ -327,8 +423,11 @@ private TableBucketWriteResult completeLakeWriter( long maxTimestamp) throws IOException { LakeWriter lakeWriter = lakeWriters.remove(bucket); - WriteResult writeResult = lakeWriter.complete(); - lakeWriter.close(); + WriteResult writeResult = null; + if (lakeWriter != null) { + writeResult = lakeWriter.complete(); + lakeWriter.close(); + } return toTableBucketWriteResult( currentTablePath, bucket, @@ -339,22 +438,22 @@ private TableBucketWriteResult completeLakeWriter( checkNotNull(currentTableNumberOfSplits)); } - private TableBucketWriteResultWithSplitIds forEmptySplits(Set emptySplits) { + private TableBucketWriteResultWithSplitIds forEmptySplits(Set emptySplits) { Map> writeResults = new HashMap<>(); Map finishedSplitIds = new HashMap<>(); - for (TieringLogSplit logSplit : emptySplits) { - TableBucket tableBucket = logSplit.getTableBucket(); - finishedSplitIds.put(tableBucket, logSplit.splitId()); + for (TieringSplit tieringSplit : emptySplits) { + TableBucket tableBucket = tieringSplit.getTableBucket(); + finishedSplitIds.put(tableBucket, tieringSplit.splitId()); writeResults.put( tableBucket, toTableBucketWriteResult( - logSplit.getTablePath(), + tieringSplit.getTablePath(), tableBucket, - logSplit.getPartitionName(), + tieringSplit.getPartitionName(), null, - logSplit.getStoppingOffset(), + UNKNOWN_BUCKET_OFFSET, UNKNOWN_BUCKET_TIMESTAMP, - logSplit.getNumberOfSplits())); + tieringSplit.getNumberOfSplits())); } return new TableBucketWriteResultWithSplitIds(writeResults, finishedSplitIds); } @@ -362,7 +461,6 @@ private TableBucketWriteResultWithSplitIds forEmptySplits(Set e private void mayFinishCurrentTable() throws IOException { // no any pending splits for the table, just finish the table if (currentTableSplitsByBucket.isEmpty()) { - LOG.info("Finish tier table {} of table id {}.", currentTablePath, currentTableId); finishCurrentTable(); } } @@ -377,6 +475,11 @@ private TableBucketWriteResultWithSplitIds finishCurrentSnapshotSplit() throws I currentSnapshotSplit.getPartitionName(), logEndOffset, UNKNOWN_BUCKET_TIMESTAMP); + LOG.info( + "Finish tier bucket {} for table {}, split: {}.", + tableBucket, + currentTablePath, + splitId); closeCurrentSnapshotSplit(); mayFinishCurrentTable(); return new TableBucketWriteResultWithSplitIds( @@ -437,10 +540,21 @@ private void finishCurrentTable() throws IOException { currentTableNumberOfSplits = null; currentPendingSnapshotSplits.clear(); currentTableStoppingOffsets.clear(); - currentTableEmptyLogSplits.clear(); + currentTableTieredOffsetAndTimestamp.clear(); currentTableSplitsByBucket.clear(); } + /** + * Handle a table reach max tiering duration. This will mark the current table as reaching max + * duration, and it will be force completed in the next fetch cycle. + */ + public void handleTableReachTieringMaxDuration(long tableId) { + if ((currentTableId != null && currentTableId.equals(tableId)) + || pendingTieringSplits.containsKey(tableId)) { + reachTieringMaxDurationTables.add(tableId); + } + } + @Override public void wakeUp() { if (currentLogScanner != null) { @@ -466,7 +580,7 @@ private void subscribeLog(TieringLogSplit logSplit) { long stoppingOffset = logSplit.getStoppingOffset(); long startingOffset = logSplit.getStartingOffset(); if (startingOffset >= stoppingOffset || stoppingOffset <= 0) { - currentTableEmptyLogSplits.add(logSplit); + currentEmptySplits.add(logSplit); return; } else { currentTableStoppingOffsets.put(tableBucket, stoppingOffset); @@ -559,4 +673,15 @@ public Set finishedSplits() { return new HashSet<>(bucketSplits.values()); } } + + private static final class LogOffsetAndTimestamp { + + private final long logOffset; + private final long timestamp; + + public LogOffsetAndTimestamp(long logOffset, long timestamp) { + this.logOffset = logOffset; + this.timestamp = timestamp; + } + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java index 1ef73eb39a..d13aa9e2f6 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java @@ -27,6 +27,7 @@ import org.apache.fluss.flink.tiering.event.FailedTieringEvent; import org.apache.fluss.flink.tiering.event.FinishedTieringEvent; import org.apache.fluss.flink.tiering.event.TieringFailOverEvent; +import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent; import org.apache.fluss.flink.tiering.source.split.TieringSplit; import org.apache.fluss.flink.tiering.source.split.TieringSplitGenerator; import org.apache.fluss.flink.tiering.source.state.TieringSourceEnumeratorState; @@ -40,6 +41,8 @@ import org.apache.fluss.rpc.messages.PbLakeTieringTableInfo; import org.apache.fluss.rpc.metrics.ClientMetricGroup; import org.apache.fluss.utils.MapUtils; +import org.apache.fluss.utils.clock.Clock; +import org.apache.fluss.utils.clock.SystemClock; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitEnumerator; @@ -54,7 +57,9 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -91,12 +96,18 @@ public class TieringSourceEnumerator private final SplitEnumeratorContext context; private final SplitEnumeratorMetricGroup enumeratorMetricGroup; private final long pollTieringTableIntervalMs; + private final long tieringTableDurationMaxMs; + private final long tieringTableDurationDetectIntervalMs; private final List pendingSplits; private final Set readersAwaitingSplit; + + private final Map tieringTablesDeadline; private final Map tieringTableEpochs; private final Map failedTableEpochs; private final Map finishedTableEpochs; + private final Clock clock; + // lazily instantiated private RpcClient rpcClient; private CoordinatorGateway coordinatorGateway; @@ -110,16 +121,38 @@ public class TieringSourceEnumerator public TieringSourceEnumerator( Configuration flussConf, SplitEnumeratorContext context, - long pollTieringTableIntervalMs) { + long pollTieringTableIntervalMs, + long tieringTableDurationMaxMs, + long tieringTableDurationDetectIntervalMs) { + this( + flussConf, + context, + pollTieringTableIntervalMs, + tieringTableDurationMaxMs, + tieringTableDurationDetectIntervalMs, + SystemClock.getInstance()); + } + + public TieringSourceEnumerator( + Configuration flussConf, + SplitEnumeratorContext context, + long pollTieringTableIntervalMs, + long tieringTableDurationMaxMs, + long tieringTableDurationDetectIntervalMs, + Clock clock) { this.flussConf = flussConf; this.context = context; this.enumeratorMetricGroup = context.metricGroup(); this.pollTieringTableIntervalMs = pollTieringTableIntervalMs; + this.tieringTableDurationMaxMs = tieringTableDurationMaxMs; + this.tieringTableDurationDetectIntervalMs = tieringTableDurationDetectIntervalMs; this.pendingSplits = new ArrayList<>(); this.readersAwaitingSplit = new TreeSet<>(); this.tieringTableEpochs = MapUtils.newConcurrentHashMap(); this.finishedTableEpochs = MapUtils.newConcurrentHashMap(); this.failedTableEpochs = MapUtils.newConcurrentHashMap(); + this.tieringTablesDeadline = MapUtils.newConcurrentHashMap(); + this.clock = clock; } @Override @@ -155,6 +188,12 @@ public void start() { this::generateAndAssignSplits, 0, pollTieringTableIntervalMs); + + this.context.callAsync( + this::checkTableReachMaxTieringDuration, + this::handleReachMaxTieringDurationTables, + 0, + tieringTableDurationDetectIntervalMs); } @Override @@ -198,6 +237,7 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { } else { finishedTableEpochs.put(finishedTableId, tieringEpoch); } + tieringTablesDeadline.remove(finishedTableId); } if (sourceEvent instanceof FailedTieringEvent) { @@ -216,6 +256,7 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { } else { failedTableEpochs.put(failedTableId, tieringEpoch); } + tieringTablesDeadline.remove(failedTableId); } if (sourceEvent instanceof TieringFailOverEvent) { @@ -236,6 +277,53 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { } } + private Set checkTableReachMaxTieringDuration() { + Set tieringReachMaxDurationTables = new HashSet<>(); + long currentTime = clock.milliseconds(); + for (Map.Entry tieringTableDeadline : tieringTablesDeadline.entrySet()) { + long tableId = tieringTableDeadline.getKey(); + long deadline = tieringTableDeadline.getValue(); + if (deadline < currentTime) { + tieringReachMaxDurationTables.add(tableId); + } + } + return tieringReachMaxDurationTables; + } + + private void handleReachMaxTieringDurationTables( + Set tieringReachMaxDurationTables, Throwable throwable) { + if (throwable != null) { + LOG.error("Fail to check tiering timeout tables.", throwable); + return; + } + + for (Long reachMaxDurationTable : tieringReachMaxDurationTables) { + for (TieringSplit tieringSplit : pendingSplits) { + if (tieringSplit.getTableBucket().getTableId() == reachMaxDurationTable) { + // force ignore this tiering split since the tiering for this table is timeout, + // we have to force to set to ignore the tiering split so that the + // tiering source reader can ignore them directly + tieringSplit.forceIgnore(); + } else { + // we can break directly, if found any one split's table id is not equal to the + // timeout + // table, the following split must be not equal to the table id + break; + } + } + + LOG.debug("Found the table {} reach max tiering duration.", reachMaxDurationTable); + + // broadcast the tiering reach max duration event to all readers, + // we broadcast all for simplicity + Set readers = new HashSet<>(context.registeredReaders().keySet()); + for (int reader : readers) { + context.sendEventToSourceReader( + reader, new TieringReachMaxDurationEvent(reachMaxDurationTable)); + } + } + } + private void generateAndAssignSplits( @Nullable Tuple3 tieringTable, Throwable throwable) { if (throwable != null) { @@ -249,6 +337,7 @@ private void generateAndAssignSplits( private void assignSplits() { /* This method may be called from both addSplitsBack and handleSplitRequest, make it thread safe. */ + // todo: do we need to add lock? synchronized (readersAwaitingSplit) { if (!readersAwaitingSplit.isEmpty()) { final Integer[] readers = readersAwaitingSplit.toArray(new Integer[0]); @@ -324,6 +413,9 @@ private void generateTieringSplits(Tuple3 tieringTable) List tieringSplits = populateNumberOfTieringSplits( splitGenerator.generateTableSplits(tieringTable.f2)); + // shuffle tiering split to avoid splits tiering skew + // after introduce tiering max duration + Collections.shuffle(tieringSplits); LOG.info( "Generate Tiering {} splits for table {} with cost {}ms.", tieringSplits.size(), @@ -337,6 +429,8 @@ private void generateTieringSplits(Tuple3 tieringTable) } else { tieringTableEpochs.put(tieringTable.f0, tieringTable.f1); pendingSplits.addAll(tieringSplits); + tieringTablesDeadline.put( + tieringTable.f0, clock.milliseconds() + tieringTableDurationMaxMs); } } catch (Exception e) { LOG.warn("Fail to generate Tiering splits for table {}.", tieringTable.f2, e); @@ -352,7 +446,7 @@ private List populateNumberOfTieringSplits(List tier } @Override - public TieringSourceEnumeratorState snapshotState(long checkpointId) throws Exception { + public TieringSourceEnumeratorState snapshotState(long checkpointId) { // do nothing, the downstream lake commiter will snapshot the state to Fluss Cluster return new TieringSourceEnumeratorState(); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringLogSplit.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringLogSplit.java index 3751e531f8..ac8c783af2 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringLogSplit.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringLogSplit.java @@ -54,7 +54,25 @@ public TieringLogSplit( long startingOffset, long stoppingOffset, int numberOfSplits) { - super(tablePath, tableBucket, partitionName, numberOfSplits); + this( + tablePath, + tableBucket, + partitionName, + startingOffset, + stoppingOffset, + false, + numberOfSplits); + } + + public TieringLogSplit( + TablePath tablePath, + TableBucket tableBucket, + @Nullable String partitionName, + long startingOffset, + long stoppingOffset, + boolean forceIgnore, + int numberOfSplits) { + super(tablePath, tableBucket, partitionName, forceIgnore, numberOfSplits); this.startingOffset = startingOffset; this.stoppingOffset = stoppingOffset; } @@ -82,12 +100,14 @@ public String toString() { + ", partitionName='" + partitionName + '\'' + + ", numberOfSplits=" + + numberOfSplits + + ", forceIgnore=" + + forceIgnore + ", startingOffset=" + startingOffset + ", stoppingOffset=" + stoppingOffset - + ", numberOfSplits=" - + numberOfSplits + '}'; } @@ -99,6 +119,7 @@ public TieringLogSplit copy(int numberOfSplits) { partitionName, startingOffset, stoppingOffset, + forceIgnore, numberOfSplits); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSnapshotSplit.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSnapshotSplit.java index a0a095d578..0044659d2b 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSnapshotSplit.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSnapshotSplit.java @@ -43,10 +43,16 @@ public TieringSnapshotSplit( TableBucket tableBucket, @Nullable String partitionName, long snapshotId, - long logOffsetOfSnapshot) { - super(tablePath, tableBucket, partitionName, UNKNOWN_NUMBER_OF_SPLITS); - this.snapshotId = snapshotId; - this.logOffsetOfSnapshot = logOffsetOfSnapshot; + long logOffsetOfSnapshot, + int numberOfSplits) { + this( + tablePath, + tableBucket, + partitionName, + snapshotId, + logOffsetOfSnapshot, + numberOfSplits, + false); } public TieringSnapshotSplit( @@ -55,8 +61,9 @@ public TieringSnapshotSplit( @Nullable String partitionName, long snapshotId, long logOffsetOfSnapshot, - int numberOfSplits) { - super(tablePath, tableBucket, partitionName, numberOfSplits); + int numberOfSplits, + boolean forceIgnore) { + super(tablePath, tableBucket, partitionName, forceIgnore, numberOfSplits); this.snapshotId = snapshotId; this.logOffsetOfSnapshot = logOffsetOfSnapshot; } @@ -84,12 +91,14 @@ public String toString() { + ", partitionName='" + partitionName + '\'' + + ", numberOfSplits=" + + numberOfSplits + + ", forceIgnore=" + + forceIgnore + ", snapshotId=" + snapshotId + ", logOffsetOfSnapshot=" + logOffsetOfSnapshot - + ", numberOfSplits=" - + numberOfSplits + '}'; } @@ -101,7 +110,8 @@ public TieringSnapshotSplit copy(int numberOfSplits) { partitionName, snapshotId, logOffsetOfSnapshot, - numberOfSplits); + numberOfSplits, + forceIgnore); } @Override diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplit.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplit.java index 9da98b8387..cbd3482a8c 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplit.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplit.java @@ -38,6 +38,7 @@ public abstract class TieringSplit implements SourceSplit { protected final TableBucket tableBucket; @Nullable protected final String partitionName; + protected boolean forceIgnore; // the total number of splits in one round of tiering protected final int numberOfSplits; @@ -45,6 +46,7 @@ public TieringSplit( TablePath tablePath, TableBucket tableBucket, @Nullable String partitionName, + boolean forceIgnore, int numberOfSplits) { this.tablePath = tablePath; this.tableBucket = tableBucket; @@ -54,6 +56,7 @@ public TieringSplit( throw new IllegalArgumentException( "Partition name and partition id must be both null or both not null."); } + this.forceIgnore = forceIgnore; this.numberOfSplits = numberOfSplits; } @@ -72,6 +75,14 @@ public final boolean isTieringLogSplit() { return getClass() == TieringLogSplit.class; } + public void forceIgnore() { + this.forceIgnore = true; + } + + public boolean isForceIgnore() { + return forceIgnore; + } + /** Casts this split into a {@link TieringLogSplit}. */ public TieringLogSplit asTieringLogSplit() { return (TieringLogSplit) this; @@ -128,11 +139,12 @@ public boolean equals(Object object) { return Objects.equals(tablePath, that.tablePath) && Objects.equals(tableBucket, that.tableBucket) && Objects.equals(partitionName, that.partitionName) + && forceIgnore == that.forceIgnore && numberOfSplits == that.numberOfSplits; } @Override public int hashCode() { - return Objects.hash(tablePath, tableBucket, partitionName, numberOfSplits); + return Objects.hash(tablePath, tableBucket, partitionName, forceIgnore, numberOfSplits); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java index eb207f8ea9..2d39f93a39 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java @@ -57,7 +57,6 @@ public TieringSplitGenerator(Admin flussAdmin) { } public List generateTableSplits(TablePath tablePath) throws Exception { - final TableInfo tableInfo = flussAdmin.getTableInfo(tablePath).get(); final BucketOffsetsRetriever bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(flussAdmin, tablePath); @@ -287,7 +286,7 @@ private Optional generateSplitForPrimaryKeyTableBucket( latestBucketOffset, 0)); } else { - LOG.info( + LOG.debug( "The lastCommittedBucketOffset {} is equals or bigger than latestBucketOffset {}, skip generating split for bucket {}", lastCommittedBucketOffset, latestBucketOffset, @@ -320,8 +319,7 @@ private Optional generateSplitForLogTableBucket( tableBucket, partitionName, EARLIEST_OFFSET, - latestBucketOffset, - 0)); + latestBucketOffset)); } else { // the bucket has been tiered, scan remain fluss log if (lastCommittedBucketOffset < latestBucketOffset) { @@ -331,11 +329,10 @@ private Optional generateSplitForLogTableBucket( tableBucket, partitionName, lastCommittedBucketOffset, - latestBucketOffset, - 0)); + latestBucketOffset)); } } - LOG.info( + LOG.debug( "The lastCommittedBucketOffset {} is equals or bigger than latestBucketOffset {}, skip generating split for bucket {}", lastCommittedBucketOffset, latestBucketOffset, diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializer.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializer.java index 1c2997a540..85b3429afe 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializer.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializer.java @@ -74,6 +74,9 @@ public byte[] serialize(TieringSplit split) throws IOException { out.writeBoolean(false); } + // write force ignore + out.writeBoolean(split.isForceIgnore()); + // write number of splits out.writeInt(split.getNumberOfSplits()); if (split.isTieringSnapshotSplit()) { @@ -126,6 +129,8 @@ public TieringSplit deserialize(int version, byte[] serialized) throws IOExcepti } TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId); + boolean forceIgnore = in.readBoolean(); + // deserialize number of splits int numberOfSplits = in.readInt(); @@ -140,7 +145,8 @@ public TieringSplit deserialize(int version, byte[] serialized) throws IOExcepti partitionName, snapshotId, logOffsetOfSnapshot, - numberOfSplits); + numberOfSplits, + forceIgnore); } else { // deserialize starting offset long startingOffset = in.readLong(); @@ -152,6 +158,7 @@ public TieringSplit deserialize(int version, byte[] serialized) throws IOExcepti partitionName, startingOffset, stoppingOffset, + forceIgnore, numberOfSplits); } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringITCase.java new file mode 100644 index 0000000000..ce8c20c41e --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringITCase.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering; + +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.client.metadata.LakeSnapshot; +import org.apache.fluss.client.table.Table; +import org.apache.fluss.client.table.writer.AppendWriter; +import org.apache.fluss.client.table.writer.TableWriter; +import org.apache.fluss.client.table.writer.UpsertWriter; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.exception.LakeTableSnapshotNotExistException; +import org.apache.fluss.metadata.DataLakeFormat; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.server.testutils.FlussClusterExtension; +import org.apache.fluss.types.DataTypes; +import org.apache.fluss.utils.ExceptionUtils; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.nio.file.Files; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL; +import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_MAX; +import static org.apache.fluss.testutils.common.CommonTestUtils.waitValue; +import static org.assertj.core.api.Assertions.assertThat; + +/** The IT case for tiering. */ +class TieringITCase { + + @RegisterExtension + public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = + FlussClusterExtension.builder() + .setClusterConf(initConfig()) + .setNumOfTabletServers(3) + .build(); + + protected static String warehousePath; + protected static Connection conn; + protected static Admin admin; + protected static StreamExecutionEnvironment execEnv; + + protected static Configuration initConfig() { + Configuration conf = new Configuration(); + conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1)) + // not to clean snapshots for test purpose + .set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, Integer.MAX_VALUE); + conf.setString("datalake.format", "paimon"); + conf.setString("datalake.paimon.metastore", "filesystem"); + try { + warehousePath = + Files.createTempDirectory("fluss-testing-datalake-tiered") + .resolve("warehouse") + .toString(); + } catch (Exception e) { + throw new FlussRuntimeException("Failed to create warehouse path"); + } + conf.setString("datalake.paimon.warehouse", warehousePath); + return conf; + } + + @BeforeAll + static void beforeAll() { + conn = ConnectionFactory.createConnection(FLUSS_CLUSTER_EXTENSION.getClientConfig()); + admin = conn.getAdmin(); + execEnv = + StreamExecutionEnvironment.getExecutionEnvironment() + .setParallelism(1) + .setRuntimeMode(RuntimeExecutionMode.STREAMING); + } + + @Test + void testTieringReachMaxDuration() throws Exception { + TablePath logTablePath = TablePath.of("fluss", "logtable"); + createTable(logTablePath, false); + TablePath pkTablePath = TablePath.of("fluss", "pktable"); + long pkTableId = createTable(pkTablePath, true); + + // write some records to log table + List rows = new ArrayList<>(); + int recordCount = 6; + for (int i = 0; i < recordCount; i++) { + rows.add(GenericRow.of(i, BinaryString.fromString("v" + i))); + } + writeRows(logTablePath, rows, true); + + rows = new ArrayList<>(); + // write 6 records to primary key table, each bucket should only contain few record + for (int i = 0; i < recordCount; i++) { + rows.add(GenericRow.of(i, BinaryString.fromString("v" + i))); + } + writeRows(pkTablePath, rows, false); + + waitUntilSnapshot(pkTableId, 3, 0); + + JobClient jobClient = buildTieringJob(execEnv); + + try { + // verify the tiered records is less than the table total record to + // make sure tiering is forced to complete when reach max duration + LakeSnapshot logTableLakeSnapshot = waitLakeSnapshot(logTablePath); + long tieredRecords = countTieredRecords(logTableLakeSnapshot); + assertThat(tieredRecords).isLessThan(recordCount); + + // verify the tiered records is less than the table total record to + // make sure tiering is forced to complete when reach max duration + LakeSnapshot pkTableLakeSnapshot = waitLakeSnapshot(pkTablePath); + tieredRecords = countTieredRecords(pkTableLakeSnapshot); + assertThat(tieredRecords).isLessThan(recordCount); + } finally { + jobClient.cancel(); + } + } + + @AfterAll + static void afterAll() throws Exception { + if (admin != null) { + admin.close(); + admin = null; + } + if (conn != null) { + conn.close(); + conn = null; + } + } + + private long countTieredRecords(LakeSnapshot lakeSnapshot) throws Exception { + return lakeSnapshot.getTableBucketsOffset().values().stream() + .mapToLong(Long::longValue) + .sum(); + } + + private LakeSnapshot waitLakeSnapshot(TablePath tablePath) { + return waitValue( + () -> { + try { + return Optional.of(admin.getLatestLakeSnapshot(tablePath).get()); + } catch (Exception e) { + if (ExceptionUtils.stripExecutionException(e) + instanceof LakeTableSnapshotNotExistException) { + return Optional.empty(); + } + throw e; + } + }, + Duration.ofSeconds(30), + "Fail to wait for one round of tiering finish for table " + tablePath); + } + + private long createTable(TablePath tablePath, boolean isPrimaryKeyTable) throws Exception { + Schema.Builder schemaBuilder = + Schema.newBuilder().column("a", DataTypes.INT()).column("b", DataTypes.STRING()); + if (isPrimaryKeyTable) { + schemaBuilder.primaryKey("a"); + } + TableDescriptor.Builder tableDescriptorBuilder = + TableDescriptor.builder() + .schema(schemaBuilder.build()) + .distributedBy(3, "a") + .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true") + .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500)); + + // see TestingPaimonStoragePlugin#TestingPaimonWriter, we set write-pause + // to 1s to make it easy to mock tiering reach max duration + Map customProperties = Collections.singletonMap("write-pause", "1s"); + tableDescriptorBuilder.customProperties(customProperties); + return createTable(tablePath, tableDescriptorBuilder.build()); + } + + protected long createTable(TablePath tablePath, TableDescriptor tableDescriptor) + throws Exception { + admin.createTable(tablePath, tableDescriptor, true).get(); + return admin.getTableInfo(tablePath).get().getTableId(); + } + + private void writeRows(TablePath tablePath, List rows, boolean append) + throws Exception { + try (Table table = conn.getTable(tablePath)) { + TableWriter tableWriter; + if (append) { + tableWriter = table.newAppend().createWriter(); + } else { + tableWriter = table.newUpsert().createWriter(); + } + for (InternalRow row : rows) { + if (tableWriter instanceof AppendWriter) { + ((AppendWriter) tableWriter).append(row); + } else { + ((UpsertWriter) tableWriter).upsert(row); + } + } + tableWriter.flush(); + } + } + + private JobClient buildTieringJob(StreamExecutionEnvironment execEnv) throws Exception { + Configuration lakeTieringConfig = new Configuration(); + lakeTieringConfig.set(LAKE_TIERING_TABLE_DURATION_MAX, Duration.ofSeconds(1)); + lakeTieringConfig.set(LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL, Duration.ofMillis(100)); + + Configuration flussConfig = new Configuration(); + flussConfig.setString( + ConfigOptions.BOOTSTRAP_SERVERS.key(), + FLUSS_CLUSTER_EXTENSION.getBootstrapServers()); + return LakeTieringJobBuilder.newBuilder( + execEnv, + flussConfig, + new Configuration(), + lakeTieringConfig, + DataLakeFormat.PAIMON.toString()) + .build(); + } + + protected void waitUntilSnapshot(long tableId, int bucketNum, long snapshotId) { + for (int i = 0; i < bucketNum; i++) { + TableBucket tableBucket = new TableBucket(tableId, i); + FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket, snapshotId); + } + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java new file mode 100644 index 0000000000..8868b29832 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering.source; + +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.flink.tiering.TestingLakeTieringFactory; +import org.apache.fluss.flink.tiering.TestingWriteResult; +import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent; +import org.apache.fluss.flink.tiering.source.split.TieringLogSplit; +import org.apache.fluss.flink.utils.FlinkTestBase; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePath; + +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.testutils.source.reader.TestingReaderContext; +import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; + +import static org.apache.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET; +import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.apache.fluss.testutils.common.CommonTestUtils.retry; +import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link TieringSourceReader}. */ +class TieringSourceReaderTest extends FlinkTestBase { + + @Test + void testHandleTieringReachMaxDurationEvent() throws Exception { + TablePath tablePath = TablePath.of("fluss", "test_tiering_reach_max_duration"); + long tableId = createTable(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR); + Configuration conf = new Configuration(FLUSS_CLUSTER_EXTENSION.getClientConfig()); + conf.set( + ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER, + ConfigOptions.NoKeyAssigner.ROUND_ROBIN); + try (Connection connection = ConnectionFactory.createConnection(conf)) { + FutureCompletingBlockingQueue< + RecordsWithSplitIds>> + elementsQueue = new FutureCompletingBlockingQueue<>(16); + TestingReaderContext readerContext = new TestingReaderContext(); + try (TieringSourceReader reader = + new TieringSourceReader<>( + elementsQueue, + readerContext, + connection, + new TestingLakeTieringFactory(), + Duration.ofMillis(500))) { + + reader.start(); + + // no data, add a split for the table, + // should be force be complete after reach max duration + TieringLogSplit split = + new TieringLogSplit( + tablePath, new TableBucket(tableId, 0), null, EARLIEST_OFFSET, 100); + reader.addSplits(Collections.singletonList(split)); + + // send TieringReachMaxDurationEvent + TieringReachMaxDurationEvent event = new TieringReachMaxDurationEvent(tableId); + reader.handleSourceEvents(event); + + retry( + Duration.ofMinutes(1), + () -> { + TestingReaderOutput> output = + new TestingReaderOutput<>(); + // should force to finish, the write result is null + reader.pollNext(output); + assertThat(output.getEmittedRecords()).hasSize(1); + TableBucketWriteResult result = + output.getEmittedRecords().get(0); + assertThat(result.writeResult()).isNull(); + }); + + // write some data + writeRows( + connection, + tablePath, + Arrays.asList(row(0, "v0"), row(1, "v1"), row(2, "v2")), + true); + split = + new TieringLogSplit( + tablePath, + new TableBucket(tableId, 2), + null, + EARLIEST_OFFSET, + // use 100L as end offset, so that + // tiering won't be finished if no tiering reach max duration logic + 100L); + + reader.addSplits(Collections.singletonList(split)); + + // wait to run one round of tiering to do some tiering + FutureCompletingBlockingQueue< + RecordsWithSplitIds>> + blockingQueue = getElementsQueue(reader); + // wait blockingQueue is not empty to make sure we have one fetch + // in tiering source reader + waitUntil( + () -> !blockingQueue.isEmpty(), + Duration.ofSeconds(30), + "Fail to wait element queue is not empty."); + + // send TieringReachMaxDurationEvent + event = new TieringReachMaxDurationEvent(tableId); + reader.handleSourceEvents(event); + + // make sure tiering will be finished, still maintain the result + // of previous tiering + retry( + Duration.ofMinutes(1), + () -> { + TestingReaderOutput> + output1 = new TestingReaderOutput<>(); + + // should force to finish, the write result isn't null + reader.pollNext(output1); + assertThat(output1.getEmittedRecords()).hasSize(1); + TableBucketWriteResult result = + output1.getEmittedRecords().get(0); + TestingWriteResult testingWriteResult = result.writeResult(); + assertThat(testingWriteResult).isNotNull(); + assertThat(result.logEndOffset()).isEqualTo(1); + }); + + // test add split with force ignore + split = + new TieringLogSplit( + tablePath, + new TableBucket(tableId, 1), + null, + EARLIEST_OFFSET, + 100L); + split.forceIgnore(); + reader.addSplits(Collections.singletonList(split)); + // should skip tiering for this split + retry( + Duration.ofMinutes(1), + () -> { + TestingReaderOutput> + output1 = new TestingReaderOutput<>(); + // should force to finish, and the result is null + reader.pollNext(output1); + assertThat(output1.getEmittedRecords()).hasSize(1); + TableBucketWriteResult result = + output1.getEmittedRecords().get(0); + assertThat(result.writeResult()).isNull(); + }); + } + } + } + + /** + * Get the elementsQueue from TieringSourceReader using reflection. + * + * @param reader the TieringSourceReader instance + * @return the elementsQueue field value + */ + @SuppressWarnings("unchecked") + private FutureCompletingBlockingQueue< + RecordsWithSplitIds>> + getElementsQueue(TieringSourceReader reader) throws Exception { + Class clazz = reader.getClass(); + while (clazz != null) { + try { + Field elementsQueueField = clazz.getDeclaredField("elementsQueue"); + elementsQueueField.setAccessible(true); + return (FutureCompletingBlockingQueue< + RecordsWithSplitIds>>) + elementsQueueField.get(reader); + } catch (NoSuchFieldException e) { + // Try parent class + clazz = clazz.getSuperclass(); + } + } + throw new RuntimeException("No elementsQueue field found"); + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java index ba8c5b17a8..ac46e35d53 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java @@ -21,6 +21,7 @@ import org.apache.fluss.flink.tiering.event.FailedTieringEvent; import org.apache.fluss.flink.tiering.event.FinishedTieringEvent; import org.apache.fluss.flink.tiering.event.TieringFailOverEvent; +import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent; import org.apache.fluss.flink.tiering.source.TieringTestBase; import org.apache.fluss.flink.tiering.source.split.TieringLogSplit; import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit; @@ -31,8 +32,10 @@ import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest; import org.apache.fluss.rpc.messages.PbLakeTableOffsetForBucket; import org.apache.fluss.rpc.messages.PbLakeTableSnapshotInfo; +import org.apache.fluss.utils.clock.ManualClock; import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitsAssignment; import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; import org.junit.jupiter.api.BeforeAll; @@ -41,9 +44,9 @@ import javax.annotation.Nullable; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -79,7 +82,7 @@ void testPrimaryKeyTableWithNoSnapshotSplits() throws Throwable { try (MockSplitEnumeratorContext context = new MockSplitEnumeratorContext<>(numSubtasks)) { TieringSourceEnumerator enumerator = - new TieringSourceEnumerator(flussConf, context, 500); + createDefaultTieringSourceEnumerator(flussConf, context); enumerator.start(); assertThat(context.getSplitsAssignmentSequence()).isEmpty(); @@ -126,25 +129,23 @@ void testPrimaryKeyTableWithNoSnapshotSplits() throws Throwable { } waitUntilTieringTableSplitAssignmentReady(context, DEFAULT_BUCKET_NUM, 500L); - Map> expectedLogAssignment = new HashMap<>(); + List expectedLogAssignment = new ArrayList<>(); for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; tableBucket++) { - expectedLogAssignment.put( - tableBucket, - Collections.singletonList( - new TieringLogSplit( - tablePath, - new TableBucket(tableId, tableBucket), - null, - bucketOffsetOfInitialWrite.get(tableBucket), - bucketOffsetOfInitialWrite.get(tableBucket) - + bucketOffsetOfSecondWrite.get(tableBucket), - expectNumberOfSplits))); - } - Map> actualLogAssignment = new HashMap<>(); - for (SplitsAssignment a : context.getSplitsAssignmentSequence()) { - actualLogAssignment.putAll(a.assignment()); - } - assertThat(actualLogAssignment).isEqualTo(expectedLogAssignment); + expectedLogAssignment.add( + new TieringLogSplit( + tablePath, + new TableBucket(tableId, tableBucket), + null, + bucketOffsetOfInitialWrite.get(tableBucket), + bucketOffsetOfInitialWrite.get(tableBucket) + + bucketOffsetOfSecondWrite.get(tableBucket), + expectNumberOfSplits)); + } + List actualLogAssignment = new ArrayList<>(); + context.getSplitsAssignmentSequence() + .forEach(a -> a.assignment().values().forEach(actualLogAssignment::addAll)); + assertThat(actualLogAssignment) + .containsExactlyInAnyOrderElementsOf(expectedLogAssignment); } } @@ -164,7 +165,7 @@ void testPrimaryKeyTableWithSnapshotSplits() throws Throwable { try (MockSplitEnumeratorContext context = new MockSplitEnumeratorContext<>(numSubtasks)) { TieringSourceEnumerator enumerator = - new TieringSourceEnumerator(flussConf, context, 500); + createDefaultTieringSourceEnumerator(flussConf, context); enumerator.start(); assertThat(context.getSplitsAssignmentSequence()).isEmpty(); @@ -176,24 +177,22 @@ void testPrimaryKeyTableWithSnapshotSplits() throws Throwable { } waitUntilTieringTableSplitAssignmentReady(context, DEFAULT_BUCKET_NUM, 3000L); - Map> expectedSnapshotAssignment = new HashMap<>(); + List expectedSnapshotAssignment = new ArrayList<>(); for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; tableBucket++) { - expectedSnapshotAssignment.put( - tableBucket, - Collections.singletonList( - new TieringSnapshotSplit( - tablePath, - new TableBucket(tableId, tableBucket), - null, - snapshotId, - bucketOffsetOfInitialWrite.get(tableBucket), - expectNumberOfSplits))); - } - Map> actualSnapshotAssignment = new HashMap<>(); - for (SplitsAssignment a : context.getSplitsAssignmentSequence()) { - actualSnapshotAssignment.putAll(a.assignment()); - } - assertThat(actualSnapshotAssignment).isEqualTo(expectedSnapshotAssignment); + expectedSnapshotAssignment.add( + new TieringSnapshotSplit( + tablePath, + new TableBucket(tableId, tableBucket), + null, + snapshotId, + bucketOffsetOfInitialWrite.get(tableBucket), + expectNumberOfSplits)); + } + List actualAssignment = new ArrayList<>(); + context.getSplitsAssignmentSequence() + .forEach(a -> a.assignment().values().forEach(actualAssignment::addAll)); + assertThat(actualAssignment) + .containsExactlyInAnyOrderElementsOf(expectedSnapshotAssignment); // mock finished tiered this round, check second round context.getSplitsAssignmentSequence().clear(); @@ -223,25 +222,23 @@ void testPrimaryKeyTableWithSnapshotSplits() throws Throwable { // three log splits will be ready soon waitUntilTieringTableSplitAssignmentReady(context, DEFAULT_BUCKET_NUM, 500L); - Map> expectedLogAssignment = new HashMap<>(); + List expectedLogAssignment = new ArrayList<>(); for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; tableBucket++) { - expectedLogAssignment.put( - tableBucket, - Collections.singletonList( - new TieringLogSplit( - tablePath, - new TableBucket(tableId, tableBucket), - null, - bucketOffsetOfInitialWrite.get(tableBucket), - bucketOffsetOfInitialWrite.get(tableBucket) - + bucketOffsetOfSecondWrite.get(tableBucket), - expectNumberOfSplits))); - } - Map> actualLogAssignment = new HashMap<>(); - for (SplitsAssignment a : context.getSplitsAssignmentSequence()) { - actualLogAssignment.putAll(a.assignment()); - } - assertThat(actualLogAssignment).isEqualTo(expectedLogAssignment); + expectedLogAssignment.add( + new TieringLogSplit( + tablePath, + new TableBucket(tableId, tableBucket), + null, + bucketOffsetOfInitialWrite.get(tableBucket), + bucketOffsetOfInitialWrite.get(tableBucket) + + bucketOffsetOfSecondWrite.get(tableBucket), + expectNumberOfSplits)); + } + List actualLogAssignment = new ArrayList<>(); + context.getSplitsAssignmentSequence() + .forEach(a -> a.assignment().values().forEach(actualLogAssignment::addAll)); + assertThat(actualLogAssignment) + .containsExactlyInAnyOrderElementsOf(expectedLogAssignment); } } @@ -255,7 +252,7 @@ void testLogTableSplits() throws Throwable { try (MockSplitEnumeratorContext context = new MockSplitEnumeratorContext<>(numSubtasks)) { TieringSourceEnumerator enumerator = - new TieringSourceEnumerator(flussConf, context, 500); + createDefaultTieringSourceEnumerator(flussConf, context); enumerator.start(); assertThat(context.getSplitsAssignmentSequence()).isEmpty(); @@ -287,7 +284,7 @@ void testLogTableSplits() throws Throwable { context.getSplitsAssignmentSequence() .forEach(a -> a.assignment().values().forEach(actualAssignment::addAll)); - assertThat(actualAssignment).isEqualTo(expectedAssignment); + assertThat(actualAssignment).containsExactlyInAnyOrderElementsOf(expectedAssignment); // mock finished tiered this round, check second round context.getSplitsAssignmentSequence().clear(); @@ -317,25 +314,23 @@ void testLogTableSplits() throws Throwable { waitUntilTieringTableSplitAssignmentReady(context, DEFAULT_BUCKET_NUM, 500L); - Map> expectedLogAssignment = new HashMap<>(); + List expectedLogAssignment = new ArrayList<>(); for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; tableBucket++) { - expectedLogAssignment.put( - tableBucket, - Collections.singletonList( - new TieringLogSplit( - tablePath, - new TableBucket(tableId, tableBucket), - null, - bucketOffsetOfInitialWrite.get(tableBucket), - bucketOffsetOfInitialWrite.get(tableBucket) - + bucketOffsetOfSecondWrite.get(tableBucket), - expectNumberOfSplits))); - } - Map> actualLogAssignment = new HashMap<>(); - for (SplitsAssignment a : context.getSplitsAssignmentSequence()) { - actualLogAssignment.putAll(a.assignment()); - } - assertThat(actualLogAssignment).isEqualTo(expectedLogAssignment); + expectedLogAssignment.add( + new TieringLogSplit( + tablePath, + new TableBucket(tableId, tableBucket), + null, + bucketOffsetOfInitialWrite.get(tableBucket), + bucketOffsetOfInitialWrite.get(tableBucket) + + bucketOffsetOfSecondWrite.get(tableBucket), + expectNumberOfSplits)); + } + List actualLogAssignment = new ArrayList<>(); + context.getSplitsAssignmentSequence() + .forEach(a -> a.assignment().values().forEach(actualLogAssignment::addAll)); + assertThat(actualLogAssignment) + .containsExactlyInAnyOrderElementsOf(expectedLogAssignment); } } @@ -354,7 +349,7 @@ void testPartitionedPrimaryKeyTable() throws Throwable { try (MockSplitEnumeratorContext context = new MockSplitEnumeratorContext<>(numSubtasks)) { TieringSourceEnumerator enumerator = - new TieringSourceEnumerator(flussConf, context, 500); + createDefaultTieringSourceEnumerator(flussConf, context); enumerator.start(); assertThat(context.getSplitsAssignmentSequence()).isEmpty(); @@ -438,8 +433,8 @@ void testPartitionedPrimaryKeyTable() throws Throwable { context.getSplitsAssignmentSequence()) { splitsAssignment.assignment().values().forEach(actualLogAssignment::addAll); } - assertThat(sortSplits(actualLogAssignment)) - .isEqualTo(sortSplits(expectedLogAssignment)); + assertThat(actualLogAssignment) + .containsExactlyInAnyOrderElementsOf(expectedLogAssignment); } } @@ -458,7 +453,7 @@ void testPartitionedLogTableSplits() throws Throwable { try (MockSplitEnumeratorContext context = new MockSplitEnumeratorContext<>(numSubtasks)) { TieringSourceEnumerator enumerator = - new TieringSourceEnumerator(flussConf, context, 500); + createDefaultTieringSourceEnumerator(flussConf, context); enumerator.start(); assertThat(context.getSplitsAssignmentSequence()).isEmpty(); @@ -498,7 +493,7 @@ void testPartitionedLogTableSplits() throws Throwable { context.getSplitsAssignmentSequence()) { splitsAssignment.assignment().values().forEach(actualAssignment::addAll); } - assertThat(sortSplits(actualAssignment)).isEqualTo(sortSplits(expectedAssignment)); + assertThat(actualAssignment).containsExactlyInAnyOrderElementsOf(expectedAssignment); // mock finished tiered this round, check second round context.getSplitsAssignmentSequence().clear(); @@ -567,8 +562,8 @@ void testPartitionedLogTableSplits() throws Throwable { context.getSplitsAssignmentSequence()) { splitsAssignment.assignment().values().forEach(actualLogAssignment::addAll); } - assertThat(sortSplits(actualLogAssignment)) - .isEqualTo(sortSplits(expectedLogAssignment)); + assertThat(actualLogAssignment) + .containsExactlyInAnyOrderElementsOf(expectedLogAssignment); } } @@ -584,7 +579,7 @@ void testHandleFailedTieringTableEvent() throws Throwable { try (MockSplitEnumeratorContext context = new MockSplitEnumeratorContext<>(numSubtasks)) { TieringSourceEnumerator enumerator = - new TieringSourceEnumerator(flussConf, context, 500); + createDefaultTieringSourceEnumerator(flussConf, context); enumerator.start(); assertThat(context.getSplitsAssignmentSequence()).isEmpty(); @@ -611,7 +606,7 @@ void testHandleFailedTieringTableEvent() throws Throwable { context.getSplitsAssignmentSequence() .forEach(a -> a.assignment().values().forEach(actualAssignment::addAll)); - assertThat(actualAssignment).isEqualTo(expectedAssignment); + assertThat(actualAssignment).containsExactlyInAnyOrderElementsOf(expectedAssignment); // mock tiering fail by send tiering fail event context.getSplitsAssignmentSequence().clear(); @@ -625,7 +620,7 @@ void testHandleFailedTieringTableEvent() throws Throwable { List actualAssignment1 = new ArrayList<>(); context.getSplitsAssignmentSequence() .forEach(a -> a.assignment().values().forEach(actualAssignment1::addAll)); - assertThat(actualAssignment1).isEqualTo(expectedAssignment); + assertThat(actualAssignment1).containsExactlyInAnyOrderElementsOf(expectedAssignment); } } @@ -643,7 +638,7 @@ void testHandleFailOverEvent() throws Throwable { try (MockSplitEnumeratorContext context = new MockSplitEnumeratorContext<>(numSubtasks)) { TieringSourceEnumerator enumerator = - new TieringSourceEnumerator(flussConf, context, 500); + createDefaultTieringSourceEnumerator(flussConf, context); enumerator.start(); assertThat(context.getSplitsAssignmentSequence()).isEmpty(); @@ -719,12 +714,6 @@ private void waitUntilTieringTableSplitAssignmentReady( } } - private static List sortSplits(List splits) { - return splits.stream() - .sorted(Comparator.comparing(Object::toString)) - .collect(Collectors.toList()); - } - private void verifyTieringSplitAssignment( MockSplitEnumeratorContext context, int expectedSplitSize, @@ -743,4 +732,99 @@ private void verifyTieringSplitAssignment( assertThat(allTieringSplits) .allMatch(tieringSplit -> tieringSplit.getTablePath().equals(expectedTablePath)); } + + private TieringSourceEnumerator createDefaultTieringSourceEnumerator( + Configuration flussConf, MockSplitEnumeratorContext context) { + return new TieringSourceEnumerator( + flussConf, + context, + 500, + Duration.ofMinutes(10).toMillis(), + Duration.ofSeconds(10).toMillis()); + } + + private TieringSourceEnumerator createTieringSourceEnumeratorWithManualClock( + Configuration flussConf, + MockSplitEnumeratorContext context, + ManualClock clock, + long tieringTableDurationMaxMs, + long tieringTableDurationDetectIntervalMs) { + return new TieringSourceEnumerator( + flussConf, + context, + 500, + tieringTableDurationMaxMs, + tieringTableDurationDetectIntervalMs, + clock); + } + + @Test + void testTableReachMaxTieringDuration() throws Throwable { + TablePath tablePath = TablePath.of(DEFAULT_DB, "tiering-max-duration-test-log-table"); + long tableId = createTable(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR); + int numSubtasks = 2; + + long tieringTableDurationMaxMs = Duration.ofMinutes(10).toMillis(); + long tieringTableDurationDetectIntervalMs = Duration.ofMillis(100).toMillis(); + + ManualClock manualClock = new ManualClock(); + + try (MockSplitEnumeratorContext context = + new MockSplitEnumeratorContext<>(numSubtasks); + TieringSourceEnumerator enumerator = + createTieringSourceEnumeratorWithManualClock( + flussConf, + context, + manualClock, + tieringTableDurationMaxMs, + tieringTableDurationDetectIntervalMs); ) { + enumerator.start(); + + // Register all readers + for (int subtaskId = 0; subtaskId < numSubtasks; subtaskId++) { + registerReader(context, enumerator, subtaskId, "localhost-" + subtaskId); + } + + appendRow(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR, 0, 10); + + for (int subTask = 0; subTask < numSubtasks; subTask++) { + enumerator.handleSplitRequest(subTask, "localhost-" + subTask); + } + + // Wait for initial assignment + waitUntilTieringTableSplitAssignmentReady(context, 2, 200L); + + // Advance time to mock exceed max duration + manualClock.advanceTime(Duration.ofMillis(tieringTableDurationMaxMs + 60_000)); + + // Run periodic callable to trigger max duration check + // Index 0 is for requestTieringTableSplitsViaHeartBeat + // Index 1 is for checkTableReachMaxTieringDuration + context.runPeriodicCallable(1); + + // Verify that TieringReachMaxDurationEvent was sent to all readers + // Use reflection to access events sent to readers + Map> eventsToReaders = context.getSentSourceEvent(); + assertThat(eventsToReaders).hasSize(numSubtasks); + for (Map.Entry> entry : eventsToReaders.entrySet()) { + assertThat(entry.getValue()) + .containsExactly(new TieringReachMaxDurationEvent(tableId)); + } + + // clear split assignment + context.getSplitsAssignmentSequence().clear(); + + // request a split again + enumerator.handleSplitRequest(0, "localhost-0"); + + // the split should be marked as forceIgnore + waitUntilTieringTableSplitAssignmentReady(context, 1, 100L); + + List assignedSplits = new ArrayList<>(); + context.getSplitsAssignmentSequence() + .forEach(a -> a.assignment().values().forEach(assignedSplits::addAll)); + assertThat(assignedSplits).hasSize(1); + assertThat(assignedSplits.get(0).isForceIgnore()).isTrue(); + } + } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializerTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializerTest.java index 33356dec40..6c4774870a 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializerTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializerTest.java @@ -20,6 +20,7 @@ import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -68,8 +69,8 @@ void testTieringSnapshotSplitStringExpression(Boolean isPartitionedTable) throws String expectedSplitString = isPartitionedTable - ? "TieringSnapshotSplit{tablePath=test_db.test_partitioned_table, tableBucket=TableBucket{tableId=1, partitionId=100, bucket=2}, partitionName='1024', snapshotId=0, logOffsetOfSnapshot=200, numberOfSplits=30}" - : "TieringSnapshotSplit{tablePath=test_db.test_table, tableBucket=TableBucket{tableId=1, bucket=2}, partitionName='null', snapshotId=0, logOffsetOfSnapshot=200, numberOfSplits=30}"; + ? "TieringSnapshotSplit{tablePath=test_db.test_partitioned_table, tableBucket=TableBucket{tableId=1, partitionId=100, bucket=2}, partitionName='1024', numberOfSplits=30, forceIgnore=false, snapshotId=0, logOffsetOfSnapshot=200}" + : "TieringSnapshotSplit{tablePath=test_db.test_table, tableBucket=TableBucket{tableId=1, bucket=2}, partitionName='null', numberOfSplits=30, forceIgnore=false, snapshotId=0, logOffsetOfSnapshot=200}"; assertThat(new TieringSnapshotSplit(path, bucket, partitionName, 0L, 200L, 30).toString()) .isEqualTo(expectedSplitString); } @@ -102,9 +103,52 @@ void testTieringLogSplitStringExpression(Boolean isPartitionedTable) throws Exce String expectedSplitString = isPartitionedTable - ? "TieringLogSplit{tablePath=test_db.test_partitioned_table, tableBucket=TableBucket{tableId=1, partitionId=100, bucket=2}, partitionName='1024', startingOffset=100, stoppingOffset=200, numberOfSplits=2}" - : "TieringLogSplit{tablePath=test_db.test_table, tableBucket=TableBucket{tableId=1, bucket=2}, partitionName='null', startingOffset=100, stoppingOffset=200, numberOfSplits=2}"; + ? "TieringLogSplit{tablePath=test_db.test_partitioned_table, tableBucket=TableBucket{tableId=1, partitionId=100, bucket=2}, partitionName='1024', numberOfSplits=2, forceIgnore=false, startingOffset=100, stoppingOffset=200}" + : "TieringLogSplit{tablePath=test_db.test_table, tableBucket=TableBucket{tableId=1, bucket=2}, partitionName='null', numberOfSplits=2, forceIgnore=false, startingOffset=100, stoppingOffset=200}"; assertThat(new TieringLogSplit(path, bucket, partitionName, 100, 200, 2).toString()) .isEqualTo(expectedSplitString); } + + @Test + void testForceIgnoreSerde() throws Exception { + // Test TieringSnapshotSplit with forceIgnore set at creation + TieringSnapshotSplit snapshotSplitWithForceIgnore = + new TieringSnapshotSplit(tablePath, tableBucket, null, 0L, 200L, 10, true); + byte[] serialized = serializer.serialize(snapshotSplitWithForceIgnore); + TieringSnapshotSplit deserializedSnapshotSplit = + (TieringSnapshotSplit) serializer.deserialize(serializer.getVersion(), serialized); + assertThat(deserializedSnapshotSplit).isEqualTo(snapshotSplitWithForceIgnore); + + // Test TieringLogSplit with forceIgnore set at creation + TieringLogSplit logSplitWithForceIgnore = + new TieringLogSplit(tablePath, tableBucket, null, 100, 200, true, 40); + serialized = serializer.serialize(logSplitWithForceIgnore); + TieringLogSplit deserializedLogSplit = + (TieringLogSplit) serializer.deserialize(serializer.getVersion(), serialized); + assertThat(deserializedLogSplit).isEqualTo(logSplitWithForceIgnore); + + // Test TieringSnapshotSplit with forceIgnore set after creation + TieringSnapshotSplit snapshotSplit = + new TieringSnapshotSplit(tablePath, tableBucket, null, 0L, 200L, 10, false); + assertThat(snapshotSplit.isForceIgnore()).isFalse(); + snapshotSplit.forceIgnore(); + assertThat(snapshotSplit.isForceIgnore()).isTrue(); + + serialized = serializer.serialize(snapshotSplit); + deserializedSnapshotSplit = + (TieringSnapshotSplit) serializer.deserialize(serializer.getVersion(), serialized); + assertThat(deserializedSnapshotSplit).isEqualTo(snapshotSplit); + + // Test TieringLogSplit with forceIgnore set after creation + TieringLogSplit logSplit = + new TieringLogSplit(tablePath, tableBucket, null, 100, 200, false, 40); + assertThat(logSplit.isForceIgnore()).isFalse(); + logSplit.forceIgnore(); + assertThat(logSplit.isForceIgnore()).isTrue(); + + serialized = serializer.serialize(logSplit); + deserializedLogSplit = + (TieringLogSplit) serializer.deserialize(serializer.getVersion(), serialized); + assertThat(deserializedLogSplit).isEqualTo(logSplit); + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java b/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java index aa030d0256..d21c7174ce 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java @@ -17,23 +17,42 @@ package org.apache.fluss.server.lakehouse; +import org.apache.fluss.config.ConfigOption; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.exception.TableNotExistException; +import org.apache.fluss.lake.committer.CommittedLakeSnapshot; +import org.apache.fluss.lake.committer.CommitterInitContext; +import org.apache.fluss.lake.committer.LakeCommitter; import org.apache.fluss.lake.lakestorage.LakeCatalog; import org.apache.fluss.lake.lakestorage.LakeStorage; import org.apache.fluss.lake.lakestorage.LakeStoragePlugin; +import org.apache.fluss.lake.serializer.SimpleVersionedSerializer; import org.apache.fluss.lake.source.LakeSource; import org.apache.fluss.lake.writer.LakeTieringFactory; +import org.apache.fluss.lake.writer.LakeWriter; +import org.apache.fluss.lake.writer.WriterInitContext; import org.apache.fluss.metadata.DataLakeFormat; import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.record.LogRecord; +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.fluss.config.ConfigBuilder.key; + /** A plugin of paimon just for testing purpose. */ public class TestingPaimonStoragePlugin implements LakeStoragePlugin { @@ -54,7 +73,7 @@ public static class TestingPaimonLakeStorage implements LakeStorage { @Override public LakeTieringFactory createLakeTieringFactory() { - throw new UnsupportedOperationException("createLakeTieringFactory is not supported."); + return new TestingPaimonTieringFactory(); } @Override @@ -96,4 +115,154 @@ public TableDescriptor getTable(TablePath tablePath) { return tableByPath.get(tablePath); } } + + private static class TestingPaimonTieringFactory + implements LakeTieringFactory { + + @Override + public LakeWriter createLakeWriter( + WriterInitContext writerInitContext) { + return new TestingPaimonWriter(writerInitContext.tableInfo()); + } + + @Override + public SimpleVersionedSerializer getWriteResultSerializer() { + return new TestingPaimonWriteResultSerializer(); + } + + @Override + public LakeCommitter createLakeCommitter( + CommitterInitContext committerInitContext) throws IOException { + return new TestingPaimonCommitter(); + } + + @Override + public SimpleVersionedSerializer getCommittableSerializer() { + return new SimpleVersionedSerializer() { + @Override + public int getVersion() { + return 0; + } + + @Override + public byte[] serialize(TestPaimonCommittable obj) throws IOException { + return new byte[0]; + } + + @Override + public TestPaimonCommittable deserialize(int version, byte[] serialized) + throws IOException { + return new TestPaimonCommittable(); + } + }; + } + } + + private static class TestingPaimonWriter implements LakeWriter { + + static ConfigOption writePauseOption = + key("write-pause").durationType().noDefaultValue(); + + private int writtenRecords = 0; + private final Duration writePause; + + private TestingPaimonWriter(TableInfo tableInfo) { + this.writePause = tableInfo.getCustomProperties().get(writePauseOption); + } + + @Override + public void write(LogRecord record) throws IOException { + try { + if (writePause != null) { + Thread.sleep(writePause.toMillis()); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while pausing before write", e); + } + writtenRecords += 1; + } + + @Override + public TestingPaimonWriteResult complete() throws IOException { + return new TestingPaimonWriteResult(writtenRecords); + } + + @Override + public void close() throws IOException { + // do nothing + } + } + + private static class TestingPaimonWriteResult { + private final int writtenRecords; + + public TestingPaimonWriteResult(int writtenRecords) { + this.writtenRecords = writtenRecords; + } + } + + private static class TestingPaimonWriteResultSerializer + implements SimpleVersionedSerializer { + + @Override + public int getVersion() { + return 1; + } + + @Override + public byte[] serialize(TestingPaimonWriteResult result) throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos)) { + oos.writeInt(result.writtenRecords); + return baos.toByteArray(); + } + } + + @Override + public TestingPaimonWriteResult deserialize(int version, byte[] serialized) + throws IOException { + try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); + ObjectInputStream ois = new ObjectInputStream(bais)) { + return new TestingPaimonWriteResult(ois.readInt()); + } + } + } + + private static class TestPaimonCommittable {} + + private static class TestingPaimonCommitter + implements LakeCommitter { + + @Override + public TestPaimonCommittable toCommittable( + List testingPaimonWriteResults) throws IOException { + return new TestPaimonCommittable(); + } + + @Override + public long commit( + TestPaimonCommittable committable, Map snapshotProperties) + throws IOException { + // do nothing, and always return 1 as committed snapshot + return 1; + } + + @Override + public void abort(TestPaimonCommittable committable) throws IOException { + // do nothing + } + + @Nullable + @Override + public CommittedLakeSnapshot getMissingLakeSnapshot( + @Nullable Long latestLakeSnapshotIdOfFluss) throws IOException { + return null; + } + + @Override + public void close() throws Exception { + // do nothing + } + } } diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index 7f0a4cc9b1..b11c232212 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -407,6 +407,7 @@ org.apache.fluss.flink.tiering.source.TieringWriterInitContext org.apache.fluss.flink.tiering.source.TieringSourceReader + org.apache.fluss.flink.tiering.source.TieringSourceFetcherManager org.apache.fluss.flink.tiering.source.TableBucketWriteResultEmitter