diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java index f4be12593a..3b9f29534a 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java @@ -17,6 +17,7 @@ package org.apache.fluss.flink.tiering.committer; +import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.client.metadata.MetadataUpdater; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; @@ -37,6 +38,10 @@ /** Committer to commit {@link FlussTableLakeSnapshot} of lake to Fluss. */ public class FlussTableLakeSnapshotCommitter implements AutoCloseable { + // current version for commit lake snapshot is 2, + // coordinator should use v2 to serialize lake snapshot + private static final int CURRENT_VERSION_FOR_COMMIT_LAKE_SNAPSHOT = 2; + private final Configuration flussConf; private CoordinatorGateway coordinatorGateway; @@ -63,6 +68,7 @@ void commit(FlussTableLakeSnapshot flussTableLakeSnapshot) throws IOException { try { CommitLakeTableSnapshotRequest request = toCommitLakeTableSnapshotRequest(flussTableLakeSnapshot); + request.setLakeSnapshotSerializationVersion(CURRENT_VERSION_FOR_COMMIT_LAKE_SNAPSHOT); coordinatorGateway.commitLakeTableSnapshot(request).get(); } catch (Exception e) { throw new IOException( @@ -84,7 +90,8 @@ public void commit(long tableId, long snapshotId, Map logEndO commit(flussTableLakeSnapshot); } - private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest( + @VisibleForTesting + static CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest( FlussTableLakeSnapshot flussTableLakeSnapshot) { CommitLakeTableSnapshotRequest commitLakeTableSnapshotRequest = new CommitLakeTableSnapshotRequest(); @@ -106,6 +113,11 @@ private CommitLakeTableSnapshotRequest toCommitLakeTableSnapshotRequest( return commitLakeTableSnapshotRequest; } + @VisibleForTesting + CoordinatorGateway getCoordinatorGateway() { + return coordinatorGateway; + } + @Override public void close() throws Exception { if (rpcClient != null) { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java index f8d71e6cf0..cc8b647670 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java @@ -21,18 +21,25 @@ import org.apache.fluss.flink.utils.FlinkTestBase; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.ZkData; +import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot; +import org.apache.fluss.server.zk.data.lake.LakeTableSnapshotJsonSerde; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.fluss.utils.types.Tuple2; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; +import static org.apache.fluss.flink.tiering.committer.FlussTableLakeSnapshotCommitter.toCommitLakeTableSnapshotRequest; import static org.apache.fluss.record.TestData.DATA1_PARTITIONED_TABLE_DESCRIPTOR; import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR; import static org.assertj.core.api.Assertions.assertThat; @@ -61,44 +68,94 @@ void afterEach() throws Exception { void testCommit(boolean isPartitioned) throws Exception { TablePath tablePath = TablePath.of("fluss", "test_commit" + (isPartitioned ? "_partitioned" : "")); - long tableId = - createTable( - tablePath, - isPartitioned - ? DATA1_PARTITIONED_TABLE_DESCRIPTOR - : DATA1_TABLE_DESCRIPTOR); + Tuple2> tableIdAndPartitions = createTable(tablePath, isPartitioned); + long tableId = tableIdAndPartitions.f0; + Collection partitions = tableIdAndPartitions.f1; - List partitions; - Map partitionNameAndIds = new HashMap<>(); - if (!isPartitioned) { - FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId); - partitions = Collections.singletonList(null); - } else { - partitionNameAndIds = FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath); - partitions = new ArrayList<>(partitionNameAndIds.keySet()); + Map logEndOffsets = mockLogEndOffsets(tableId, partitions); + + long snapshotId = 3; + // commit offsets + flussTableLakeSnapshotCommitter.commit(tableId, snapshotId, logEndOffsets); + LakeSnapshot lakeSnapshot = admin.getLatestLakeSnapshot(tablePath).get(); + assertThat(lakeSnapshot.getSnapshotId()).isEqualTo(3); + + // get and check the offsets + Map bucketLogOffsets = lakeSnapshot.getTableBucketsOffset(); + assertThat(bucketLogOffsets).isEqualTo(logEndOffsets); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testCompatibilityWithoutSerializationVersion(boolean isPartitioned) throws Exception { + TablePath tablePath = + TablePath.of( + "fluss", + "test_legacy_version_commit" + (isPartitioned ? "_partitioned" : "")); + Tuple2> tableIdAndPartitions = createTable(tablePath, isPartitioned); + long tableId = tableIdAndPartitions.f0; + Collection partitions = tableIdAndPartitions.f1; + + Map logEndOffsets = mockLogEndOffsets(tableId, partitions); + + long snapshotId = 3; + // commit offsets + FlussTableLakeSnapshot flussTableLakeSnapshot = + new FlussTableLakeSnapshot(tableId, snapshotId); + for (Map.Entry entry : logEndOffsets.entrySet()) { + flussTableLakeSnapshot.addBucketOffset(entry.getKey(), entry.getValue()); } + // not set commit lake snapshot version to mock old version behavior + flussTableLakeSnapshotCommitter + .getCoordinatorGateway() + .commitLakeTableSnapshot(toCommitLakeTableSnapshotRequest(flussTableLakeSnapshot)) + .get(); + + // test deserialize with old version deserializer + ZooKeeperClient zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(); + // read the the json node from lake table node + JsonNode jsonNode = + new ObjectMapper() + .readTree(zkClient.getOrEmpty(ZkData.LakeTableZNode.path(tableId)).get()); + LakeTableSnapshot lakeTableSnapshot = + LakeTableSnapshotJsonSerde.INSTANCE.deserializeVersion1(jsonNode); + + // verify the deserialized lakeTableSnapshot + assertThat(lakeTableSnapshot.getSnapshotId()).isEqualTo(3); + assertThat(lakeTableSnapshot.getBucketLogEndOffset()).isEqualTo(logEndOffsets); + } + + private Map mockLogEndOffsets(long tableId, Collection partitionsIds) { Map logEndOffsets = new HashMap<>(); for (int bucket = 0; bucket < 3; bucket++) { long bucketOffset = bucket * bucket; - for (String partitionName : partitions) { - if (partitionName == null) { + for (Long partitionId : partitionsIds) { + if (partitionId == null) { logEndOffsets.put(new TableBucket(tableId, bucket), bucketOffset); } else { - long partitionId = partitionNameAndIds.get(partitionName); logEndOffsets.put(new TableBucket(tableId, partitionId, bucket), bucketOffset); } } } + return logEndOffsets; + } - long snapshotId = 3; - // commit offsets - flussTableLakeSnapshotCommitter.commit(tableId, snapshotId, logEndOffsets); - LakeSnapshot lakeSnapshot = admin.getLatestLakeSnapshot(tablePath).get(); - assertThat(lakeSnapshot.getSnapshotId()).isEqualTo(3); - - // get and check the offsets - Map bucketLogOffsets = lakeSnapshot.getTableBucketsOffset(); - assertThat(bucketLogOffsets).isEqualTo(logEndOffsets); + private Tuple2> createTable(TablePath tablePath, boolean isPartitioned) + throws Exception { + long tableId = + createTable( + tablePath, + isPartitioned + ? DATA1_PARTITIONED_TABLE_DESCRIPTOR + : DATA1_TABLE_DESCRIPTOR); + Collection partitions; + if (!isPartitioned) { + partitions = Collections.singletonList(null); + FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId); + } else { + partitions = FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath).values(); + } + return new Tuple2<>(tableId, partitions); } } diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index d2917734ce..2accea59c5 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -451,6 +451,19 @@ message NotifyRemoteLogOffsetsResponse { message CommitLakeTableSnapshotRequest { repeated PbLakeTableSnapshotInfo tables_req = 1; + // The version number for serializing lake_snapshot. This field tells the coordinator server + // which version to use when serializing the lake snapshot data. + // + // Legacy tiering services (before this field was introduced) do not set this field. This field + // is primarily used to handle compatibility during Fluss upgrades: + // + // - During upgrade: Fluss may use a new serialization format, but some tablet servers may not + // have been upgraded yet and cannot deserialize the new format. The coordinator server can + // check this field to determine which serialization format to use. + // - After upgrade: Once all Fluss components are upgraded, tiering services can be updated to + // use the new format. The coordinator server will recognize this field and use the new + // serialization method, and all tablet servers will be able to deserialize the new format. + optional int32 lake_snapshot_serialization_version = 2; } message PbLakeTableSnapshotInfo { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java index 4a6fd89f06..e90b3001eb 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java @@ -1260,10 +1260,14 @@ private void tryProcessCommitLakeTableSnapshot( + tableId + " not found in coordinator context."); } - - // this involves IO operation (ZK), so we do it in ioExecutor - lakeTableHelper.upsertLakeTable( - tableId, tablePath, lakeTableSnapshotEntry.getValue()); + if (commitLakeTableSnapshotData.getSerializationVersion() == null) { + lakeTableHelper.upsertLakeTableV1( + tableId, lakeTableSnapshotEntry.getValue()); + } else { + // this involves IO operation (ZK), so we do it in ioExecutor + lakeTableHelper.upsertLakeTable( + tableId, tablePath, lakeTableSnapshotEntry.getValue()); + } } catch (Exception e) { ApiError error = ApiError.fromThrowable(e); tableResp.setError(error.error().code(), error.message()); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/entity/CommitLakeTableSnapshotData.java b/fluss-server/src/main/java/org/apache/fluss/server/entity/CommitLakeTableSnapshotData.java index 21ac1b340f..abd209848f 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/entity/CommitLakeTableSnapshotData.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/entity/CommitLakeTableSnapshotData.java @@ -21,6 +21,8 @@ import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest; import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot; +import javax.annotation.Nullable; + import java.util.Map; import java.util.Objects; @@ -30,11 +32,17 @@ public class CommitLakeTableSnapshotData { private final Map lakeTableSnapshots; private final Map tableBucketsMaxTieredTimestamp; + // the serialization version for lake table snapshot, will be null + // before 0.8 + private final Integer serializationVersion; + public CommitLakeTableSnapshotData( Map lakeTableSnapshots, - Map tableBucketsMaxTieredTimestamp) { + Map tableBucketsMaxTieredTimestamp, + @Nullable Integer serializationVersion) { this.lakeTableSnapshots = lakeTableSnapshots; this.tableBucketsMaxTieredTimestamp = tableBucketsMaxTieredTimestamp; + this.serializationVersion = serializationVersion; } public Map getLakeTableSnapshot() { @@ -45,6 +53,10 @@ public Map getTableBucketsMaxTieredTimestamp() { return tableBucketsMaxTieredTimestamp; } + public Integer getSerializationVersion() { + return serializationVersion; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -56,12 +68,14 @@ public boolean equals(Object o) { CommitLakeTableSnapshotData that = (CommitLakeTableSnapshotData) o; return Objects.equals(lakeTableSnapshots, that.lakeTableSnapshots) && Objects.equals( - tableBucketsMaxTieredTimestamp, that.tableBucketsMaxTieredTimestamp); + tableBucketsMaxTieredTimestamp, that.tableBucketsMaxTieredTimestamp) + && Objects.equals(serializationVersion, that.serializationVersion); } @Override public int hashCode() { - return Objects.hash(lakeTableSnapshots, tableBucketsMaxTieredTimestamp); + return Objects.hash( + lakeTableSnapshots, tableBucketsMaxTieredTimestamp, serializationVersion); } @Override @@ -71,6 +85,8 @@ public String toString() { + lakeTableSnapshots + ", tableBucketsMaxTieredTimestamp=" + tableBucketsMaxTieredTimestamp + + ", serializationVersion=" + + serializationVersion + '}'; } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java index ed1051af94..c0b645d918 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java @@ -1574,7 +1574,13 @@ public static CommitLakeTableSnapshotData getCommitLakeTableSnapshotData( lakeTableInfoByTableId.put( tableId, new LakeTableSnapshot(snapshotId, bucketLogEndOffset)); } - return new CommitLakeTableSnapshotData(lakeTableInfoByTableId, tableBucketsMaxTimestamp); + + Integer serializationVersion = + request.hasLakeSnapshotSerializationVersion() + ? request.getLakeSnapshotSerializationVersion() + : null; + return new CommitLakeTableSnapshotData( + lakeTableInfoByTableId, tableBucketsMaxTimestamp, serializationVersion); } public static PbNotifyLakeTableOffsetReqForBucket makeNotifyLakeTableOffsetForBucket( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java index dfc2c004d0..ee04a155fb 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java @@ -1027,9 +1027,15 @@ public Optional getRemoteLogManifestHandle(TableBucket } /** Upsert the {@link LakeTable} to Zk Node. */ - public void upsertLakeTable(long tableId, LakeTable lakeTable, boolean isUpdate) + public void upsertLakeTable( + long tableId, LakeTable lakeTable, boolean isUpdate, boolean isLegacyVersion) throws Exception { - byte[] zkData = LakeTableZNode.encode(lakeTable); + byte[] zkData; + if (isLegacyVersion) { + zkData = LakeTableZNode.encodeV1(tableId, lakeTable); + } else { + zkData = LakeTableZNode.encode(lakeTable); + } String zkPath = LakeTableZNode.path(tableId); if (isUpdate) { zkClient.setData().forPath(zkPath, zkData); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java index bb33e17ff5..a00fab8b3b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java @@ -31,6 +31,7 @@ import javax.annotation.Nullable; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Map; @@ -589,6 +590,14 @@ public static String path(long tableId) { return TableIdZNode.path(tableId) + "/laketable"; } + /** + * Encodes a {@link LakeTable} to JSON bytes using Version 1 format (legacy) for storage in + * ZooKeeper. + */ + public static byte[] encodeV1(long tableId, LakeTable lakeTable) throws IOException { + return LakeTableJsonSerde.serializeV1(tableId, lakeTable); + } + /** * Encodes a LakeTable to JSON bytes for storage in ZK. * diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTable.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTable.java index 5df57acd57..661c063f01 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTable.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTable.java @@ -112,7 +112,7 @@ public List getLakeSnapshotMetadatas() { * * @return the LakeTableSnapshot */ - public LakeTableSnapshot getLatestTableSnapshot() throws Exception { + public LakeTableSnapshot getLatestTableSnapshot() throws IOException { if (lakeTableSnapshot != null) { return lakeTableSnapshot; } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java index 8031b9ddeb..c5514b26b5 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java @@ -43,6 +43,26 @@ public LakeTableHelper(ZooKeeperClient zkClient, String remoteDataDir) { this.remoteDataDir = remoteDataDir; } + /** + * Upserts a lake table snapshot for the given table, stored in v1 format. + * + * @param tableId the table ID + * @param lakeTableSnapshot the new snapshot to upsert + * @throws Exception if the operation fails + */ + public void upsertLakeTableV1(long tableId, LakeTableSnapshot lakeTableSnapshot) + throws Exception { + Optional optPreviousLakeTable = zkClient.getLakeTable(tableId); + // Merge with previous snapshot if exists + if (optPreviousLakeTable.isPresent()) { + lakeTableSnapshot = + mergeLakeTable( + optPreviousLakeTable.get().getLatestTableSnapshot(), lakeTableSnapshot); + } + zkClient.upsertLakeTable( + tableId, new LakeTable(lakeTableSnapshot), optPreviousLakeTable.isPresent(), true); + } + /** * Upserts a lake table snapshot for the given table. * @@ -87,7 +107,7 @@ public void upsertLakeTable( // metadata LakeTable lakeTable = new LakeTable(lakeSnapshotMetadata); try { - zkClient.upsertLakeTable(tableId, lakeTable, optPreviousLakeTable.isPresent()); + zkClient.upsertLakeTable(tableId, lakeTable, optPreviousLakeTable.isPresent(), false); } catch (Exception e) { LOG.warn("Failed to upsert lake table snapshot to zk.", e); // discard the new lake snapshot metadata diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableJsonSerde.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableJsonSerde.java index f272361bc6..f195997fd7 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableJsonSerde.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableJsonSerde.java @@ -85,6 +85,17 @@ public void serialize(LakeTable lakeTable, JsonGenerator generator) throws IOExc generator.writeEndObject(); } + /** + * Serializes a {@link LakeTable} to JSON bytes using Version 1 format (legacy). + * + *

This method is used for backward compatibility when the coordinator receives a commit + * request without a serialization version (from legacy tiering services). + */ + public static byte[] serializeV1(long tableId, LakeTable lakeTable) throws IOException { + return LakeTableSnapshotJsonSerde.toJsonVersion1( + lakeTable.getLatestTableSnapshot(), tableId); + } + @Override public LakeTable deserialize(JsonNode node) { int version = node.get(VERSION_KEY).asInt(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableSnapshotJsonSerde.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableSnapshotJsonSerde.java index 2543dff59e..5730ff0679 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableSnapshotJsonSerde.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableSnapshotJsonSerde.java @@ -18,7 +18,6 @@ package org.apache.fluss.server.zk.data.lake; -import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; @@ -173,7 +172,7 @@ public LakeTableSnapshot deserialize(JsonNode node) { } /** Deserialize Version 1 format (legacy). */ - private LakeTableSnapshot deserializeVersion1(JsonNode node) { + public LakeTableSnapshot deserializeVersion1(JsonNode node) { long snapshotId = node.get(SNAPSHOT_ID).asLong(); long tableId = node.get(TABLE_ID).asLong(); Iterator buckets = node.get(BUCKETS).elements(); @@ -263,7 +262,6 @@ public static byte[] toJson(LakeTableSnapshot lakeTableSnapshot) { } /** Serialize the {@link LakeTableSnapshot} to json bytes using Version 1 format. */ - @VisibleForTesting public static byte[] toJsonVersion1(LakeTableSnapshot lakeTableSnapshot, long tableId) { return JsonSerdeUtils.writeValueAsBytes(lakeTableSnapshot, new Version1Serializer(tableId)); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/zk/data/lake/LakeTableHelperTest.java b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/lake/LakeTableHelperTest.java index 6dd8497d58..28939e0796 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/zk/data/lake/LakeTableHelperTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/lake/LakeTableHelperTest.java @@ -30,7 +30,6 @@ import org.apache.fluss.server.zk.ZooKeeperExtension; import org.apache.fluss.server.zk.ZooKeeperUtils; import org.apache.fluss.server.zk.data.TableRegistration; -import org.apache.fluss.server.zk.data.ZkData; import org.apache.fluss.testutils.common.AllCallbackWrapper; import org.junit.jupiter.api.AfterAll; @@ -110,16 +109,9 @@ void testUpsertLakeTableCompatible(@TempDir Path tempDir) throws Exception { LakeTableSnapshot lakeTableSnapshot = new LakeTableSnapshot(snapshotId, bucketLogEndOffset); - // Write version 1 format data directly to ZK (simulating old system behavior) - String zkPath = ZkData.LakeTableZNode.path(tableId); - byte[] version1Data = - LakeTableSnapshotJsonSerde.toJsonVersion1(lakeTableSnapshot, tableId); - zooKeeperClient - .getCuratorClient() - .create() - .creatingParentsIfNeeded() - .forPath(zkPath, version1Data); + // Write version 1 format (simulating old system behavior) + lakeTableHelper.upsertLakeTableV1(tableId, lakeTableSnapshot); // Verify version 1 data can be read Optional optionalLakeTable = zooKeeperClient.getLakeTable(tableId); assertThat(optionalLakeTable).isPresent();