Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions docs/content/maintenance/rescale-bucket.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,6 @@ Please note that
```
- During overwrite period, make sure there are no other jobs writing the same table/partition.

{{< hint info >}}
__Note:__ For the table which enables log system(*e.g.* Kafka), please rescale the topic's partition as well to keep consistency.
{{< /hint >}}

## Use Case

Rescale bucket helps to handle sudden spikes in throughput. Suppose there is a daily streaming ETL task to sync transaction data. The table's DDL and pipeline
Expand Down
6 changes: 0 additions & 6 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -1272,12 +1272,6 @@
<td>Boolean</td>
<td>Whether to read the delta from append table's overwrite commit in streaming mode.</td>
</tr>
<tr>
<td><h5>streaming-read-mode</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td><p>Enum</p></td>
<td>The mode of streaming read that specifies to read the data of table file or log.<br /><br />Possible values:<ul><li>"log": Read from the data of table log store.</li><li>"file": Read from the data of table file store.</li></ul></td>
</tr>
<tr>
<td><h5>streaming-read-overwrite</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
42 changes: 0 additions & 42 deletions docs/layouts/shortcodes/generated/kafka_log_configuration.html

This file was deleted.

156 changes: 0 additions & 156 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.paimon.annotation.Documentation;
import org.apache.paimon.annotation.Documentation.ExcludeFromDocumentation;
import org.apache.paimon.annotation.Documentation.Immutable;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.fs.Path;
Expand Down Expand Up @@ -969,42 +968,6 @@ public InlineElement getDescription() {
.withDescription(
"The delay duration of stream read when scan incremental snapshots.");

@ExcludeFromDocumentation("Confused without log system")
public static final ConfigOption<LogConsistency> LOG_CONSISTENCY =
key("log.consistency")
.enumType(LogConsistency.class)
.defaultValue(LogConsistency.TRANSACTIONAL)
.withDescription("Specify the log consistency mode for table.");

@ExcludeFromDocumentation("Confused without log system")
public static final ConfigOption<LogChangelogMode> LOG_CHANGELOG_MODE =
key("log.changelog-mode")
.enumType(LogChangelogMode.class)
.defaultValue(LogChangelogMode.AUTO)
.withDescription("Specify the log changelog mode for table.");

@ExcludeFromDocumentation("Confused without log system")
public static final ConfigOption<String> LOG_KEY_FORMAT =
key("log.key.format")
.stringType()
.defaultValue("json")
.withDescription(
"Specify the key message format of log system with primary key.");

@ExcludeFromDocumentation("Confused without log system")
public static final ConfigOption<String> LOG_FORMAT =
key("log.format")
.stringType()
.defaultValue("debezium-json")
.withDescription("Specify the message format of log system.");

@ExcludeFromDocumentation("Confused without log system")
public static final ConfigOption<Boolean> LOG_IGNORE_DELETE =
key("log.ignore-delete")
.booleanType()
.defaultValue(false)
.withDescription("Specify whether the log system ignores delete records.");

public static final ConfigOption<Boolean> AUTO_CREATE =
key("auto-create")
.booleanType()
Expand Down Expand Up @@ -1272,13 +1235,6 @@ public InlineElement getDescription() {
"Only used to force TableScan to construct suitable 'StartingUpScanner' and 'FollowUpScanner' "
+ "dedicated internal streaming scan.");

public static final ConfigOption<StreamingReadMode> STREAMING_READ_MODE =
key("streaming-read-mode")
.enumType(StreamingReadMode.class)
.noDefaultValue()
.withDescription(
"The mode of streaming read that specifies to read the data of table file or log.");

@ExcludeFromDocumentation("Internal use only")
public static final ConfigOption<BatchScanMode> BATCH_SCAN_MODE =
key("batch-scan-mode")
Expand Down Expand Up @@ -2770,10 +2726,6 @@ public boolean scanPlanSortPartition() {
}

public StartupMode startupMode() {
return startupMode(options);
}

public static StartupMode startupMode(Options options) {
StartupMode mode = options.get(SCAN_MODE);
if (mode == StartupMode.DEFAULT) {
if (options.getOptional(SCAN_TIMESTAMP_MILLIS).isPresent()
Expand Down Expand Up @@ -3001,10 +2953,6 @@ public Integer fullCompactionDeltaCommits() {
return options.get(FULL_COMPACTION_DELTA_COMMITS);
}

public static StreamingReadMode streamReadType(Options options) {
return options.get(STREAMING_READ_MODE);
}

public Duration consumerExpireTime() {
return options.get(CONSUMER_EXPIRATION_TIME);
}
Expand Down Expand Up @@ -3471,67 +3419,6 @@ public InlineElement getDescription() {
}
}

/** Specifies the log consistency mode for table. */
public enum LogConsistency implements DescribedEnum {
TRANSACTIONAL(
"transactional",
"Only the data after the checkpoint can be seen by readers, the latency depends on checkpoint interval."),

EVENTUAL(
"eventual",
"Immediate data visibility, you may see some intermediate states, "
+ "but eventually the right results will be produced, only works for table with primary key.");

private final String value;
private final String description;

LogConsistency(String value, String description) {
this.value = value;
this.description = description;
}

@Override
public String toString() {
return value;
}

@Override
public InlineElement getDescription() {
return text(description);
}
}

/** Specifies the log changelog mode for table. */
public enum LogChangelogMode implements DescribedEnum {
AUTO("auto", "Upsert for table with primary key, all for table without primary key."),

ALL("all", "The log system stores all changes including UPDATE_BEFORE."),

UPSERT(
"upsert",
"The log system does not store the UPDATE_BEFORE changes, the log consumed job"
+ " will automatically add the normalized node, relying on the state"
+ " to generate the required update_before.");

private final String value;
private final String description;

LogChangelogMode(String value, String description) {
this.value = value;
this.description = description;
}

@Override
public String toString() {
return value;
}

@Override
public InlineElement getDescription() {
return text(description);
}
}

/** Specifies the changelog producer for table. */
public enum ChangelogProducer implements DescribedEnum {
NONE("none", "No changelog file."),
Expand Down Expand Up @@ -3563,49 +3450,6 @@ public InlineElement getDescription() {
}
}

/** Specifies the type for streaming read. */
public enum StreamingReadMode implements DescribedEnum {
LOG("log", "Read from the data of table log store."),
FILE("file", "Read from the data of table file store.");

private final String value;
private final String description;

StreamingReadMode(String value, String description) {
this.value = value;
this.description = description;
}

@Override
public String toString() {
return value;
}

@Override
public InlineElement getDescription() {
return text(description);
}

public String getValue() {
return value;
}

@VisibleForTesting
public static StreamingReadMode fromValue(String value) {
for (StreamingReadMode formatType : StreamingReadMode.values()) {
if (formatType.value.equals(value)) {
return formatType;
}
}
throw new IllegalArgumentException(
String.format(
"Invalid format type %s, only support [%s]",
value,
StringUtils.join(
Arrays.stream(StreamingReadMode.values()).iterator(), ",")));
}
}

/** Inner stream scan mode for some internal requirements. */
public enum StreamScanMode implements DescribedEnum {
NONE("none", "No requirement."),
Expand Down
Loading
Loading