diff --git a/docs/content/maintenance/rescale-bucket.md b/docs/content/maintenance/rescale-bucket.md
index 96d6e1ee86e5..22b134c2f307 100644
--- a/docs/content/maintenance/rescale-bucket.md
+++ b/docs/content/maintenance/rescale-bucket.md
@@ -66,10 +66,6 @@ Please note that
```
- During overwrite period, make sure there are no other jobs writing the same table/partition.
-{{< hint info >}}
-__Note:__ For the table which enables log system(*e.g.* Kafka), please rescale the topic's partition as well to keep consistency.
-{{< /hint >}}
-
## Use Case
Rescale bucket helps to handle sudden spikes in throughput. Suppose there is a daily streaming ETL task to sync transaction data. The table's DDL and pipeline
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 23491cc468a7..f4e0a6d1cd43 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -1272,12 +1272,6 @@
Boolean |
Whether to read the delta from append table's overwrite commit in streaming mode. |
-
- streaming-read-mode |
- (none) |
- Enum |
- The mode of streaming read that specifies to read the data of table file or log.
Possible values:- "log": Read from the data of table log store.
- "file": Read from the data of table file store.
|
-
streaming-read-overwrite |
false |
diff --git a/docs/layouts/shortcodes/generated/kafka_log_configuration.html b/docs/layouts/shortcodes/generated/kafka_log_configuration.html
deleted file mode 100644
index 663ec9848872..000000000000
--- a/docs/layouts/shortcodes/generated/kafka_log_configuration.html
+++ /dev/null
@@ -1,42 +0,0 @@
-{{/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/}}
-
-
-
- | Key |
- Default |
- Type |
- Description |
-
-
-
-
- kafka.bootstrap.servers |
- (none) |
- String |
- Required Kafka server connection string. |
-
-
- kafka.topic |
- (none) |
- String |
- Topic of this kafka table. |
-
-
-
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index a5e0e1919e4f..78270d7523bc 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -21,7 +21,6 @@
import org.apache.paimon.annotation.Documentation;
import org.apache.paimon.annotation.Documentation.ExcludeFromDocumentation;
import org.apache.paimon.annotation.Documentation.Immutable;
-import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.fs.Path;
@@ -969,42 +968,6 @@ public InlineElement getDescription() {
.withDescription(
"The delay duration of stream read when scan incremental snapshots.");
- @ExcludeFromDocumentation("Confused without log system")
- public static final ConfigOption LOG_CONSISTENCY =
- key("log.consistency")
- .enumType(LogConsistency.class)
- .defaultValue(LogConsistency.TRANSACTIONAL)
- .withDescription("Specify the log consistency mode for table.");
-
- @ExcludeFromDocumentation("Confused without log system")
- public static final ConfigOption LOG_CHANGELOG_MODE =
- key("log.changelog-mode")
- .enumType(LogChangelogMode.class)
- .defaultValue(LogChangelogMode.AUTO)
- .withDescription("Specify the log changelog mode for table.");
-
- @ExcludeFromDocumentation("Confused without log system")
- public static final ConfigOption LOG_KEY_FORMAT =
- key("log.key.format")
- .stringType()
- .defaultValue("json")
- .withDescription(
- "Specify the key message format of log system with primary key.");
-
- @ExcludeFromDocumentation("Confused without log system")
- public static final ConfigOption LOG_FORMAT =
- key("log.format")
- .stringType()
- .defaultValue("debezium-json")
- .withDescription("Specify the message format of log system.");
-
- @ExcludeFromDocumentation("Confused without log system")
- public static final ConfigOption LOG_IGNORE_DELETE =
- key("log.ignore-delete")
- .booleanType()
- .defaultValue(false)
- .withDescription("Specify whether the log system ignores delete records.");
-
public static final ConfigOption AUTO_CREATE =
key("auto-create")
.booleanType()
@@ -1272,13 +1235,6 @@ public InlineElement getDescription() {
"Only used to force TableScan to construct suitable 'StartingUpScanner' and 'FollowUpScanner' "
+ "dedicated internal streaming scan.");
- public static final ConfigOption STREAMING_READ_MODE =
- key("streaming-read-mode")
- .enumType(StreamingReadMode.class)
- .noDefaultValue()
- .withDescription(
- "The mode of streaming read that specifies to read the data of table file or log.");
-
@ExcludeFromDocumentation("Internal use only")
public static final ConfigOption BATCH_SCAN_MODE =
key("batch-scan-mode")
@@ -2770,10 +2726,6 @@ public boolean scanPlanSortPartition() {
}
public StartupMode startupMode() {
- return startupMode(options);
- }
-
- public static StartupMode startupMode(Options options) {
StartupMode mode = options.get(SCAN_MODE);
if (mode == StartupMode.DEFAULT) {
if (options.getOptional(SCAN_TIMESTAMP_MILLIS).isPresent()
@@ -3001,10 +2953,6 @@ public Integer fullCompactionDeltaCommits() {
return options.get(FULL_COMPACTION_DELTA_COMMITS);
}
- public static StreamingReadMode streamReadType(Options options) {
- return options.get(STREAMING_READ_MODE);
- }
-
public Duration consumerExpireTime() {
return options.get(CONSUMER_EXPIRATION_TIME);
}
@@ -3471,67 +3419,6 @@ public InlineElement getDescription() {
}
}
- /** Specifies the log consistency mode for table. */
- public enum LogConsistency implements DescribedEnum {
- TRANSACTIONAL(
- "transactional",
- "Only the data after the checkpoint can be seen by readers, the latency depends on checkpoint interval."),
-
- EVENTUAL(
- "eventual",
- "Immediate data visibility, you may see some intermediate states, "
- + "but eventually the right results will be produced, only works for table with primary key.");
-
- private final String value;
- private final String description;
-
- LogConsistency(String value, String description) {
- this.value = value;
- this.description = description;
- }
-
- @Override
- public String toString() {
- return value;
- }
-
- @Override
- public InlineElement getDescription() {
- return text(description);
- }
- }
-
- /** Specifies the log changelog mode for table. */
- public enum LogChangelogMode implements DescribedEnum {
- AUTO("auto", "Upsert for table with primary key, all for table without primary key."),
-
- ALL("all", "The log system stores all changes including UPDATE_BEFORE."),
-
- UPSERT(
- "upsert",
- "The log system does not store the UPDATE_BEFORE changes, the log consumed job"
- + " will automatically add the normalized node, relying on the state"
- + " to generate the required update_before.");
-
- private final String value;
- private final String description;
-
- LogChangelogMode(String value, String description) {
- this.value = value;
- this.description = description;
- }
-
- @Override
- public String toString() {
- return value;
- }
-
- @Override
- public InlineElement getDescription() {
- return text(description);
- }
- }
-
/** Specifies the changelog producer for table. */
public enum ChangelogProducer implements DescribedEnum {
NONE("none", "No changelog file."),
@@ -3563,49 +3450,6 @@ public InlineElement getDescription() {
}
}
- /** Specifies the type for streaming read. */
- public enum StreamingReadMode implements DescribedEnum {
- LOG("log", "Read from the data of table log store."),
- FILE("file", "Read from the data of table file store.");
-
- private final String value;
- private final String description;
-
- StreamingReadMode(String value, String description) {
- this.value = value;
- this.description = description;
- }
-
- @Override
- public String toString() {
- return value;
- }
-
- @Override
- public InlineElement getDescription() {
- return text(description);
- }
-
- public String getValue() {
- return value;
- }
-
- @VisibleForTesting
- public static StreamingReadMode fromValue(String value) {
- for (StreamingReadMode formatType : StreamingReadMode.values()) {
- if (formatType.value.equals(value)) {
- return formatType;
- }
- }
- throw new IllegalArgumentException(
- String.format(
- "Invalid format type %s, only support [%s]",
- value,
- StringUtils.join(
- Arrays.stream(StreamingReadMode.values()).iterator(), ",")));
- }
- }
-
/** Inner stream scan mode for some internal requirements. */
public enum StreamScanMode implements DescribedEnum {
NONE("none", "No requirement."),
diff --git a/paimon-api/src/main/java/org/apache/paimon/Snapshot.java b/paimon-api/src/main/java/org/apache/paimon/Snapshot.java
index a318a72e40a3..894b8724484e 100644
--- a/paimon-api/src/main/java/org/apache/paimon/Snapshot.java
+++ b/paimon-api/src/main/java/org/apache/paimon/Snapshot.java
@@ -64,7 +64,6 @@ public class Snapshot implements Serializable {
public static final long FIRST_SNAPSHOT_ID = 1;
- public static final int TABLE_STORE_02_VERSION = 1;
protected static final int CURRENT_VERSION = 3;
protected static final String FIELD_VERSION = "version";
@@ -81,7 +80,6 @@ public class Snapshot implements Serializable {
protected static final String FIELD_COMMIT_IDENTIFIER = "commitIdentifier";
protected static final String FIELD_COMMIT_KIND = "commitKind";
protected static final String FIELD_TIME_MILLIS = "timeMillis";
- protected static final String FIELD_LOG_OFFSETS = "logOffsets";
protected static final String FIELD_TOTAL_RECORD_COUNT = "totalRecordCount";
protected static final String FIELD_DELTA_RECORD_COUNT = "deltaRecordCount";
protected static final String FIELD_CHANGELOG_RECORD_COUNT = "changelogRecordCount";
@@ -91,10 +89,8 @@ public class Snapshot implements Serializable {
protected static final String FIELD_NEXT_ROW_ID = "nextRowId";
// version of snapshot
- // null for paimon <= 0.2
@JsonProperty(FIELD_VERSION)
- @Nullable
- protected final Integer version;
+ protected final int version;
@JsonProperty(FIELD_ID)
protected final long id;
@@ -106,6 +102,7 @@ public class Snapshot implements Serializable {
@JsonProperty(FIELD_BASE_MANIFEST_LIST)
protected final String baseManifestList;
+ // null for paimon <= 1.0
@JsonProperty(FIELD_BASE_MANIFEST_LIST_SIZE)
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
@@ -116,17 +113,20 @@ public class Snapshot implements Serializable {
@JsonProperty(FIELD_DELTA_MANIFEST_LIST)
protected final String deltaManifestList;
+ // null for paimon <= 1.0
@JsonProperty(FIELD_DELTA_MANIFEST_LIST_SIZE)
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
protected final Long deltaManifestListSize;
// a manifest list recording all changelog produced in this snapshot
- // null if no changelog is produced, or for paimon <= 0.2
+ // null if no changelog is produced
@JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST)
+ @JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
protected final String changelogManifestList;
+ // null for paimon <= 1.0 or no changelog
@JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST_SIZE)
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
@@ -157,32 +157,22 @@ public class Snapshot implements Serializable {
@JsonProperty(FIELD_TIME_MILLIS)
protected final long timeMillis;
- @JsonProperty(FIELD_LOG_OFFSETS)
- @JsonInclude(JsonInclude.Include.NON_NULL)
- @Nullable
- protected final Map logOffsets;
-
// record count of all changes occurred in this snapshot
- // null for paimon <= 0.3
@JsonProperty(FIELD_TOTAL_RECORD_COUNT)
- @Nullable
- protected final Long totalRecordCount;
+ protected final long totalRecordCount;
// record count of all new changes occurred in this snapshot
- // null for paimon <= 0.3
@JsonProperty(FIELD_DELTA_RECORD_COUNT)
- @Nullable
- protected final Long deltaRecordCount;
+ protected final long deltaRecordCount;
// record count of all changelog produced in this snapshot
- // null for paimon <= 0.3
+ // null if no changelog
@JsonProperty(FIELD_CHANGELOG_RECORD_COUNT)
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
protected final Long changelogRecordCount;
// watermark for input records
- // null for paimon <= 0.3
// null if there is no watermark in new committing, and the previous snapshot does not have a
// watermark
@JsonProperty(FIELD_WATERMARK)
@@ -198,15 +188,15 @@ public class Snapshot implements Serializable {
protected final String statistics;
// properties
- // null for paimon <= 1.1 or empty properties
+ // null for empty properties
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty(FIELD_PROPERTIES)
@Nullable
protected final Map properties;
- @Nullable
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty(FIELD_NEXT_ROW_ID)
+ @Nullable
protected final Long nextRowId;
public Snapshot(
@@ -223,9 +213,8 @@ public Snapshot(
long commitIdentifier,
CommitKind commitKind,
long timeMillis,
- Map logOffsets,
- @Nullable Long totalRecordCount,
- @Nullable Long deltaRecordCount,
+ long totalRecordCount,
+ long deltaRecordCount,
@Nullable Long changelogRecordCount,
@Nullable Long watermark,
@Nullable String statistics,
@@ -246,7 +235,6 @@ public Snapshot(
commitIdentifier,
commitKind,
timeMillis,
- logOffsets,
totalRecordCount,
deltaRecordCount,
changelogRecordCount,
@@ -258,7 +246,7 @@ public Snapshot(
@JsonCreator
public Snapshot(
- @JsonProperty(FIELD_VERSION) @Nullable Integer version,
+ @JsonProperty(FIELD_VERSION) int version,
@JsonProperty(FIELD_ID) long id,
@JsonProperty(FIELD_SCHEMA_ID) long schemaId,
@JsonProperty(FIELD_BASE_MANIFEST_LIST) String baseManifestList,
@@ -273,9 +261,8 @@ public Snapshot(
@JsonProperty(FIELD_COMMIT_IDENTIFIER) long commitIdentifier,
@JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind,
@JsonProperty(FIELD_TIME_MILLIS) long timeMillis,
- @JsonProperty(FIELD_LOG_OFFSETS) @Nullable Map logOffsets,
- @JsonProperty(FIELD_TOTAL_RECORD_COUNT) @Nullable Long totalRecordCount,
- @JsonProperty(FIELD_DELTA_RECORD_COUNT) @Nullable Long deltaRecordCount,
+ @JsonProperty(FIELD_TOTAL_RECORD_COUNT) long totalRecordCount,
+ @JsonProperty(FIELD_DELTA_RECORD_COUNT) long deltaRecordCount,
@JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) @Nullable Long changelogRecordCount,
@JsonProperty(FIELD_WATERMARK) @Nullable Long watermark,
@JsonProperty(FIELD_STATISTICS) @Nullable String statistics,
@@ -295,7 +282,6 @@ public Snapshot(
this.commitIdentifier = commitIdentifier;
this.commitKind = commitKind;
this.timeMillis = timeMillis;
- this.logOffsets = logOffsets;
this.totalRecordCount = totalRecordCount;
this.deltaRecordCount = deltaRecordCount;
this.changelogRecordCount = changelogRecordCount;
@@ -307,8 +293,7 @@ public Snapshot(
@JsonGetter(FIELD_VERSION)
public int version() {
- // there is no version field for paimon <= 0.2
- return version == null ? TABLE_STORE_02_VERSION : version;
+ return version;
}
@JsonGetter(FIELD_ID)
@@ -381,21 +366,13 @@ public long timeMillis() {
return timeMillis;
}
- @JsonGetter(FIELD_LOG_OFFSETS)
- @Nullable
- public Map logOffsets() {
- return logOffsets;
- }
-
@JsonGetter(FIELD_TOTAL_RECORD_COUNT)
- @Nullable
- public Long totalRecordCount() {
+ public long totalRecordCount() {
return totalRecordCount;
}
@JsonGetter(FIELD_DELTA_RECORD_COUNT)
- @Nullable
- public Long deltaRecordCount() {
+ public long deltaRecordCount() {
return deltaRecordCount;
}
@@ -450,7 +427,6 @@ public int hashCode() {
commitIdentifier,
commitKind,
timeMillis,
- logOffsets,
totalRecordCount,
deltaRecordCount,
changelogRecordCount,
@@ -483,7 +459,6 @@ public boolean equals(Object o) {
&& commitIdentifier == that.commitIdentifier
&& commitKind == that.commitKind
&& timeMillis == that.timeMillis
- && Objects.equals(logOffsets, that.logOffsets)
&& Objects.equals(totalRecordCount, that.totalRecordCount)
&& Objects.equals(deltaRecordCount, that.deltaRecordCount)
&& Objects.equals(changelogRecordCount, that.changelogRecordCount)
diff --git a/paimon-core/src/main/java/org/apache/paimon/Changelog.java b/paimon-core/src/main/java/org/apache/paimon/Changelog.java
index a715ab8234c3..0ab3429dfc1f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/Changelog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/Changelog.java
@@ -57,7 +57,6 @@ public Changelog(Snapshot snapshot) {
snapshot.commitIdentifier(),
snapshot.commitKind(),
snapshot.timeMillis(),
- snapshot.logOffsets(),
snapshot.totalRecordCount(),
snapshot.deltaRecordCount(),
snapshot.changelogRecordCount(),
@@ -69,7 +68,7 @@ public Changelog(Snapshot snapshot) {
@JsonCreator
public Changelog(
- @JsonProperty(FIELD_VERSION) @Nullable Integer version,
+ @JsonProperty(FIELD_VERSION) int version,
@JsonProperty(FIELD_ID) long id,
@JsonProperty(FIELD_SCHEMA_ID) long schemaId,
@JsonProperty(FIELD_BASE_MANIFEST_LIST) String baseManifestList,
@@ -84,9 +83,8 @@ public Changelog(
@JsonProperty(FIELD_COMMIT_IDENTIFIER) long commitIdentifier,
@JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind,
@JsonProperty(FIELD_TIME_MILLIS) long timeMillis,
- @JsonProperty(FIELD_LOG_OFFSETS) Map logOffsets,
- @JsonProperty(FIELD_TOTAL_RECORD_COUNT) @Nullable Long totalRecordCount,
- @JsonProperty(FIELD_DELTA_RECORD_COUNT) @Nullable Long deltaRecordCount,
+ @JsonProperty(FIELD_TOTAL_RECORD_COUNT) long totalRecordCount,
+ @JsonProperty(FIELD_DELTA_RECORD_COUNT) long deltaRecordCount,
@JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) @Nullable Long changelogRecordCount,
@JsonProperty(FIELD_WATERMARK) @Nullable Long watermark,
@JsonProperty(FIELD_STATISTICS) @Nullable String statistics,
@@ -107,7 +105,6 @@ public Changelog(
commitIdentifier,
commitKind,
timeMillis,
- logOffsets,
totalRecordCount,
deltaRecordCount,
changelogRecordCount,
diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittable.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittable.java
index e8e12a668d25..18712efaaed3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittable.java
@@ -33,7 +33,6 @@ public class ManifestCommittable {
private final long identifier;
@Nullable private final Long watermark;
- private final Map logOffsets;
private final Map properties;
private final List commitMessages;
@@ -44,28 +43,22 @@ public ManifestCommittable(long identifier) {
public ManifestCommittable(long identifier, @Nullable Long watermark) {
this.identifier = identifier;
this.watermark = watermark;
- this.logOffsets = new HashMap<>();
this.commitMessages = new ArrayList<>();
this.properties = new HashMap<>();
}
public ManifestCommittable(
- long identifier,
- @Nullable Long watermark,
- Map logOffsets,
- List commitMessages) {
- this(identifier, watermark, logOffsets, commitMessages, new HashMap<>());
+ long identifier, @Nullable Long watermark, List commitMessages) {
+ this(identifier, watermark, commitMessages, new HashMap<>());
}
public ManifestCommittable(
long identifier,
@Nullable Long watermark,
- Map logOffsets,
List commitMessages,
Map properties) {
this.identifier = identifier;
this.watermark = watermark;
- this.logOffsets = logOffsets;
this.commitMessages = commitMessages;
this.properties = properties;
}
@@ -74,16 +67,6 @@ public void addFileCommittable(CommitMessage commitMessage) {
commitMessages.add(commitMessage);
}
- public void addLogOffset(int bucket, long offset, boolean allowDuplicate) {
- if (!allowDuplicate && logOffsets.containsKey(bucket)) {
- throw new RuntimeException(
- String.format(
- "bucket-%d appears multiple times, which is not possible.", bucket));
- }
- long newOffset = Math.max(logOffsets.getOrDefault(bucket, offset), offset);
- logOffsets.put(bucket, newOffset);
- }
-
public void addProperty(String key, String value) {
properties.put(key, value);
}
@@ -97,10 +80,6 @@ public Long watermark() {
return watermark;
}
- public Map logOffsets() {
- return logOffsets;
- }
-
public List fileCommittables() {
return commitMessages;
}
@@ -120,14 +99,13 @@ public boolean equals(Object o) {
ManifestCommittable that = (ManifestCommittable) o;
return Objects.equals(identifier, that.identifier)
&& Objects.equals(watermark, that.watermark)
- && Objects.equals(logOffsets, that.logOffsets)
&& Objects.equals(commitMessages, that.commitMessages)
&& Objects.equals(properties, that.properties);
}
@Override
public int hashCode() {
- return Objects.hash(identifier, watermark, logOffsets, commitMessages, properties);
+ return Objects.hash(identifier, watermark, commitMessages, properties);
}
@Override
@@ -136,9 +114,8 @@ public String toString() {
"ManifestCommittable {"
+ "identifier = %s, "
+ "watermark = %s, "
- + "logOffsets = %s, "
+ "commitMessages = %s, "
+ "properties = %s}",
- identifier, watermark, logOffsets, commitMessages, properties);
+ identifier, watermark, commitMessages, properties);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java
index 67845099e424..7cb6dc0e0fa4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java
@@ -34,7 +34,7 @@
/** {@link VersionedSerializer} for {@link ManifestCommittable}. */
public class ManifestCommittableSerializer implements VersionedSerializer {
- private static final int CURRENT_VERSION = 4;
+ private static final int CURRENT_VERSION = 5;
private final CommitMessageSerializer commitMessageSerializer;
@@ -61,22 +61,12 @@ public byte[] serialize(ManifestCommittable obj) throws IOException {
view.writeBoolean(false);
view.writeLong(watermark);
}
- serializeOffsets(view, obj.logOffsets());
serializeProperties(view, obj.properties());
view.writeInt(commitMessageSerializer.getVersion());
commitMessageSerializer.serializeList(obj.fileCommittables(), view);
return out.toByteArray();
}
- private void serializeOffsets(DataOutputViewStreamWrapper view, Map offsets)
- throws IOException {
- view.writeInt(offsets.size());
- for (Map.Entry entry : offsets.entrySet()) {
- view.writeInt(entry.getKey());
- view.writeLong(entry.getValue());
- }
- }
-
private void serializeProperties(
DataOutputViewStreamWrapper view, Map properties) throws IOException {
view.writeInt(properties.size());
@@ -100,9 +90,11 @@ public ManifestCommittable deserialize(int version, byte[] serialized) throws IO
DataInputDeserializer view = new DataInputDeserializer(serialized);
long identifier = view.readLong();
Long watermark = view.readBoolean() ? null : view.readLong();
- Map offsets = deserializeOffsets(view);
+ if (version <= 4) {
+ skipLegacyLogOffsets(view);
+ }
Map properties =
- version == CURRENT_VERSION ? deserializeProperties(view) : new HashMap<>();
+ version >= 4 ? deserializeProperties(view) : new HashMap<>();
int fileCommittableSerializerVersion = view.readInt();
List fileCommittables;
try {
@@ -119,7 +111,7 @@ public ManifestCommittable deserialize(int version, byte[] serialized) throws IO
if (!view.readBoolean()) {
view.readLong();
}
- deserializeOffsets(view);
+ skipLegacyLogOffsets(view);
view.readInt();
if (legacyV2CommitMessageSerializer == null) {
@@ -128,17 +120,15 @@ public ManifestCommittable deserialize(int version, byte[] serialized) throws IO
fileCommittables = legacyV2CommitMessageSerializer.deserializeList(view);
}
- return new ManifestCommittable(
- identifier, watermark, offsets, fileCommittables, properties);
+ return new ManifestCommittable(identifier, watermark, fileCommittables, properties);
}
- private Map deserializeOffsets(DataInputDeserializer view) throws IOException {
+ private void skipLegacyLogOffsets(DataInputDeserializer view) throws IOException {
int size = view.readInt();
- Map offsets = new HashMap<>(size);
for (int i = 0; i < size; i++) {
- offsets.put(view.readInt(), view.readLong());
+ view.readInt();
+ view.readLong();
}
- return offsets;
}
private Map deserializeProperties(DataInputDeserializer view)
diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
index 756611b07605..f95ba19221a2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
@@ -26,6 +26,8 @@
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TinyIntType;
+import javax.annotation.Nullable;
+
import java.util.Arrays;
import java.util.List;
@@ -68,6 +70,14 @@ static long recordCount(List manifestEntries) {
return manifestEntries.stream().mapToLong(manifest -> manifest.file().rowCount()).sum();
}
+ @Nullable
+ static Long nullableRecordCount(List manifestEntries) {
+ if (manifestEntries.isEmpty()) {
+ return null;
+ }
+ return manifestEntries.stream().mapToLong(manifest -> manifest.file().rowCount()).sum();
+ }
+
static long recordCountAdd(List manifestEntries) {
return manifestEntries.stream()
.filter(manifestEntry -> FileKind.ADD.equals(manifestEntry.kind()))
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index e5d4cd2ff20c..1355bb4a3acd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -95,7 +95,7 @@
import static java.util.Collections.emptyList;
import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
-import static org.apache.paimon.manifest.ManifestEntry.recordCount;
+import static org.apache.paimon.manifest.ManifestEntry.nullableRecordCount;
import static org.apache.paimon.manifest.ManifestEntry.recordCountAdd;
import static org.apache.paimon.manifest.ManifestEntry.recordCountDelete;
import static org.apache.paimon.operation.commit.ConflictDetection.hasConflictChecked;
@@ -383,7 +383,6 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
changes.appendIndexFiles),
committable.identifier(),
committable.watermark(),
- committable.logOffsets(),
committable.properties(),
CommitKindProvider.provider(commitKind),
conflictCheck,
@@ -421,7 +420,6 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
changes.compactIndexFiles),
committable.identifier(),
committable.watermark(),
- committable.logOffsets(),
committable.properties(),
CommitKindProvider.provider(CommitKind.COMPACT),
hasConflictChecked(safeLatestSnapshotId),
@@ -574,7 +572,6 @@ public int overwritePartition(
changes.appendIndexFiles,
committable.identifier(),
committable.watermark(),
- committable.logOffsets(),
committable.properties());
generatedSnapshot += 1;
}
@@ -588,7 +585,6 @@ public int overwritePartition(
changes.compactIndexFiles),
committable.identifier(),
committable.watermark(),
- committable.logOffsets(),
committable.properties(),
CommitKindProvider.provider(CommitKind.COMPACT),
mustConflictCheck(),
@@ -688,25 +684,13 @@ public void dropPartitions(List